diff options
author | Steven Phillips <sphillips@maprtech.com> | 2014-07-23 20:03:07 -0700 |
---|---|---|
committer | Steven Phillips <sphillips@maprtech.com> | 2014-07-25 18:33:53 -0700 |
commit | c331aed81e73d16ea29bf8c94863591b212aa644 (patch) | |
tree | 3887590400bc633bd459f9606ad3fbc8de983850 /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java | |
parent | 5e482c17d20bcc957be50d570d03f1a5fdfca75e (diff) |
DRILL-991: Limit should terminate upstream fragments immediately upon completion
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java | 27 |
1 files changed, 19 insertions, 8 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java index fcbd95449..3141aed1b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java @@ -51,7 +51,6 @@ import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.rpc.data.DataTunnel; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.allocator.VectorAllocator; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -71,7 +70,7 @@ public abstract class PartitionerTemplate implements Partitioner { } @Override - public List<? extends PartitionStatsBatch> getOutgoingBatches() { + public List<? extends PartitionOutgoingBatch> getOutgoingBatches() { return outgoingBatches; } @@ -203,7 +202,7 @@ public abstract class PartitionerTemplate implements Partitioner { public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") OutgoingRecordBatch[] outgoing) throws SchemaChangeException; public abstract int doEval(@Named("inIndex") int inIndex); - public class OutgoingRecordBatch implements PartitionStatsBatch, VectorAccessible { + public class OutgoingRecordBatch implements PartitionOutgoingBatch, VectorAccessible { private final DataTunnel tunnel; private final HashPartitionSender operator; @@ -214,6 +213,8 @@ public abstract class PartitionerTemplate implements Partitioner { private final int oppositeMinorFragmentId; private boolean isLast = false; + private volatile boolean terminated = false; + private boolean dropAll = false; private BatchSchema outSchema; private int recordCount; private int totalRecords; @@ -247,6 +248,11 @@ public abstract class PartitionerTemplate implements Partitioner { return false; } + @Override + public void terminate() { + terminated = true; + } + @RuntimeOverridden protected void doSetup(@Named("incoming") RecordBatch incoming, @Named("outgoing") VectorAccessible outgoing) {}; @@ -254,9 +260,13 @@ public abstract class PartitionerTemplate implements Partitioner { protected boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex) { return false; }; public void flush() throws IOException { - final ExecProtos.FragmentHandle handle = context.getHandle(); + if (dropAll) { + vectorContainer.zeroVectors(); + return; + } + final FragmentHandle handle = context.getHandle(); - if (recordCount != 0) { + if (recordCount != 0 && !terminated) { for(VectorWrapper<?> w : vectorContainer){ w.getValueVector().getMutator().setValueCount(recordCount); @@ -280,9 +290,9 @@ public abstract class PartitionerTemplate implements Partitioner { this.sendCount.increment(); } else { logger.debug("Flush requested on an empty outgoing record batch" + (isLast ? " (last batch)" : "")); - if (isLast) { + if (isLast || terminated) { // send final (empty) batch - FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast, + FragmentWritableBatch writableBatch = new FragmentWritableBatch(true, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), @@ -296,7 +306,8 @@ public abstract class PartitionerTemplate implements Partitioner { stats.stopWait(); } this.sendCount.increment(); - vectorContainer.clear(); + vectorContainer.zeroVectors(); + dropAll = true; return; } } |