aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTimothy Farkas <timothyfarkas@apache.org>2017-12-14 10:48:27 -0800
committerAman Sinha <asinha@maprtech.com>2018-01-21 08:32:38 -0800
commit81f926b5ea5c8537c7c65db397efc0fdf0a1d589 (patch)
treee02e8ee708958e7b9fd4fc251a3db6f0754d188a
parent9e944c97ee6f6c0d1705f09d531af35deed2e310 (diff)
DRILL-5967: Fixed memory leak in OrderedPartitionSender
closes apache/drill#1073
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java35
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java14
2 files changed, 44 insertions, 5 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java
index 5c953b1f3..d2e07e7b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java
@@ -29,8 +29,34 @@ import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootEx
import org.apache.drill.exec.record.RecordBatch;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+/**
+ * <br/>
+ * <h1>Known Issues:</h1>
+ * <h2>Creation of batches</h2>
+ * <p>
+ * The {@link org.apache.drill.exec.work.fragment.FragmentExecutor} is only aware of the operators in the tree that it has a reference too. In the case of the
+ * {@link OrderedPartitionSenderCreator}, an upstream {@link org.apache.drill.exec.record.RecordBatch} is wrapped in an
+ * {@link org.apache.drill.exec.physical.impl.orderedpartitioner.OrderedPartitionRecordBatch}. Since the
+ * {@link org.apache.drill.exec.physical.impl.orderedpartitioner.OrderedPartitionRecordBatch} is instantiated in the creator the
+ * {@link org.apache.drill.exec.work.fragment.FragmentExecutor} does not have a reference to it. So when the {@link org.apache.drill.exec.work.fragment.FragmentExecutor}
+ * closes the operators it closes the original operator, but not not the wrapping {@link OrderedPartitionSenderCreator}. This is an issue since the
+ * {@link OrderedPartitionSenderCreator} allocates {@link org.apache.drill.exec.record.VectorContainer}s which are consequentially never released.
+ * <br/>
+ * <ol>
+ * <li>
+ * We change the Creators in some way to communicate to the FragmentExecutor that they have wrapped an operator, so the FragmentExecutor can close the wrapped operator
+ * instead of the original operator.
+ * </li>
+ * <li>
+ * Or we take a less invasive approach and simply tell the {@link org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec} whether to close the wrapped
+ * operator.
+ * </li>
+ * </ol>
+ * <br/>
+ * For now we've taken approach 2. In the future we should we should implement approach 1.
+ * </p>
+ */
public class OrderedPartitionSenderCreator implements RootCreator<OrderedPartitionSender> {
@SuppressWarnings("resource")
@@ -39,10 +65,9 @@ public class OrderedPartitionSenderCreator implements RootCreator<OrderedPartiti
List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
- List<RecordBatch> ordered_children = Lists.newArrayList();
- ordered_children.add(new OrderedPartitionRecordBatch(config, children.iterator().next(), context));
- HashPartitionSender hpc = new HashPartitionSender(config.getOppositeMajorFragmentId(), config, config.getRef(), config.getDestinations());
- return new PartitionSenderRootExec(context, ordered_children.iterator().next(), hpc);
+ final OrderedPartitionRecordBatch recordBatch = new OrderedPartitionRecordBatch(config, children.iterator().next(), context);
+ final HashPartitionSender hpc = new HashPartitionSender(config.getOppositeMajorFragmentId(), config, config.getRef(), config.getDestinations());
+ return new PartitionSenderRootExec(context, recordBatch, hpc, true);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 7684e94de..108d539f8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -43,6 +43,7 @@ import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
@@ -72,6 +73,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
private final AtomicInteger remaingReceiverCount;
private volatile boolean done = false;
private boolean first = true;
+ private boolean closeIncoming;
long minReceiverRecordCount = Long.MAX_VALUE;
long maxReceiverRecordCount = Long.MIN_VALUE;
@@ -99,9 +101,17 @@ public class PartitionSenderRootExec extends BaseRootExec {
public PartitionSenderRootExec(FragmentContext context,
RecordBatch incoming,
HashPartitionSender operator) throws OutOfMemoryException {
+ this(context, incoming, operator, false);
+ }
+
+ public PartitionSenderRootExec(FragmentContext context,
+ RecordBatch incoming,
+ HashPartitionSender operator,
+ boolean closeIncoming) throws OutOfMemoryException {
super(context, context.newOperatorContext(operator, null), operator);
this.incoming = incoming;
this.operator = operator;
+ this.closeIncoming = closeIncoming;
this.context = context;
outGoingBatchCount = operator.getDestinations().size();
popConfig = operator;
@@ -341,6 +351,10 @@ public class PartitionSenderRootExec extends BaseRootExec {
updateAggregateStats();
partitioner.clear();
}
+
+ if (closeIncoming) {
+ ((CloseableRecordBatch) incoming).close();
+ }
}
public void sendEmptyBatch(boolean isLast) {