diff options
author | Steven Phillips <sphillips@maprtech.com> | 2015-04-07 16:59:52 -0700 |
---|---|---|
committer | Steven Phillips <sphillips@maprtech.com> | 2015-04-10 23:46:34 -0700 |
commit | 712b57b981ec2ace8b16b1c34654ad91e1c2cf31 (patch) | |
tree | 3e5072e357eb0694f80be6ecccca2af556c2de81 /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java | |
parent | 08236fa59ae7bc6326409d373ef91bca3b2328f1 (diff) |
DRILL-2718: Move counting and tracking of sent batches to FragmentContext
Creates wrapper classes FragmentDataTunnel and FragmentUserDataTunnel which wrap
the DataTunnel and UserClientConnection, respectively, allowing us to use DataTunnels
and UserClientConnections from a global pool, but track pending batches and send status
at the FragmentContext level.
Consolidates the various StatusListener implementations used by the various senders and
instead uses just one implementation.
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java | 28 |
1 files changed, 7 insertions, 21 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java index 33d6f9560..440af59a4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java @@ -28,12 +28,12 @@ import org.apache.drill.exec.compile.sig.RuntimeOverridden; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.ops.AccountingDataTunnel; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.config.HashPartitionSender; -import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.Metric; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.BatchSchema; @@ -47,7 +47,6 @@ import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; -import org.apache.drill.exec.rpc.data.DataTunnel; import org.apache.drill.exec.vector.ValueVector; import com.google.common.collect.Lists; @@ -89,9 +88,7 @@ public abstract class PartitionerTemplate implements Partitioner { RecordBatch incoming, HashPartitionSender popConfig, OperatorStats stats, - SendingAccountor sendingAccountor, OperatorContext oContext, - StatusHandler statusHandler, int start, int end) throws SchemaChangeException { this.incoming = incoming; @@ -112,8 +109,8 @@ public abstract class PartitionerTemplate implements Partitioner { // create outgoingBatches only for subset of Destination Points if ( fieldId >= start && fieldId < end ) { logger.debug("start: {}, count: {}, fieldId: {}", start, end, fieldId); - outgoingBatches.add(new OutgoingRecordBatch(stats, sendingAccountor, popConfig, - context.getDataTunnel(destination.getEndpoint()), context, oContext.getAllocator(), destination.getId(), statusHandler)); + outgoingBatches.add(new OutgoingRecordBatch(stats, popConfig, + context.getDataTunnel(destination.getEndpoint()), context, oContext.getAllocator(), destination.getId())); } fieldId++; } @@ -202,7 +199,6 @@ public abstract class PartitionerTemplate implements Partitioner { /** * Helper method to copy data based on partition * @param svIndex - * @param incoming * @throws IOException */ private void doCopy(int svIndex) throws IOException { @@ -225,14 +221,12 @@ public abstract class PartitionerTemplate implements Partitioner { public class OutgoingRecordBatch implements PartitionOutgoingBatch, VectorAccessible { - private final DataTunnel tunnel; + private final AccountingDataTunnel tunnel; private final HashPartitionSender operator; private final FragmentContext context; private final BufferAllocator allocator; private final VectorContainer vectorContainer = new VectorContainer(); - private final SendingAccountor sendCount; private final int oppositeMinorFragmentId; - private final StatusHandler statusHandler; private final OperatorStats stats; private boolean isLast = false; @@ -241,17 +235,14 @@ public abstract class PartitionerTemplate implements Partitioner { private int recordCount; private int totalRecords; - public OutgoingRecordBatch(OperatorStats stats, SendingAccountor sendCount, HashPartitionSender operator, DataTunnel tunnel, - FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId, - StatusHandler statusHandler) { + public OutgoingRecordBatch(OperatorStats stats, HashPartitionSender operator, AccountingDataTunnel tunnel, + FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId) { this.context = context; this.allocator = allocator; this.operator = operator; this.tunnel = tunnel; - this.sendCount = sendCount; this.stats = stats; this.oppositeMinorFragmentId = oppositeMinorFragmentId; - this.statusHandler = statusHandler; } protected void copy(int inIndex) throws IOException { @@ -308,11 +299,10 @@ public abstract class PartitionerTemplate implements Partitioner { updateStats(writableBatch); stats.startWait(); try { - tunnel.sendRecordBatch(statusHandler, writableBatch); + tunnel.sendRecordBatch(writableBatch); } finally { stats.stopWait(); } - sendCount.increment(); // If the current batch is the last batch, then set a flag to ignore any requests to flush the data // This is possible when the receiver is terminated, but we still get data from input operator @@ -329,10 +319,6 @@ public abstract class PartitionerTemplate implements Partitioner { vectorContainer.zeroVectors(); allocateOutgoingRecordBatch(); } - - if (!statusHandler.isOk()) { - throw new IOException(statusHandler.getException()); - } } private void allocateOutgoingRecordBatch() { |