aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill
diff options
context:
space:
mode:
authorSteven Phillips <sphillips@maprtech.com>2014-06-13 13:14:12 -0700
committerJacques Nadeau <jacques@apache.org>2014-06-16 08:04:43 -0700
commitfc1a7778e2af3b07117f99070530dd5a296ebc6d (patch)
tree436be4d0f01b7c5a68ee21f6f0d48f2d1038b09d /exec/java-exec/src/main/java/org/apache/drill
parent49a9ff27f283cbc1c8749989ff408440a0275e7d (diff)
Fix and improve runtime stats profiles
- Stop stats processing while waiting for next. - Fix stats collection in PartitionSender and ScanBatch - Add stats to all senders - Add wait time to operator profile.
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java20
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java36
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java58
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java39
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java29
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java25
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java28
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java15
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java18
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java177
13 files changed, 326 insertions, 137 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
index 4ac8f74c5..4afea7b63 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
@@ -41,12 +41,15 @@ public class OperatorStats {
private boolean inProcessing = false;
private boolean inSetup = false;
+ private boolean inWait = false;
protected long processingNanos;
protected long setupNanos;
+ protected long waitNanos;
private long processingMark;
private long setupMark;
+ private long waitMark;
private long schemas;
@@ -89,6 +92,20 @@ public class OperatorStats {
inProcessing = false;
}
+ public void startWait() {
+ assert !inWait;
+ stopProcessing();
+ inWait = true;
+ waitMark = System.nanoTime();
+ }
+
+ public void stopWait() {
+ assert inWait;
+ startProcessing();
+ waitNanos += System.nanoTime() - waitMark;
+ inWait = false;
+ }
+
public void batchReceived(int inputIndex, long records, boolean newSchema) {
recordsReceivedByInput[inputIndex] += records;
batchesReceivedByInput[inputIndex]++;
@@ -103,7 +120,8 @@ public class OperatorStats {
.setOperatorType(operatorType) //
.setOperatorId(operatorId) //
.setSetupNanos(setupNanos) //
- .setProcessNanos(processingNanos);
+ .setProcessNanos(processingNanos)
+ .setWaitNanos(waitNanos);
addAllMetrics(b);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index 256c1063a..452052b47 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -17,11 +17,25 @@
*/
package org.apache.drill.exec.physical.impl;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.ops.SenderStats;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatch.IterOutcome;
public abstract class BaseRootExec implements RootExec {
- protected OperatorStats stats = null;
+ protected SenderStats stats = null;
+ protected OperatorContext oContext = null;
+
+ public BaseRootExec(FragmentContext context, PhysicalOperator config) throws OutOfMemoryException {
+ this.stats = new SenderStats(config);
+ context.getStats().addOperatorStats(this.stats);
+ this.oContext = new OperatorContext(config, context, stats);
+ }
@Override
public final boolean next() {
@@ -35,8 +49,24 @@ public abstract class BaseRootExec implements RootExec {
}
}
- public void setStats(OperatorStats stats) {
- this.stats = stats;
+ public final IterOutcome next(RecordBatch b){
+ stats.stopProcessing();
+ IterOutcome next;
+ try {
+ next = b.next();
+ } finally {
+ stats.startProcessing();
+ }
+
+ switch(next){
+ case OK_NEW_SCHEMA:
+ stats.batchReceived(0, b.getRecordCount(), true);
+ break;
+ case OK:
+ stats.batchReceived(0, b.getRecordCount(), false);
+ break;
+ }
+ return next;
}
public abstract boolean innerNext();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java
index 966c22102..4ff583181 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RandomReceiverCreator.java
@@ -39,7 +39,7 @@ public class RandomReceiverCreator implements BatchCreator<RandomReceiver>{
RawBatchBuffer[] buffers = bufHolder.getBuffers(receiver.getOppositeMajorFragmentId());
assert buffers.length == 1;
RawBatchBuffer buffer = buffers[0];
- return new WireRecordBatch(context, buffer);
+ return new WireRecordBatch(context, buffer, receiver);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index d142ff847..55d3f625a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -127,34 +127,44 @@ public class ScanBatch implements RecordBatch {
@Override
public IterOutcome next() {
- mutator.allocate(MAX_RECORD_CNT);
- while ((recordCount = currentReader.next()) == 0) {
- try {
- if (!readers.hasNext()) {
- currentReader.cleanup();
+ oContext.getStats().startProcessing();
+ try {
+ mutator.allocate(MAX_RECORD_CNT);
+ while ((recordCount = currentReader.next()) == 0) {
+ try {
+ if (!readers.hasNext()) {
+ currentReader.cleanup();
+ releaseAssets();
+ return IterOutcome.NONE;
+ }
+ oContext.getStats().startSetup();
+ try {
+ currentReader.cleanup();
+ currentReader = readers.next();
+ partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null;
+ currentReader.setup(mutator);
+ mutator.allocate(MAX_RECORD_CNT);
+ addPartitionVectors();
+ } finally {
+ oContext.getStats().stopSetup();
+ }
+ } catch (ExecutionSetupException e) {
+ this.context.fail(e);
releaseAssets();
- return IterOutcome.NONE;
+ return IterOutcome.STOP;
}
- currentReader.cleanup();
- currentReader = readers.next();
- partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null;
- currentReader.setup(mutator);
- mutator.allocate(MAX_RECORD_CNT);
- addPartitionVectors();
- } catch (ExecutionSetupException e) {
- this.context.fail(e);
- releaseAssets();
- return IterOutcome.STOP;
}
- }
- populatePartitionVectors();
- if (mutator.isNewSchema()) {
- container.buildSchema(SelectionVectorMode.NONE);
- schema = container.getSchema();
- return IterOutcome.OK_NEW_SCHEMA;
- } else {
- return IterOutcome.OK;
+ populatePartitionVectors();
+ if (mutator.isNewSchema()) {
+ container.buildSchema(SelectionVectorMode.NONE);
+ schema = container.getSchema();
+ return IterOutcome.OK_NEW_SCHEMA;
+ } else {
+ return IterOutcome.OK;
+ }
+ } finally {
+ oContext.getStats().stopProcessing();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 643552b0a..86e77d8b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -21,7 +21,10 @@ import io.netty.buffer.ByteBuf;
import java.util.List;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.config.Screen;
import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer;
@@ -44,14 +47,14 @@ public class ScreenCreator implements RootCreator<Screen>{
@Override
- public RootExec getRoot(FragmentContext context, Screen config, List<RecordBatch> children) {
+ public RootExec getRoot(FragmentContext context, Screen config, List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkNotNull(children);
Preconditions.checkArgument(children.size() == 1);
- return new ScreenRoot(context, children.iterator().next());
+ return new ScreenRoot(context, children.iterator().next(), config);
}
- static class ScreenRoot implements RootExec{
+ static class ScreenRoot extends BaseRootExec {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class);
volatile boolean ok = true;
@@ -62,9 +65,9 @@ public class ScreenCreator implements RootCreator<Screen>{
final UserClientConnection connection;
private RecordMaterializer materializer;
- public ScreenRoot(FragmentContext context, RecordBatch incoming){
+ public ScreenRoot(FragmentContext context, RecordBatch incoming, Screen config) throws OutOfMemoryException {
+ super(context, config);
assert context.getConnection() != null : "A screen root should only be run on the driving node which is connected directly to the client. As such, this should always be true.";
-
this.context = context;
this.incoming = incoming;
this.connection = context.getConnection();
@@ -72,14 +75,14 @@ public class ScreenCreator implements RootCreator<Screen>{
@Override
- public boolean next() {
+ public boolean innerNext() {
if(!ok){
stop();
context.fail(this.listener.ex);
return false;
}
- IterOutcome outcome = incoming.next();
+ IterOutcome outcome = next(incoming);
// logger.debug("Screen Outcome {}", outcome);
switch(outcome){
case STOP: {
@@ -92,7 +95,12 @@ public class ScreenCreator implements RootCreator<Screen>{
.setIsLastChunk(true) //
.build();
QueryWritableBatch batch = new QueryWritableBatch(header);
- connection.sendResult(listener, batch);
+ stats.startWait();
+ try {
+ connection.sendResult(listener, batch);
+ } finally {
+ stats.stopWait();
+ }
sendCount.increment();
return false;
@@ -107,7 +115,12 @@ public class ScreenCreator implements RootCreator<Screen>{
.setIsLastChunk(true) //
.build();
QueryWritableBatch batch = new QueryWritableBatch(header);
- connection.sendResult(listener, batch);
+ stats.startWait();
+ try {
+ connection.sendResult(listener, batch);
+ } finally {
+ stats.stopWait();
+ }
sendCount.increment();
return false;
@@ -119,7 +132,12 @@ public class ScreenCreator implements RootCreator<Screen>{
// context.getStats().batchesCompleted.inc(1);
// context.getStats().recordsCompleted.inc(incoming.getRecordCount());
QueryWritableBatch batch = materializer.convertNext(false);
- connection.sendResult(listener, batch);
+ stats.startWait();
+ try {
+ connection.sendResult(listener, batch);
+ } finally {
+ stats.stopWait();
+ }
sendCount.increment();
return true;
@@ -131,6 +149,7 @@ public class ScreenCreator implements RootCreator<Screen>{
@Override
public void stop() {
sendCount.waitForSendComplete();
+ oContext.close();
incoming.cleanup();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 7679701fe..9e9146820 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -22,7 +22,10 @@ import io.netty.buffer.ByteBuf;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.SenderStats;
import org.apache.drill.exec.physical.config.SingleSender;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -44,7 +47,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
- private static class SingleSenderRootExec implements RootExec{
+ private static class SingleSenderRootExec extends BaseRootExec {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleSenderRootExec.class);
private RecordBatch incoming;
private DataTunnel tunnel;
@@ -53,8 +56,9 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
private FragmentContext context;
private volatile boolean ok = true;
private final SendingAccountor sendCount = new SendingAccountor();
-
- public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config){
+
+ public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config) throws OutOfMemoryException {
+ super(context, config);
this.incoming = batch;
assert(incoming != null);
this.handle = context.getHandle();
@@ -65,27 +69,37 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
}
@Override
- public boolean next() {
+ public boolean innerNext() {
if(!ok){
incoming.kill();
return false;
}
- IterOutcome out = incoming.next();
+ IterOutcome out = next(incoming);
// logger.debug("Outcome of sender next {}", out);
switch(out){
case STOP:
case NONE:
FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0);
sendCount.increment();
- tunnel.sendRecordBatch(new RecordSendFailure(), b2);
+ stats.startWait();
+ try {
+ tunnel.sendRecordBatch(new RecordSendFailure(), b2);
+ } finally {
+ stats.stopWait();
+ }
return false;
case OK_NEW_SCHEMA:
case OK:
FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, 0, incoming.getWritableBatch());
sendCount.increment();
- tunnel.sendRecordBatch(new RecordSendFailure(), batch);
+ stats.startWait();
+ try {
+ tunnel.sendRecordBatch(new RecordSendFailure(), batch);
+ } finally {
+ stats.stopWait();
+ }
return true;
case NOT_YET:
@@ -98,6 +112,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
public void stop() {
ok = false;
sendCount.waitForSendComplete();
+ oContext.close();
incoming.cleanup();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
index c7fc8135e..bc2cdb5a5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
@@ -24,6 +24,9 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OpProfileDef;
+import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.physical.config.RandomReceiver;
import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RawFragmentBatch;
@@ -43,14 +46,16 @@ public class WireRecordBatch implements RecordBatch {
private RawFragmentBatchProvider fragProvider;
private FragmentContext context;
private BatchSchema schema;
+ private OperatorStats stats;
- public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider fragProvider) throws OutOfMemoryException {
+ public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider fragProvider, RandomReceiver config) throws OutOfMemoryException {
this.fragProvider = fragProvider;
this.context = context;
// In normal case, batchLoader does not require an allocator. However, in case of splitAndTransfer of a value vector,
// we may need an allocator for the new offset vector. Therefore, here we pass the context's allocator to batchLoader.
this.batchLoader = new RecordBatchLoader(context.getAllocator());
+ this.stats = context.getStats().getOperatorStats(new OpProfileDef(config.getOperatorId(), config.getOperatorType(), 0));
}
@Override
@@ -100,14 +105,22 @@ public class WireRecordBatch implements RecordBatch {
@Override
public IterOutcome next() {
+ stats.startProcessing();
try{
- RawFragmentBatch batch = fragProvider.getNext();
-
- // skip over empty batches. we do this since these are basically control messages.
- while(batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0){
+ RawFragmentBatch batch;
+ try {
+ stats.startWait();
batch = fragProvider.getNext();
+
+ // skip over empty batches. we do this since these are basically control messages.
+ while(batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0){
+ batch = fragProvider.getNext();
+ }
+ } finally {
+ stats.stopWait();
}
+
if (batch == null){
batchLoader.clear();
return IterOutcome.NONE;
@@ -133,6 +146,8 @@ public class WireRecordBatch implements RecordBatch {
}catch(SchemaChangeException | IOException ex){
context.fail(ex);
return IterOutcome.STOP;
+ } finally {
+ stats.stopProcessing();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index 9c5582523..a70cd5023 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -21,8 +21,12 @@ import io.netty.buffer.ByteBuf;
import java.util.List;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.ops.SenderStats;
import org.apache.drill.exec.physical.config.BroadcastSender;
+import org.apache.drill.exec.physical.impl.BaseRootExec;
import org.apache.drill.exec.physical.impl.RootExec;
import org.apache.drill.exec.physical.impl.SendingAccountor;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -43,7 +47,7 @@ import org.apache.drill.exec.work.ErrorHelper;
* This is useful in cases such as broadcast join where sending the entire table to join
* to all nodes is cheaper than merging and computing all the joins in the same node.
*/
-public class BroadcastSenderRootExec implements RootExec {
+public class BroadcastSenderRootExec extends BaseRootExec {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BroadcastSenderRootExec.class);
private final FragmentContext context;
private final BroadcastSender config;
@@ -54,7 +58,8 @@ public class BroadcastSenderRootExec implements RootExec {
public BroadcastSenderRootExec(FragmentContext context,
RecordBatch incoming,
- BroadcastSender config) {
+ BroadcastSender config) throws OutOfMemoryException {
+ super(context, config);
this.ok = true;
this.context = context;
this.incoming = incoming;
@@ -69,20 +74,25 @@ public class BroadcastSenderRootExec implements RootExec {
}
@Override
- public boolean next() {
+ public boolean innerNext() {
if(!ok) {
context.fail(statusHandler.ex);
return false;
}
- RecordBatch.IterOutcome out = incoming.next();
+ RecordBatch.IterOutcome out = next(incoming);
logger.debug("Outcome of sender next {}", out);
switch(out){
case STOP:
case NONE:
for (int i = 0; i < tunnels.length; ++i) {
FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLast(handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i);
- tunnels[i].sendRecordBatch(this.statusHandler, b2);
+ stats.startWait();
+ try {
+ tunnels[i].sendRecordBatch(this.statusHandler, b2);
+ } finally {
+ stats.stopWait();
+ }
statusHandler.sendCount.increment();
}
@@ -96,7 +106,12 @@ public class BroadcastSenderRootExec implements RootExec {
}
for (int i = 0; i < tunnels.length; ++i) {
FragmentWritableBatch batch = new FragmentWritableBatch(false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), config.getOppositeMajorFragmentId(), i, writableBatch);
- tunnels[i].sendRecordBatch(this.statusHandler, batch);
+ stats.startWait();
+ try {
+ tunnels[i].sendRecordBatch(this.statusHandler, batch);
+ } finally {
+ stats.stopWait();
+ }
statusHandler.sendCount.increment();
}
@@ -135,6 +150,7 @@ public class BroadcastSenderRootExec implements RootExec {
public void stop() {
ok = false;
statusHandler.sendCount.waitForSendComplete();
+ oContext.close();
incoming.cleanup();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 25ee667cc..a5d80b0c5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -119,13 +119,16 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
private RawFragmentBatch getNext(RawFragmentBatchProvider provider) throws IOException{
- long startNext = System.nanoTime();
- RawFragmentBatch b = provider.getNext();
- if(b != null){
- stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false);
+ stats.startWait();
+ try {
+ RawFragmentBatch b = provider.getNext();
+ if(b != null){
+ stats.batchReceived(0, b.getHeader().getDef().getRecordCount(), false);
+ }
+ return b;
+ } finally {
+ stats.stopWait();
}
- stats.addLongStat(Metric.NEXT_WAIT_NANOS, System.nanoTime() - startNext);
- return b;
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index bb640b467..7535dcc3f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -56,28 +56,22 @@ public class PartitionSenderRootExec extends BaseRootExec {
private HashPartitionSender operator;
private Partitioner partitioner;
private FragmentContext context;
- private OperatorContext oContext;
private boolean ok = true;
private final SendingAccountor sendCount = new SendingAccountor();
private final int outGoingBatchCount;
private final HashPartitionSender popConfig;
private final StatusHandler statusHandler;
- private final SenderStats stats;
public PartitionSenderRootExec(FragmentContext context,
RecordBatch incoming,
HashPartitionSender operator) throws OutOfMemoryException {
-
+ super(context, operator);
this.incoming = incoming;
this.operator = operator;
this.context = context;
this.outGoingBatchCount = operator.getDestinations().size();
this.popConfig = operator;
this.statusHandler = new StatusHandler(sendCount, context);
- this.stats = new SenderStats(operator);
- context.getStats().addOperatorStats(this.stats);
- setStats(stats);
- this.oContext = new OperatorContext(operator, context, stats);
}
@Override
@@ -90,7 +84,8 @@ public class PartitionSenderRootExec extends BaseRootExec {
return false;
}
- RecordBatch.IterOutcome out = incoming.next();
+ RecordBatch.IterOutcome out = next(incoming);
+
logger.debug("Partitioner.next(): got next record batch with status {}", out);
switch(out){
case NONE:
@@ -122,7 +117,6 @@ public class PartitionSenderRootExec extends BaseRootExec {
partitioner.flushOutgoingBatches(false, true);
partitioner.clear();
}
- // update DeprecatedOutgoingRecordBatch's schema and generate partitioning code
createPartitioner();
} catch (IOException e) {
incoming.kill();
@@ -227,6 +221,12 @@ public class PartitionSenderRootExec extends BaseRootExec {
fieldId,
WritableBatch.getBatchNoHVWrap(0, container, false));
tunnel.sendRecordBatch(statusHandler, writableBatch);
+ stats.startWait();
+ try {
+ tunnel.sendRecordBatch(statusHandler, writableBatch);
+ } finally {
+ stats.stopWait();
+ }
this.sendCount.increment();
fieldId++;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 9bb24d439..6a26d301f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -265,7 +265,12 @@ public abstract class PartitionerTemplate implements Partitioner {
oppositeMinorFragmentId,
getWritableBatch());
- tunnel.sendRecordBatch(statusHandler, writableBatch);
+ stats.startWait();
+ try {
+ tunnel.sendRecordBatch(statusHandler, writableBatch);
+ } finally {
+ stats.stopWait();
+ }
this.sendCount.increment();
} else {
logger.debug("Flush requested on an empty outgoing record batch" + (isLast ? " (last batch)" : ""));
@@ -278,7 +283,12 @@ public abstract class PartitionerTemplate implements Partitioner {
operator.getOppositeMajorFragmentId(),
oppositeMinorFragmentId,
getWritableBatch());
- tunnel.sendRecordBatch(statusHandler, writableBatch);
+ stats.startWait();
+ try {
+ tunnel.sendRecordBatch(statusHandler, writableBatch);
+ } finally {
+ stats.stopWait();
+ }
this.sendCount.increment();
vectorContainer.clear();
return;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index d71b8112f..72a7d3bf7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -66,7 +66,9 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
}
public final IterOutcome next(int inputIndex, RecordBatch b){
+ stats.stopProcessing();
IterOutcome next = b.next();
+ stats.startProcessing();
switch(next){
case OK_NEW_SCHEMA:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java
index a1d4df953..2952c4172 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java
@@ -23,15 +23,20 @@ import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
+import org.apache.drill.exec.proto.UserBitShared.StreamProfile;
+import java.text.DateFormat;
import java.text.NumberFormat;
+import java.text.SimpleDateFormat;
import java.util.Collections;
+import java.util.Date;
import java.util.List;
import java.util.Locale;
public class ProfileWrapper {
NumberFormat format = NumberFormat.getInstance(Locale.US);
+ DateFormat dateFormat = new SimpleDateFormat("HH:mm:ss.SSS");
public QueryProfile profile;
@@ -46,10 +51,14 @@ public class ProfileWrapper {
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
- builder.append("MAJOR FRAGMENTS\nid\tmin\tavg\tmax\t(time in ms)\n\n" + listMajorFragments());
+ builder.append("MAJOR FRAGMENTS\nid\tfirst start\tlast start\tfirst end\tlast end\tmin\tavg\tmax\t(time in ms)\n\n" + listMajorFragments());
builder.append("\n");
for (MajorFragmentProfile majorProfile : profile.getFragmentProfileList()) {
- builder.append(String.format("Major Fragment: %d\n%s\n", majorProfile.getMajorFragmentId(), new MajorFragmentWrapper(majorProfile).toString()));
+ builder.append(String.format("Major Fragment: %d\n%s\n", majorProfile.getMajorFragmentId(), printOperatorsInMajor(majorProfile)));
+ }
+ builder.append("\n");
+ for (MajorFragmentProfile majorProfile : profile.getFragmentProfileList()) {
+ builder.append(String.format("Major Fragment: %d\n%s\n", majorProfile.getMajorFragmentId(), printMinorFragmentsInMajor(majorProfile)));
}
return builder.toString();
}
@@ -58,8 +67,12 @@ public class ProfileWrapper {
StringBuilder builder = new StringBuilder();
for (MajorFragmentProfile m : profile.getFragmentProfileList()) {
List<Long> totalTimes = Lists.newArrayList();
+ List<Long> startTimes = Lists.newArrayList();
+ List<Long> endTimes = Lists.newArrayList();
for (MinorFragmentProfile minorFragmentProfile : m.getMinorFragmentProfileList()) {
totalTimes.add(minorFragmentProfile.getEndTime() - minorFragmentProfile.getStartTime());
+ startTimes.add(minorFragmentProfile.getStartTime());
+ endTimes.add(minorFragmentProfile.getEndTime());
}
long min = Collections.min(totalTimes);
long max = Collections.max(totalTimes);
@@ -67,82 +80,120 @@ public class ProfileWrapper {
for (Long l : totalTimes) {
sum += l;
}
+ long firstStart = Collections.min(startTimes);
+ long lastStart = Collections.max(startTimes);
+ long firstEnd = Collections.min(endTimes);
+ long lastEnd = Collections.max(endTimes);
long avg = sum / totalTimes.size();
- builder.append(String.format("%d\t%s\t%s\t%s\n", m.getMajorFragmentId(), format.format(min), format.format(avg), format.format(max)));
+ builder.append(String.format("%d\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n", m.getMajorFragmentId(), dateFormat.format(new Date(firstStart)),
+ dateFormat.format(new Date(lastStart)), dateFormat.format(new Date(firstEnd)), dateFormat.format(new Date(lastEnd)),
+ format.format(min), format.format(avg), format.format(max)));
}
return builder.toString();
}
- public class MajorFragmentWrapper {
- MajorFragmentProfile majorFragmentProfile;
+ public String printMinorFragmentsInMajor(MajorFragmentProfile majorFragmentProfile) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("id\tstart\tend\ttotal time (ms)\tmax records\tbatches\n");
+ for (MinorFragmentProfile m : majorFragmentProfile.getMinorFragmentProfileList()) {
+ long startTime = m.getStartTime();
+ long endTime = m.getEndTime();
+
+ List<OperatorProfile> operators = m.getOperatorProfileList();
+ OperatorProfile biggest = null;
+ int biggestIncomingRecords = 0;
+ for (OperatorProfile oProfile : operators) {
+ if (biggest == null) {
+ biggest = oProfile;
+ int incomingRecordCount = 0;
+ for (StreamProfile streamProfile : oProfile.getInputProfileList()) {
+ incomingRecordCount += streamProfile.getRecords();
+ }
+ biggestIncomingRecords = incomingRecordCount;
+ } else {
+ int incomingRecordCount = 0;
+ for (StreamProfile streamProfile : oProfile.getInputProfileList()) {
+ incomingRecordCount += streamProfile.getRecords();
+ }
+ if (incomingRecordCount > biggestIncomingRecords) {
+ biggest = oProfile;
+ biggestIncomingRecords = incomingRecordCount;
+ }
+ }
+ }
- public MajorFragmentWrapper(MajorFragmentProfile majorFragmentProfile) {
- this.majorFragmentProfile = majorFragmentProfile;
- }
+ int biggestBatches = 0;
+ for (StreamProfile sProfile : biggest.getInputProfileList()) {
+ biggestBatches += sProfile.getBatches();
+ }
- @Override
- public String toString() {
- return String.format("Minor Fragments\nid\ttotal time (ms)\n%s\nOperators\nid\ttype\tmin\tavg\tmax\t(time in ns)\n%s\n", new MinorFragmentsInMajor().toString(), new OperatorsInMajor().toString());
+ builder.append(String.format("%d\t%s\t%s\t%s\t%s\t%s\n", m.getMinorFragmentId(), dateFormat.format(new Date(startTime)),
+ dateFormat.format(new Date(endTime)), format.format(endTime - startTime), biggestIncomingRecords, biggestBatches));
}
+ return builder.toString();
+ }
- public class MinorFragmentsInMajor {
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- for (MinorFragmentProfile minorFragmentProfile: majorFragmentProfile.getMinorFragmentProfileList()) {
- builder.append(String.format("%d\t%s\n", minorFragmentProfile.getMinorFragmentId(), format.format(minorFragmentProfile.getEndTime() - minorFragmentProfile.getStartTime())));
+ public String printOperatorsInMajor(MajorFragmentProfile majorFragmentProfile) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("id\ttype\tp min\tp avg\tp max\ts min\ts avg\ts max\tw min\tw avg\tw max\n");
+ int numOperators = majorFragmentProfile.getMinorFragmentProfile(0).getOperatorProfileCount();
+ int numFragments = majorFragmentProfile.getMinorFragmentProfileCount();
+ long[][] processing = new long[numOperators + 1][numFragments];
+ long[][] setup = new long[numOperators + 1][numFragments];
+ long[][] wait = new long[numOperators + 1][numFragments];
+ CoreOperatorType[] operatorTypes = new CoreOperatorType[numOperators + 1];
+
+ for (int i = 0; i < numFragments; i++) {
+ MinorFragmentProfile minorProfile = majorFragmentProfile.getMinorFragmentProfile(i);
+ for (int j = 0; j < numOperators; j++) {
+ OperatorProfile operatorProfile = minorProfile.getOperatorProfile(j);
+ int operatorId = operatorProfile.getOperatorId();
+ processing[operatorId][i] = operatorProfile.getProcessNanos();
+ setup[operatorId][i] = operatorProfile.getSetupNanos();
+ wait[operatorId][i] = operatorProfile.getWaitNanos();
+ if (i == 0) {
+ operatorTypes[operatorId] = CoreOperatorType.valueOf(operatorProfile.getOperatorType());
}
- return builder.toString();
}
}
- public class OperatorsInMajor {
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- int numOperators = majorFragmentProfile.getMinorFragmentProfile(0).getOperatorProfileCount();
- int numFragments = majorFragmentProfile.getMinorFragmentProfileCount();
- long[][] values = new long[numOperators + 1][numFragments];
- CoreOperatorType[] operatorTypes = new CoreOperatorType[numOperators + 1];
-
- for (int i = 0; i < numFragments; i++) {
- MinorFragmentProfile minorProfile = majorFragmentProfile.getMinorFragmentProfile(i);
- for (int j = 0; j < numOperators; j++) {
- OperatorProfile operatorProfile = minorProfile.getOperatorProfile(j);
- int operatorId = operatorProfile.getOperatorId();
- values[operatorId][i] = operatorProfile.getProcessNanos() + operatorProfile.getSetupNanos();
- if (i == 0) {
- operatorTypes[operatorId] = CoreOperatorType.valueOf(operatorProfile.getOperatorType());
- }
- }
- }
-
- for (int j = 0; j < numOperators + 1; j++) {
- if (operatorTypes[j] == null) {
- continue;
- }
- long min = Long.MAX_VALUE;
- long max = Long.MIN_VALUE;
- long sum = 0;
-
- for (int i = 0; i < numFragments; i++) {
- min = Math.min(min, values[j][i]);
- max = Math.max(max, values[j][i]);
- sum += values[j][i];
- }
+ for (int j = 0; j < numOperators + 1; j++) {
+ if (operatorTypes[j] == null) {
+ continue;
+ }
+ long processingMin = Long.MAX_VALUE;
+ long processingMax = Long.MIN_VALUE;
+ long processingSum = 0;
+ long setupMin = Long.MAX_VALUE;
+ long setupMax = Long.MIN_VALUE;
+ long setupSum = 0;
+ long waitMin = Long.MAX_VALUE;
+ long waitMax = Long.MIN_VALUE;
+ long waitSum = 0;
+
+ for (int i = 0; i < numFragments; i++) {
+ processingMin = Math.min(processingMin, processing[j][i]);
+ processingMax = Math.max(processingMax, processing[j][i]);
+ processingSum += processing[j][i];
+
+ setupMin = Math.min(setupMin, setup[j][i]);
+ setupMax = Math.max(setupMax, setup[j][i]);
+ setupSum += setup[j][i];
+
+ waitMin = Math.min(waitMin, wait[j][i]);
+ waitMax = Math.max(waitMax, wait[j][i]);
+ waitSum += wait[j][i];
+ }
- long avg = sum / numFragments;
+ long processingAvg = processingSum / numFragments;
+ long setupAvg = setupSum / numFragments;
+ long waitAvg = waitSum / numFragments;
- builder.append(String.format("%d\t%s\t%s\t%s\t%s\n", j, operatorTypes[j].toString(), format.format(min), format.format(avg), format.format(max)));
- }
- return builder.toString();
- }
+ builder.append(String.format("%d\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n", j, operatorTypes[j].toString(),
+ format.format(processingMin/1000/1000), format.format(processingAvg/1000/1000), format.format(processingMax/1000/1000),
+ format.format(setupMin/1000/1000), format.format(setupAvg/1000/1000), format.format(setupMax/1000/1000),
+ format.format(waitMin/1000/1000), format.format(waitAvg/1000/1000), format.format(waitMax/1000/1000)));
}
+ return builder.toString();
}
-
-
-
-
}