diff options
author | Steven Phillips <sphillips@maprtech.com> | 2014-07-23 20:03:07 -0700 |
---|---|---|
committer | Steven Phillips <sphillips@maprtech.com> | 2014-07-25 18:33:53 -0700 |
commit | c331aed81e73d16ea29bf8c94863591b212aa644 (patch) | |
tree | 3887590400bc633bd459f9606ad3fbc8de983850 /exec/java-exec/src | |
parent | 5e482c17d20bcc957be50d570d03f1a5fdfca75e (diff) |
DRILL-991: Limit should terminate upstream fragments immediately upon completion
Diffstat (limited to 'exec/java-exec/src')
32 files changed, 305 insertions, 103 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java index fa6c99781..c2c314436 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java @@ -23,6 +23,7 @@ import org.apache.drill.exec.ops.OpProfileDef; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; @@ -72,4 +73,9 @@ public abstract class BaseRootExec implements RootExec { } public abstract boolean innerNext(); + + @Override + public void receivingFragmentFinished(FragmentHandle handle) { + logger.warn("Currently not handling FinishedFragment message"); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java index fcc10aa18..42ac4f66d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java @@ -18,6 +18,8 @@ package org.apache.drill.exec.physical.impl; +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; + /** * A FragmentRoot is a node which is the last processing node in a query plan. FragmentTerminals include Exchange * output nodes and storage nodes. They are there driving force behind the completion of a query. @@ -35,5 +37,11 @@ public interface RootExec { * Inform all children to clean up and go away. */ public void stop(); + + /** + * Inform sender that receiving fragment is finished and doesn't need any more data + * @param handle + */ + public void receivingFragmentFinished(FragmentHandle handle); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index a8881f090..21a580b99 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -119,8 +119,12 @@ public class ScanBatch implements RecordBatch { } @Override - public void kill() { - releaseAssets(); + public void kill(boolean sendUpstream) { + if (sendUpstream) { + done = true; + } else { + releaseAssets(); + } } private void releaseAssets() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java index 325e315aa..26aa5abd2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java @@ -56,6 +56,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ private int recMajor; private FragmentContext context; private volatile boolean ok = true; + private volatile boolean done = false; private final SendingAccountor sendCount = new SendingAccountor(); public enum Metric implements MetricDef { @@ -81,11 +82,18 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ @Override public boolean innerNext() { if(!ok){ - incoming.kill(); + incoming.kill(false); return false; } - IterOutcome out = next(incoming); + + IterOutcome out; + if (!done) { + out = next(incoming); + } else { + incoming.kill(true); + out = IterOutcome.NONE; + } // logger.debug("Outcome of sender next {}", out); switch(out){ case STOP: @@ -132,8 +140,12 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ oContext.close(); incoming.cleanup(); } - - + + @Override + public void receivingFragmentFinished(FragmentHandle handle) { + done = true; + } + private class RecordSendFailure extends BaseRpcOutcomeListener<Ack>{ @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index 0132e8544..fb9554c10 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -101,11 +101,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { } @Override - public void kill() { - incoming.kill(); - } - - @Override public SelectionVector2 getSelectionVector2() { throw new UnsupportedOperationException(); } @@ -203,7 +198,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { return IterOutcome.OK_NEW_SCHEMA; }catch(SchemaChangeException | ClassTransformationException | IOException ex){ - kill(); + kill(false); logger.error("Failure during query", ex); context.fail(ex); return IterOutcome.STOP; @@ -297,8 +292,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { } @Override - protected void killIncoming() { - incoming.kill(); + protected void killIncoming(boolean sendUpstream) { + incoming.kill(sendUpstream); } @@ -334,7 +329,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { } @Override - public void kill() { + public void kill(boolean sendUpstream) { } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java index 43e0dd4c7..29b346ddd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java @@ -67,8 +67,8 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { } @Override - protected void killIncoming() { - incoming.kill(); + protected void killIncoming(boolean sendUpstream) { + incoming.kill(sendUpstream); } @Override @@ -100,7 +100,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { try{ setupNewSchema(); }catch(Exception ex){ - kill(); + kill(false); logger.error("Failure during query", ex); context.fail(ex); return IterOutcome.STOP; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index b30a357a1..393fa4f58 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -180,7 +180,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { }catch(SchemaChangeException | ClassTransformationException | IOException ex){ context.fail(ex); container.clear(); - incoming.kill(); + incoming.kill(false); return false; }finally{ stats.stopSetup(); @@ -301,8 +301,8 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { } @Override - protected void killIncoming() { - incoming.kill(); + protected void killIncoming(boolean sendUpstream) { + incoming.kill(sendUpstream); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index 2f71bf9e2..39131125d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -157,7 +157,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { }catch(SchemaChangeException | ClassTransformationException | IOException ex){ context.fail(ex); container.clear(); - incoming.kill(); + incoming.kill(false); return false; }finally{ stats.stopSetup(); @@ -338,8 +338,8 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { } @Override - protected void killIncoming() { - incoming.kill(); + protected void killIncoming(boolean sendUpstream) { + incoming.kill(sendUpstream); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 46f7d51a4..7233f69bf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -242,6 +242,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { for (VectorWrapper<?> wrapper : left) { wrapper.getValueVector().clear(); } + left.kill(true); leftUpstream = next(HashJoinHelper.LEFT_INPUT, left); while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) { for (VectorWrapper<?> wrapper : left) { @@ -260,7 +261,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { return IterOutcome.NONE; } catch (ClassTransformationException | SchemaChangeException | IOException e) { context.fail(e); - killIncoming(); + killIncoming(false); return IterOutcome.STOP; } } @@ -483,9 +484,9 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { } @Override - public void killIncoming() { - this.left.kill(); - this.right.kill(); + public void killIncoming(boolean sendUpstream) { + this.left.kill(sendUpstream); + this.right.kill(sendUpstream); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 0c6657cc2..24ca46384 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -172,7 +172,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { first = true; } catch (ClassTransformationException | IOException | SchemaChangeException e) { context.fail(new SchemaChangeException(e)); - kill(); + kill(false); return IterOutcome.STOP; } finally { stats.stopSetup(); @@ -191,7 +191,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { setRecordCountInContainer(); return first ? IterOutcome.OK_NEW_SCHEMA : IterOutcome.OK; case FAILURE: - kill(); + kill(false); return IterOutcome.STOP; case NO_MORE_DATA: logger.debug("NO MORE DATA; returning {}", (status.getOutPosition() > 0 ? (first ? "OK_NEW_SCHEMA" : "OK") : (first ? "OK_NEW_SCHEMA" :"NONE"))); @@ -233,9 +233,9 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } @Override - protected void killIncoming() { - left.kill(); - right.kill(); + protected void killIncoming(boolean sendUpstream) { + left.kill(sendUpstream); + right.kill(sendUpstream); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java index 078c4c449..12ee40617 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java @@ -83,8 +83,8 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { @Override public IterOutcome innerNext() { if(!noEndLimit && recordsLeft <= 0) { - // don't kill incoming batches or call cleanup yet, as this could close allocators before the buffers have been cleared - // Drain the incoming record batch and clear the memory + incoming.kill(true); + IterOutcome upStream = incoming.next(); while (upStream == IterOutcome.OK || upStream == IterOutcome.OK_NEW_SCHEMA) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index 313fdecad..b8e18afa5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -24,6 +24,7 @@ import java.util.Iterator; import java.util.List; import java.util.PriorityQueue; +import io.netty.buffer.ByteBuf; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.LogicalExpression; @@ -43,6 +44,9 @@ import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.physical.config.MergingReceiverPOP; +import org.apache.drill.exec.proto.BitControl.FinishedReceiver; +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; @@ -60,6 +64,8 @@ import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; import org.apache.drill.exec.vector.CopyUtil; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.allocator.VectorAllocator; @@ -87,6 +93,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> private BatchSchema schema; private VectorContainer outgoingContainer; private MergingReceiverGeneratorBase merger; + private MergingReceiverPOP config; private boolean hasRun = false; private boolean prevBatchWasFull = false; private boolean hasMoreIncoming = true; @@ -119,6 +126,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> this.context = context; this.outgoingContainer = new VectorContainer(); this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders()); + this.config = config; } private RawFragmentBatch getNext(RawFragmentBatchProvider provider) throws IOException{ @@ -437,15 +445,49 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } @Override - public void kill() { - cleanup(); - for (RawFragmentBatchProvider provider : fragProviders) { - provider.kill(context); + public void kill(boolean sendUpstream) { + if (sendUpstream) { + informSenders(); + } else { + cleanup(); + for (RawFragmentBatchProvider provider : fragProviders) { + provider.kill(context); + } + } + } + + private void informSenders() { + FragmentHandle handlePrototype = FragmentHandle.newBuilder() + .setMajorFragmentId(config.getOppositeMajorFragmentId()) + .setQueryId(context.getHandle().getQueryId()) + .build(); + for (int i = 0; i < config.getNumSenders(); i++) { + FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype) + .setMinorFragmentId(i) + .build(); + FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder() + .setReceiver(context.getHandle()) + .setSender(sender) + .build(); + context.getControlTunnel(config.getProvidingEndpoints().get(i)).informReceiverFinished(new OutcomeListener(), finishedReceiver); + } + } + + private class OutcomeListener implements RpcOutcomeListener<Ack> { + + @Override + public void failed(RpcException ex) { + logger.warn("Failed to inform upstream that receiver is finished"); + } + + @Override + public void success(Ack value, ByteBuf buffer) { + // Do nothing } } @Override - protected void killIncoming() { + protected void killIncoming(boolean sendUpstream) { //No op } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java index f677e5439..45f32cff4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java @@ -300,7 +300,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart partitionVectors.add(w.getValueVector()); } } catch (ClassTransformationException | IOException | SchemaChangeException | InterruptedException ex) { - kill(); + kill(false); logger.error("Failure while building final partition table.", ex); context.fail(ex); return false; @@ -419,8 +419,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart } @Override - protected void killIncoming() { - incoming.kill(); + protected void killIncoming(boolean sendUpstream) { + incoming.kill(sendUpstream); } @Override @@ -441,7 +441,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart // Must set up a new schema each time, because ValueVectors are not reused between containers in queue setupNewSchema(vc); } catch (SchemaChangeException ex) { - kill(); + kill(false); logger.error("Failure during query", ex); context.fail(ex); return IterOutcome.STOP; @@ -474,7 +474,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart try { setupNewSchema(vc); } catch (SchemaChangeException ex) { - kill(); + kill(false); logger.error("Failure during query", ex); context.fail(ex); return IterOutcome.STOP; @@ -504,7 +504,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart try { setupNewSchema(incoming); } catch (SchemaChangeException ex) { - kill(); + kill(false); logger.error("Failure during query", ex); context.fail(ex); return IterOutcome.STOP; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionStatsBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionOutgoingBatch.java index 85ccffbe5..71a1590bd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionStatsBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionOutgoingBatch.java @@ -18,7 +18,9 @@ package org.apache.drill.exec.physical.impl.partitionsender; -public interface PartitionStatsBatch { +public interface PartitionOutgoingBatch { public long getTotalRecords(); + + public void terminate(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 69be256ec..14cf092ee 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -19,6 +19,8 @@ package org.apache.drill.exec.physical.impl.partitionsender; import java.io.IOException; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerArray; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; @@ -31,22 +33,16 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; -import org.apache.drill.exec.ops.OperatorContext; -import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.BaseRootExec; import org.apache.drill.exec.physical.impl.SendingAccountor; -import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; -import org.apache.drill.exec.proto.UserBitShared.MetricValue; -import org.apache.drill.exec.proto.UserBitShared.OperatorProfile; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.FragmentWritableBatch; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.rpc.data.DataTunnel; import org.apache.drill.exec.vector.CopyUtil; @@ -66,6 +62,10 @@ public class PartitionSenderRootExec extends BaseRootExec { private final int outGoingBatchCount; private final HashPartitionSender popConfig; private final StatusHandler statusHandler; + + private final AtomicIntegerArray remainingReceivers; + private final AtomicInteger remaingReceiverCount; + private volatile boolean done = false; long minReceiverRecordCount = Long.MAX_VALUE; long maxReceiverRecordCount = Long.MIN_VALUE; @@ -94,6 +94,17 @@ public class PartitionSenderRootExec extends BaseRootExec { this.outGoingBatchCount = operator.getDestinations().size(); this.popConfig = operator; this.statusHandler = new StatusHandler(sendCount, context); + this.remainingReceivers = new AtomicIntegerArray(outGoingBatchCount); + this.remaingReceiverCount = new AtomicInteger(outGoingBatchCount); + } + + private boolean done() { + for (int i = 0; i < remainingReceivers.length(); i++) { + if (remainingReceivers.get(i) == 0) { + return false; + } + } + return true; } @Override @@ -106,7 +117,13 @@ public class PartitionSenderRootExec extends BaseRootExec { return false; } - RecordBatch.IterOutcome out = next(incoming); + IterOutcome out; + if (!done) { + out = next(incoming); + } else { + incoming.kill(true); + out = IterOutcome.NONE; + } logger.debug("Partitioner.next(): got next record batch with status {}", out); switch(out){ @@ -119,7 +136,7 @@ public class PartitionSenderRootExec extends BaseRootExec { sendEmptyBatch(); } } catch (IOException e) { - incoming.kill(); + incoming.kill(false); logger.error("Error while creating partitioning sender or flushing outgoing batches", e); context.fail(e); } @@ -140,12 +157,12 @@ public class PartitionSenderRootExec extends BaseRootExec { } createPartitioner(); } catch (IOException e) { - incoming.kill(); + incoming.kill(false); logger.error("Error while flushing outgoing batches", e); context.fail(e); return false; } catch (SchemaChangeException e) { - incoming.kill(); + incoming.kill(false); logger.error("Error while setting up partitioner", e); context.fail(e); return false; @@ -155,7 +172,7 @@ public class PartitionSenderRootExec extends BaseRootExec { partitioner.partitionBatch(incoming); } catch (IOException e) { context.fail(e); - incoming.kill(); + incoming.kill(false); return false; } for (VectorWrapper<?> v : incoming) { @@ -206,9 +223,9 @@ public class PartitionSenderRootExec extends BaseRootExec { } } - public void updateStats(List<? extends PartitionStatsBatch> outgoing) { + public void updateStats(List<? extends PartitionOutgoingBatch> outgoing) { long records = 0; - for (PartitionStatsBatch o : outgoing) { + for (PartitionOutgoingBatch o : outgoing) { long totalRecords = o.getTotalRecords(); minReceiverRecordCount = Math.min(minReceiverRecordCount, totalRecords); maxReceiverRecordCount = Math.max(maxReceiverRecordCount, totalRecords); @@ -220,6 +237,18 @@ public class PartitionSenderRootExec extends BaseRootExec { stats.setLongStat(Metric.MAX_RECORDS, maxReceiverRecordCount); stats.setLongStat(Metric.N_RECEIVERS, outgoing.size()); } + + @Override + public void receivingFragmentFinished(FragmentHandle handle) { + int id = handle.getMinorFragmentId(); + if (remainingReceivers.compareAndSet(id, 0, 1)) { + partitioner.getOutgoingBatches().get(handle.getMinorFragmentId()).terminate(); + int remaining = remaingReceiverCount.decrementAndGet(); + if (remaining == 0) { + done = true; + } + } + } public void stop() { logger.debug("Partition sender stopping."); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java index 53528bac8..c5fe154ab 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java @@ -25,7 +25,6 @@ import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.physical.impl.partitionsender.PartitionStatsBatch; import java.io.IOException; import java.util.List; @@ -44,7 +43,7 @@ public interface Partitioner { public abstract void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException; public abstract void initialize(); public abstract void clear(); - public abstract List<? extends PartitionStatsBatch> getOutgoingBatches(); + public abstract List<? extends PartitionOutgoingBatch> getOutgoingBatches(); public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class); }
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java index fcbd95449..3141aed1b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java @@ -51,7 +51,6 @@ import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.rpc.data.DataTunnel; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.allocator.VectorAllocator; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -71,7 +70,7 @@ public abstract class PartitionerTemplate implements Partitioner { } @Override - public List<? extends PartitionStatsBatch> getOutgoingBatches() { + public List<? extends PartitionOutgoingBatch> getOutgoingBatches() { return outgoingBatches; } @@ -203,7 +202,7 @@ public abstract class PartitionerTemplate implements Partitioner { public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") OutgoingRecordBatch[] outgoing) throws SchemaChangeException; public abstract int doEval(@Named("inIndex") int inIndex); - public class OutgoingRecordBatch implements PartitionStatsBatch, VectorAccessible { + public class OutgoingRecordBatch implements PartitionOutgoingBatch, VectorAccessible { private final DataTunnel tunnel; private final HashPartitionSender operator; @@ -214,6 +213,8 @@ public abstract class PartitionerTemplate implements Partitioner { private final int oppositeMinorFragmentId; private boolean isLast = false; + private volatile boolean terminated = false; + private boolean dropAll = false; private BatchSchema outSchema; private int recordCount; private int totalRecords; @@ -247,6 +248,11 @@ public abstract class PartitionerTemplate implements Partitioner { return false; } + @Override + public void terminate() { + terminated = true; + } + @RuntimeOverridden protected void doSetup(@Named("incoming") RecordBatch incoming, @Named("outgoing") VectorAccessible outgoing) {}; @@ -254,9 +260,13 @@ public abstract class PartitionerTemplate implements Partitioner { protected boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex) { return false; }; public void flush() throws IOException { - final ExecProtos.FragmentHandle handle = context.getHandle(); + if (dropAll) { + vectorContainer.zeroVectors(); + return; + } + final FragmentHandle handle = context.getHandle(); - if (recordCount != 0) { + if (recordCount != 0 && !terminated) { for(VectorWrapper<?> w : vectorContainer){ w.getValueVector().getMutator().setValueCount(recordCount); @@ -280,9 +290,9 @@ public abstract class PartitionerTemplate implements Partitioner { this.sendCount.increment(); } else { logger.debug("Flush requested on an empty outgoing record batch" + (isLast ? " (last batch)" : "")); - if (isLast) { + if (isLast || terminated) { // send final (empty) batch - FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast, + FragmentWritableBatch writableBatch = new FragmentWritableBatch(true, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), @@ -296,7 +306,8 @@ public abstract class PartitionerTemplate implements Partitioner { stats.stopWait(); } this.sendCount.increment(); - vectorContainer.clear(); + vectorContainer.zeroVectors(); + dropAll = true; return; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java index 2dae5023f..f091aa90c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java @@ -182,7 +182,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { } @Override - protected void killIncoming() { + protected void killIncoming(boolean sendUpstream) { producer.interrupt(); stop = true; try { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java index f21673d0e..dbb547dca 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java @@ -74,11 +74,6 @@ public class SortBatch extends AbstractRecordBatch<Sort> { } @Override - public void kill() { - incoming.kill(); - } - - @Override public SelectionVector2 getSelectionVector2() { throw new UnsupportedOperationException(); } @@ -148,7 +143,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> { return IterOutcome.OK_NEW_SCHEMA; }catch(SchemaChangeException | ClassTransformationException | IOException ex){ - kill(); + kill(false); logger.error("Failure during query", ex); context.fail(ex); return IterOutcome.STOP; @@ -209,8 +204,8 @@ public class SortBatch extends AbstractRecordBatch<Sort> { } @Override - protected void killIncoming() { - incoming.kill(); + protected void killIncoming(boolean sendUpstream) { + incoming.kill(sendUpstream); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java index ddee38af8..1f2f8430d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java @@ -59,21 +59,21 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { } @Override - public void kill() { + public void kill(boolean sendUpstream) { if(current != null){ - current.kill(); + current.kill(sendUpstream); current = null; } for(;incomingIterator.hasNext();){ - incomingIterator.next().kill(); + incomingIterator.next().kill(sendUpstream); } } @Override - protected void killIncoming() { + protected void killIncoming(boolean sendUpstream) { for (int i = 0; i < incoming.size(); i++) { RecordBatch in = incoming.get(i); - in.kill(); + in.kill(sendUpstream); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java index 79669fae6..16a68b809 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.unorderedreceiver; import java.io.IOException; import java.util.Iterator; +import io.netty.buffer.ByteBuf; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.OutOfMemoryException; @@ -28,6 +29,9 @@ import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.ops.OpProfileDef; import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.config.UnorderedReceiver; +import org.apache.drill.exec.proto.BitControl.FinishedReceiver; +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RawFragmentBatch; @@ -40,6 +44,9 @@ import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.control.ControlTunnel.ReceiverFinished; public class UnorderedReceiverBatch implements RecordBatch { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverBatch.class); @@ -50,6 +57,7 @@ public class UnorderedReceiverBatch implements RecordBatch { private BatchSchema schema; private OperatorStats stats; private boolean first = true; + private UnorderedReceiver config; public enum Metric implements MetricDef { BYTES_RECEIVED, @@ -70,6 +78,7 @@ public class UnorderedReceiverBatch implements RecordBatch { this.stats = context.getStats().getOperatorStats(new OpProfileDef(config.getOperatorId(), config.getOperatorType(), 1), null); this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders()); + this.config = config; } @Override @@ -88,8 +97,12 @@ public class UnorderedReceiverBatch implements RecordBatch { } @Override - public void kill() { - fragProvider.kill(context); + public void kill(boolean sendUpstream) { + if (sendUpstream) { + informSenders(); + } else { + fragProvider.kill(context); + } } @Override @@ -188,4 +201,34 @@ public class UnorderedReceiverBatch implements RecordBatch { throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName())); } + private void informSenders() { + FragmentHandle handlePrototype = FragmentHandle.newBuilder() + .setMajorFragmentId(config.getOppositeMajorFragmentId()) + .setQueryId(context.getHandle().getQueryId()) + .build(); + for (int i = 0; i < config.getNumSenders(); i++) { + FragmentHandle sender = FragmentHandle.newBuilder(handlePrototype) + .setMinorFragmentId(i) + .build(); + FinishedReceiver finishedReceiver = FinishedReceiver.newBuilder() + .setReceiver(context.getHandle()) + .setSender(sender) + .build(); + context.getControlTunnel(config.getProvidingEndpoints().get(i)).informReceiverFinished(new OutcomeListener(), finishedReceiver); + } + } + + private class OutcomeListener implements RpcOutcomeListener<Ack> { + + @Override + public void failed(RpcException ex) { + logger.warn("Failed to inform upstream that receiver is finished"); + } + + @Override + public void success(Ack value, ByteBuf buffer) { + // Do nothing + } + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java index 20e4de4fd..14110e374 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java @@ -82,8 +82,8 @@ public class IteratorValidatorBatchIterator implements RecordBatch { } @Override - public void kill() { - incoming.kill(); + public void kill(boolean sendUpstream) { + incoming.kill(sendUpstream); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index 08219a159..d4b10014f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -132,8 +132,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } @Override - public void kill() { - incoming.kill(); + public void kill(boolean sendUpstream) { + incoming.kill(sendUpstream); } @Override @@ -324,7 +324,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { return IterOutcome.OK_NEW_SCHEMA; }catch(SchemaChangeException | ClassTransformationException | IOException ex){ - kill(); + kill(false); logger.error("Failure during query", ex); context.fail(ex); return IterOutcome.STOP; @@ -577,8 +577,8 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } @Override - protected void killIncoming() { - incoming.kill(); + protected void killIncoming(boolean sendUpstream) { + incoming.kill(sendUpstream); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java index fd584cb69..931301870 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java @@ -44,7 +44,7 @@ public class PlannerSettings implements FrameworkContext{ public static final OptionValidator BROADCAST = new BooleanValidator("planner.enable_broadcast_join", true); public static final OptionValidator BROADCAST_THRESHOLD = new PositiveLongValidator("planner.broadcast_threshold", MAX_BROADCAST_THRESHOLD, 1000000); public static final OptionValidator JOIN_ROW_COUNT_ESTIMATE_FACTOR = new RangeDoubleValidator("planner.join.row_count_estimate_factor", 0, 100, 1.0d); - public static final OptionValidator PRODUCER_CONSUMER = new BooleanValidator("planner.add_producer_consumer", true); + public static final OptionValidator PRODUCER_CONSUMER = new BooleanValidator("planner.add_producer_consumer", false); public static final OptionValidator PRODUCER_CONSUMER_QUEUE_SIZE = new LongValidator("planner.producer_consumer_queue_size", 10); public static final OptionValidator HASH_SINGLE_KEY = new BooleanValidator("planner.enable_hash_single_key", true); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index 088b1204c..e8ad3114c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -106,11 +106,11 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements } @Override - public void kill() { - killIncoming(); + public void kill(boolean sendUpstream) { + killIncoming(sendUpstream); } - protected abstract void killIncoming(); + protected abstract void killIncoming(boolean sendUpstream); public void cleanup(){ container.clear(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java index 721755d1c..bea7bbfe9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java @@ -36,8 +36,8 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte } @Override - protected void killIncoming() { - incoming.kill(); + protected void killIncoming(boolean sendUpstream) { + incoming.kill(sendUpstream); } @Override @@ -65,7 +65,7 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte stats.startSetup(); setupNewSchema(); }catch(SchemaChangeException ex){ - kill(); + kill(false); logger.error("Failure during query", ex); context.fail(ex); return IterOutcome.STOP; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java index 7617d9185..9b2817983 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java @@ -79,7 +79,7 @@ public interface RecordBatch extends VectorAccessible { * Inform child nodes that this query should be terminated. Child nodes should utilize the QueryContext to determine * what has happened. */ - public void kill(); + public void kill(boolean sendUpstream); public abstract SelectionVector2 getSelectionVector2(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java index b398e473c..9953e5fbd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlRpcConfig.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.rpc.control; import org.apache.drill.exec.proto.BitControl.BitControlHandshake; +import org.apache.drill.exec.proto.BitControl.FinishedReceiver; import org.apache.drill.exec.proto.BitControl.FragmentStatus; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.BitControl.RpcType; @@ -37,6 +38,7 @@ public class ControlRpcConfig { .add(RpcType.HANDSHAKE, BitControlHandshake.class, RpcType.HANDSHAKE, BitControlHandshake.class) .add(RpcType.REQ_INIATILIZE_FRAGMENT, PlanFragment.class, RpcType.ACK, Ack.class) .add(RpcType.REQ_CANCEL_FRAGMENT, FragmentHandle.class, RpcType.ACK, Ack.class) + .add(RpcType.REQ_RECEIVER_FINISHED, FinishedReceiver.class, RpcType.ACK, Ack.class) .add(RpcType.REQ_FRAGMENT_STATUS, FragmentStatus.class, RpcType.ACK, Ack.class) .add(RpcType.REQ_QUERY_STATUS, QueryId.class, RpcType.RESP_QUERY_STATUS, QueryProfile.class) .build(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java index 9a2603965..d035c10fa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.rpc.control; +import org.apache.drill.exec.proto.BitControl.FinishedReceiver; import org.apache.drill.exec.proto.BitControl.FragmentStatus; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.BitControl.RpcType; @@ -56,6 +57,11 @@ public class ControlTunnel { manager.runCommand(b); } + public void informReceiverFinished(RpcOutcomeListener<Ack> outcomeListener, FinishedReceiver finishedReceiver){ + ReceiverFinished b = new ReceiverFinished(outcomeListener, finishedReceiver); + manager.runCommand(b); + } + public DrillRpcFuture<Ack> sendFragmentStatus(FragmentStatus status){ SendFragmentStatus b = new SendFragmentStatus(status); manager.runCommand(b); @@ -84,6 +90,21 @@ public class ControlTunnel { } + + public static class ReceiverFinished extends ListeningCommand<Ack, ControlConnection> { + final FinishedReceiver finishedReceiver; + + public ReceiverFinished(RpcOutcomeListener<Ack> listener, FinishedReceiver finishedReceiver) { + super(listener); + this.finishedReceiver = finishedReceiver; + } + + @Override + public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) { + connection.send(outcomeListener, RpcType.REQ_RECEIVER_FINISHED, finishedReceiver, Ack.class); + } + } + public static class CancelFragment extends ListeningCommand<Ack, ControlConnection> { final FragmentHandle handle; @@ -127,5 +148,4 @@ public class ControlTunnel { connection.send(outcomeListener, RpcType.REQ_QUERY_STATUS, queryId, QueryProfile.class); } } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java index afd3fa266..893aec8b1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java @@ -30,6 +30,7 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.impl.ImplCreator; import org.apache.drill.exec.physical.impl.RootExec; +import org.apache.drill.exec.proto.BitControl.FinishedReceiver; import org.apache.drill.exec.proto.BitControl.FragmentStatus; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.BitControl.RpcType; @@ -74,6 +75,11 @@ public class ControlHandlerImpl implements ControlMessageHandler { cancelFragment(handle); return DataRpcConfig.OK; + case RpcType.REQ_RECEIVER_FINISHED_VALUE: + FinishedReceiver finishedReceiver = get(pBody, FinishedReceiver.PARSER); + receivingFragmentFinished(finishedReceiver); + return DataRpcConfig.OK; + case RpcType.REQ_FRAGMENT_STATUS_VALUE: bee.getContext().getWorkBus().status( get(pBody, FragmentStatus.PARSER)); // TODO: Support a type of message that has no response. @@ -159,6 +165,22 @@ public class ControlHandlerImpl implements ControlMessageHandler { return Acks.OK; } + public Ack receivingFragmentFinished(FinishedReceiver finishedReceiver) { + FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(finishedReceiver.getSender()); + + FragmentExecutor executor; + if(manager != null) { + executor = manager.getRunnable(); + } else { + // then try local cancel. + executor = bee.getFragmentRunner(finishedReceiver.getSender()); + } + if (executor != null) { + executor.receivingFragmentFinished(finishedReceiver.getReceiver()); + } + + return Acks.OK; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java index 735e66332..c5c08e276 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java @@ -25,6 +25,7 @@ import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.impl.ImplCreator; import org.apache.drill.exec.physical.impl.RootExec; import org.apache.drill.exec.proto.BitControl.FragmentStatus; +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.UserBitShared.FragmentState; import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection; @@ -71,6 +72,10 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid } } + public void receivingFragmentFinished(FragmentHandle handle) { + root.receivingFragmentFinished(handle); + } + public UserClientConnection getClient(){ return context.getConnection(); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java index 7dce6e035..db8ff8e22 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.ScreenCreator.ScreenRoot; +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.TypedFieldId; @@ -85,6 +86,11 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector>{ } @Override + public void receivingFragmentFinished(FragmentHandle handle) { + //no op + } + + @Override public Iterator<ValueVector> iterator() { List<ValueVector> vv = Lists.newArrayList(); for(VectorWrapper<?> vw : incoming){ |