aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src
diff options
context:
space:
mode:
authorSteven Phillips <sphillips@maprtech.com>2014-07-23 20:03:07 -0700
committerSteven Phillips <sphillips@maprtech.com>2014-07-25 18:33:53 -0700
commitc331aed81e73d16ea29bf8c94863591b212aa644 (patch)
tree3887590400bc633bd459f9606ad3fbc8de983850 /exec/java-exec/src
parent5e482c17d20bcc957be50d570d03f1a5fdfca75e (diff)
DRILL-991: Limit should terminate upstream fragments immediately upon completion
Diffstat (limited to 'exec/java-exec/src')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java20
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java52
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionOutgoingBatch.java (renamed from exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionStatsBatch.java)4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java57
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java27
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java47
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java22
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java22
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java5
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java6
32 files changed, 305 insertions, 103 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index fa6c99781..c2c314436 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.ops.OpProfileDef;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
@@ -72,4 +73,9 @@ public abstract class BaseRootExec implements RootExec {
}
public abstract boolean innerNext();
+
+ @Override
+ public void receivingFragmentFinished(FragmentHandle handle) {
+ logger.warn("Currently not handling FinishedFragment message");
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
index fcc10aa18..42ac4f66d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
@@ -18,6 +18,8 @@
package org.apache.drill.exec.physical.impl;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+
/**
* A FragmentRoot is a node which is the last processing node in a query plan. FragmentTerminals include Exchange
* output nodes and storage nodes. They are there driving force behind the completion of a query.
@@ -35,5 +37,11 @@ public interface RootExec {
* Inform all children to clean up and go away.
*/
public void stop();
+
+ /**
+ * Inform sender that receiving fragment is finished and doesn't need any more data
+ * @param handle
+ */
+ public void receivingFragmentFinished(FragmentHandle handle);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index a8881f090..21a580b99 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -119,8 +119,12 @@ public class ScanBatch implements RecordBatch {
}
@Override
- public void kill() {
- releaseAssets();
+ public void kill(boolean sendUpstream) {
+ if (sendUpstream) {
+ done = true;
+ } else {
+ releaseAssets();
+ }
}
private void releaseAssets() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 325e315aa..26aa5abd2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -56,6 +56,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
private int recMajor;
private FragmentContext context;
private volatile boolean ok = true;
+ private volatile boolean done = false;
private final SendingAccountor sendCount = new SendingAccountor();
public enum Metric implements MetricDef {
@@ -81,11 +82,18 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
@Override
public boolean innerNext() {
if(!ok){
- incoming.kill();
+ incoming.kill(false);
return false;
}
- IterOutcome out = next(incoming);
+
+ IterOutcome out;
+ if (!done) {
+ out = next(incoming);
+ } else {
+ incoming.kill(true);
+ out = IterOutcome.NONE;
+ }
// logger.debug("Outcome of sender next {}", out);
switch(out){
case STOP:
@@ -132,8 +140,12 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
oContext.close();
incoming.cleanup();
}
-
-
+
+ @Override
+ public void receivingFragmentFinished(FragmentHandle handle) {
+ done = true;
+ }
+
private class RecordSendFailure extends BaseRpcOutcomeListener<Ack>{
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 0132e8544..fb9554c10 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -101,11 +101,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
}
@Override
- public void kill() {
- incoming.kill();
- }
-
- @Override
public SelectionVector2 getSelectionVector2() {
throw new UnsupportedOperationException();
}
@@ -203,7 +198,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
return IterOutcome.OK_NEW_SCHEMA;
}catch(SchemaChangeException | ClassTransformationException | IOException ex){
- kill();
+ kill(false);
logger.error("Failure during query", ex);
context.fail(ex);
return IterOutcome.STOP;
@@ -297,8 +292,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
}
@Override
- protected void killIncoming() {
- incoming.kill();
+ protected void killIncoming(boolean sendUpstream) {
+ incoming.kill(sendUpstream);
}
@@ -334,7 +329,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
}
@Override
- public void kill() {
+ public void kill(boolean sendUpstream) {
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 43e0dd4c7..29b346ddd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -67,8 +67,8 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
}
@Override
- protected void killIncoming() {
- incoming.kill();
+ protected void killIncoming(boolean sendUpstream) {
+ incoming.kill(sendUpstream);
}
@Override
@@ -100,7 +100,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
try{
setupNewSchema();
}catch(Exception ex){
- kill();
+ kill(false);
logger.error("Failure during query", ex);
context.fail(ex);
return IterOutcome.STOP;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index b30a357a1..393fa4f58 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -180,7 +180,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
}catch(SchemaChangeException | ClassTransformationException | IOException ex){
context.fail(ex);
container.clear();
- incoming.kill();
+ incoming.kill(false);
return false;
}finally{
stats.stopSetup();
@@ -301,8 +301,8 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
}
@Override
- protected void killIncoming() {
- incoming.kill();
+ protected void killIncoming(boolean sendUpstream) {
+ incoming.kill(sendUpstream);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 2f71bf9e2..39131125d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -157,7 +157,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
}catch(SchemaChangeException | ClassTransformationException | IOException ex){
context.fail(ex);
container.clear();
- incoming.kill();
+ incoming.kill(false);
return false;
}finally{
stats.stopSetup();
@@ -338,8 +338,8 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
}
@Override
- protected void killIncoming() {
- incoming.kill();
+ protected void killIncoming(boolean sendUpstream) {
+ incoming.kill(sendUpstream);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 46f7d51a4..7233f69bf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -242,6 +242,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
for (VectorWrapper<?> wrapper : left) {
wrapper.getValueVector().clear();
}
+ left.kill(true);
leftUpstream = next(HashJoinHelper.LEFT_INPUT, left);
while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
for (VectorWrapper<?> wrapper : left) {
@@ -260,7 +261,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
return IterOutcome.NONE;
} catch (ClassTransformationException | SchemaChangeException | IOException e) {
context.fail(e);
- killIncoming();
+ killIncoming(false);
return IterOutcome.STOP;
}
}
@@ -483,9 +484,9 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
}
@Override
- public void killIncoming() {
- this.left.kill();
- this.right.kill();
+ public void killIncoming(boolean sendUpstream) {
+ this.left.kill(sendUpstream);
+ this.right.kill(sendUpstream);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 0c6657cc2..24ca46384 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -172,7 +172,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
first = true;
} catch (ClassTransformationException | IOException | SchemaChangeException e) {
context.fail(new SchemaChangeException(e));
- kill();
+ kill(false);
return IterOutcome.STOP;
} finally {
stats.stopSetup();
@@ -191,7 +191,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
setRecordCountInContainer();
return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK;
case FAILURE:
- kill();
+ kill(false);
return IterOutcome.STOP;
case NO_MORE_DATA:
logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : (first ? "OK_NEW_SCHEMA" :"NONE")));
@@ -233,9 +233,9 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
}
@Override
- protected void killIncoming() {
- left.kill();
- right.kill();
+ protected void killIncoming(boolean sendUpstream) {
+ left.kill(sendUpstream);
+ right.kill(sendUpstream);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index 078c4c449..12ee40617 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -83,8 +83,8 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
@Override
public IterOutcome innerNext() {
if(!noEndLimit && recordsLeft <= 0) {
- // don't kill incoming batches or call cleanup yet, as this could close allocators before the buffers have been cleared
- // Drain the incoming record batch and clear the memory
+ incoming.kill(true);
+
IterOutcome upStream = incoming.next();
while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 313fdecad..b8e18afa5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
+import io.netty.buffer.ByteBuf;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.LogicalExpression;
@@ -43,6 +44,9 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.physical.config.MergingReceiverPOP;
+import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
@@ -60,6 +64,8 @@ import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.vector.CopyUtil;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.allocator.VectorAllocator;
@@ -87,6 +93,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
private BatchSchema schema;
private VectorContainer outgoingContainer;
private MergingReceiverGeneratorBase merger;
+ private MergingReceiverPOP config;
private boolean hasRun = false;
private boolean prevBatchWasFull = false;
private boolean hasMoreIncoming = true;
@@ -119,6 +126,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
this.context = context;
this.outgoingContainer = new VectorContainer();
this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders());
+ this.config = config;
}
private RawFragmentBatch getNext(RawFragmentBatchProvider provider) throws IOException{
@@ -437,15 +445,49 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
@Override
- public void kill() {
- cleanup();
- for (RawFragmentBatchProvider provider : fragProviders) {
- provider.kill(context);
+ public void kill(boolean sendUpstream) {
+ if (sendUpstream) {
+ informSenders();
+ } else {
+ cleanup();
+ for (RawFragmentBatchProvider provider : fragProviders) {
+ provider.kill(context);
+ }
+ }
+ }
+
+ private void informSenders() {
+ FragmentHandle handlePrototype = FragmentHandle.newBuilder()
+ .setMajorFragmentId(config.getOppositeMajorFragmentId())
+ .setQueryId(context.getHandle().getQueryId())
+ .build();
+ for (int i = 0; i < config.getNumSenders(); i++) {
+ FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype)
+ .setMinorFragmentId(i)
+ .build();
+ FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder()
+ .setReceiver(context.getHandle())
+ .setSender(sender)
+ .build();
+ context.getControlTunnel(config.getProvidingEndpoints().get(i)).informReceiverFinished(new OutcomeListener(), finishedReceiver);
+ }
+ }
+
+ private class OutcomeListener implements RpcOutcomeListener<Ack> {
+
+ @Override
+ public void failed(RpcException ex) {
+ logger.warn("Failed to inform upstream that receiver is finished");
+ }
+
+ @Override
+ public void success(Ack value, ByteBuf buffer) {
+ // Do nothing
}
}
@Override
- protected void killIncoming() {
+ protected void killIncoming(boolean sendUpstream) {
//No op
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index f677e5439..45f32cff4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -300,7 +300,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
partitionVectors.add(w.getValueVector());
}
} catch (ClassTransformationException | IOException | SchemaChangeException | InterruptedException ex) {
- kill();
+ kill(false);
logger.error("Failure while building final partition table.", ex);
context.fail(ex);
return false;
@@ -419,8 +419,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
}
@Override
- protected void killIncoming() {
- incoming.kill();
+ protected void killIncoming(boolean sendUpstream) {
+ incoming.kill(sendUpstream);
}
@Override
@@ -441,7 +441,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
// Must set up a new schema each time, because ValueVectors are not reused between containers in queue
setupNewSchema(vc);
} catch (SchemaChangeException ex) {
- kill();
+ kill(false);
logger.error("Failure during query", ex);
context.fail(ex);
return IterOutcome.STOP;
@@ -474,7 +474,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
try {
setupNewSchema(vc);
} catch (SchemaChangeException ex) {
- kill();
+ kill(false);
logger.error("Failure during query", ex);
context.fail(ex);
return IterOutcome.STOP;
@@ -504,7 +504,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
try {
setupNewSchema(incoming);
} catch (SchemaChangeException ex) {
- kill();
+ kill(false);
logger.error("Failure during query", ex);
context.fail(ex);
return IterOutcome.STOP;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionStatsBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionOutgoingBatch.java
index 85ccffbe5..71a1590bd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionStatsBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionOutgoingBatch.java
@@ -18,7 +18,9 @@
package org.apache.drill.exec.physical.impl.partitionsender;
-public interface PartitionStatsBatch {
+public interface PartitionOutgoingBatch {
public long getTotalRecords();
+
+ public void terminate();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 69be256ec..14cf092ee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -19,6 +19,8 @@ package org.apache.drill.exec.physical.impl.partitionsender;
import java.io.IOException;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerArray;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
@@ -31,22 +33,16 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.physical.impl.BaseRootExec;
import org.apache.drill.exec.physical.impl.SendingAccountor;
-import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.UserBitShared.MetricValue;
-import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.rpc.data.DataTunnel;
import org.apache.drill.exec.vector.CopyUtil;
@@ -66,6 +62,10 @@ public class PartitionSenderRootExec extends BaseRootExec {
private final int outGoingBatchCount;
private final HashPartitionSender popConfig;
private final StatusHandler statusHandler;
+
+ private final AtomicIntegerArray remainingReceivers;
+ private final AtomicInteger remaingReceiverCount;
+ private volatile boolean done = false;
long minReceiverRecordCount = Long.MAX_VALUE;
long maxReceiverRecordCount = Long.MIN_VALUE;
@@ -94,6 +94,17 @@ public class PartitionSenderRootExec extends BaseRootExec {
this.outGoingBatchCount = operator.getDestinations().size();
this.popConfig = operator;
this.statusHandler = new StatusHandler(sendCount, context);
+ this.remainingReceivers = new AtomicIntegerArray(outGoingBatchCount);
+ this.remaingReceiverCount = new AtomicInteger(outGoingBatchCount);
+ }
+
+ private boolean done() {
+ for (int i = 0; i < remainingReceivers.length(); i++) {
+ if (remainingReceivers.get(i) == 0) {
+ return false;
+ }
+ }
+ return true;
}
@Override
@@ -106,7 +117,13 @@ public class PartitionSenderRootExec extends BaseRootExec {
return false;
}
- RecordBatch.IterOutcome out = next(incoming);
+ IterOutcome out;
+ if (!done) {
+ out = next(incoming);
+ } else {
+ incoming.kill(true);
+ out = IterOutcome.NONE;
+ }
logger.debug("Partitioner.next(): got next record batch with status {}", out);
switch(out){
@@ -119,7 +136,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
sendEmptyBatch();
}
} catch (IOException e) {
- incoming.kill();
+ incoming.kill(false);
logger.error("Error while creating partitioning sender or flushing outgoing batches", e);
context.fail(e);
}
@@ -140,12 +157,12 @@ public class PartitionSenderRootExec extends BaseRootExec {
}
createPartitioner();
} catch (IOException e) {
- incoming.kill();
+ incoming.kill(false);
logger.error("Error while flushing outgoing batches", e);
context.fail(e);
return false;
} catch (SchemaChangeException e) {
- incoming.kill();
+ incoming.kill(false);
logger.error("Error while setting up partitioner", e);
context.fail(e);
return false;
@@ -155,7 +172,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
partitioner.partitionBatch(incoming);
} catch (IOException e) {
context.fail(e);
- incoming.kill();
+ incoming.kill(false);
return false;
}
for (VectorWrapper<?> v : incoming) {
@@ -206,9 +223,9 @@ public class PartitionSenderRootExec extends BaseRootExec {
}
}
- public void updateStats(List<? extends PartitionStatsBatch> outgoing) {
+ public void updateStats(List<? extends PartitionOutgoingBatch> outgoing) {
long records = 0;
- for (PartitionStatsBatch o : outgoing) {
+ for (PartitionOutgoingBatch o : outgoing) {
long totalRecords = o.getTotalRecords();
minReceiverRecordCount = Math.min(minReceiverRecordCount, totalRecords);
maxReceiverRecordCount = Math.max(maxReceiverRecordCount, totalRecords);
@@ -220,6 +237,18 @@ public class PartitionSenderRootExec extends BaseRootExec {
stats.setLongStat(Metric.MAX_RECORDS, maxReceiverRecordCount);
stats.setLongStat(Metric.N_RECEIVERS, outgoing.size());
}
+
+ @Override
+ public void receivingFragmentFinished(FragmentHandle handle) {
+ int id = handle.getMinorFragmentId();
+ if (remainingReceivers.compareAndSet(id, 0, 1)) {
+ partitioner.getOutgoingBatches().get(handle.getMinorFragmentId()).terminate();
+ int remaining = remaingReceiverCount.decrementAndGet();
+ if (remaining == 0) {
+ done = true;
+ }
+ }
+ }
public void stop() {
logger.debug("Partition sender stopping.");
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
index 53528bac8..c5fe154ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
@@ -25,7 +25,6 @@ import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.physical.impl.SendingAccountor;
import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.physical.impl.partitionsender.PartitionStatsBatch;
import java.io.IOException;
import java.util.List;
@@ -44,7 +43,7 @@ public interface Partitioner {
public abstract void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException;
public abstract void initialize();
public abstract void clear();
- public abstract List<? extends PartitionStatsBatch> getOutgoingBatches();
+ public abstract List<? extends PartitionOutgoingBatch> getOutgoingBatches();
public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class);
} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index fcbd95449..3141aed1b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -51,7 +51,6 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.rpc.data.DataTunnel;
import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.allocator.VectorAllocator;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -71,7 +70,7 @@ public abstract class PartitionerTemplate implements Partitioner {
}
@Override
- public List<? extends PartitionStatsBatch> getOutgoingBatches() {
+ public List<? extends PartitionOutgoingBatch> getOutgoingBatches() {
return outgoingBatches;
}
@@ -203,7 +202,7 @@ public abstract class PartitionerTemplate implements Partitioner {
public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") OutgoingRecordBatch[] outgoing) throws SchemaChangeException;
public abstract int doEval(@Named("inIndex") int inIndex);
- public class OutgoingRecordBatch implements PartitionStatsBatch, VectorAccessible {
+ public class OutgoingRecordBatch implements PartitionOutgoingBatch, VectorAccessible {
private final DataTunnel tunnel;
private final HashPartitionSender operator;
@@ -214,6 +213,8 @@ public abstract class PartitionerTemplate implements Partitioner {
private final int oppositeMinorFragmentId;
private boolean isLast = false;
+ private volatile boolean terminated = false;
+ private boolean dropAll = false;
private BatchSchema outSchema;
private int recordCount;
private int totalRecords;
@@ -247,6 +248,11 @@ public abstract class PartitionerTemplate implements Partitioner {
return false;
}
+ @Override
+ public void terminate() {
+ terminated = true;
+ }
+
@RuntimeOverridden
protected void doSetup(@Named("incoming") RecordBatch incoming, @Named("outgoing") VectorAccessible outgoing) {};
@@ -254,9 +260,13 @@ public abstract class PartitionerTemplate implements Partitioner {
protected boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex) { return false; };
public void flush() throws IOException {
- final ExecProtos.FragmentHandle handle = context.getHandle();
+ if (dropAll) {
+ vectorContainer.zeroVectors();
+ return;
+ }
+ final FragmentHandle handle = context.getHandle();
- if (recordCount != 0) {
+ if (recordCount != 0 && !terminated) {
for(VectorWrapper<?> w : vectorContainer){
w.getValueVector().getMutator().setValueCount(recordCount);
@@ -280,9 +290,9 @@ public abstract class PartitionerTemplate implements Partitioner {
this.sendCount.increment();
} else {
logger.debug("Flush requested on an empty outgoing record batch" + (isLast ? " (last batch)" : ""));
- if (isLast) {
+ if (isLast || terminated) {
// send final (empty) batch
- FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast,
+ FragmentWritableBatch writableBatch = new FragmentWritableBatch(true,
handle.getQueryId(),
handle.getMajorFragmentId(),
handle.getMinorFragmentId(),
@@ -296,7 +306,8 @@ public abstract class PartitionerTemplate implements Partitioner {
stats.stopWait();
}
this.sendCount.increment();
- vectorContainer.clear();
+ vectorContainer.zeroVectors();
+ dropAll = true;
return;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index 2dae5023f..f091aa90c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -182,7 +182,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
}
@Override
- protected void killIncoming() {
+ protected void killIncoming(boolean sendUpstream) {
producer.interrupt();
stop = true;
try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index f21673d0e..dbb547dca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -74,11 +74,6 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
}
@Override
- public void kill() {
- incoming.kill();
- }
-
- @Override
public SelectionVector2 getSelectionVector2() {
throw new UnsupportedOperationException();
}
@@ -148,7 +143,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
return IterOutcome.OK_NEW_SCHEMA;
}catch(SchemaChangeException | ClassTransformationException | IOException ex){
- kill();
+ kill(false);
logger.error("Failure during query", ex);
context.fail(ex);
return IterOutcome.STOP;
@@ -209,8 +204,8 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
}
@Override
- protected void killIncoming() {
- incoming.kill();
+ protected void killIncoming(boolean sendUpstream) {
+ incoming.kill(sendUpstream);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index ddee38af8..1f2f8430d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -59,21 +59,21 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
}
@Override
- public void kill() {
+ public void kill(boolean sendUpstream) {
if(current != null){
- current.kill();
+ current.kill(sendUpstream);
current = null;
}
for(;incomingIterator.hasNext();){
- incomingIterator.next().kill();
+ incomingIterator.next().kill(sendUpstream);
}
}
@Override
- protected void killIncoming() {
+ protected void killIncoming(boolean sendUpstream) {
for (int i = 0; i < incoming.size(); i++) {
RecordBatch in = incoming.get(i);
- in.kill();
+ in.kill(sendUpstream);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 79669fae6..16a68b809 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.unorderedreceiver;
import java.io.IOException;
import java.util.Iterator;
+import io.netty.buffer.ByteBuf;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.OutOfMemoryException;
@@ -28,6 +29,9 @@ import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.ops.OpProfileDef;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.config.UnorderedReceiver;
+import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RawFragmentBatch;
@@ -40,6 +44,9 @@ import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.control.ControlTunnel.ReceiverFinished;
public class UnorderedReceiverBatch implements RecordBatch {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverBatch.class);
@@ -50,6 +57,7 @@ public class UnorderedReceiverBatch implements RecordBatch {
private BatchSchema schema;
private OperatorStats stats;
private boolean first = true;
+ private UnorderedReceiver config;
public enum Metric implements MetricDef {
BYTES_RECEIVED,
@@ -70,6 +78,7 @@ public class UnorderedReceiverBatch implements RecordBatch {
this.stats = context.getStats().getOperatorStats(new OpProfileDef(config.getOperatorId(), config.getOperatorType(), 1), null);
this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders());
+ this.config = config;
}
@Override
@@ -88,8 +97,12 @@ public class UnorderedReceiverBatch implements RecordBatch {
}
@Override
- public void kill() {
- fragProvider.kill(context);
+ public void kill(boolean sendUpstream) {
+ if (sendUpstream) {
+ informSenders();
+ } else {
+ fragProvider.kill(context);
+ }
}
@Override
@@ -188,4 +201,34 @@ public class UnorderedReceiverBatch implements RecordBatch {
throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));
}
+ private void informSenders() {
+ FragmentHandle handlePrototype = FragmentHandle.newBuilder()
+ .setMajorFragmentId(config.getOppositeMajorFragmentId())
+ .setQueryId(context.getHandle().getQueryId())
+ .build();
+ for (int i = 0; i < config.getNumSenders(); i++) {
+ FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype)
+ .setMinorFragmentId(i)
+ .build();
+ FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder()
+ .setReceiver(context.getHandle())
+ .setSender(sender)
+ .build();
+ context.getControlTunnel(config.getProvidingEndpoints().get(i)).informReceiverFinished(new OutcomeListener(), finishedReceiver);
+ }
+ }
+
+ private class OutcomeListener implements RpcOutcomeListener<Ack> {
+
+ @Override
+ public void failed(RpcException ex) {
+ logger.warn("Failed to inform upstream that receiver is finished");
+ }
+
+ @Override
+ public void success(Ack value, ByteBuf buffer) {
+ // Do nothing
+ }
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 20e4de4fd..14110e374 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -82,8 +82,8 @@ public class IteratorValidatorBatchIterator implements RecordBatch {
}
@Override
- public void kill() {
- incoming.kill();
+ public void kill(boolean sendUpstream) {
+ incoming.kill(sendUpstream);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 08219a159..d4b10014f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -132,8 +132,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
@Override
- public void kill() {
- incoming.kill();
+ public void kill(boolean sendUpstream) {
+ incoming.kill(sendUpstream);
}
@Override
@@ -324,7 +324,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
return IterOutcome.OK_NEW_SCHEMA;
}catch(SchemaChangeException | ClassTransformationException | IOException ex){
- kill();
+ kill(false);
logger.error("Failure during query", ex);
context.fail(ex);
return IterOutcome.STOP;
@@ -577,8 +577,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
@Override
- protected void killIncoming() {
- incoming.kill();
+ protected void killIncoming(boolean sendUpstream) {
+ incoming.kill(sendUpstream);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index fd584cb69..931301870 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -44,7 +44,7 @@ public class PlannerSettings implements FrameworkContext{
public static final OptionValidator BROADCAST = new BooleanValidator("planner.enable_broadcast_join", true);
public static final OptionValidator BROADCAST_THRESHOLD = new PositiveLongValidator("planner.broadcast_threshold", MAX_BROADCAST_THRESHOLD, 1000000);
public static final OptionValidator JOIN_ROW_COUNT_ESTIMATE_FACTOR = new RangeDoubleValidator("planner.join.row_count_estimate_factor", 0, 100, 1.0d);
- public static final OptionValidator PRODUCER_CONSUMER = new BooleanValidator("planner.add_producer_consumer", true);
+ public static final OptionValidator PRODUCER_CONSUMER = new BooleanValidator("planner.add_producer_consumer", false);
public static final OptionValidator PRODUCER_CONSUMER_QUEUE_SIZE = new LongValidator("planner.producer_consumer_queue_size", 10);
public static final OptionValidator HASH_SINGLE_KEY = new BooleanValidator("planner.enable_hash_single_key", true);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 088b1204c..e8ad3114c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -106,11 +106,11 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
}
@Override
- public void kill() {
- killIncoming();
+ public void kill(boolean sendUpstream) {
+ killIncoming(sendUpstream);
}
- protected abstract void killIncoming();
+ protected abstract void killIncoming(boolean sendUpstream);
public void cleanup(){
container.clear();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index 721755d1c..bea7bbfe9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -36,8 +36,8 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
}
@Override
- protected void killIncoming() {
- incoming.kill();
+ protected void killIncoming(boolean sendUpstream) {
+ incoming.kill(sendUpstream);
}
@Override
@@ -65,7 +65,7 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
stats.startSetup();
setupNewSchema();
}catch(SchemaChangeException ex){
- kill();
+ kill(false);
logger.error("Failure during query", ex);
context.fail(ex);
return IterOutcome.STOP;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index 7617d9185..9b2817983 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -79,7 +79,7 @@ public interface RecordBatch extends VectorAccessible {
* Inform child nodes that this query should be terminated. Child nodes should utilize the QueryContext to determine
* what has happened.
*/
- public void kill();
+ public void kill(boolean sendUpstream);
public abstract SelectionVector2 getSelectionVector2();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
index b398e473c..9953e5fbd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.rpc.control;
import org.apache.drill.exec.proto.BitControl.BitControlHandshake;
+import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.BitControl.RpcType;
@@ -37,6 +38,7 @@ public class ControlRpcConfig {
.add(RpcType.HANDSHAKE, BitControlHandshake.class, RpcType.HANDSHAKE, BitControlHandshake.class)
.add(RpcType.REQ_INIATILIZE_FRAGMENT, PlanFragment.class, RpcType.ACK, Ack.class)
.add(RpcType.REQ_CANCEL_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class)
+ .add(RpcType.REQ_RECEIVER_FINISHED, FinishedReceiver.class, RpcType.ACK, Ack.class)
.add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class)
.add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class)
.build();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
index 9a2603965..d035c10fa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.rpc.control;
+import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.BitControl.RpcType;
@@ -56,6 +57,11 @@ public class ControlTunnel {
manager.runCommand(b);
}
+ public void informReceiverFinished(RpcOutcomeListener<Ack> outcomeListener, FinishedReceiver finishedReceiver){
+ ReceiverFinished b = new ReceiverFinished(outcomeListener, finishedReceiver);
+ manager.runCommand(b);
+ }
+
public DrillRpcFuture<Ack> sendFragmentStatus(FragmentStatus status){
SendFragmentStatus b = new SendFragmentStatus(status);
manager.runCommand(b);
@@ -84,6 +90,21 @@ public class ControlTunnel {
}
+
+ public static class ReceiverFinished extends ListeningCommand<Ack, ControlConnection> {
+ final FinishedReceiver finishedReceiver;
+
+ public ReceiverFinished(RpcOutcomeListener<Ack> listener, FinishedReceiver finishedReceiver) {
+ super(listener);
+ this.finishedReceiver = finishedReceiver;
+ }
+
+ @Override
+ public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) {
+ connection.send(outcomeListener, RpcType.REQ_RECEIVER_FINISHED, finishedReceiver, Ack.class);
+ }
+ }
+
public static class CancelFragment extends ListeningCommand<Ack, ControlConnection> {
final FragmentHandle handle;
@@ -127,5 +148,4 @@ public class ControlTunnel {
connection.send(outcomeListener, RpcType.REQ_QUERY_STATUS, queryId, QueryProfile.class);
}
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
index afd3fa266..893aec8b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.impl.ImplCreator;
import org.apache.drill.exec.physical.impl.RootExec;
+import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.BitControl.RpcType;
@@ -74,6 +75,11 @@ public class ControlHandlerImpl implements ControlMessageHandler {
cancelFragment(handle);
return DataRpcConfig.OK;
+ case RpcType.REQ_RECEIVER_FINISHED_VALUE:
+ FinishedReceiver finishedReceiver = get(pBody, FinishedReceiver.PARSER);
+ receivingFragmentFinished(finishedReceiver);
+ return DataRpcConfig.OK;
+
case RpcType.REQ_FRAGMENT_STATUS_VALUE:
bee.getContext().getWorkBus().status( get(pBody, FragmentStatus.PARSER));
// TODO: Support a type of message that has no response.
@@ -159,6 +165,22 @@ public class ControlHandlerImpl implements ControlMessageHandler {
return Acks.OK;
}
+ public Ack receivingFragmentFinished(FinishedReceiver finishedReceiver) {
+ FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(finishedReceiver.getSender());
+
+ FragmentExecutor executor;
+ if(manager != null) {
+ executor = manager.getRunnable();
+ } else {
+ // then try local cancel.
+ executor = bee.getFragmentRunner(finishedReceiver.getSender());
+ }
+ if (executor != null) {
+ executor.receivingFragmentFinished(finishedReceiver.getReceiver());
+ }
+
+ return Acks.OK;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 735e66332..c5c08e276 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.impl.ImplCreator;
import org.apache.drill.exec.physical.impl.RootExec;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.UserBitShared.FragmentState;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
@@ -71,6 +72,10 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
}
}
+ public void receivingFragmentFinished(FragmentHandle handle) {
+ root.receivingFragmentFinished(handle);
+ }
+
public UserClientConnection getClient(){
return context.getConnection();
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
index 7dce6e035..db8ff8e22 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.ScreenCreator.ScreenRoot;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import org.apache.drill.exec.record.TypedFieldId;
@@ -85,6 +86,11 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector>{
}
@Override
+ public void receivingFragmentFinished(FragmentHandle handle) {
+ //no op
+ }
+
+ @Override
public Iterator<ValueVector> iterator() {
List<ValueVector> vv = Lists.newArrayList();
for(VectorWrapper<?> vw : incoming){