diff options
author | Steven Phillips <sphillips@maprtech.com> | 2014-06-13 13:14:12 -0700 |
---|---|---|
committer | Jacques Nadeau <jacques@apache.org> | 2014-06-16 08:04:43 -0700 |
commit | fc1a7778e2af3b07117f99070530dd5a296ebc6d (patch) | |
tree | 436be4d0f01b7c5a68ee21f6f0d48f2d1038b09d /exec/java-exec/src/main/java/org/apache/drill | |
parent | 49a9ff27f283cbc1c8749989ff408440a0275e7d (diff) |
Fix and improve runtime stats profiles
- Stop stats processing while waiting for next.
- Fix stats collection in PartitionSender and ScanBatch
- Add stats to all senders
- Add wait time to operator profile.
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill')
13 files changed, 326 insertions, 137 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java index 4ac8f74c5..4afea7b63 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java @@ -41,12 +41,15 @@ public class OperatorStats { private boolean inProcessing = false; private boolean inSetup = false; + private boolean inWait = false; protected long processingNanos; protected long setupNanos; + protected long waitNanos; private long processingMark; private long setupMark; + private long waitMark; private long schemas; @@ -89,6 +92,20 @@ public class OperatorStats { inProcessing = false; } + public void startWait() { + assert !inWait; + stopProcessing(); + inWait = true; + waitMark = System.nanoTime(); + } + + public void stopWait() { + assert inWait; + startProcessing(); + waitNanos += System.nanoTime() - waitMark; + inWait = false; + } + public void batchReceived(int inputIndex, long records, boolean newSchema) { recordsReceivedByInput[inputIndex] += records; batchesReceivedByInput[inputIndex]++; @@ -103,7 +120,8 @@ public class OperatorStats { .setOperatorType(operatorType) // .setOperatorId(operatorId) // .setSetupNanos(setupNanos) // - .setProcessNanos(processingNanos); + .setProcessNanos(processingNanos) + .setWaitNanos(waitNanos); addAllMetrics(b); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java index 256c1063a..452052b47 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java @@ -17,11 +17,25 @@ */ package org.apache.drill.exec.physical.impl; +import org.apache.drill.exec.memory.OutOfMemoryException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.ops.SenderStats; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.RecordBatch.IterOutcome; public abstract class BaseRootExec implements RootExec { - protected OperatorStats stats = null; + protected SenderStats stats = null; + protected OperatorContext oContext = null; + + public BaseRootExec(FragmentContext context, PhysicalOperator config) throws OutOfMemoryException { + this.stats = new SenderStats(config); + context.getStats().addOperatorStats(this.stats); + this.oContext = new OperatorContext(config, context, stats); + } @Override public final boolean next() { @@ -35,8 +49,24 @@ public abstract class BaseRootExec implements RootExec { } } - public void setStats(OperatorStats stats) { - this.stats = stats; + public final IterOutcome next(RecordBatch b){ + stats.stopProcessing(); + IterOutcome next; + try { + next = b.next(); + } finally { + stats.startProcessing(); + } + + switch(next){ + case OK_NEW_SCHEMA: + stats.batchReceived(0, b.getRecordCount(), true); + break; + case OK: + stats.batchReceived(0, b.getRecordCount(), false); + break; + } + return next; } public abstract boolean innerNext(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java index 966c22102..4ff583181 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java @@ -39,7 +39,7 @@ public class RandomReceiverCreator implements BatchCreator<RandomReceiver>{ RawBatchBuffer[] buffers = bufHolder.getBuffers(receiver.getOppositeMajorFragmentId()); assert buffers.length == 1; RawBatchBuffer buffer = buffers[0]; - return new WireRecordBatch(context, buffer); + return new WireRecordBatch(context, buffer, receiver); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index d142ff847..55d3f625a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -127,34 +127,44 @@ public class ScanBatch implements RecordBatch { @Override public IterOutcome next() { - mutator.allocate(MAX_RECORD_CNT); - while ((recordCount = currentReader.next()) == 0) { - try { - if (!readers.hasNext()) { - currentReader.cleanup(); + oContext.getStats().startProcessing(); + try { + mutator.allocate(MAX_RECORD_CNT); + while ((recordCount = currentReader.next()) == 0) { + try { + if (!readers.hasNext()) { + currentReader.cleanup(); + releaseAssets(); + return IterOutcome.NONE; + } + oContext.getStats().startSetup(); + try { + currentReader.cleanup(); + currentReader = readers.next(); + partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null; + currentReader.setup(mutator); + mutator.allocate(MAX_RECORD_CNT); + addPartitionVectors(); + } finally { + oContext.getStats().stopSetup(); + } + } catch (ExecutionSetupException e) { + this.context.fail(e); releaseAssets(); - return IterOutcome.NONE; + return IterOutcome.STOP; } - currentReader.cleanup(); - currentReader = readers.next(); - partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null; - currentReader.setup(mutator); - mutator.allocate(MAX_RECORD_CNT); - addPartitionVectors(); - } catch (ExecutionSetupException e) { - this.context.fail(e); - releaseAssets(); - return IterOutcome.STOP; } - } - populatePartitionVectors(); - if (mutator.isNewSchema()) { - container.buildSchema(SelectionVectorMode.NONE); - schema = container.getSchema(); - return IterOutcome.OK_NEW_SCHEMA; - } else { - return IterOutcome.OK; + populatePartitionVectors(); + if (mutator.isNewSchema()) { + container.buildSchema(SelectionVectorMode.NONE); + schema = container.getSchema(); + return IterOutcome.OK_NEW_SCHEMA; + } else { + return IterOutcome.OK; + } + } finally { + oContext.getStats().stopProcessing(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java index 643552b0a..86e77d8b3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java @@ -21,7 +21,10 @@ import io.netty.buffer.ByteBuf; import java.util.List; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.config.Screen; import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch; import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer; @@ -44,14 +47,14 @@ public class ScreenCreator implements RootCreator<Screen>{ @Override - public RootExec getRoot(FragmentContext context, Screen config, List<RecordBatch> children) { + public RootExec getRoot(FragmentContext context, Screen config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkNotNull(children); Preconditions.checkArgument(children.size() == 1); - return new ScreenRoot(context, children.iterator().next()); + return new ScreenRoot(context, children.iterator().next(), config); } - static class ScreenRoot implements RootExec{ + static class ScreenRoot extends BaseRootExec { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class); volatile boolean ok = true; @@ -62,9 +65,9 @@ public class ScreenCreator implements RootCreator<Screen>{ final UserClientConnection connection; private RecordMaterializer materializer; - public ScreenRoot(FragmentContext context, RecordBatch incoming){ + public ScreenRoot(FragmentContext context, RecordBatch incoming, Screen config) throws OutOfMemoryException { + super(context, config); assert context.getConnection() != null : "A screen root should only be run on the driving node which is connected directly to the client. As such, this should always be true."; - this.context = context; this.incoming = incoming; this.connection = context.getConnection(); @@ -72,14 +75,14 @@ public class ScreenCreator implements RootCreator<Screen>{ @Override - public boolean next() { + public boolean innerNext() { if(!ok){ stop(); context.fail(this.listener.ex); return false; } - IterOutcome outcome = incoming.next(); + IterOutcome outcome = next(incoming); // logger.debug("Screen Outcome {}", outcome); switch(outcome){ case STOP: { @@ -92,7 +95,12 @@ public class ScreenCreator implements RootCreator<Screen>{ .setIsLastChunk(true) // .build(); QueryWritableBatch batch = new QueryWritableBatch(header); - connection.sendResult(listener, batch); + stats.startWait(); + try { + connection.sendResult(listener, batch); + } finally { + stats.stopWait(); + } sendCount.increment(); return false; @@ -107,7 +115,12 @@ public class ScreenCreator implements RootCreator<Screen>{ .setIsLastChunk(true) // .build(); QueryWritableBatch batch = new QueryWritableBatch(header); - connection.sendResult(listener, batch); + stats.startWait(); + try { + connection.sendResult(listener, batch); + } finally { + stats.stopWait(); + } sendCount.increment(); return false; @@ -119,7 +132,12 @@ public class ScreenCreator implements RootCreator<Screen>{ // context.getStats().batchesCompleted.inc(1); // context.getStats().recordsCompleted.inc(incoming.getRecordCount()); QueryWritableBatch batch = materializer.convertNext(false); - connection.sendResult(listener, batch); + stats.startWait(); + try { + connection.sendResult(listener, batch); + } finally { + stats.stopWait(); + } sendCount.increment(); return true; @@ -131,6 +149,7 @@ public class ScreenCreator implements RootCreator<Screen>{ @Override public void stop() { sendCount.waitForSendComplete(); + oContext.close(); incoming.cleanup(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java index 7679701fe..9e9146820 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java @@ -22,7 +22,10 @@ import io.netty.buffer.ByteBuf; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.ops.SenderStats; import org.apache.drill.exec.physical.config.SingleSender; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; @@ -44,7 +47,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ - private static class SingleSenderRootExec implements RootExec{ + private static class SingleSenderRootExec extends BaseRootExec { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleSenderRootExec.class); private RecordBatch incoming; private DataTunnel tunnel; @@ -53,8 +56,9 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ private FragmentContext context; private volatile boolean ok = true; private final SendingAccountor sendCount = new SendingAccountor(); - - public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config){ + + public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config) throws OutOfMemoryException { + super(context, config); this.incoming = batch; assert(incoming != null); this.handle = context.getHandle(); @@ -65,27 +69,37 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ } @Override - public boolean next() { + public boolean innerNext() { if(!ok){ incoming.kill(); return false; } - IterOutcome out = incoming.next(); + IterOutcome out = next(incoming); // logger.debug("Outcome of sender next {}", out); switch(out){ case STOP: case NONE: FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0); sendCount.increment(); - tunnel.sendRecordBatch(new RecordSendFailure(), b2); + stats.startWait(); + try { + tunnel.sendRecordBatch(new RecordSendFailure(), b2); + } finally { + stats.stopWait(); + } return false; case OK_NEW_SCHEMA: case OK: FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch()); sendCount.increment(); - tunnel.sendRecordBatch(new RecordSendFailure(), batch); + stats.startWait(); + try { + tunnel.sendRecordBatch(new RecordSendFailure(), batch); + } finally { + stats.stopWait(); + } return true; case NOT_YET: @@ -98,6 +112,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ public void stop() { ok = false; sendCount.waitForSendComplete(); + oContext.close(); incoming.cleanup(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java index c7fc8135e..bc2cdb5a5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java @@ -24,6 +24,9 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OpProfileDef; +import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.physical.config.RandomReceiver; import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RawFragmentBatch; @@ -43,14 +46,16 @@ public class WireRecordBatch implements RecordBatch { private RawFragmentBatchProvider fragProvider; private FragmentContext context; private BatchSchema schema; + private OperatorStats stats; - public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider fragProvider) throws OutOfMemoryException { + public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider fragProvider, RandomReceiver config) throws OutOfMemoryException { this.fragProvider = fragProvider; this.context = context; // In normal case, batchLoader does not require an allocator. However, in case of splitAndTransfer of a value vector, // we may need an allocator for the new offset vector. Therefore, here we pass the context's allocator to batchLoader. this.batchLoader = new RecordBatchLoader(context.getAllocator()); + this.stats = context.getStats().getOperatorStats(new OpProfileDef(config.getOperatorId(), config.getOperatorType(), 0)); } @Override @@ -100,14 +105,22 @@ public class WireRecordBatch implements RecordBatch { @Override public IterOutcome next() { + stats.startProcessing(); try{ - RawFragmentBatch batch = fragProvider.getNext(); - - // skip over empty batches. we do this since these are basically control messages. - while(batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0){ + RawFragmentBatch batch; + try { + stats.startWait(); batch = fragProvider.getNext(); + + // skip over empty batches. we do this since these are basically control messages. + while(batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0){ + batch = fragProvider.getNext(); + } + } finally { + stats.stopWait(); } + if (batch == null){ batchLoader.clear(); return IterOutcome.NONE; @@ -133,6 +146,8 @@ public class WireRecordBatch implements RecordBatch { }catch(SchemaChangeException | IOException ex){ context.fail(ex); return IterOutcome.STOP; + } finally { + stats.stopProcessing(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java index 9c5582523..a70cd5023 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java @@ -21,8 +21,12 @@ import io.netty.buffer.ByteBuf; import java.util.List; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.ops.SenderStats; import org.apache.drill.exec.physical.config.BroadcastSender; +import org.apache.drill.exec.physical.impl.BaseRootExec; import org.apache.drill.exec.physical.impl.RootExec; import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; @@ -43,7 +47,7 @@ import org.apache.drill.exec.work.ErrorHelper; * This is useful in cases such as broadcast join where sending the entire table to join * to all nodes is cheaper than merging and computing all the joins in the same node. */ -public class BroadcastSenderRootExec implements RootExec { +public class BroadcastSenderRootExec extends BaseRootExec { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BroadcastSenderRootExec.class); private final FragmentContext context; private final BroadcastSender config; @@ -54,7 +58,8 @@ public class BroadcastSenderRootExec implements RootExec { public BroadcastSenderRootExec(FragmentContext context, RecordBatch incoming, - BroadcastSender config) { + BroadcastSender config) throws OutOfMemoryException { + super(context, config); this.ok = true; this.context = context; this.incoming = incoming; @@ -69,20 +74,25 @@ public class BroadcastSenderRootExec implements RootExec { } @Override - public boolean next() { + public boolean innerNext() { if(!ok) { context.fail(statusHandler.ex); return false; } - RecordBatch.IterOutcome out = incoming.next(); + RecordBatch.IterOutcome out = next(incoming); logger.debug("Outcome of sender next {}", out); switch(out){ case STOP: case NONE: for (int i = 0; i < tunnels.length; ++i) { FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i); - tunnels[i].sendRecordBatch(this.statusHandler, b2); + stats.startWait(); + try { + tunnels[i].sendRecordBatch(this.statusHandler, b2); + } finally { + stats.stopWait(); + } statusHandler.sendCount.increment(); } @@ -96,7 +106,12 @@ public class BroadcastSenderRootExec implements RootExec { } for (int i = 0; i < tunnels.length; ++i) { FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i, writableBatch); - tunnels[i].sendRecordBatch(this.statusHandler, batch); + stats.startWait(); + try { + tunnels[i].sendRecordBatch(this.statusHandler, batch); + } finally { + stats.stopWait(); + } statusHandler.sendCount.increment(); } @@ -135,6 +150,7 @@ public class BroadcastSenderRootExec implements RootExec { public void stop() { ok = false; statusHandler.sendCount.waitForSendComplete(); + oContext.close(); incoming.cleanup(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index 25ee667cc..a5d80b0c5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -119,13 +119,16 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } private RawFragmentBatch getNext(RawFragmentBatchProvider provider) throws IOException{ - long startNext = System.nanoTime(); - RawFragmentBatch b = provider.getNext(); - if(b != null){ - stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false); + stats.startWait(); + try { + RawFragmentBatch b = provider.getNext(); + if(b != null){ + stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false); + } + return b; + } finally { + stats.stopWait(); } - stats.addLongStat(Metric.NEXT_WAIT_NANOS, System.nanoTime() - startNext); - return b; } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index bb640b467..7535dcc3f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -56,28 +56,22 @@ public class PartitionSenderRootExec extends BaseRootExec { private HashPartitionSender operator; private Partitioner partitioner; private FragmentContext context; - private OperatorContext oContext; private boolean ok = true; private final SendingAccountor sendCount = new SendingAccountor(); private final int outGoingBatchCount; private final HashPartitionSender popConfig; private final StatusHandler statusHandler; - private final SenderStats stats; public PartitionSenderRootExec(FragmentContext context, RecordBatch incoming, HashPartitionSender operator) throws OutOfMemoryException { - + super(context, operator); this.incoming = incoming; this.operator = operator; this.context = context; this.outGoingBatchCount = operator.getDestinations().size(); this.popConfig = operator; this.statusHandler = new StatusHandler(sendCount, context); - this.stats = new SenderStats(operator); - context.getStats().addOperatorStats(this.stats); - setStats(stats); - this.oContext = new OperatorContext(operator, context, stats); } @Override @@ -90,7 +84,8 @@ public class PartitionSenderRootExec extends BaseRootExec { return false; } - RecordBatch.IterOutcome out = incoming.next(); + RecordBatch.IterOutcome out = next(incoming); + logger.debug("Partitioner.next(): got next record batch with status {}", out); switch(out){ case NONE: @@ -122,7 +117,6 @@ public class PartitionSenderRootExec extends BaseRootExec { partitioner.flushOutgoingBatches(false, true); partitioner.clear(); } - // update DeprecatedOutgoingRecordBatch's schema and generate partitioning code createPartitioner(); } catch (IOException e) { incoming.kill(); @@ -227,6 +221,12 @@ public class PartitionSenderRootExec extends BaseRootExec { fieldId, WritableBatch.getBatchNoHVWrap(0, container, false)); tunnel.sendRecordBatch(statusHandler, writableBatch); + stats.startWait(); + try { + tunnel.sendRecordBatch(statusHandler, writableBatch); + } finally { + stats.stopWait(); + } this.sendCount.increment(); fieldId++; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java index 9bb24d439..6a26d301f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java @@ -265,7 +265,12 @@ public abstract class PartitionerTemplate implements Partitioner { oppositeMinorFragmentId, getWritableBatch()); - tunnel.sendRecordBatch(statusHandler, writableBatch); + stats.startWait(); + try { + tunnel.sendRecordBatch(statusHandler, writableBatch); + } finally { + stats.stopWait(); + } this.sendCount.increment(); } else { logger.debug("Flush requested on an empty outgoing record batch" + (isLast ? " (last batch)" : "")); @@ -278,7 +283,12 @@ public abstract class PartitionerTemplate implements Partitioner { operator.getOppositeMajorFragmentId(), oppositeMinorFragmentId, getWritableBatch()); - tunnel.sendRecordBatch(statusHandler, writableBatch); + stats.startWait(); + try { + tunnel.sendRecordBatch(statusHandler, writableBatch); + } finally { + stats.stopWait(); + } this.sendCount.increment(); vectorContainer.clear(); return; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index d71b8112f..72a7d3bf7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -66,7 +66,9 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements } public final IterOutcome next(int inputIndex, RecordBatch b){ + stats.stopProcessing(); IterOutcome next = b.next(); + stats.startProcessing(); switch(next){ case OK_NEW_SCHEMA: diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java index a1d4df953..2952c4172 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java @@ -23,15 +23,20 @@ import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile; import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile; import org.apache.drill.exec.proto.UserBitShared.OperatorProfile; import org.apache.drill.exec.proto.UserBitShared.QueryProfile; +import org.apache.drill.exec.proto.UserBitShared.StreamProfile; +import java.text.DateFormat; import java.text.NumberFormat; +import java.text.SimpleDateFormat; import java.util.Collections; +import java.util.Date; import java.util.List; import java.util.Locale; public class ProfileWrapper { NumberFormat format = NumberFormat.getInstance(Locale.US); + DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS"); public QueryProfile profile; @@ -46,10 +51,14 @@ public class ProfileWrapper { @Override public String toString() { StringBuilder builder = new StringBuilder(); - builder.append("MAJOR FRAGMENTS\nid\tmin\tavg\tmax\t(time in ms)\n\n" + listMajorFragments()); + builder.append("MAJOR FRAGMENTS\nid\tfirst start\tlast start\tfirst end\tlast end\tmin\tavg\tmax\t(time in ms)\n\n" + listMajorFragments()); builder.append("\n"); for (MajorFragmentProfile majorProfile : profile.getFragmentProfileList()) { - builder.append(String.format("Major Fragment: %d\n%s\n", majorProfile.getMajorFragmentId(), new MajorFragmentWrapper(majorProfile).toString())); + builder.append(String.format("Major Fragment: %d\n%s\n", majorProfile.getMajorFragmentId(), printOperatorsInMajor(majorProfile))); + } + builder.append("\n"); + for (MajorFragmentProfile majorProfile : profile.getFragmentProfileList()) { + builder.append(String.format("Major Fragment: %d\n%s\n", majorProfile.getMajorFragmentId(), printMinorFragmentsInMajor(majorProfile))); } return builder.toString(); } @@ -58,8 +67,12 @@ public class ProfileWrapper { StringBuilder builder = new StringBuilder(); for (MajorFragmentProfile m : profile.getFragmentProfileList()) { List<Long> totalTimes = Lists.newArrayList(); + List<Long> startTimes = Lists.newArrayList(); + List<Long> endTimes = Lists.newArrayList(); for (MinorFragmentProfile minorFragmentProfile : m.getMinorFragmentProfileList()) { totalTimes.add(minorFragmentProfile.getEndTime() - minorFragmentProfile.getStartTime()); + startTimes.add(minorFragmentProfile.getStartTime()); + endTimes.add(minorFragmentProfile.getEndTime()); } long min = Collections.min(totalTimes); long max = Collections.max(totalTimes); @@ -67,82 +80,120 @@ public class ProfileWrapper { for (Long l : totalTimes) { sum += l; } + long firstStart = Collections.min(startTimes); + long lastStart = Collections.max(startTimes); + long firstEnd = Collections.min(endTimes); + long lastEnd = Collections.max(endTimes); long avg = sum / totalTimes.size(); - builder.append(String.format("%d\t%s\t%s\t%s\n", m.getMajorFragmentId(), format.format(min), format.format(avg), format.format(max))); + builder.append(String.format("%d\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n", m.getMajorFragmentId(), dateFormat.format(new Date(firstStart)), + dateFormat.format(new Date(lastStart)), dateFormat.format(new Date(firstEnd)), dateFormat.format(new Date(lastEnd)), + format.format(min), format.format(avg), format.format(max))); } return builder.toString(); } - public class MajorFragmentWrapper { - MajorFragmentProfile majorFragmentProfile; + public String printMinorFragmentsInMajor(MajorFragmentProfile majorFragmentProfile) { + StringBuilder builder = new StringBuilder(); + builder.append("id\tstart\tend\ttotal time (ms)\tmax records\tbatches\n"); + for (MinorFragmentProfile m : majorFragmentProfile.getMinorFragmentProfileList()) { + long startTime = m.getStartTime(); + long endTime = m.getEndTime(); + + List<OperatorProfile> operators = m.getOperatorProfileList(); + OperatorProfile biggest = null; + int biggestIncomingRecords = 0; + for (OperatorProfile oProfile : operators) { + if (biggest == null) { + biggest = oProfile; + int incomingRecordCount = 0; + for (StreamProfile streamProfile : oProfile.getInputProfileList()) { + incomingRecordCount += streamProfile.getRecords(); + } + biggestIncomingRecords = incomingRecordCount; + } else { + int incomingRecordCount = 0; + for (StreamProfile streamProfile : oProfile.getInputProfileList()) { + incomingRecordCount += streamProfile.getRecords(); + } + if (incomingRecordCount > biggestIncomingRecords) { + biggest = oProfile; + biggestIncomingRecords = incomingRecordCount; + } + } + } - public MajorFragmentWrapper(MajorFragmentProfile majorFragmentProfile) { - this.majorFragmentProfile = majorFragmentProfile; - } + int biggestBatches = 0; + for (StreamProfile sProfile : biggest.getInputProfileList()) { + biggestBatches += sProfile.getBatches(); + } - @Override - public String toString() { - return String.format("Minor Fragments\nid\ttotal time (ms)\n%s\nOperators\nid\ttype\tmin\tavg\tmax\t(time in ns)\n%s\n", new MinorFragmentsInMajor().toString(), new OperatorsInMajor().toString()); + builder.append(String.format("%d\t%s\t%s\t%s\t%s\t%s\n", m.getMinorFragmentId(), dateFormat.format(new Date(startTime)), + dateFormat.format(new Date(endTime)), format.format(endTime - startTime), biggestIncomingRecords, biggestBatches)); } + return builder.toString(); + } - public class MinorFragmentsInMajor { - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - for (MinorFragmentProfile minorFragmentProfile: majorFragmentProfile.getMinorFragmentProfileList()) { - builder.append(String.format("%d\t%s\n", minorFragmentProfile.getMinorFragmentId(), format.format(minorFragmentProfile.getEndTime() - minorFragmentProfile.getStartTime()))); + public String printOperatorsInMajor(MajorFragmentProfile majorFragmentProfile) { + StringBuilder builder = new StringBuilder(); + builder.append("id\ttype\tp min\tp avg\tp max\ts min\ts avg\ts max\tw min\tw avg\tw max\n"); + int numOperators = majorFragmentProfile.getMinorFragmentProfile(0).getOperatorProfileCount(); + int numFragments = majorFragmentProfile.getMinorFragmentProfileCount(); + long[][] processing = new long[numOperators + 1][numFragments]; + long[][] setup = new long[numOperators + 1][numFragments]; + long[][] wait = new long[numOperators + 1][numFragments]; + CoreOperatorType[] operatorTypes = new CoreOperatorType[numOperators + 1]; + + for (int i = 0; i < numFragments; i++) { + MinorFragmentProfile minorProfile = majorFragmentProfile.getMinorFragmentProfile(i); + for (int j = 0; j < numOperators; j++) { + OperatorProfile operatorProfile = minorProfile.getOperatorProfile(j); + int operatorId = operatorProfile.getOperatorId(); + processing[operatorId][i] = operatorProfile.getProcessNanos(); + setup[operatorId][i] = operatorProfile.getSetupNanos(); + wait[operatorId][i] = operatorProfile.getWaitNanos(); + if (i == 0) { + operatorTypes[operatorId] = CoreOperatorType.valueOf(operatorProfile.getOperatorType()); } - return builder.toString(); } } - public class OperatorsInMajor { - - @Override - public String toString() { - StringBuilder builder = new StringBuilder(); - int numOperators = majorFragmentProfile.getMinorFragmentProfile(0).getOperatorProfileCount(); - int numFragments = majorFragmentProfile.getMinorFragmentProfileCount(); - long[][] values = new long[numOperators + 1][numFragments]; - CoreOperatorType[] operatorTypes = new CoreOperatorType[numOperators + 1]; - - for (int i = 0; i < numFragments; i++) { - MinorFragmentProfile minorProfile = majorFragmentProfile.getMinorFragmentProfile(i); - for (int j = 0; j < numOperators; j++) { - OperatorProfile operatorProfile = minorProfile.getOperatorProfile(j); - int operatorId = operatorProfile.getOperatorId(); - values[operatorId][i] = operatorProfile.getProcessNanos() + operatorProfile.getSetupNanos(); - if (i == 0) { - operatorTypes[operatorId] = CoreOperatorType.valueOf(operatorProfile.getOperatorType()); - } - } - } - - for (int j = 0; j < numOperators + 1; j++) { - if (operatorTypes[j] == null) { - continue; - } - long min = Long.MAX_VALUE; - long max = Long.MIN_VALUE; - long sum = 0; - - for (int i = 0; i < numFragments; i++) { - min = Math.min(min, values[j][i]); - max = Math.max(max, values[j][i]); - sum += values[j][i]; - } + for (int j = 0; j < numOperators + 1; j++) { + if (operatorTypes[j] == null) { + continue; + } + long processingMin = Long.MAX_VALUE; + long processingMax = Long.MIN_VALUE; + long processingSum = 0; + long setupMin = Long.MAX_VALUE; + long setupMax = Long.MIN_VALUE; + long setupSum = 0; + long waitMin = Long.MAX_VALUE; + long waitMax = Long.MIN_VALUE; + long waitSum = 0; + + for (int i = 0; i < numFragments; i++) { + processingMin = Math.min(processingMin, processing[j][i]); + processingMax = Math.max(processingMax, processing[j][i]); + processingSum += processing[j][i]; + + setupMin = Math.min(setupMin, setup[j][i]); + setupMax = Math.max(setupMax, setup[j][i]); + setupSum += setup[j][i]; + + waitMin = Math.min(waitMin, wait[j][i]); + waitMax = Math.max(waitMax, wait[j][i]); + waitSum += wait[j][i]; + } - long avg = sum / numFragments; + long processingAvg = processingSum / numFragments; + long setupAvg = setupSum / numFragments; + long waitAvg = waitSum / numFragments; - builder.append(String.format("%d\t%s\t%s\t%s\t%s\n", j, operatorTypes[j].toString(), format.format(min), format.format(avg), format.format(max))); - } - return builder.toString(); - } + builder.append(String.format("%d\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n", j, operatorTypes[j].toString(), + format.format(processingMin/1000/1000), format.format(processingAvg/1000/1000), format.format(processingMax/1000/1000), + format.format(setupMin/1000/1000), format.format(setupAvg/1000/1000), format.format(setupMax/1000/1000), + format.format(waitMin/1000/1000), format.format(waitAvg/1000/1000), format.format(waitMax/1000/1000))); } + return builder.toString(); } - - - - } |