diff options
8 files changed, 49 insertions, 7 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java index b192850b1..503ebdded 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java @@ -449,13 +449,16 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor public void close() { waitForSendComplete(); + // Close the buffers before closing the operators; this is needed as buffer ownership + // is attached to the receive operators. + suppressingClose(buffers); + // close operator context for (OperatorContextImpl opContext : contexts) { suppressingClose(opContext); } suppressingClose(bufferManager); - suppressingClose(buffers); suppressingClose(allocator); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java index 0ef84b960..66a0cc2c2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java @@ -42,7 +42,7 @@ public class MergingReceiverCreator implements BatchCreator<MergingReceiverPOP> IncomingBuffers bufHolder = context.getBuffers(); assert bufHolder != null : "IncomingBuffers must be defined for any place a receiver is declared."; - RawBatchBuffer[] buffers = bufHolder.getBuffers(receiver.getOppositeMajorFragmentId()); + RawBatchBuffer[] buffers = bufHolder.getCollector(receiver.getOppositeMajorFragmentId()).getBuffers(); return new MergingRecordBatch(context, receiver, buffers); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index 7e5ff2126..9087757b0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -136,6 +136,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> this.config = config; this.inputCounts = new long[config.getNumSenders()]; this.outputCounts = new long[config.getNumSenders()]; + + // Register this operator's buffer allocator so that incoming buffers are owned by this allocator + context.getBuffers().getCollector(config.getOppositeMajorFragmentId()).setAllocator(oContext.getAllocator()); } @SuppressWarnings("resource") diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java index 433e0c8b8..424a733bb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java @@ -86,6 +86,9 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch { this.stats = oContext.getStats(); this.stats.setLongStat(Metric.NUM_SENDERS, config.getNumSenders()); this.config = config; + + // Register this operator's buffer allocator so that incoming buffers are owned by this allocator + context.getBuffers().getCollector(config.getOppositeMajorFragmentId()).setAllocator(oContext.getAllocator()); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java index 01a458890..3dcdfc4ef 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java @@ -37,7 +37,7 @@ public class UnorderedReceiverCreator implements BatchCreator<UnorderedReceiver> IncomingBuffers bufHolder = context.getBuffers(); assert bufHolder != null : "IncomingBuffers must be defined for any place a receiver is declared."; - RawBatchBuffer[] buffers = bufHolder.getBuffers(receiver.getOppositeMajorFragmentId()); + RawBatchBuffer[] buffers = bufHolder.getCollector(receiver.getOppositeMajorFragmentId()).getBuffers(); assert buffers.length == 1; RawBatchBuffer buffer = buffers[0]; return new UnorderedReceiverBatch(context, buffer, receiver); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java index b6b4183e7..bb3a5a266 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerArray; import org.apache.drill.common.AutoCloseables; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.proto.BitControl.Collector; import org.apache.drill.exec.record.RawFragmentBatch; @@ -37,6 +38,8 @@ public abstract class AbstractDataCollector implements DataCollector { private final int incomingStreams; protected final RawBatchBuffer[] buffers; protected final ArrayWrappedIntIntMap fragmentMap; + /** Allocator which owns incoming batches */ + protected BufferAllocator ownerAllocator; /** * @param parentAccounter @@ -53,6 +56,7 @@ public abstract class AbstractDataCollector implements DataCollector { this.parentAccounter = parentAccounter; this.remainders = new AtomicIntegerArray(incomingStreams); this.oppositeMajorFragmentId = collector.getOppositeMajorFragmentId(); + this.ownerAllocator = context.getAllocator(); // Create fragmentId to index that is within the range [0, incoming.size()-1] // We use this mapping to find objects belonging to the fragment in buffers and remainders arrays. fragmentMap = new ArrayWrappedIntIntMap(); @@ -116,4 +120,17 @@ public abstract class AbstractDataCollector implements DataCollector { AutoCloseables.close(buffers); } + /** {@inheritDoc} */ + @Override + public BufferAllocator getAllocator() { + return this.ownerAllocator; + } + + /** {@inheritDoc} */ + @Override + public void setAllocator(BufferAllocator allocator) { + Preconditions.checkArgument(allocator != null, "buffer allocator cannot be null"); + this.ownerAllocator = allocator; + } + }
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java index 026fc81e1..fa746770b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/DataCollector.java @@ -19,13 +19,25 @@ package org.apache.drill.exec.work.batch; import java.io.IOException; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.record.RawFragmentBatch; -interface DataCollector extends AutoCloseable { +public interface DataCollector extends AutoCloseable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataCollector.class); public boolean batchArrived(int minorFragmentId, RawFragmentBatch batch) throws IOException ; public int getOppositeMajorFragmentId(); public RawBatchBuffer[] getBuffers(); public int getTotalIncomingFragments(); public void close() throws Exception; + /** + * Enables caller (e.g., receiver) to attach its buffer allocator to this Data Collector in order + * to claim ownership of incoming batches; by default, the fragment allocator owns these batches. + * + * @param allocator operator buffer allocator + */ + void setAllocator(BufferAllocator allocator); + /** + * @return allocator + */ + BufferAllocator getAllocator(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java index 876c8b5b5..2d1b4f2f0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java @@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.concurrent.AutoCloseableLock; import org.apache.drill.exec.exception.FragmentSetupException; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.proto.BitControl.Collector; import org.apache.drill.exec.proto.BitControl.PlanFragment; @@ -103,8 +104,11 @@ public class IncomingBuffers implements AutoCloseable { Arrays.toString(collectorMap.values().toArray()))); } + // Use the Data Collector's buffer allocator if set, otherwise the fragment's one + BufferAllocator ownerAllocator = collector.getAllocator(); + synchronized (collector) { - final RawFragmentBatch newRawFragmentBatch = incomingBatch.newRawFragmentBatch(context.getAllocator()); + final RawFragmentBatch newRawFragmentBatch = incomingBatch.newRawFragmentBatch(ownerAllocator); boolean decrementedToZero = collector .batchArrived(incomingBatch.getHeader().getSendingMinorFragmentId(), newRawFragmentBatch); newRawFragmentBatch.release(); @@ -125,8 +129,8 @@ public class IncomingBuffers implements AutoCloseable { return rem; } - public RawBatchBuffer[] getBuffers(int senderMajorFragmentId) { - return collectorMap.get(senderMajorFragmentId).getBuffers(); + public DataCollector getCollector(int senderMajorFragmentId) { + return collectorMap.get(senderMajorFragmentId); } public boolean isDone() { |