aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
diff options
context:
space:
mode:
authorSteven Phillips <sphillips@maprtech.com>2014-04-11 18:41:03 -0700
committerSteven Phillips <sphillips@maprtech.com>2014-05-04 18:46:37 -0700
commita2355d42dbff51b858fc28540915cf793f1c0fac (patch)
treee5db0d8d9a3d5a0dcb15d1c5e9c24d9a1c47dffb /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
parent70dddc54a73183e58f5493b13b1b19e51162f752 (diff)
DRILL-620: Memory consumption fixes
accounting fixes trim buffers switch to using setSafe and copySafe methods only adaptive allocation operator based allocator wip handle OOM Operator Context
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java19
1 files changed, 14 insertions, 5 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 3e3157b48..604808547 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -29,13 +29,17 @@ import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.physical.impl.RootExec;
import org.apache.drill.exec.physical.impl.SendingAccountor;
import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorWrapper;
@@ -57,6 +61,7 @@ public class PartitionSenderRootExec implements RootExec {
private OutgoingRecordBatch[] outgoing;
private Partitioner partitioner;
private FragmentContext context;
+ private OperatorContext oContext;
private boolean ok = true;
private AtomicLong batchesSent = new AtomicLong(0);
private final SendingAccountor sendCount = new SendingAccountor();
@@ -64,11 +69,12 @@ public class PartitionSenderRootExec implements RootExec {
public PartitionSenderRootExec(FragmentContext context,
RecordBatch incoming,
- HashPartitionSender operator) {
+ HashPartitionSender operator) throws OutOfMemoryException {
this.incoming = incoming;
this.operator = operator;
this.context = context;
+ this.oContext = new OperatorContext(operator, context);
this.outgoing = new OutgoingRecordBatch[operator.getDestinations().size()];
int fieldId = 0;
for (CoordinationProtos.DrillbitEndpoint endpoint : operator.getDestinations()) {
@@ -77,6 +83,7 @@ public class PartitionSenderRootExec implements RootExec {
context.getDataTunnel(endpoint, opposite),
incoming,
context,
+ oContext.getAllocator(),
fieldId);
fieldId++;
}
@@ -252,16 +259,17 @@ public class PartitionSenderRootExec implements RootExec {
// ((IntVector) outgoingVectors[bucket][0]).copyFrom(inIndex,
// outgoingBatches[bucket].getRecordCount(),
// vv1);
- cg.getEvalBlock().add(
+ cg.getEvalBlock()._if(
((JExpression) JExpr.cast(vvClass,
((JExpression)
outgoingVectors
.component(bucket))
.component(JExpr.lit(fieldId))))
- .invoke("copyFrom")
+ .invoke("copyFromSafe")
.arg(inIndex)
.arg(((JExpression) outgoingBatches.component(bucket)).invoke("getRecordCount"))
- .arg(incomingVV));
+ .arg(incomingVV).not())._then().add(((JExpression) outgoingBatches.component(bucket)).invoke("flush"))
+ ._return();
++fieldId;
}
@@ -306,7 +314,8 @@ public class PartitionSenderRootExec implements RootExec {
for(OutgoingRecordBatch b : outgoing){
b.clear();
}
- incoming.cleanup();
sendCount.waitForSendComplete();
+ oContext.close();
+ incoming.cleanup();
}
}