diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java | 31 |
1 files changed, 23 insertions, 8 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 409a9c37f..d88e5b58e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.physical.impl.partitionsender; import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; @@ -30,8 +31,10 @@ import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.RootExec; +import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression; import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorWrapper; @@ -54,6 +57,9 @@ public class PartitionSenderRootExec implements RootExec { private Partitioner partitioner; private FragmentContext context; private boolean ok = true; + private AtomicLong batchesSent = new AtomicLong(0); + private final SendingAccountor sendCount = new SendingAccountor(); + public PartitionSenderRootExec(FragmentContext context, RecordBatch incoming, @@ -65,8 +71,9 @@ public class PartitionSenderRootExec implements RootExec { this.outgoing = new OutgoingRecordBatch[operator.getDestinations().size()]; int fieldId = 0; for (CoordinationProtos.DrillbitEndpoint endpoint : operator.getDestinations()) { - outgoing[fieldId] = new OutgoingRecordBatch(operator, - context.getCommunicator().getTunnel(endpoint), + FragmentHandle opposite = context.getHandle().toBuilder().setMajorFragmentId(operator.getOppositeMajorFragmentId()).setMinorFragmentId(fieldId).build(); + outgoing[fieldId] = new OutgoingRecordBatch(sendCount, operator, + context.getDataTunnel(endpoint, opposite), incoming, context, fieldId); @@ -79,6 +86,7 @@ public class PartitionSenderRootExec implements RootExec { if (!ok) { stop(); + return false; } @@ -119,8 +127,8 @@ public class PartitionSenderRootExec implements RootExec { } case OK: partitioner.partitionBatch(incoming); - context.batchesCompleted.inc(1); - context.recordsCompleted.inc(incoming.getRecordCount()); + context.getStats().batchesCompleted.inc(1); + context.getStats().recordsCompleted.inc(incoming.getRecordCount()); return true; case NOT_YET: default: @@ -128,10 +136,7 @@ public class PartitionSenderRootExec implements RootExec { } } - public void stop() { - ok = false; - incoming.kill(); - } + private void generatePartitionFunction() throws SchemaChangeException { @@ -285,4 +290,14 @@ public class PartitionSenderRootExec implements RootExec { } } } + + public void stop() { + logger.debug("Partition sender stopping."); + ok = false; + for(OutgoingRecordBatch b : outgoing){ + b.clear(); + } + incoming.cleanup(); + sendCount.waitForSendComplete(); + } } |