aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
diff options
context:
space:
mode:
authorSteven Phillips <sphillips@maprtech.com>2014-07-23 20:03:07 -0700
committerSteven Phillips <sphillips@maprtech.com>2014-07-25 18:33:53 -0700
commitc331aed81e73d16ea29bf8c94863591b212aa644 (patch)
tree3887590400bc633bd459f9606ad3fbc8de983850 /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
parent5e482c17d20bcc957be50d570d03f1a5fdfca75e (diff)
DRILL-991: Limit should terminate upstream fragments immediately upon completion
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java27
1 files changed, 19 insertions, 8 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index fcbd95449..3141aed1b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -51,7 +51,6 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.rpc.data.DataTunnel;
import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.allocator.VectorAllocator;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -71,7 +70,7 @@ public abstract class PartitionerTemplate implements Partitioner {
}
@Override
- public List<? extends PartitionStatsBatch> getOutgoingBatches() {
+ public List<? extends PartitionOutgoingBatch> getOutgoingBatches() {
return outgoingBatches;
}
@@ -203,7 +202,7 @@ public abstract class PartitionerTemplate implements Partitioner {
public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") OutgoingRecordBatch[] outgoing) throws SchemaChangeException;
public abstract int doEval(@Named("inIndex") int inIndex);
- public class OutgoingRecordBatch implements PartitionStatsBatch, VectorAccessible {
+ public class OutgoingRecordBatch implements PartitionOutgoingBatch, VectorAccessible {
private final DataTunnel tunnel;
private final HashPartitionSender operator;
@@ -214,6 +213,8 @@ public abstract class PartitionerTemplate implements Partitioner {
private final int oppositeMinorFragmentId;
private boolean isLast = false;
+ private volatile boolean terminated = false;
+ private boolean dropAll = false;
private BatchSchema outSchema;
private int recordCount;
private int totalRecords;
@@ -247,6 +248,11 @@ public abstract class PartitionerTemplate implements Partitioner {
return false;
}
+ @Override
+ public void terminate() {
+ terminated = true;
+ }
+
@RuntimeOverridden
protected void doSetup(@Named("incoming") RecordBatch incoming, @Named("outgoing") VectorAccessible outgoing) {};
@@ -254,9 +260,13 @@ public abstract class PartitionerTemplate implements Partitioner {
protected boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex) { return false; };
public void flush() throws IOException {
- final ExecProtos.FragmentHandle handle = context.getHandle();
+ if (dropAll) {
+ vectorContainer.zeroVectors();
+ return;
+ }
+ final FragmentHandle handle = context.getHandle();
- if (recordCount != 0) {
+ if (recordCount != 0 && !terminated) {
for(VectorWrapper<?> w : vectorContainer){
w.getValueVector().getMutator().setValueCount(recordCount);
@@ -280,9 +290,9 @@ public abstract class PartitionerTemplate implements Partitioner {
this.sendCount.increment();
} else {
logger.debug("Flush requested on an empty outgoing record batch" + (isLast ? " (last batch)" : ""));
- if (isLast) {
+ if (isLast || terminated) {
// send final (empty) batch
- FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast,
+ FragmentWritableBatch writableBatch = new FragmentWritableBatch(true,
handle.getQueryId(),
handle.getMajorFragmentId(),
handle.getMinorFragmentId(),
@@ -296,7 +306,8 @@ public abstract class PartitionerTemplate implements Partitioner {
stats.stopWait();
}
this.sendCount.increment();
- vectorContainer.clear();
+ vectorContainer.zeroVectors();
+ dropAll = true;
return;
}
}