aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
diff options
context:
space:
mode:
authorSteven Phillips <sphillips@maprtech.com>2014-07-23 20:03:07 -0700
committerSteven Phillips <sphillips@maprtech.com>2014-07-25 18:33:53 -0700
commitc331aed81e73d16ea29bf8c94863591b212aa644 (patch)
tree3887590400bc633bd459f9606ad3fbc8de983850 /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
parent5e482c17d20bcc957be50d570d03f1a5fdfca75e (diff)
DRILL-991: Limit should terminate upstream fragments immediately upon completion
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java57
1 files changed, 43 insertions, 14 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 69be256ec..14cf092ee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -19,6 +19,8 @@ package org.apache.drill.exec.physical.impl.partitionsender;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerArray;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
@@ -31,22 +33,16 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.physical.impl.BaseRootExec;
import org.apache.drill.exec.physical.impl.SendingAccountor;
-import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.UserBitShared.MetricValue;
-import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.rpc.data.DataTunnel;
import org.apache.drill.exec.vector.CopyUtil;
@@ -66,6 +62,10 @@ public class PartitionSenderRootExec extends BaseRootExec {
private final int outGoingBatchCount;
private final HashPartitionSender popConfig;
private final StatusHandler statusHandler;
+
+ private final AtomicIntegerArray remainingReceivers;
+ private final AtomicInteger remaingReceiverCount;
+ private volatile boolean done = false;
long minReceiverRecordCount = Long.MAX_VALUE;
long maxReceiverRecordCount = Long.MIN_VALUE;
@@ -94,6 +94,17 @@ public class PartitionSenderRootExec extends BaseRootExec {
this.outGoingBatchCount = operator.getDestinations().size();
this.popConfig = operator;
this.statusHandler = new StatusHandler(sendCount, context);
+ this.remainingReceivers = new AtomicIntegerArray(outGoingBatchCount);
+ this.remaingReceiverCount = new AtomicInteger(outGoingBatchCount);
+ }
+
+ private boolean done() {
+ for (int i = 0; i < remainingReceivers.length(); i++) {
+ if (remainingReceivers.get(i) == 0) {
+ return false;
+ }
+ }
+ return true;
}
@Override
@@ -106,7 +117,13 @@ public class PartitionSenderRootExec extends BaseRootExec {
return false;
}
- RecordBatch.IterOutcome out = next(incoming);
+ IterOutcome out;
+ if (!done) {
+ out = next(incoming);
+ } else {
+ incoming.kill(true);
+ out = IterOutcome.NONE;
+ }
logger.debug("Partitioner.next(): got next record batch with status {}", out);
switch(out){
@@ -119,7 +136,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
sendEmptyBatch();
}
} catch (IOException e) {
- incoming.kill();
+ incoming.kill(false);
logger.error("Error while creating partitioning sender or flushing outgoing batches", e);
context.fail(e);
}
@@ -140,12 +157,12 @@ public class PartitionSenderRootExec extends BaseRootExec {
}
createPartitioner();
} catch (IOException e) {
- incoming.kill();
+ incoming.kill(false);
logger.error("Error while flushing outgoing batches", e);
context.fail(e);
return false;
} catch (SchemaChangeException e) {
- incoming.kill();
+ incoming.kill(false);
logger.error("Error while setting up partitioner", e);
context.fail(e);
return false;
@@ -155,7 +172,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
partitioner.partitionBatch(incoming);
} catch (IOException e) {
context.fail(e);
- incoming.kill();
+ incoming.kill(false);
return false;
}
for (VectorWrapper<?> v : incoming) {
@@ -206,9 +223,9 @@ public class PartitionSenderRootExec extends BaseRootExec {
}
}
- public void updateStats(List<? extends PartitionStatsBatch> outgoing) {
+ public void updateStats(List<? extends PartitionOutgoingBatch> outgoing) {
long records = 0;
- for (PartitionStatsBatch o : outgoing) {
+ for (PartitionOutgoingBatch o : outgoing) {
long totalRecords = o.getTotalRecords();
minReceiverRecordCount = Math.min(minReceiverRecordCount, totalRecords);
maxReceiverRecordCount = Math.max(maxReceiverRecordCount, totalRecords);
@@ -220,6 +237,18 @@ public class PartitionSenderRootExec extends BaseRootExec {
stats.setLongStat(Metric.MAX_RECORDS, maxReceiverRecordCount);
stats.setLongStat(Metric.N_RECEIVERS, outgoing.size());
}
+
+ @Override
+ public void receivingFragmentFinished(FragmentHandle handle) {
+ int id = handle.getMinorFragmentId();
+ if (remainingReceivers.compareAndSet(id, 0, 1)) {
+ partitioner.getOutgoingBatches().get(handle.getMinorFragmentId()).terminate();
+ int remaining = remaingReceiverCount.decrementAndGet();
+ if (remaining == 0) {
+ done = true;
+ }
+ }
+ }
public void stop() {
logger.debug("Partition sender stopping.");