diff options
author | Steven Phillips <smp@apache.org> | 2015-12-01 00:34:41 -0800 |
---|---|---|
committer | Steven Phillips <smp@apache.org> | 2015-12-28 15:43:07 -0800 |
commit | 6dea429949a3d6a68aefbdb3d78de41e0955239b (patch) | |
tree | fdb0c3d018163be7a629363efafa9c83b71ae68d /exec | |
parent | de008810c815e46e6f6e5d13ad0b9a23e705b13a (diff) |
DRILL-4215: Transfer buffer ownership in TransferPair
Diffstat (limited to 'exec')
45 files changed, 159 insertions, 115 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java index 3b90979da..60355fb6b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java @@ -107,7 +107,7 @@ public class ScreenCreator implements RootCreator<Screen> { return false; case OK_NEW_SCHEMA: - materializer = new VectorRecordMaterializer(context, incoming); + materializer = new VectorRecordMaterializer(context, oContext, incoming); //$FALL-THROUGH$ case OK: injector.injectPause(context.getExecutionControls(), "sending-data", logger); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java index 23e97d04c..2f33193a3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java @@ -120,7 +120,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ final FragmentWritableBatch batch = new FragmentWritableBatch( false, handle.getQueryId(), handle.getMajorFragmentId(), handle.getMinorFragmentId(), recMajor, oppositeHandle.getMinorFragmentId(), - incoming.getWritableBatch()); + incoming.getWritableBatch().transfer(oContext.getAllocator())); updateStats(batch); stats.startWait(); try { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index c287bc312..a6c3269df 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -229,9 +229,9 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { batchCount++; RecordBatchData batch; if (schemaChanged) { - batch = new RecordBatchData(SchemaUtil.coerceContainer(incoming, this.schema, oContext)); + batch = new RecordBatchData(SchemaUtil.coerceContainer(incoming, this.schema, oContext), oContext.getAllocator()); } else { - batch = new RecordBatchData(incoming); + batch = new RecordBatchData(incoming, oContext.getAllocator()); } boolean success = false; try { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java index dae9eaedb..9e96727f6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.aggregate; import java.util.Iterator; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorContainer; @@ -34,11 +35,11 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{ private final SelectionVector2 sv2; private final SelectionVector4 sv4; - public InternalBatch(RecordBatch incoming) { - this(incoming, null); + public InternalBatch(RecordBatch incoming, OperatorContext oContext) { + this(incoming, null, oContext); } - public InternalBatch(RecordBatch incoming, VectorWrapper[] ignoreWrappers){ + public InternalBatch(RecordBatch incoming, VectorWrapper[] ignoreWrappers, OperatorContext oContext){ switch(incoming.getSchema().getSelectionVectorMode()){ case FOUR_BYTE: this.sv4 = incoming.getSelectionVector4().createNewWrapperCurrent(); @@ -53,7 +54,7 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{ this.sv2 = null; } this.schema = incoming.getSchema(); - this.container = VectorContainer.getTransferClone(incoming, ignoreWrappers); + this.container = VectorContainer.getTransferClone(incoming, ignoreWrappers, oContext); } public BatchSchema getSchema() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index ee9a0ab34..c084e39ef 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -309,7 +309,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { container.buildSchema(SelectionVectorMode.NONE); StreamingAggregator agg = context.getImplementationClass(cg); - agg.setup(context, incoming, this); + agg.setup(oContext, incoming, this); return agg; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java index 4932b0fad..82e877779 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java @@ -21,6 +21,7 @@ import javax.inject.Named; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.VectorWrapper; @@ -41,12 +42,12 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { private int outputCount = 0; private RecordBatch incoming; private StreamingAggBatch outgoing; - private FragmentContext context; private boolean done = false; + private OperatorContext context; @Override - public void setup(FragmentContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException { + public void setup(OperatorContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException { this.context = context; this.incoming = incoming; this.outgoing = outgoing; @@ -164,7 +165,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator { previousIndex = currentIndex; } - InternalBatch previous = new InternalBatch(incoming); + InternalBatch previous = new InternalBatch(incoming, context); try { while (true) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java index 96da00b46..61c82d8e2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.aggregate; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; @@ -31,7 +32,7 @@ public interface StreamingAggregator { RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR; } - public abstract void setup(FragmentContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException; + public abstract void setup(OperatorContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException; public abstract IterOutcome getOutcome(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java index c88c72d5c..80d774483 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java @@ -117,7 +117,7 @@ public class BroadcastSenderRootExec extends BaseRootExec { case OK_NEW_SCHEMA: case OK: - WritableBatch writableBatch = incoming.getWritableBatch(); + WritableBatch writableBatch = incoming.getWritableBatch().transfer(oContext.getAllocator()); if (tunnels.length > 1) { writableBatch.retainBuffers(tunnels.length - 1); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java index e435e79b3..c0e894494 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java @@ -153,7 +153,7 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ for (final VectorWrapper<?> vw : incoming) { for (final ValueVector vv : vw.getValueVectors()) { - final TransferPair pair = vv.getTransferPair(); + final TransferPair pair = vv.getTransferPair(oContext.getAllocator()); container.add(pair.getTo()); transfers.add(pair); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java index c3b3f4593..dcaa2440e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java @@ -265,12 +265,12 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { TransferPair tp = null; if (flattenField instanceof RepeatedMapVector) { - tp = ((RepeatedMapVector)flattenField).getTransferPairToSingleMap(reference); + tp = ((RepeatedMapVector)flattenField).getTransferPairToSingleMap(reference, oContext.getAllocator()); } else { final ValueVector vvIn = RepeatedValueVector.class.cast(flattenField).getDataVector(); // vvIn may be null because of fast schema return for repeated list vectors if (vvIn != null) { - tp = vvIn.getTransferPair(reference); + tp = vvIn.getTransferPair(reference, oContext.getAllocator()); } } return tp; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 2d0bd43fb..3ea97c658 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -380,7 +380,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { * to the hyper vector container. Will be used when we want to retrieve * records that have matching keys on the probe side. */ - final RecordBatchData nextBatch = new RecordBatchData(right); + final RecordBatchData nextBatch = new RecordBatchData(right, oContext.getAllocator()); boolean success = false; try { if (hyperContainer == null) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java index b390b8fb3..f0e53e135 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java @@ -340,7 +340,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP> } private void addBatchToHyperContainer(RecordBatch inputBatch) { - final RecordBatchData batchCopy = new RecordBatchData(inputBatch); + final RecordBatchData batchCopy = new RecordBatchData(inputBatch, oContext.getAllocator()); boolean success = false; try { rightCounts.addLast(inputBatch.getRecordCount()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java index 3933ddd62..ba8df9260 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java @@ -17,7 +17,9 @@ */ package org.apache.drill.exec.physical.impl.materialize; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryData; import org.apache.drill.exec.record.BatchSchema; @@ -29,10 +31,12 @@ public class VectorRecordMaterializer implements RecordMaterializer{ private QueryId queryId; private RecordBatch batch; + private BufferAllocator allocator; - public VectorRecordMaterializer(FragmentContext context, RecordBatch batch) { + public VectorRecordMaterializer(FragmentContext context, OperatorContext oContext, RecordBatch batch) { this.queryId = context.getHandle().getQueryId(); this.batch = batch; + this.allocator = oContext.getAllocator(); BatchSchema schema = batch.getSchema(); assert schema != null : "Schema must be defined."; @@ -43,7 +47,7 @@ public class VectorRecordMaterializer implements RecordMaterializer{ public QueryWritableBatch convertNext() { //batch.getWritableBatch().getDef().getRecordCount() - WritableBatch w = batch.getWritableBatch(); + WritableBatch w = batch.getWritableBatch().transfer(allocator); QueryData header = QueryData.newBuilder() // .setQueryId(queryId) // diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java index c9483ae75..64cfad01c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java @@ -590,7 +590,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart OrderedPartitionProjector.TEMPLATE_DEFINITION, context.getFunctionRegistry()); for (VectorWrapper<?> vw : batch) { - TransferPair tp = vw.getValueVector().getTransferPair(); + TransferPair tp = vw.getValueVector().getTransferPair(oContext.getAllocator()); transfers.add(tp); container.add(tp.getTo()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java index e3033b43c..38d08b6f8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java @@ -141,7 +141,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { return; case OK_NEW_SCHEMA: case OK: - wrapper = RecordBatchDataWrapper.batch(new RecordBatchData(incoming)); + wrapper = RecordBatchDataWrapper.batch(new RecordBatchData(incoming, oContext.getAllocator())); queue.put(wrapper); wrapper = null; break; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java index af774db23..0cd55eb18 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.sort; import java.util.List; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; @@ -40,7 +41,7 @@ public class RecordBatchData { private int recordCount; VectorContainer container = new VectorContainer(); - public RecordBatchData(VectorAccessible batch) { + public RecordBatchData(VectorAccessible batch, BufferAllocator allocator) { List<ValueVector> vectors = Lists.newArrayList(); recordCount = batch.getRecordCount(); @@ -54,7 +55,7 @@ public class RecordBatchData { if (v.isHyper()) { throw new UnsupportedOperationException("Record batch data can't be created based on a hyper batch."); } - TransferPair tp = v.getValueVector().getTransferPair(); + TransferPair tp = v.getValueVector().getTransferPair(allocator); tp.transfer(); vectors.add(tp.getTo()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java index f2302ce88..33338ddd8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java @@ -91,7 +91,7 @@ public class SortRecordBatchBuilder implements AutoCloseable { } - RecordBatchData bd = new RecordBatchData(batch); + RecordBatchData bd = new RecordBatchData(batch, allocator); runningBatches++; batches.put(batch.getSchema(), bd); recordCount += bd.getRecordCount(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java index ff83cc934..209624bbe 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java @@ -140,7 +140,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> { /* Add all the value vectors in the container */ for (VectorWrapper<?> vv : incoming) { - TransferPair tp = vv.getValueVector().getTransferPair(); + TransferPair tp = vv.getValueVector().getTransferPair(oContext.getAllocator()); container.add(tp.getTo()); } container.buildSchema(incoming.getSchema().getSelectionVectorMode()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java index b2befa330..7abc03cee 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java @@ -47,7 +47,7 @@ public class WindowDataBatch implements VectorAccessible { if (v.isHyper()) { throw new UnsupportedOperationException("Record batch data can't be created based on a hyper batch."); } - TransferPair tp = v.getValueVector().getTransferPair(); + TransferPair tp = v.getValueVector().getTransferPair(oContext.getAllocator()); tp.transfer(); vectors.add(tp.getTo()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index 4dbd92dba..6e79f0182 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -366,7 +366,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { totalCount += count; sorter.setup(context, sv2, convertedBatch); sorter.sort(sv2); - RecordBatchData rbd = new RecordBatchData(convertedBatch); + RecordBatchData rbd = new RecordBatchData(convertedBatch, oContext.getAllocator()); boolean success = false; try { rbd.setSv2(sv2); @@ -446,7 +446,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { builder = new SortRecordBatchBuilder(oContext.getAllocator()); for (BatchGroup group : batchGroups) { - RecordBatchData rbd = new RecordBatchData(group.getContainer()); + RecordBatchData rbd = new RecordBatchData(group.getContainer(), oContext.getAllocator()); rbd.setSv2(group.getSv2()); builder.add(rbd); } @@ -562,7 +562,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { // 1 output container is kept in memory, so we want to hold on to it and transferClone // allows keeping ownership - VectorContainer c1 = VectorContainer.getTransferClone(outputContainer); + VectorContainer c1 = VectorContainer.getTransferClone(outputContainer, oContext); c1.buildSchema(BatchSchema.SelectionVectorMode.NONE); c1.setRecordCount(count); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java index 7fc7960a1..322339e6d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java @@ -21,6 +21,7 @@ import java.util.AbstractMap; import org.apache.commons.lang3.ArrayUtils; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; import org.apache.drill.exec.vector.complex.AbstractMapVector; @@ -116,7 +117,7 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper< @Override @SuppressWarnings("unchecked") - public VectorWrapper<T> cloneAndTransfer() { + public VectorWrapper<T> cloneAndTransfer(BufferAllocator allocator) { return new HyperVectorWrapper<T>(f, vectors, false); // T[] newVectors = (T[]) Array.newInstance(vectors.getClass().getComponentType(), vectors.length); // for(int i =0; i < newVectors.length; i++) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java index 77cb9a198..af0a75304 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java @@ -53,6 +53,7 @@ public class RecordIterator implements VectorAccessible { private int inputIndex; // For two way merge join 0:left, 1:right private boolean lastBatchRead; // True if all batches are consumed. private boolean initialized; + private OperatorContext oContext; private final VectorContainer container; // Holds VectorContainer of current record batch private final TreeRangeMap<Long, RecordBatchData> batches = TreeRangeMap.create(); @@ -66,6 +67,7 @@ public class RecordIterator implements VectorAccessible { this.inputIndex = inputIndex; this.lastBatchRead = false; this.container = new VectorContainer(oContext); + this.oContext = oContext; resetIndices(); this.initialized = false; } @@ -181,7 +183,7 @@ public class RecordIterator implements VectorAccessible { nextOuterPosition = 0; } // Transfer vectors from incoming record batch. - final RecordBatchData rbd = new RecordBatchData(incoming); + final RecordBatchData rbd = new RecordBatchData(incoming, oContext.getAllocator()); innerRecordCount = incoming.getRecordCount(); if (!initialized) { for (VectorWrapper<?> w : rbd.getContainer()) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java index 8cf90ab0c..48f0a3623 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java @@ -100,7 +100,7 @@ public class SchemaUtil { int recordCount, OperatorContext context) { if (v != null) { int valueCount = v.getAccessor().getValueCount(); - TransferPair tp = v.getTransferPair(); + TransferPair tp = v.getTransferPair(context.getAllocator()); tp.transfer(); if (v.getField().getType().getMinorType().equals(field.getType().getMinorType())) { if (field.getType().getMinorType() == MinorType.UNION) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java index f1b60d498..1e8a52ff7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java @@ -26,6 +26,7 @@ import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MajorTypeOrBuilder; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; import org.apache.drill.exec.vector.complex.AbstractMapVector; @@ -74,8 +75,8 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper @SuppressWarnings("unchecked") @Override - public VectorWrapper<T> cloneAndTransfer() { - TransferPair tp = vector.getTransferPair(); + public VectorWrapper<T> cloneAndTransfer(BufferAllocator allocator) { + TransferPair tp = vector.getTransferPair(allocator); tp.transfer(); return new SimpleVectorWrapper<T>((T) tp.getTo()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index c48365020..33351ba9c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -28,6 +28,7 @@ import java.util.Set; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.vector.SchemaChangeCallBack; @@ -148,15 +149,15 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess * The RecordBatch iterator the contains the batch we should take over. * @return A cloned vector container. */ - public static VectorContainer getTransferClone(VectorAccessible incoming) { - VectorContainer vc = new VectorContainer(); + public static VectorContainer getTransferClone(VectorAccessible incoming, OperatorContext oContext) { + VectorContainer vc = new VectorContainer(oContext); for (VectorWrapper<?> w : incoming) { vc.cloneAndTransfer(w); } return vc; } - public static VectorContainer getTransferClone(VectorAccessible incoming, VectorWrapper[] ignoreWrappers) { + public static VectorContainer getTransferClone(VectorAccessible incoming, VectorWrapper[] ignoreWrappers, OperatorContext oContext) { Iterable<VectorWrapper<?>> wrappers = incoming; if (ignoreWrappers != null) { final List<VectorWrapper> ignored = Lists.newArrayList(ignoreWrappers); @@ -165,7 +166,7 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess wrappers = resultant; } - final VectorContainer vc = new VectorContainer(); + final VectorContainer vc = new VectorContainer(oContext); for (VectorWrapper<?> w : wrappers) { vc.cloneAndTransfer(w); } @@ -198,7 +199,7 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess } private void cloneAndTransfer(VectorWrapper<?> wrapper) { - wrappers.add(wrapper.cloneAndTransfer()); + wrappers.add(wrapper.cloneAndTransfer(oContext.getAllocator())); } public void addCollection(Iterable<ValueVector> vectors) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java index 5250f98a9..65ea4570e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.record; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.vector.ValueVector; @@ -31,7 +32,7 @@ public interface VectorWrapper<T extends ValueVector> { public T[] getValueVectors(); public boolean isHyper(); public void clear(); - public VectorWrapper<T> cloneAndTransfer(); + public VectorWrapper<T> cloneAndTransfer(BufferAllocator allocator); public VectorWrapper<?> getChildWrapper(int[] ids); public void transfer(VectorWrapper<?> destination); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java index d39ce5e89..bcec920b7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java @@ -51,6 +51,18 @@ public class WritableBatch implements AutoCloseable { this.buffers = buffers; } + public WritableBatch transfer(BufferAllocator allocator) { + List<DrillBuf> newBuffers = Lists.newArrayList(); + for (DrillBuf buf : buffers) { + int writerIndex = buf.writerIndex(); + DrillBuf newBuf = buf.transferOwnership(allocator).buffer; + newBuf.writerIndex(writerIndex); + newBuffers.add(newBuf); + } + clear(); + return new WritableBatch(def, newBuffers); + } + public RecordBatchDef getDef() { return def; } diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java index f7843f5ae..66b75710e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java @@ -46,7 +46,7 @@ import static org.junit.Assert.assertNull; * any particular order of execution. We ignore the results. */ public class TestTpchDistributedConcurrent extends BaseTestQuery { - @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(120000); // Longer timeout than usual. + @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(140000); // Longer timeout than usual. /* * Valid test names taken from TestTpchDistributed. Fuller path prefixes are diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java index b2054e61a..6ac8e976e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java @@ -51,7 +51,7 @@ public class TestSplitAndTransfer { } mutator.setValueCount(valueCount); - final TransferPair tp = varCharVector.getTransferPair(); + final TransferPair tp = varCharVector.getTransferPair(allocator); final NullableVarCharVector newVarCharVector = (NullableVarCharVector) tp.getTo(); final Accessor accessor = newVarCharVector.getAccessor(); final int[][] startLengths = {{0, 201}, {201, 200}, {401, 99}}; diff --git a/exec/vector/src/main/codegen/templates/FixedValueVectors.java b/exec/vector/src/main/codegen/templates/FixedValueVectors.java index f67614a81..8e77dcc5e 100644 --- a/exec/vector/src/main/codegen/templates/FixedValueVectors.java +++ b/exec/vector/src/main/codegen/templates/FixedValueVectors.java @@ -198,13 +198,13 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F data.writerIndex(actualLength); } - public TransferPair getTransferPair(){ - return new TransferImpl(getField()); + public TransferPair getTransferPair(BufferAllocator allocator){ + return new TransferImpl(getField(), allocator); } @Override - public TransferPair getTransferPair(FieldReference ref){ - return new TransferImpl(getField().withPath(ref)); + public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator){ + return new TransferImpl(getField().withPath(ref), allocator); } @Override @@ -214,8 +214,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F public void transferTo(${minor.class}Vector target){ target.clear(); - target.data = data; - target.data.retain(1); + target.data = data.transferOwnership(target.allocator).buffer; target.data.writerIndex(data.writerIndex()); clear(); } @@ -224,15 +223,14 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F final int startPoint = startIndex * ${type.width}; final int sliceLength = length * ${type.width}; target.clear(); - target.data = data.slice(startPoint, sliceLength); - target.data.retain(1); + target.data = data.slice(startPoint, sliceLength).transferOwnership(target.allocator).buffer; target.data.writerIndex(sliceLength); } private class TransferImpl implements TransferPair{ private ${minor.class}Vector to; - public TransferImpl(MaterializedField field){ + public TransferImpl(MaterializedField field, BufferAllocator allocator){ to = new ${minor.class}Vector(field, allocator); } diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java index 13bdd4f97..d2c17ffde 100644 --- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java +++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java @@ -243,13 +243,13 @@ public final class ${className} extends BaseDataValueVector implements <#if type } @Override - public TransferPair getTransferPair(){ - return new TransferImpl(getField()); + public TransferPair getTransferPair(BufferAllocator allocator){ + return new TransferImpl(getField(), allocator); } @Override - public TransferPair getTransferPair(FieldReference ref){ - return new TransferImpl(getField().withPath(ref)); + public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator){ + return new TransferImpl(getField().withPath(ref), allocator); } @Override @@ -277,7 +277,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type private class TransferImpl implements TransferPair { Nullable${minor.class}Vector to; - public TransferImpl(MaterializedField field){ + public TransferImpl(MaterializedField field, BufferAllocator allocator){ to = new Nullable${minor.class}Vector(field, allocator); } diff --git a/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java b/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java index 21f56162f..ca39d7130 100644 --- a/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java +++ b/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java @@ -79,13 +79,13 @@ public final class Repeated${minor.class}Vector extends BaseRepeatedValueVector } @Override - public TransferPair getTransferPair() { - return new TransferImpl(getField()); + public TransferPair getTransferPair(BufferAllocator allocator) { + return new TransferImpl(getField(), allocator); } @Override - public TransferPair getTransferPair(FieldReference ref){ - return new TransferImpl(getField().withPath(ref)); + public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator){ + return new TransferImpl(getField().withPath(ref), allocator); } @Override @@ -131,7 +131,7 @@ public final class Repeated${minor.class}Vector extends BaseRepeatedValueVector private class TransferImpl implements TransferPair { final Repeated${minor.class}Vector to; - public TransferImpl(MaterializedField field) { + public TransferImpl(MaterializedField field, BufferAllocator allocator) { this.to = new Repeated${minor.class}Vector(field, allocator); } diff --git a/exec/vector/src/main/codegen/templates/UnionVector.java b/exec/vector/src/main/codegen/templates/UnionVector.java index cc541e599..2e278b14c 100644 --- a/exec/vector/src/main/codegen/templates/UnionVector.java +++ b/exec/vector/src/main/codegen/templates/UnionVector.java @@ -80,6 +80,10 @@ public class UnionVector implements ValueVector { this.callBack = callBack; } + public BufferAllocator getAllocator() { + return allocator; + } + public List<MinorType> getSubTypes() { return majorType.getSubTypeList(); } @@ -198,13 +202,13 @@ public class UnionVector implements ValueVector { } @Override - public TransferPair getTransferPair() { - return new TransferImpl(field); + public TransferPair getTransferPair(BufferAllocator allocator) { + return new TransferImpl(field, allocator); } @Override - public TransferPair getTransferPair(FieldReference ref) { - return new TransferImpl(field.withPath(ref)); + public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) { + return new TransferImpl(field.withPath(ref), allocator); } @Override @@ -242,7 +246,7 @@ public class UnionVector implements ValueVector { UnionVector to; - public TransferImpl(MaterializedField field) { + public TransferImpl(MaterializedField field, BufferAllocator allocator) { to = new UnionVector(field, allocator, null); } diff --git a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java index f7347343d..56d2d527b 100644 --- a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java +++ b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java @@ -174,13 +174,13 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V } @Override - public TransferPair getTransferPair(){ - return new TransferImpl(getField()); + public TransferPair getTransferPair(BufferAllocator allocator){ + return new TransferImpl(getField(), allocator); } @Override - public TransferPair getTransferPair(FieldReference ref){ - return new TransferImpl(getField().withPath(ref)); + public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator){ + return new TransferImpl(getField().withPath(ref), allocator); } @Override @@ -191,8 +191,8 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V public void transferTo(${minor.class}Vector target){ target.clear(); this.offsetVector.transferTo(target.offsetVector); - target.data = data; - target.data.retain(1); + target.data = data.transferOwnership(target.allocator).buffer; + target.data.writerIndex(data.writerIndex()); clear(); } @@ -207,8 +207,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V for (int i = 0; i < length + 1; i++) { targetOffsetVectorMutator.set(i, offsetVectorAccessor.get(startIndex + i) - startPoint); } - target.data = data.slice(startPoint, sliceLength); - target.data.retain(1); + target.data = data.slice(startPoint, sliceLength).transferOwnership(target.allocator).buffer; target.getMutator().setValueCount(length); } @@ -242,7 +241,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V private class TransferImpl implements TransferPair{ ${minor.class}Vector to; - public TransferImpl(MaterializedField field){ + public TransferImpl(MaterializedField field, BufferAllocator allocator){ to = new ${minor.class}Vector(field, allocator); } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java index eb5dbcd9d..23ad77888 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java @@ -71,8 +71,8 @@ public abstract class BaseValueVector implements ValueVector { } @Override - public TransferPair getTransferPair() { - return getTransferPair(new FieldReference(getField().getPath())); + public TransferPair getTransferPair(BufferAllocator allocator) { + return getTransferPair(new FieldReference(getField().getPath()), allocator); } @Override @@ -119,5 +119,10 @@ public abstract class BaseValueVector implements ValueVector { return true; } + + @Override + public BufferAllocator getAllocator() { + return allocator; + } } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java index c1504c60a..3ba11e2b9 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java @@ -208,13 +208,13 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe } @Override - public TransferPair getTransferPair() { - return new TransferImpl(getField()); + public TransferPair getTransferPair(BufferAllocator allocator) { + return new TransferImpl(getField(), allocator); } @Override - public TransferPair getTransferPair(FieldReference ref) { - return new TransferImpl(getField().withPath(ref)); + public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) { + return new TransferImpl(getField().withPath(ref), allocator); } @Override @@ -273,7 +273,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe private class TransferImpl implements TransferPair { BitVector to; - public TransferImpl(MaterializedField field) { + public TransferImpl(MaterializedField field, BufferAllocator allocator) { this.to = new BitVector(field, allocator); } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java index 9ca4410d0..494f234bb 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java @@ -147,7 +147,7 @@ public class ObjectVector extends BaseValueVector { } @Override - public TransferPair getTransferPair() { + public TransferPair getTransferPair(BufferAllocator allocator) { throw new UnsupportedOperationException("ObjectVector does not support this"); } @@ -157,7 +157,7 @@ public class ObjectVector extends BaseValueVector { } @Override - public TransferPair getTransferPair(FieldReference ref) { + public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) { throw new UnsupportedOperationException("ObjectVector does not support this"); } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java index b39fcfe16..a4a071ee8 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java @@ -23,6 +23,7 @@ import io.netty.buffer.DrillBuf; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.UserBitShared.SerializedField; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.TransferPair; @@ -68,6 +69,8 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> { */ boolean allocateNewSafe(); + BufferAllocator getAllocator(); + /** * Set the initial record capacity * @param numRecords @@ -99,9 +102,9 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> { * Returns a {@link org.apache.drill.exec.record.TransferPair transfer pair}, creating a new target vector of * the same type. */ - TransferPair getTransferPair(); + TransferPair getTransferPair(BufferAllocator allocator); - TransferPair getTransferPair(FieldReference ref); + TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator); /** * Returns a new {@link org.apache.drill.exec.record.TransferPair transfer pair} that is used to transfer underlying diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java index c5326f663..165fc1408 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java @@ -94,7 +94,7 @@ public class ZeroVector implements ValueVector { } @Override - public TransferPair getTransferPair() { + public TransferPair getTransferPair(BufferAllocator allocator) { return defaultPair; } @@ -138,6 +138,11 @@ public class ZeroVector implements ValueVector { } @Override + public BufferAllocator getAllocator() { + throw new UnsupportedOperationException("Tried to get allocator from ZeroVector"); + } + + @Override public void setInitialCapacity(int numRecords) { } @Override @@ -146,7 +151,7 @@ public class ZeroVector implements ValueVector { } @Override - public TransferPair getTransferPair(FieldReference ref) { + public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) { return defaultPair; } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java index caedb967c..0ac2417bd 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java @@ -62,6 +62,10 @@ public abstract class AbstractContainerVector implements ValueVector { } } + public BufferAllocator getAllocator() { + return allocator; + } + /** * Returns the field definition of this instance. */ diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java index b780d1ac4..10975f548 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java @@ -103,8 +103,8 @@ public class ListVector extends BaseRepeatedValueVector { } @Override - public TransferPair getTransferPair(FieldReference ref) { - return new TransferImpl(field.withPath(ref)); + public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) { + return new TransferImpl(field.withPath(ref), allocator); } @Override @@ -116,7 +116,7 @@ public class ListVector extends BaseRepeatedValueVector { ListVector to; - public TransferImpl(MaterializedField field) { + public TransferImpl(MaterializedField field, BufferAllocator allocator) { to = new ListVector(field, allocator, null); to.addOrGetVector(new VectorDescriptor(vector.getField().getType())); } diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java index 60d74c1fe..6784ed45e 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java @@ -143,8 +143,8 @@ public class MapVector extends AbstractMapVector { } @Override - public TransferPair getTransferPair() { - return new MapTransferPair(this, getField().getPath()); + public TransferPair getTransferPair(BufferAllocator allocator) { + return new MapTransferPair(this, getField().getPath(), allocator); } @Override @@ -153,8 +153,8 @@ public class MapVector extends AbstractMapVector { } @Override - public TransferPair getTransferPair(FieldReference ref) { - return new MapTransferPair(this, ref); + public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) { + return new MapTransferPair(this, ref, allocator); } protected static class MapTransferPair implements TransferPair{ @@ -162,8 +162,8 @@ public class MapVector extends AbstractMapVector { private final MapVector from; private final MapVector to; - public MapTransferPair(MapVector from, SchemaPath path) { - this(from, new MapVector(MaterializedField.create(path, TYPE), from.allocator, from.callBack), false); + public MapTransferPair(MapVector from, SchemaPath path, BufferAllocator allocator) { + this(from, new MapVector(MaterializedField.create(path, TYPE), allocator, from.callBack), false); } public MapTransferPair(MapVector from, MapVector to) { diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java index cbc61f8d2..4706999fc 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java @@ -189,7 +189,7 @@ public class RepeatedListVector extends AbstractContainerVector } @Override - public TransferPair getTransferPair(FieldReference ref) { + public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) { return makeTransferPair(new DelegateRepeatedVector(ref, allocator)); } @@ -344,13 +344,13 @@ public class RepeatedListVector extends AbstractContainerVector } @Override - public TransferPair getTransferPair() { - return new RepeatedListTransferPair(delegate.getTransferPair()); + public TransferPair getTransferPair(BufferAllocator allocator) { + return new RepeatedListTransferPair(delegate.getTransferPair(allocator)); } @Override - public TransferPair getTransferPair(FieldReference ref) { - return new RepeatedListTransferPair(delegate.getTransferPair(ref)); + public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) { + return new RepeatedListTransferPair(delegate.getTransferPair(ref, allocator)); } @Override diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java index cb597beb3..b13de9d78 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java @@ -160,8 +160,8 @@ public class RepeatedMapVector extends AbstractMapVector } @Override - public TransferPair getTransferPair() { - return new RepeatedMapTransferPair(this, getField().getPath()); + public TransferPair getTransferPair(BufferAllocator allocator) { + return new RepeatedMapTransferPair(this, getField().getPath(), allocator); } @Override @@ -224,13 +224,13 @@ public class RepeatedMapVector extends AbstractMapVector return super.getFieldIdIfMatches(builder, addToBreadCrumb, seg); } - public TransferPair getTransferPairToSingleMap(FieldReference reference) { - return new SingleMapTransferPair(this, reference); + public TransferPair getTransferPairToSingleMap(FieldReference reference, BufferAllocator allocator) { + return new SingleMapTransferPair(this, reference, allocator); } @Override - public TransferPair getTransferPair(FieldReference ref) { - return new RepeatedMapTransferPair(this, ref); + public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) { + return new RepeatedMapTransferPair(this, ref, allocator); } @Override @@ -261,8 +261,8 @@ public class RepeatedMapVector extends AbstractMapVector private final MapVector to; private static final MajorType MAP_TYPE = Types.required(MinorType.MAP); - public SingleMapTransferPair(RepeatedMapVector from, SchemaPath path) { - this(from, new MapVector(MaterializedField.create(path, MAP_TYPE), from.allocator, from.callBack), false); + public SingleMapTransferPair(RepeatedMapVector from, SchemaPath path, BufferAllocator allocator) { + this(from, new MapVector(MaterializedField.create(path, MAP_TYPE), allocator, from.callBack), false); } public SingleMapTransferPair(RepeatedMapVector from, MapVector to) { @@ -326,8 +326,8 @@ public class RepeatedMapVector extends AbstractMapVector private final RepeatedMapVector to; private final RepeatedMapVector from; - public RepeatedMapTransferPair(RepeatedMapVector from, SchemaPath path) { - this(from, new RepeatedMapVector(MaterializedField.create(path, TYPE), from.allocator, from.callBack), false); + public RepeatedMapTransferPair(RepeatedMapVector from, SchemaPath path, BufferAllocator allocator) { + this(from, new RepeatedMapVector(MaterializedField.create(path, TYPE), allocator, from.callBack), false); } public RepeatedMapTransferPair(RepeatedMapVector from, RepeatedMapVector to) { diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/PromotableWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/PromotableWriter.java index 894e60e84..ee16e9743 100644 --- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/PromotableWriter.java +++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/PromotableWriter.java @@ -151,7 +151,7 @@ public class PromotableWriter extends AbstractPromotableFieldWriter { private FieldWriter promoteToUnion() { String name = vector.getField().getLastName(); - TransferPair tp = vector.getTransferPair(new FieldReference(vector.getField().getType().getMinorType().name().toLowerCase())); + TransferPair tp = vector.getTransferPair(new FieldReference(vector.getField().getType().getMinorType().name().toLowerCase()), vector.getAllocator()); tp.transfer(); if (parentContainer != null) { unionVector = parentContainer.addOrGet(name, Types.optional(MinorType.UNION), UnionVector.class); |