diff options
author | Timothy Farkas <timothyfarkas@apache.org> | 2017-12-14 10:48:27 -0800 |
---|---|---|
committer | Aman Sinha <asinha@maprtech.com> | 2018-01-21 08:32:38 -0800 |
commit | 81f926b5ea5c8537c7c65db397efc0fdf0a1d589 (patch) | |
tree | e02e8ee708958e7b9fd4fc251a3db6f0754d188a | |
parent | 9e944c97ee6f6c0d1705f09d531af35deed2e310 (diff) |
DRILL-5967: Fixed memory leak in OrderedPartitionSender
closes apache/drill#1073
2 files changed, 44 insertions, 5 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java index 5c953b1f3..d2e07e7b6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java @@ -29,8 +29,34 @@ import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootEx import org.apache.drill.exec.record.RecordBatch; import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; +/** + * <br/> + * <h1>Known Issues:</h1> + * <h2>Creation of batches</h2> + * <p> + * The {@link org.apache.drill.exec.work.fragment.FragmentExecutor} is only aware of the operators in the tree that it has a reference too. In the case of the + * {@link OrderedPartitionSenderCreator}, an upstream {@link org.apache.drill.exec.record.RecordBatch} is wrapped in an + * {@link org.apache.drill.exec.physical.impl.orderedpartitioner.OrderedPartitionRecordBatch}. Since the + * {@link org.apache.drill.exec.physical.impl.orderedpartitioner.OrderedPartitionRecordBatch} is instantiated in the creator the + * {@link org.apache.drill.exec.work.fragment.FragmentExecutor} does not have a reference to it. So when the {@link org.apache.drill.exec.work.fragment.FragmentExecutor} + * closes the operators it closes the original operator, but not not the wrapping {@link OrderedPartitionSenderCreator}. This is an issue since the + * {@link OrderedPartitionSenderCreator} allocates {@link org.apache.drill.exec.record.VectorContainer}s which are consequentially never released. + * <br/> + * <ol> + * <li> + * We change the Creators in some way to communicate to the FragmentExecutor that they have wrapped an operator, so the FragmentExecutor can close the wrapped operator + * instead of the original operator. + * </li> + * <li> + * Or we take a less invasive approach and simply tell the {@link org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec} whether to close the wrapped + * operator. + * </li> + * </ol> + * <br/> + * For now we've taken approach 2. In the future we should we should implement approach 1. + * </p> + */ public class OrderedPartitionSenderCreator implements RootCreator<OrderedPartitionSender> { @SuppressWarnings("resource") @@ -39,10 +65,9 @@ public class OrderedPartitionSenderCreator implements RootCreator<OrderedPartiti List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); - List<RecordBatch> ordered_children = Lists.newArrayList(); - ordered_children.add(new OrderedPartitionRecordBatch(config, children.iterator().next(), context)); - HashPartitionSender hpc = new HashPartitionSender(config.getOppositeMajorFragmentId(), config, config.getRef(), config.getDestinations()); - return new PartitionSenderRootExec(context, ordered_children.iterator().next(), hpc); + final OrderedPartitionRecordBatch recordBatch = new OrderedPartitionRecordBatch(config, children.iterator().next(), context); + final HashPartitionSender hpc = new HashPartitionSender(config.getOppositeMajorFragmentId(), config, config.getRef(), config.getDestinations()); + return new PartitionSenderRootExec(context, recordBatch, hpc, true); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 7684e94de..108d539f8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -43,6 +43,7 @@ import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.FragmentWritableBatch; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; @@ -72,6 +73,7 @@ public class PartitionSenderRootExec extends BaseRootExec { private final AtomicInteger remaingReceiverCount; private volatile boolean done = false; private boolean first = true; + private boolean closeIncoming; long minReceiverRecordCount = Long.MAX_VALUE; long maxReceiverRecordCount = Long.MIN_VALUE; @@ -99,9 +101,17 @@ public class PartitionSenderRootExec extends BaseRootExec { public PartitionSenderRootExec(FragmentContext context, RecordBatch incoming, HashPartitionSender operator) throws OutOfMemoryException { + this(context, incoming, operator, false); + } + + public PartitionSenderRootExec(FragmentContext context, + RecordBatch incoming, + HashPartitionSender operator, + boolean closeIncoming) throws OutOfMemoryException { super(context, context.newOperatorContext(operator, null), operator); this.incoming = incoming; this.operator = operator; + this.closeIncoming = closeIncoming; this.context = context; outGoingBatchCount = operator.getDestinations().size(); popConfig = operator; @@ -341,6 +351,10 @@ public class PartitionSenderRootExec extends BaseRootExec { updateAggregateStats(); partitioner.clear(); } + + if (closeIncoming) { + ((CloseableRecordBatch) incoming).close(); + } } public void sendEmptyBatch(boolean isLast) { |