aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java10
8 files changed, 49 insertions, 7 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
index b192850b1..503ebdded 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -449,13 +449,16 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
public void close() {
waitForSendComplete();
+ // Close the buffers before closing the operators; this is needed as buffer ownership
+ // is attached to the receive operators.
+ suppressingClose(buffers);
+
// close operator context
for (OperatorContextImpl opContext : contexts) {
suppressingClose(opContext);
}
suppressingClose(bufferManager);
- suppressingClose(buffers);
suppressingClose(allocator);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
index 0ef84b960..66a0cc2c2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
@@ -42,7 +42,7 @@ public class MergingReceiverCreator implements BatchCreator<MergingReceiverPOP>
IncomingBuffers bufHolder = context.getBuffers();
assert bufHolder != null : "IncomingBuffers must be defined for any place a receiver is declared.";
- RawBatchBuffer[] buffers = bufHolder.getBuffers(receiver.getOppositeMajorFragmentId());
+ RawBatchBuffer[] buffers = bufHolder.getCollector(receiver.getOppositeMajorFragmentId()).getBuffers();
return new MergingRecordBatch(context, receiver, buffers);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 7e5ff2126..9087757b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -136,6 +136,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
this.config = config;
this.inputCounts = new long[config.getNumSenders()];
this.outputCounts = new long[config.getNumSenders()];
+
+ // Register this operator's buffer allocator so that incoming buffers are owned by this allocator
+ context.getBuffers().getCollector(config.getOppositeMajorFragmentId()).setAllocator(oContext.getAllocator());
}
@SuppressWarnings("resource")
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 433e0c8b8..424a733bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -86,6 +86,9 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
this.stats = oContext.getStats();
this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders());
this.config = config;
+
+ // Register this operator's buffer allocator so that incoming buffers are owned by this allocator
+ context.getBuffers().getCollector(config.getOppositeMajorFragmentId()).setAllocator(oContext.getAllocator());
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
index 01a458890..3dcdfc4ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
@@ -37,7 +37,7 @@ public class UnorderedReceiverCreator implements BatchCreator<UnorderedReceiver>
IncomingBuffers bufHolder = context.getBuffers();
assert bufHolder != null : "IncomingBuffers must be defined for any place a receiver is declared.";
- RawBatchBuffer[] buffers = bufHolder.getBuffers(receiver.getOppositeMajorFragmentId());
+ RawBatchBuffer[] buffers = bufHolder.getCollector(receiver.getOppositeMajorFragmentId()).getBuffers();
assert buffers.length == 1;
RawBatchBuffer buffer = buffers[0];
return new UnorderedReceiverBatch(context, buffer, receiver);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
index b6b4183e7..bb3a5a266 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.BitControl.Collector;
import org.apache.drill.exec.record.RawFragmentBatch;
@@ -37,6 +38,8 @@ public abstract class AbstractDataCollector implements DataCollector {
private final int incomingStreams;
protected final RawBatchBuffer[] buffers;
protected final ArrayWrappedIntIntMap fragmentMap;
+ /** Allocator which owns incoming batches */
+ protected BufferAllocator ownerAllocator;
/**
* @param parentAccounter
@@ -53,6 +56,7 @@ public abstract class AbstractDataCollector implements DataCollector {
this.parentAccounter = parentAccounter;
this.remainders = new AtomicIntegerArray(incomingStreams);
this.oppositeMajorFragmentId = collector.getOppositeMajorFragmentId();
+ this.ownerAllocator = context.getAllocator();
// Create fragmentId to index that is within the range [0, incoming.size()-1]
// We use this mapping to find objects belonging to the fragment in buffers and remainders arrays.
fragmentMap = new ArrayWrappedIntIntMap();
@@ -116,4 +120,17 @@ public abstract class AbstractDataCollector implements DataCollector {
AutoCloseables.close(buffers);
}
+ /** {@inheritDoc} */
+ @Override
+ public BufferAllocator getAllocator() {
+ return this.ownerAllocator;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void setAllocator(BufferAllocator allocator) {
+ Preconditions.checkArgument(allocator != null, "buffer allocator cannot be null");
+ this.ownerAllocator = allocator;
+ }
+
} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java
index 026fc81e1..fa746770b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java
@@ -19,13 +19,25 @@ package org.apache.drill.exec.work.batch;
import java.io.IOException;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.record.RawFragmentBatch;
-interface DataCollector extends AutoCloseable {
+public interface DataCollector extends AutoCloseable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataCollector.class);
public boolean batchArrived(int minorFragmentId, RawFragmentBatch batch) throws IOException ;
public int getOppositeMajorFragmentId();
public RawBatchBuffer[] getBuffers();
public int getTotalIncomingFragments();
public void close() throws Exception;
+ /**
+ * Enables caller (e.g., receiver) to attach its buffer allocator to this Data Collector in order
+ * to claim ownership of incoming batches; by default, the fragment allocator owns these batches.
+ *
+ * @param allocator operator buffer allocator
+ */
+ void setAllocator(BufferAllocator allocator);
+ /**
+ * @return allocator
+ */
+ BufferAllocator getAllocator();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
index 876c8b5b5..2d1b4f2f0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
@@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.concurrent.AutoCloseableLock;
import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.BitControl.Collector;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
@@ -103,8 +104,11 @@ public class IncomingBuffers implements AutoCloseable {
Arrays.toString(collectorMap.values().toArray())));
}
+ // Use the Data Collector's buffer allocator if set, otherwise the fragment's one
+ BufferAllocator ownerAllocator = collector.getAllocator();
+
synchronized (collector) {
- final RawFragmentBatch newRawFragmentBatch = incomingBatch.newRawFragmentBatch(context.getAllocator());
+ final RawFragmentBatch newRawFragmentBatch = incomingBatch.newRawFragmentBatch(ownerAllocator);
boolean decrementedToZero = collector
.batchArrived(incomingBatch.getHeader().getSendingMinorFragmentId(), newRawFragmentBatch);
newRawFragmentBatch.release();
@@ -125,8 +129,8 @@ public class IncomingBuffers implements AutoCloseable {
return rem;
}
- public RawBatchBuffer[] getBuffers(int senderMajorFragmentId) {
- return collectorMap.get(senderMajorFragmentId).getBuffers();
+ public DataCollector getCollector(int senderMajorFragmentId) {
+ return collectorMap.get(senderMajorFragmentId);
}
public boolean isDone() {