diff options
author | Steven Phillips <sphillips@maprtech.com> | 2014-04-11 18:41:03 -0700 |
---|---|---|
committer | Steven Phillips <sphillips@maprtech.com> | 2014-05-04 18:46:37 -0700 |
commit | a2355d42dbff51b858fc28540915cf793f1c0fac (patch) | |
tree | e5db0d8d9a3d5a0dcb15d1c5e9c24d9a1c47dffb /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java | |
parent | 70dddc54a73183e58f5493b13b1b19e51162f752 (diff) |
DRILL-620: Memory consumption fixes
accounting fixes
trim buffers
switch to using setSafe and copySafe methods only
adaptive allocation
operator based allocator wip
handle OOM
Operator Context
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java | 19 |
1 files changed, 14 insertions, 5 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 3e3157b48..604808547 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -29,13 +29,17 @@ import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.RootExec; import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression; import org.apache.drill.exec.proto.CoordinationProtos; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorWrapper; @@ -57,6 +61,7 @@ public class PartitionSenderRootExec implements RootExec { private OutgoingRecordBatch[] outgoing; private Partitioner partitioner; private FragmentContext context; + private OperatorContext oContext; private boolean ok = true; private AtomicLong batchesSent = new AtomicLong(0); private final SendingAccountor sendCount = new SendingAccountor(); @@ -64,11 +69,12 @@ public class PartitionSenderRootExec implements RootExec { public PartitionSenderRootExec(FragmentContext context, RecordBatch incoming, - HashPartitionSender operator) { + HashPartitionSender operator) throws OutOfMemoryException { this.incoming = incoming; this.operator = operator; this.context = context; + this.oContext = new OperatorContext(operator, context); this.outgoing = new OutgoingRecordBatch[operator.getDestinations().size()]; int fieldId = 0; for (CoordinationProtos.DrillbitEndpoint endpoint : operator.getDestinations()) { @@ -77,6 +83,7 @@ public class PartitionSenderRootExec implements RootExec { context.getDataTunnel(endpoint, opposite), incoming, context, + oContext.getAllocator(), fieldId); fieldId++; } @@ -252,16 +259,17 @@ public class PartitionSenderRootExec implements RootExec { // ((IntVector) outgoingVectors[bucket][0]).copyFrom(inIndex, // outgoingBatches[bucket].getRecordCount(), // vv1); - cg.getEvalBlock().add( + cg.getEvalBlock()._if( ((JExpression) JExpr.cast(vvClass, ((JExpression) outgoingVectors .component(bucket)) .component(JExpr.lit(fieldId)))) - .invoke("copyFrom") + .invoke("copyFromSafe") .arg(inIndex) .arg(((JExpression) outgoingBatches.component(bucket)).invoke("getRecordCount")) - .arg(incomingVV)); + .arg(incomingVV).not())._then().add(((JExpression) outgoingBatches.component(bucket)).invoke("flush")) + ._return(); ++fieldId; } @@ -306,7 +314,8 @@ public class PartitionSenderRootExec implements RootExec { for(OutgoingRecordBatch b : outgoing){ b.clear(); } - incoming.cleanup(); sendCount.waitForSendComplete(); + oContext.close(); + incoming.cleanup(); } } |