diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java | 18 |
1 files changed, 9 insertions, 9 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index bb640b467..7535dcc3f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -56,28 +56,22 @@ public class PartitionSenderRootExec extends BaseRootExec { private HashPartitionSender operator; private Partitioner partitioner; private FragmentContext context; - private OperatorContext oContext; private boolean ok = true; private final SendingAccountor sendCount = new SendingAccountor(); private final int outGoingBatchCount; private final HashPartitionSender popConfig; private final StatusHandler statusHandler; - private final SenderStats stats; public PartitionSenderRootExec(FragmentContext context, RecordBatch incoming, HashPartitionSender operator) throws OutOfMemoryException { - + super(context, operator); this.incoming = incoming; this.operator = operator; this.context = context; this.outGoingBatchCount = operator.getDestinations().size(); this.popConfig = operator; this.statusHandler = new StatusHandler(sendCount, context); - this.stats = new SenderStats(operator); - context.getStats().addOperatorStats(this.stats); - setStats(stats); - this.oContext = new OperatorContext(operator, context, stats); } @Override @@ -90,7 +84,8 @@ public class PartitionSenderRootExec extends BaseRootExec { return false; } - RecordBatch.IterOutcome out = incoming.next(); + RecordBatch.IterOutcome out = next(incoming); + logger.debug("Partitioner.next(): got next record batch with status {}", out); switch(out){ case NONE: @@ -122,7 +117,6 @@ public class PartitionSenderRootExec extends BaseRootExec { partitioner.flushOutgoingBatches(false, true); partitioner.clear(); } - // update DeprecatedOutgoingRecordBatch's schema and generate partitioning code createPartitioner(); } catch (IOException e) { incoming.kill(); @@ -227,6 +221,12 @@ public class PartitionSenderRootExec extends BaseRootExec { fieldId, WritableBatch.getBatchNoHVWrap(0, container, false)); tunnel.sendRecordBatch(statusHandler, writableBatch); + stats.startWait(); + try { + tunnel.sendRecordBatch(statusHandler, writableBatch); + } finally { + stats.stopWait(); + } this.sendCount.increment(); fieldId++; } |