diff options
author | Steven Phillips <sphillips@maprtech.com> | 2014-06-13 13:14:12 -0700 |
---|---|---|
committer | Jacques Nadeau <jacques@apache.org> | 2014-06-16 08:04:43 -0700 |
commit | fc1a7778e2af3b07117f99070530dd5a296ebc6d (patch) | |
tree | 436be4d0f01b7c5a68ee21f6f0d48f2d1038b09d /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl | |
parent | 49a9ff27f283cbc1c8749989ff408440a0275e7d (diff) |
Fix and improve runtime stats profiles
- Stop stats processing while waiting for next.
- Fix stats collection in PartitionSender and ScanBatch
- Add stats to all senders
- Add wait time to operator profile.
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl')
10 files changed, 191 insertions, 73 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java index 256c1063a..452052b47 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java @@ -17,11 +17,25 @@ */ package org.apache.drill.exec.physical.impl; +import org.apache.drill.exec.memory.OutOfMemoryException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.ops.SenderStats; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.RecordBatch.IterOutcome; public abstract class BaseRootExec implements RootExec { - protected OperatorStats stats = null; + protected SenderStats stats = null; + protected OperatorContext oContext = null; + + public BaseRootExec(FragmentContext context, PhysicalOperator config) throws OutOfMemoryException { + this.stats = new SenderStats(config); + context.getStats().addOperatorStats(this.stats); + this.oContext = new OperatorContext(config, context, stats); + } @Override public final boolean next() { @@ -35,8 +49,24 @@ public abstract class BaseRootExec implements RootExec { } } - public void setStats(OperatorStats stats) { - this.stats = stats; + public final IterOutcome next(RecordBatch b){ + stats.stopProcessing(); + IterOutcome next; + try { + next = b.next(); + } finally { + stats.startProcessing(); + } + + switch(next){ + case OK_NEW_SCHEMA: + stats.batchReceived(0, b.getRecordCount(), true); + break; + case OK: + stats.batchReceived(0, b.getRecordCount(), false); + break; + } + return next; } public abstract boolean innerNext(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java index 966c22102..4ff583181 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java @@ -39,7 +39,7 @@ public class RandomReceiverCreator implements BatchCreator<RandomReceiver>{ RawBatchBuffer[] buffers = bufHolder.getBuffers(receiver.getOppositeMajorFragmentId()); assert buffers.length == 1; RawBatchBuffer buffer = buffers[0]; - return new WireRecordBatch(context, buffer); + return new WireRecordBatch(context, buffer, receiver); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index d142ff847..55d3f625a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -127,34 +127,44 @@ public class ScanBatch implements RecordBatch { @Override public IterOutcome next() { - mutator.allocate(MAX_RECORD_CNT); - while ((recordCount = currentReader.next()) == 0) { - try { - if (!readers.hasNext()) { - currentReader.cleanup(); + oContext.getStats().startProcessing(); + try { + mutator.allocate(MAX_RECORD_CNT); + while ((recordCount = currentReader.next()) == 0) { + try { + if (!readers.hasNext()) { + currentReader.cleanup(); + releaseAssets(); + return IterOutcome.NONE; + } + oContext.getStats().startSetup(); + try { + currentReader.cleanup(); + currentReader = readers.next(); + partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null; + currentReader.setup(mutator); + mutator.allocate(MAX_RECORD_CNT); + addPartitionVectors(); + } finally { + oContext.getStats().stopSetup(); + } + } catch (ExecutionSetupException e) { + this.context.fail(e); releaseAssets(); - return IterOutcome.NONE; + return IterOutcome.STOP; } - currentReader.cleanup(); - currentReader = readers.next(); - partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null; - currentReader.setup(mutator); - mutator.allocate(MAX_RECORD_CNT); - addPartitionVectors(); - } catch (ExecutionSetupException e) { - this.context.fail(e); - releaseAssets(); - return IterOutcome.STOP; } - } - populatePartitionVectors(); - if (mutator.isNewSchema()) { - container.buildSchema(SelectionVectorMode.NONE); - schema = container.getSchema(); - return IterOutcome.OK_NEW_SCHEMA; - } else { - return IterOutcome.OK; + populatePartitionVectors(); + if (mutator.isNewSchema()) { + container.buildSchema(SelectionVectorMode.NONE); + schema = container.getSchema(); + return IterOutcome.OK_NEW_SCHEMA; + } else { + return IterOutcome.OK; + } + } finally { + oContext.getStats().stopProcessing(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java index 643552b0a..86e77d8b3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java @@ -21,7 +21,10 @@ import io.netty.buffer.ByteBuf; import java.util.List; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.config.Screen; import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch; import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer; @@ -44,14 +47,14 @@ public class ScreenCreator implements RootCreator<Screen>{ @Override - public RootExec getRoot(FragmentContext context, Screen config, List<RecordBatch> children) { + public RootExec getRoot(FragmentContext context, Screen config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkNotNull(children); Preconditions.checkArgument(children.size() == 1); - return new ScreenRoot(context, children.iterator().next()); + return new ScreenRoot(context, children.iterator().next(), config); } - static class ScreenRoot implements RootExec{ + static class ScreenRoot extends BaseRootExec { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class); volatile boolean ok = true; @@ -62,9 +65,9 @@ public class ScreenCreator implements RootCreator<Screen>{ final UserClientConnection connection; private RecordMaterializer materializer; - public ScreenRoot(FragmentContext context, RecordBatch incoming){ + public ScreenRoot(FragmentContext context, RecordBatch incoming, Screen config) throws OutOfMemoryException { + super(context, config); assert context.getConnection() != null : "A screen root should only be run on the driving node which is connected directly to the client. As such, this should always be true."; - this.context = context; this.incoming = incoming; this.connection = context.getConnection(); @@ -72,14 +75,14 @@ public class ScreenCreator implements RootCreator<Screen>{ @Override - public boolean next() { + public boolean innerNext() { if(!ok){ stop(); context.fail(this.listener.ex); return false; } - IterOutcome outcome = incoming.next(); + IterOutcome outcome = next(incoming); // logger.debug("Screen Outcome {}", outcome); switch(outcome){ case STOP: { @@ -92,7 +95,12 @@ public class ScreenCreator implements RootCreator<Screen>{ .setIsLastChunk(true) // .build(); QueryWritableBatch batch = new QueryWritableBatch(header); - connection.sendResult(listener, batch); + stats.startWait(); + try { + connection.sendResult(listener, batch); + } finally { + stats.stopWait(); + } sendCount.increment(); return false; @@ -107,7 +115,12 @@ public class ScreenCreator implements RootCreator<Screen>{ .setIsLastChunk(true) // .build(); QueryWritableBatch batch = new QueryWritableBatch(header); - connection.sendResult(listener, batch); + stats.startWait(); + try { + connection.sendResult(listener, batch); + } finally { + stats.stopWait(); + } sendCount.increment(); return false; @@ -119,7 +132,12 @@ public class ScreenCreator implements RootCreator<Screen>{ // context.getStats().batchesCompleted.inc(1); // context.getStats().recordsCompleted.inc(incoming.getRecordCount()); QueryWritableBatch batch = materializer.convertNext(false); - connection.sendResult(listener, batch); + stats.startWait(); + try { + connection.sendResult(listener, batch); + } finally { + stats.stopWait(); + } sendCount.increment(); return true; @@ -131,6 +149,7 @@ public class ScreenCreator implements RootCreator<Screen>{ @Override public void stop() { sendCount.waitForSendComplete(); + oContext.close(); incoming.cleanup(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java index 7679701fe..9e9146820 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java @@ -22,7 +22,10 @@ import io.netty.buffer.ByteBuf; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.ops.SenderStats; import org.apache.drill.exec.physical.config.SingleSender; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; @@ -44,7 +47,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ - private static class SingleSenderRootExec implements RootExec{ + private static class SingleSenderRootExec extends BaseRootExec { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleSenderRootExec.class); private RecordBatch incoming; private DataTunnel tunnel; @@ -53,8 +56,9 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ private FragmentContext context; private volatile boolean ok = true; private final SendingAccountor sendCount = new SendingAccountor(); - - public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config){ + + public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config) throws OutOfMemoryException { + super(context, config); this.incoming = batch; assert(incoming != null); this.handle = context.getHandle(); @@ -65,27 +69,37 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ } @Override - public boolean next() { + public boolean innerNext() { if(!ok){ incoming.kill(); return false; } - IterOutcome out = incoming.next(); + IterOutcome out = next(incoming); // logger.debug("Outcome of sender next {}", out); switch(out){ case STOP: case NONE: FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0); sendCount.increment(); - tunnel.sendRecordBatch(new RecordSendFailure(), b2); + stats.startWait(); + try { + tunnel.sendRecordBatch(new RecordSendFailure(), b2); + } finally { + stats.stopWait(); + } return false; case OK_NEW_SCHEMA: case OK: FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch()); sendCount.increment(); - tunnel.sendRecordBatch(new RecordSendFailure(), batch); + stats.startWait(); + try { + tunnel.sendRecordBatch(new RecordSendFailure(), batch); + } finally { + stats.stopWait(); + } return true; case NOT_YET: @@ -98,6 +112,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ public void stop() { ok = false; sendCount.waitForSendComplete(); + oContext.close(); incoming.cleanup(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java index c7fc8135e..bc2cdb5a5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java @@ -24,6 +24,9 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OpProfileDef; +import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.physical.config.RandomReceiver; import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RawFragmentBatch; @@ -43,14 +46,16 @@ public class WireRecordBatch implements RecordBatch { private RawFragmentBatchProvider fragProvider; private FragmentContext context; private BatchSchema schema; + private OperatorStats stats; - public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider fragProvider) throws OutOfMemoryException { + public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider fragProvider, RandomReceiver config) throws OutOfMemoryException { this.fragProvider = fragProvider; this.context = context; // In normal case, batchLoader does not require an allocator. However, in case of splitAndTransfer of a value vector, // we may need an allocator for the new offset vector. Therefore, here we pass the context's allocator to batchLoader. this.batchLoader = new RecordBatchLoader(context.getAllocator()); + this.stats = context.getStats().getOperatorStats(new OpProfileDef(config.getOperatorId(), config.getOperatorType(), 0)); } @Override @@ -100,14 +105,22 @@ public class WireRecordBatch implements RecordBatch { @Override public IterOutcome next() { + stats.startProcessing(); try{ - RawFragmentBatch batch = fragProvider.getNext(); - - // skip over empty batches. we do this since these are basically control messages. - while(batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0){ + RawFragmentBatch batch; + try { + stats.startWait(); batch = fragProvider.getNext(); + + // skip over empty batches. we do this since these are basically control messages. + while(batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0){ + batch = fragProvider.getNext(); + } + } finally { + stats.stopWait(); } + if (batch == null){ batchLoader.clear(); return IterOutcome.NONE; @@ -133,6 +146,8 @@ public class WireRecordBatch implements RecordBatch { }catch(SchemaChangeException | IOException ex){ context.fail(ex); return IterOutcome.STOP; + } finally { + stats.stopProcessing(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java index 9c5582523..a70cd5023 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java @@ -21,8 +21,12 @@ import io.netty.buffer.ByteBuf; import java.util.List; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.ops.SenderStats; import org.apache.drill.exec.physical.config.BroadcastSender; +import org.apache.drill.exec.physical.impl.BaseRootExec; import org.apache.drill.exec.physical.impl.RootExec; import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; @@ -43,7 +47,7 @@ import org.apache.drill.exec.work.ErrorHelper; * This is useful in cases such as broadcast join where sending the entire table to join * to all nodes is cheaper than merging and computing all the joins in the same node. */ -public class BroadcastSenderRootExec implements RootExec { +public class BroadcastSenderRootExec extends BaseRootExec { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BroadcastSenderRootExec.class); private final FragmentContext context; private final BroadcastSender config; @@ -54,7 +58,8 @@ public class BroadcastSenderRootExec implements RootExec { public BroadcastSenderRootExec(FragmentContext context, RecordBatch incoming, - BroadcastSender config) { + BroadcastSender config) throws OutOfMemoryException { + super(context, config); this.ok = true; this.context = context; this.incoming = incoming; @@ -69,20 +74,25 @@ public class BroadcastSenderRootExec implements RootExec { } @Override - public boolean next() { + public boolean innerNext() { if(!ok) { context.fail(statusHandler.ex); return false; } - RecordBatch.IterOutcome out = incoming.next(); + RecordBatch.IterOutcome out = next(incoming); logger.debug("Outcome of sender next {}", out); switch(out){ case STOP: case NONE: for (int i = 0; i < tunnels.length; ++i) { FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i); - tunnels[i].sendRecordBatch(this.statusHandler, b2); + stats.startWait(); + try { + tunnels[i].sendRecordBatch(this.statusHandler, b2); + } finally { + stats.stopWait(); + } statusHandler.sendCount.increment(); } @@ -96,7 +106,12 @@ public class BroadcastSenderRootExec implements RootExec { } for (int i = 0; i < tunnels.length; ++i) { FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i, writableBatch); - tunnels[i].sendRecordBatch(this.statusHandler, batch); + stats.startWait(); + try { + tunnels[i].sendRecordBatch(this.statusHandler, batch); + } finally { + stats.stopWait(); + } statusHandler.sendCount.increment(); } @@ -135,6 +150,7 @@ public class BroadcastSenderRootExec implements RootExec { public void stop() { ok = false; statusHandler.sendCount.waitForSendComplete(); + oContext.close(); incoming.cleanup(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index 25ee667cc..a5d80b0c5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -119,13 +119,16 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } private RawFragmentBatch getNext(RawFragmentBatchProvider provider) throws IOException{ - long startNext = System.nanoTime(); - RawFragmentBatch b = provider.getNext(); - if(b != null){ - stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false); + stats.startWait(); + try { + RawFragmentBatch b = provider.getNext(); + if(b != null){ + stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false); + } + return b; + } finally { + stats.stopWait(); } - stats.addLongStat(Metric.NEXT_WAIT_NANOS, System.nanoTime() - startNext); - return b; } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index bb640b467..7535dcc3f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -56,28 +56,22 @@ public class PartitionSenderRootExec extends BaseRootExec { private HashPartitionSender operator; private Partitioner partitioner; private FragmentContext context; - private OperatorContext oContext; private boolean ok = true; private final SendingAccountor sendCount = new SendingAccountor(); private final int outGoingBatchCount; private final HashPartitionSender popConfig; private final StatusHandler statusHandler; - private final SenderStats stats; public PartitionSenderRootExec(FragmentContext context, RecordBatch incoming, HashPartitionSender operator) throws OutOfMemoryException { - + super(context, operator); this.incoming = incoming; this.operator = operator; this.context = context; this.outGoingBatchCount = operator.getDestinations().size(); this.popConfig = operator; this.statusHandler = new StatusHandler(sendCount, context); - this.stats = new SenderStats(operator); - context.getStats().addOperatorStats(this.stats); - setStats(stats); - this.oContext = new OperatorContext(operator, context, stats); } @Override @@ -90,7 +84,8 @@ public class PartitionSenderRootExec extends BaseRootExec { return false; } - RecordBatch.IterOutcome out = incoming.next(); + RecordBatch.IterOutcome out = next(incoming); + logger.debug("Partitioner.next(): got next record batch with status {}", out); switch(out){ case NONE: @@ -122,7 +117,6 @@ public class PartitionSenderRootExec extends BaseRootExec { partitioner.flushOutgoingBatches(false, true); partitioner.clear(); } - // update DeprecatedOutgoingRecordBatch's schema and generate partitioning code createPartitioner(); } catch (IOException e) { incoming.kill(); @@ -227,6 +221,12 @@ public class PartitionSenderRootExec extends BaseRootExec { fieldId, WritableBatch.getBatchNoHVWrap(0, container, false)); tunnel.sendRecordBatch(statusHandler, writableBatch); + stats.startWait(); + try { + tunnel.sendRecordBatch(statusHandler, writableBatch); + } finally { + stats.stopWait(); + } this.sendCount.increment(); fieldId++; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java index 9bb24d439..6a26d301f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java @@ -265,7 +265,12 @@ public abstract class PartitionerTemplate implements Partitioner { oppositeMinorFragmentId, getWritableBatch()); - tunnel.sendRecordBatch(statusHandler, writableBatch); + stats.startWait(); + try { + tunnel.sendRecordBatch(statusHandler, writableBatch); + } finally { + stats.stopWait(); + } this.sendCount.increment(); } else { logger.debug("Flush requested on an empty outgoing record batch" + (isLast ? " (last batch)" : "")); @@ -278,7 +283,12 @@ public abstract class PartitionerTemplate implements Partitioner { operator.getOppositeMajorFragmentId(), oppositeMinorFragmentId, getWritableBatch()); - tunnel.sendRecordBatch(statusHandler, writableBatch); + stats.startWait(); + try { + tunnel.sendRecordBatch(statusHandler, writableBatch); + } finally { + stats.stopWait(); + } this.sendCount.increment(); vectorContainer.clear(); return; |