diff options
author | Steven Phillips <sphillips@maprtech.com> | 2014-07-23 20:03:07 -0700 |
---|---|---|
committer | Steven Phillips <sphillips@maprtech.com> | 2014-07-25 18:33:53 -0700 |
commit | c331aed81e73d16ea29bf8c94863591b212aa644 (patch) | |
tree | 3887590400bc633bd459f9606ad3fbc8de983850 /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java | |
parent | 5e482c17d20bcc957be50d570d03f1a5fdfca75e (diff) |
DRILL-991: Limit should terminate upstream fragments immediately upon completion
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java | 57 |
1 files changed, 43 insertions, 14 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 69be256ec..14cf092ee 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -19,6 +19,8 @@ package org.apache.drill.exec.physical.impl.partitionsender; import java.io.IOException; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerArray; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; @@ -31,22 +33,16 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; -import org.apache.drill.exec.ops.OperatorContext; -import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.BaseRootExec; import org.apache.drill.exec.physical.impl.SendingAccountor; -import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; -import org.apache.drill.exec.proto.UserBitShared.MetricValue; -import org.apache.drill.exec.proto.UserBitShared.OperatorProfile; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.FragmentWritableBatch; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.rpc.data.DataTunnel; import org.apache.drill.exec.vector.CopyUtil; @@ -66,6 +62,10 @@ public class PartitionSenderRootExec extends BaseRootExec { private final int outGoingBatchCount; private final HashPartitionSender popConfig; private final StatusHandler statusHandler; + + private final AtomicIntegerArray remainingReceivers; + private final AtomicInteger remaingReceiverCount; + private volatile boolean done = false; long minReceiverRecordCount = Long.MAX_VALUE; long maxReceiverRecordCount = Long.MIN_VALUE; @@ -94,6 +94,17 @@ public class PartitionSenderRootExec extends BaseRootExec { this.outGoingBatchCount = operator.getDestinations().size(); this.popConfig = operator; this.statusHandler = new StatusHandler(sendCount, context); + this.remainingReceivers = new AtomicIntegerArray(outGoingBatchCount); + this.remaingReceiverCount = new AtomicInteger(outGoingBatchCount); + } + + private boolean done() { + for (int i = 0; i < remainingReceivers.length(); i++) { + if (remainingReceivers.get(i) == 0) { + return false; + } + } + return true; } @Override @@ -106,7 +117,13 @@ public class PartitionSenderRootExec extends BaseRootExec { return false; } - RecordBatch.IterOutcome out = next(incoming); + IterOutcome out; + if (!done) { + out = next(incoming); + } else { + incoming.kill(true); + out = IterOutcome.NONE; + } logger.debug("Partitioner.next(): got next record batch with status {}", out); switch(out){ @@ -119,7 +136,7 @@ public class PartitionSenderRootExec extends BaseRootExec { sendEmptyBatch(); } } catch (IOException e) { - incoming.kill(); + incoming.kill(false); logger.error("Error while creating partitioning sender or flushing outgoing batches", e); context.fail(e); } @@ -140,12 +157,12 @@ public class PartitionSenderRootExec extends BaseRootExec { } createPartitioner(); } catch (IOException e) { - incoming.kill(); + incoming.kill(false); logger.error("Error while flushing outgoing batches", e); context.fail(e); return false; } catch (SchemaChangeException e) { - incoming.kill(); + incoming.kill(false); logger.error("Error while setting up partitioner", e); context.fail(e); return false; @@ -155,7 +172,7 @@ public class PartitionSenderRootExec extends BaseRootExec { partitioner.partitionBatch(incoming); } catch (IOException e) { context.fail(e); - incoming.kill(); + incoming.kill(false); return false; } for (VectorWrapper<?> v : incoming) { @@ -206,9 +223,9 @@ public class PartitionSenderRootExec extends BaseRootExec { } } - public void updateStats(List<? extends PartitionStatsBatch> outgoing) { + public void updateStats(List<? extends PartitionOutgoingBatch> outgoing) { long records = 0; - for (PartitionStatsBatch o : outgoing) { + for (PartitionOutgoingBatch o : outgoing) { long totalRecords = o.getTotalRecords(); minReceiverRecordCount = Math.min(minReceiverRecordCount, totalRecords); maxReceiverRecordCount = Math.max(maxReceiverRecordCount, totalRecords); @@ -220,6 +237,18 @@ public class PartitionSenderRootExec extends BaseRootExec { stats.setLongStat(Metric.MAX_RECORDS, maxReceiverRecordCount); stats.setLongStat(Metric.N_RECEIVERS, outgoing.size()); } + + @Override + public void receivingFragmentFinished(FragmentHandle handle) { + int id = handle.getMinorFragmentId(); + if (remainingReceivers.compareAndSet(id, 0, 1)) { + partitioner.getOutgoingBatches().get(handle.getMinorFragmentId()).terminate(); + int remaining = remaingReceiverCount.decrementAndGet(); + if (remaining == 0) { + done = true; + } + } + } public void stop() { logger.debug("Partition sender stopping."); |