diff options
author | Steven Phillips <sphillips@maprtech.com> | 2014-06-13 13:14:12 -0700 |
---|---|---|
committer | Jacques Nadeau <jacques@apache.org> | 2014-06-16 08:04:43 -0700 |
commit | fc1a7778e2af3b07117f99070530dd5a296ebc6d (patch) | |
tree | 436be4d0f01b7c5a68ee21f6f0d48f2d1038b09d /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java | |
parent | 49a9ff27f283cbc1c8749989ff408440a0275e7d (diff) |
Fix and improve runtime stats profiles
- Stop stats processing while waiting for next.
- Fix stats collection in PartitionSender and ScanBatch
- Add stats to all senders
- Add wait time to operator profile.
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java | 18 |
1 files changed, 9 insertions, 9 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index bb640b467..7535dcc3f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -56,28 +56,22 @@ public class PartitionSenderRootExec extends BaseRootExec { private HashPartitionSender operator; private Partitioner partitioner; private FragmentContext context; - private OperatorContext oContext; private boolean ok = true; private final SendingAccountor sendCount = new SendingAccountor(); private final int outGoingBatchCount; private final HashPartitionSender popConfig; private final StatusHandler statusHandler; - private final SenderStats stats; public PartitionSenderRootExec(FragmentContext context, RecordBatch incoming, HashPartitionSender operator) throws OutOfMemoryException { - + super(context, operator); this.incoming = incoming; this.operator = operator; this.context = context; this.outGoingBatchCount = operator.getDestinations().size(); this.popConfig = operator; this.statusHandler = new StatusHandler(sendCount, context); - this.stats = new SenderStats(operator); - context.getStats().addOperatorStats(this.stats); - setStats(stats); - this.oContext = new OperatorContext(operator, context, stats); } @Override @@ -90,7 +84,8 @@ public class PartitionSenderRootExec extends BaseRootExec { return false; } - RecordBatch.IterOutcome out = incoming.next(); + RecordBatch.IterOutcome out = next(incoming); + logger.debug("Partitioner.next(): got next record batch with status {}", out); switch(out){ case NONE: @@ -122,7 +117,6 @@ public class PartitionSenderRootExec extends BaseRootExec { partitioner.flushOutgoingBatches(false, true); partitioner.clear(); } - // update DeprecatedOutgoingRecordBatch's schema and generate partitioning code createPartitioner(); } catch (IOException e) { incoming.kill(); @@ -227,6 +221,12 @@ public class PartitionSenderRootExec extends BaseRootExec { fieldId, WritableBatch.getBatchNoHVWrap(0, container, false)); tunnel.sendRecordBatch(statusHandler, writableBatch); + stats.startWait(); + try { + tunnel.sendRecordBatch(statusHandler, writableBatch); + } finally { + stats.stopWait(); + } this.sendCount.increment(); fieldId++; } |