aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
diff options
context:
space:
mode:
authorSteven Phillips <sphillips@maprtech.com>2015-04-07 16:59:52 -0700
committerSteven Phillips <sphillips@maprtech.com>2015-04-10 23:46:34 -0700
commit712b57b981ec2ace8b16b1c34654ad91e1c2cf31 (patch)
tree3e5072e357eb0694f80be6ecccca2af556c2de81 /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
parent08236fa59ae7bc6326409d373ef91bca3b2328f1 (diff)
DRILL-2718: Move counting and tracking of sent batches to FragmentContext
Creates wrapper classes FragmentDataTunnel and FragmentUserDataTunnel which wrap the DataTunnel and UserClientConnection, respectively, allowing us to use DataTunnels and UserClientConnections from a global pool, but track pending batches and send status at the FragmentContext level. Consolidates the various StatusListener implementations used by the various senders and instead uses just one implementation.
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java28
1 files changed, 7 insertions, 21 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 33d6f9560..440af59a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -28,12 +28,12 @@ import org.apache.drill.exec.compile.sig.RuntimeOverridden;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.config.HashPartitionSender;
-import org.apache.drill.exec.physical.impl.SendingAccountor;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.Metric;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.record.BatchSchema;
@@ -47,7 +47,6 @@ import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.rpc.data.DataTunnel;
import org.apache.drill.exec.vector.ValueVector;
import com.google.common.collect.Lists;
@@ -89,9 +88,7 @@ public abstract class PartitionerTemplate implements Partitioner {
RecordBatch incoming,
HashPartitionSender popConfig,
OperatorStats stats,
- SendingAccountor sendingAccountor,
OperatorContext oContext,
- StatusHandler statusHandler,
int start, int end) throws SchemaChangeException {
this.incoming = incoming;
@@ -112,8 +109,8 @@ public abstract class PartitionerTemplate implements Partitioner {
// create outgoingBatches only for subset of Destination Points
if ( fieldId >= start && fieldId < end ) {
logger.debug("start: {}, count: {}, fieldId: {}", start, end, fieldId);
- outgoingBatches.add(new OutgoingRecordBatch(stats, sendingAccountor, popConfig,
- context.getDataTunnel(destination.getEndpoint()), context, oContext.getAllocator(), destination.getId(), statusHandler));
+ outgoingBatches.add(new OutgoingRecordBatch(stats, popConfig,
+ context.getDataTunnel(destination.getEndpoint()), context, oContext.getAllocator(), destination.getId()));
}
fieldId++;
}
@@ -202,7 +199,6 @@ public abstract class PartitionerTemplate implements Partitioner {
/**
* Helper method to copy data based on partition
* @param svIndex
- * @param incoming
* @throws IOException
*/
private void doCopy(int svIndex) throws IOException {
@@ -225,14 +221,12 @@ public abstract class PartitionerTemplate implements Partitioner {
public class OutgoingRecordBatch implements PartitionOutgoingBatch, VectorAccessible {
- private final DataTunnel tunnel;
+ private final AccountingDataTunnel tunnel;
private final HashPartitionSender operator;
private final FragmentContext context;
private final BufferAllocator allocator;
private final VectorContainer vectorContainer = new VectorContainer();
- private final SendingAccountor sendCount;
private final int oppositeMinorFragmentId;
- private final StatusHandler statusHandler;
private final OperatorStats stats;
private boolean isLast = false;
@@ -241,17 +235,14 @@ public abstract class PartitionerTemplate implements Partitioner {
private int recordCount;
private int totalRecords;
- public OutgoingRecordBatch(OperatorStats stats, SendingAccountor sendCount, HashPartitionSender operator, DataTunnel tunnel,
- FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId,
- StatusHandler statusHandler) {
+ public OutgoingRecordBatch(OperatorStats stats, HashPartitionSender operator, AccountingDataTunnel tunnel,
+ FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId) {
this.context = context;
this.allocator = allocator;
this.operator = operator;
this.tunnel = tunnel;
- this.sendCount = sendCount;
this.stats = stats;
this.oppositeMinorFragmentId = oppositeMinorFragmentId;
- this.statusHandler = statusHandler;
}
protected void copy(int inIndex) throws IOException {
@@ -308,11 +299,10 @@ public abstract class PartitionerTemplate implements Partitioner {
updateStats(writableBatch);
stats.startWait();
try {
- tunnel.sendRecordBatch(statusHandler, writableBatch);
+ tunnel.sendRecordBatch(writableBatch);
} finally {
stats.stopWait();
}
- sendCount.increment();
// If the current batch is the last batch, then set a flag to ignore any requests to flush the data
// This is possible when the receiver is terminated, but we still get data from input operator
@@ -329,10 +319,6 @@ public abstract class PartitionerTemplate implements Partitioner {
vectorContainer.zeroVectors();
allocateOutgoingRecordBatch();
}
-
- if (!statusHandler.isOk()) {
- throw new IOException(statusHandler.getException());
- }
}
private void allocateOutgoingRecordBatch() {