aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
diff options
context:
space:
mode:
authorSteven Phillips <sphillips@maprtech.com>2014-06-13 13:14:12 -0700
committerJacques Nadeau <jacques@apache.org>2014-06-16 08:04:43 -0700
commitfc1a7778e2af3b07117f99070530dd5a296ebc6d (patch)
tree436be4d0f01b7c5a68ee21f6f0d48f2d1038b09d /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
parent49a9ff27f283cbc1c8749989ff408440a0275e7d (diff)
Fix and improve runtime stats profiles
- Stop stats processing while waiting for next. - Fix stats collection in PartitionSender and ScanBatch - Add stats to all senders - Add wait time to operator profile.
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java18
1 files changed, 9 insertions, 9 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index bb640b467..7535dcc3f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -56,28 +56,22 @@ public class PartitionSenderRootExec extends BaseRootExec {
private HashPartitionSender operator;
private Partitioner partitioner;
private FragmentContext context;
- private OperatorContext oContext;
private boolean ok = true;
private final SendingAccountor sendCount = new SendingAccountor();
private final int outGoingBatchCount;
private final HashPartitionSender popConfig;
private final StatusHandler statusHandler;
- private final SenderStats stats;
public PartitionSenderRootExec(FragmentContext context,
RecordBatch incoming,
HashPartitionSender operator) throws OutOfMemoryException {
-
+ super(context, operator);
this.incoming = incoming;
this.operator = operator;
this.context = context;
this.outGoingBatchCount = operator.getDestinations().size();
this.popConfig = operator;
this.statusHandler = new StatusHandler(sendCount, context);
- this.stats = new SenderStats(operator);
- context.getStats().addOperatorStats(this.stats);
- setStats(stats);
- this.oContext = new OperatorContext(operator, context, stats);
}
@Override
@@ -90,7 +84,8 @@ public class PartitionSenderRootExec extends BaseRootExec {
return false;
}
- RecordBatch.IterOutcome out = incoming.next();
+ RecordBatch.IterOutcome out = next(incoming);
+
logger.debug("Partitioner.next(): got next record batch with status {}", out);
switch(out){
case NONE:
@@ -122,7 +117,6 @@ public class PartitionSenderRootExec extends BaseRootExec {
partitioner.flushOutgoingBatches(false, true);
partitioner.clear();
}
- // update DeprecatedOutgoingRecordBatch's schema and generate partitioning code
createPartitioner();
} catch (IOException e) {
incoming.kill();
@@ -227,6 +221,12 @@ public class PartitionSenderRootExec extends BaseRootExec {
fieldId,
WritableBatch.getBatchNoHVWrap(0, container, false));
tunnel.sendRecordBatch(statusHandler, writableBatch);
+ stats.startWait();
+ try {
+ tunnel.sendRecordBatch(statusHandler, writableBatch);
+ } finally {
+ stats.stopWait();
+ }
this.sendCount.increment();
fieldId++;
}