aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java31
1 files changed, 23 insertions, 8 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 409a9c37f..d88e5b58e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.physical.impl.partitionsender;
import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
@@ -30,8 +31,10 @@ import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.physical.impl.RootExec;
+import org.apache.drill.exec.physical.impl.SendingAccountor;
import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorWrapper;
@@ -54,6 +57,9 @@ public class PartitionSenderRootExec implements RootExec {
private Partitioner partitioner;
private FragmentContext context;
private boolean ok = true;
+ private AtomicLong batchesSent = new AtomicLong(0);
+ private final SendingAccountor sendCount = new SendingAccountor();
+
public PartitionSenderRootExec(FragmentContext context,
RecordBatch incoming,
@@ -65,8 +71,9 @@ public class PartitionSenderRootExec implements RootExec {
this.outgoing = new OutgoingRecordBatch[operator.getDestinations().size()];
int fieldId = 0;
for (CoordinationProtos.DrillbitEndpoint endpoint : operator.getDestinations()) {
- outgoing[fieldId] = new OutgoingRecordBatch(operator,
- context.getCommunicator().getTunnel(endpoint),
+ FragmentHandle opposite = context.getHandle().toBuilder().setMajorFragmentId(operator.getOppositeMajorFragmentId()).setMinorFragmentId(fieldId).build();
+ outgoing[fieldId] = new OutgoingRecordBatch(sendCount, operator,
+ context.getDataTunnel(endpoint, opposite),
incoming,
context,
fieldId);
@@ -79,6 +86,7 @@ public class PartitionSenderRootExec implements RootExec {
if (!ok) {
stop();
+
return false;
}
@@ -119,8 +127,8 @@ public class PartitionSenderRootExec implements RootExec {
}
case OK:
partitioner.partitionBatch(incoming);
- context.batchesCompleted.inc(1);
- context.recordsCompleted.inc(incoming.getRecordCount());
+ context.getStats().batchesCompleted.inc(1);
+ context.getStats().recordsCompleted.inc(incoming.getRecordCount());
return true;
case NOT_YET:
default:
@@ -128,10 +136,7 @@ public class PartitionSenderRootExec implements RootExec {
}
}
- public void stop() {
- ok = false;
- incoming.kill();
- }
+
private void generatePartitionFunction() throws SchemaChangeException {
@@ -285,4 +290,14 @@ public class PartitionSenderRootExec implements RootExec {
}
}
}
+
+ public void stop() {
+ logger.debug("Partition sender stopping.");
+ ok = false;
+ for(OutgoingRecordBatch b : outgoing){
+ b.clear();
+ }
+ incoming.cleanup();
+ sendCount.waitForSendComplete();
+ }
}