aboutsummaryrefslogtreecommitdiff
path: root/exec
diff options
context:
space:
mode:
authorSteven Phillips <smp@apache.org>2015-12-01 00:34:41 -0800
committerSteven Phillips <smp@apache.org>2015-12-28 15:43:07 -0800
commit6dea429949a3d6a68aefbdb3d78de41e0955239b (patch)
treefdb0c3d018163be7a629363efafa9c83b71ae68d /exec
parentde008810c815e46e6f6e5d13ad0b9a23e705b13a (diff)
DRILL-4215: Transfer buffer ownership in TransferPair
Diffstat (limited to 'exec')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java12
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java2
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java2
-rw-r--r--exec/vector/src/main/codegen/templates/FixedValueVectors.java16
-rw-r--r--exec/vector/src/main/codegen/templates/NullableValueVectors.java10
-rw-r--r--exec/vector/src/main/codegen/templates/RepeatedValueVectors.java10
-rw-r--r--exec/vector/src/main/codegen/templates/UnionVector.java14
-rw-r--r--exec/vector/src/main/codegen/templates/VariableLengthVectors.java17
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java9
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java10
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java4
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java7
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java9
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java4
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java6
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java12
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java10
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java20
-rw-r--r--exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/PromotableWriter.java2
45 files changed, 159 insertions, 115 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 3b90979da..60355fb6b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -107,7 +107,7 @@ public class ScreenCreator implements RootCreator<Screen> {
return false;
case OK_NEW_SCHEMA:
- materializer = new VectorRecordMaterializer(context, incoming);
+ materializer = new VectorRecordMaterializer(context, oContext, incoming);
//$FALL-THROUGH$
case OK:
injector.injectPause(context.getExecutionControls(), "sending-data", logger);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 23e97d04c..2f33193a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -120,7 +120,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
final FragmentWritableBatch batch = new FragmentWritableBatch(
false, handle.getQueryId(), handle.getMajorFragmentId(),
handle.getMinorFragmentId(), recMajor, oppositeHandle.getMinorFragmentId(),
- incoming.getWritableBatch());
+ incoming.getWritableBatch().transfer(oContext.getAllocator()));
updateStats(batch);
stats.startWait();
try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index c287bc312..a6c3269df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -229,9 +229,9 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
batchCount++;
RecordBatchData batch;
if (schemaChanged) {
- batch = new RecordBatchData(SchemaUtil.coerceContainer(incoming, this.schema, oContext));
+ batch = new RecordBatchData(SchemaUtil.coerceContainer(incoming, this.schema, oContext), oContext.getAllocator());
} else {
- batch = new RecordBatchData(incoming);
+ batch = new RecordBatchData(incoming, oContext.getAllocator());
}
boolean success = false;
try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
index dae9eaedb..9e96727f6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.aggregate;
import java.util.Iterator;
+import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
@@ -34,11 +35,11 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{
private final SelectionVector2 sv2;
private final SelectionVector4 sv4;
- public InternalBatch(RecordBatch incoming) {
- this(incoming, null);
+ public InternalBatch(RecordBatch incoming, OperatorContext oContext) {
+ this(incoming, null, oContext);
}
- public InternalBatch(RecordBatch incoming, VectorWrapper[] ignoreWrappers){
+ public InternalBatch(RecordBatch incoming, VectorWrapper[] ignoreWrappers, OperatorContext oContext){
switch(incoming.getSchema().getSelectionVectorMode()){
case FOUR_BYTE:
this.sv4 = incoming.getSelectionVector4().createNewWrapperCurrent();
@@ -53,7 +54,7 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{
this.sv2 = null;
}
this.schema = incoming.getSchema();
- this.container = VectorContainer.getTransferClone(incoming, ignoreWrappers);
+ this.container = VectorContainer.getTransferClone(incoming, ignoreWrappers, oContext);
}
public BatchSchema getSchema() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index ee9a0ab34..c084e39ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -309,7 +309,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
container.buildSchema(SelectionVectorMode.NONE);
StreamingAggregator agg = context.getImplementationClass(cg);
- agg.setup(context, incoming, this);
+ agg.setup(oContext, incoming, this);
return agg;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 4932b0fad..82e877779 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -21,6 +21,7 @@ import javax.inject.Named;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import org.apache.drill.exec.record.VectorWrapper;
@@ -41,12 +42,12 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
private int outputCount = 0;
private RecordBatch incoming;
private StreamingAggBatch outgoing;
- private FragmentContext context;
private boolean done = false;
+ private OperatorContext context;
@Override
- public void setup(FragmentContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException {
+ public void setup(OperatorContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException {
this.context = context;
this.incoming = incoming;
this.outgoing = outgoing;
@@ -164,7 +165,7 @@ public abstract class StreamingAggTemplate implements StreamingAggregator {
previousIndex = currentIndex;
}
- InternalBatch previous = new InternalBatch(incoming);
+ InternalBatch previous = new InternalBatch(incoming, context);
try {
while (true) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
index 96da00b46..61c82d8e2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.aggregate;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
@@ -31,7 +32,7 @@ public interface StreamingAggregator {
RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR;
}
- public abstract void setup(FragmentContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException;
+ public abstract void setup(OperatorContext context, RecordBatch incoming, StreamingAggBatch outgoing) throws SchemaChangeException;
public abstract IterOutcome getOutcome();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index c88c72d5c..80d774483 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -117,7 +117,7 @@ public class BroadcastSenderRootExec extends BaseRootExec {
case OK_NEW_SCHEMA:
case OK:
- WritableBatch writableBatch = incoming.getWritableBatch();
+ WritableBatch writableBatch = incoming.getWritableBatch().transfer(oContext.getAllocator());
if (tunnels.length > 1) {
writableBatch.retainBuffers(tunnels.length - 1);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index e435e79b3..c0e894494 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -153,7 +153,7 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
for (final VectorWrapper<?> vw : incoming) {
for (final ValueVector vv : vw.getValueVectors()) {
- final TransferPair pair = vv.getTransferPair();
+ final TransferPair pair = vv.getTransferPair(oContext.getAllocator());
container.add(pair.getTo());
transfers.add(pair);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index c3b3f4593..dcaa2440e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -265,12 +265,12 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
TransferPair tp = null;
if (flattenField instanceof RepeatedMapVector) {
- tp = ((RepeatedMapVector)flattenField).getTransferPairToSingleMap(reference);
+ tp = ((RepeatedMapVector)flattenField).getTransferPairToSingleMap(reference, oContext.getAllocator());
} else {
final ValueVector vvIn = RepeatedValueVector.class.cast(flattenField).getDataVector();
// vvIn may be null because of fast schema return for repeated list vectors
if (vvIn != null) {
- tp = vvIn.getTransferPair(reference);
+ tp = vvIn.getTransferPair(reference, oContext.getAllocator());
}
}
return tp;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 2d0bd43fb..3ea97c658 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -380,7 +380,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
* to the hyper vector container. Will be used when we want to retrieve
* records that have matching keys on the probe side.
*/
- final RecordBatchData nextBatch = new RecordBatchData(right);
+ final RecordBatchData nextBatch = new RecordBatchData(right, oContext.getAllocator());
boolean success = false;
try {
if (hyperContainer == null) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index b390b8fb3..f0e53e135 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -340,7 +340,7 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
}
private void addBatchToHyperContainer(RecordBatch inputBatch) {
- final RecordBatchData batchCopy = new RecordBatchData(inputBatch);
+ final RecordBatchData batchCopy = new RecordBatchData(inputBatch, oContext.getAllocator());
boolean success = false;
try {
rightCounts.addLast(inputBatch.getRecordCount());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
index 3933ddd62..ba8df9260 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/VectorRecordMaterializer.java
@@ -17,7 +17,9 @@
*/
package org.apache.drill.exec.physical.impl.materialize;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryData;
import org.apache.drill.exec.record.BatchSchema;
@@ -29,10 +31,12 @@ public class VectorRecordMaterializer implements RecordMaterializer{
private QueryId queryId;
private RecordBatch batch;
+ private BufferAllocator allocator;
- public VectorRecordMaterializer(FragmentContext context, RecordBatch batch) {
+ public VectorRecordMaterializer(FragmentContext context, OperatorContext oContext, RecordBatch batch) {
this.queryId = context.getHandle().getQueryId();
this.batch = batch;
+ this.allocator = oContext.getAllocator();
BatchSchema schema = batch.getSchema();
assert schema != null : "Schema must be defined.";
@@ -43,7 +47,7 @@ public class VectorRecordMaterializer implements RecordMaterializer{
public QueryWritableBatch convertNext() {
//batch.getWritableBatch().getDef().getRecordCount()
- WritableBatch w = batch.getWritableBatch();
+ WritableBatch w = batch.getWritableBatch().transfer(allocator);
QueryData header = QueryData.newBuilder() //
.setQueryId(queryId) //
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index c9483ae75..64cfad01c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -590,7 +590,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
OrderedPartitionProjector.TEMPLATE_DEFINITION, context.getFunctionRegistry());
for (VectorWrapper<?> vw : batch) {
- TransferPair tp = vw.getValueVector().getTransferPair();
+ TransferPair tp = vw.getValueVector().getTransferPair(oContext.getAllocator());
transfers.add(tp);
container.add(tp.getTo());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index e3033b43c..38d08b6f8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -141,7 +141,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
return;
case OK_NEW_SCHEMA:
case OK:
- wrapper = RecordBatchDataWrapper.batch(new RecordBatchData(incoming));
+ wrapper = RecordBatchDataWrapper.batch(new RecordBatchData(incoming, oContext.getAllocator()));
queue.put(wrapper);
wrapper = null;
break;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
index af774db23..0cd55eb18 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/RecordBatchData.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.sort;
import java.util.List;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
@@ -40,7 +41,7 @@ public class RecordBatchData {
private int recordCount;
VectorContainer container = new VectorContainer();
- public RecordBatchData(VectorAccessible batch) {
+ public RecordBatchData(VectorAccessible batch, BufferAllocator allocator) {
List<ValueVector> vectors = Lists.newArrayList();
recordCount = batch.getRecordCount();
@@ -54,7 +55,7 @@ public class RecordBatchData {
if (v.isHyper()) {
throw new UnsupportedOperationException("Record batch data can't be created based on a hyper batch.");
}
- TransferPair tp = v.getValueVector().getTransferPair();
+ TransferPair tp = v.getValueVector().getTransferPair(allocator);
tp.transfer();
vectors.add(tp.getTo());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index f2302ce88..33338ddd8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -91,7 +91,7 @@ public class SortRecordBatchBuilder implements AutoCloseable {
}
- RecordBatchData bd = new RecordBatchData(batch);
+ RecordBatchData bd = new RecordBatchData(batch, allocator);
runningBatches++;
batches.put(batch.getSchema(), bd);
recordCount += bd.getRecordCount();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index ff83cc934..209624bbe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -140,7 +140,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
/* Add all the value vectors in the container */
for (VectorWrapper<?> vv : incoming) {
- TransferPair tp = vv.getValueVector().getTransferPair();
+ TransferPair tp = vv.getValueVector().getTransferPair(oContext.getAllocator());
container.add(tp.getTo());
}
container.buildSchema(incoming.getSchema().getSelectionVectorMode());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
index b2befa330..7abc03cee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowDataBatch.java
@@ -47,7 +47,7 @@ public class WindowDataBatch implements VectorAccessible {
if (v.isHyper()) {
throw new UnsupportedOperationException("Record batch data can't be created based on a hyper batch.");
}
- TransferPair tp = v.getValueVector().getTransferPair();
+ TransferPair tp = v.getValueVector().getTransferPair(oContext.getAllocator());
tp.transfer();
vectors.add(tp.getTo());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 4dbd92dba..6e79f0182 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -366,7 +366,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
totalCount += count;
sorter.setup(context, sv2, convertedBatch);
sorter.sort(sv2);
- RecordBatchData rbd = new RecordBatchData(convertedBatch);
+ RecordBatchData rbd = new RecordBatchData(convertedBatch, oContext.getAllocator());
boolean success = false;
try {
rbd.setSv2(sv2);
@@ -446,7 +446,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
builder = new SortRecordBatchBuilder(oContext.getAllocator());
for (BatchGroup group : batchGroups) {
- RecordBatchData rbd = new RecordBatchData(group.getContainer());
+ RecordBatchData rbd = new RecordBatchData(group.getContainer(), oContext.getAllocator());
rbd.setSv2(group.getSv2());
builder.add(rbd);
}
@@ -562,7 +562,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
// 1 output container is kept in memory, so we want to hold on to it and transferClone
// allows keeping ownership
- VectorContainer c1 = VectorContainer.getTransferClone(outputContainer);
+ VectorContainer c1 = VectorContainer.getTransferClone(outputContainer, oContext);
c1.buildSchema(BatchSchema.SelectionVectorMode.NONE);
c1.setRecordCount(count);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
index 7fc7960a1..322339e6d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/HyperVectorWrapper.java
@@ -21,6 +21,7 @@ import java.util.AbstractMap;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
import org.apache.drill.exec.vector.complex.AbstractMapVector;
@@ -116,7 +117,7 @@ public class HyperVectorWrapper<T extends ValueVector> implements VectorWrapper<
@Override
@SuppressWarnings("unchecked")
- public VectorWrapper<T> cloneAndTransfer() {
+ public VectorWrapper<T> cloneAndTransfer(BufferAllocator allocator) {
return new HyperVectorWrapper<T>(f, vectors, false);
// T[] newVectors = (T[]) Array.newInstance(vectors.getClass().getComponentType(), vectors.length);
// for(int i =0; i < newVectors.length; i++) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
index 77cb9a198..af0a75304 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordIterator.java
@@ -53,6 +53,7 @@ public class RecordIterator implements VectorAccessible {
private int inputIndex; // For two way merge join 0:left, 1:right
private boolean lastBatchRead; // True if all batches are consumed.
private boolean initialized;
+ private OperatorContext oContext;
private final VectorContainer container; // Holds VectorContainer of current record batch
private final TreeRangeMap<Long, RecordBatchData> batches = TreeRangeMap.create();
@@ -66,6 +67,7 @@ public class RecordIterator implements VectorAccessible {
this.inputIndex = inputIndex;
this.lastBatchRead = false;
this.container = new VectorContainer(oContext);
+ this.oContext = oContext;
resetIndices();
this.initialized = false;
}
@@ -181,7 +183,7 @@ public class RecordIterator implements VectorAccessible {
nextOuterPosition = 0;
}
// Transfer vectors from incoming record batch.
- final RecordBatchData rbd = new RecordBatchData(incoming);
+ final RecordBatchData rbd = new RecordBatchData(incoming, oContext.getAllocator());
innerRecordCount = incoming.getRecordCount();
if (!initialized) {
for (VectorWrapper<?> w : rbd.getContainer()) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
index 8cf90ab0c..48f0a3623 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SchemaUtil.java
@@ -100,7 +100,7 @@ public class SchemaUtil {
int recordCount, OperatorContext context) {
if (v != null) {
int valueCount = v.getAccessor().getValueCount();
- TransferPair tp = v.getTransferPair();
+ TransferPair tp = v.getTransferPair(context.getAllocator());
tp.transfer();
if (v.getField().getType().getMinorType().equals(field.getType().getMinorType())) {
if (field.getType().getMinorType() == MinorType.UNION) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
index f1b60d498..1e8a52ff7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/SimpleVectorWrapper.java
@@ -26,6 +26,7 @@ import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MajorTypeOrBuilder;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
import org.apache.drill.exec.vector.complex.AbstractMapVector;
@@ -74,8 +75,8 @@ public class SimpleVectorWrapper<T extends ValueVector> implements VectorWrapper
@SuppressWarnings("unchecked")
@Override
- public VectorWrapper<T> cloneAndTransfer() {
- TransferPair tp = vector.getTransferPair();
+ public VectorWrapper<T> cloneAndTransfer(BufferAllocator allocator) {
+ TransferPair tp = vector.getTransferPair(allocator);
tp.transfer();
return new SimpleVectorWrapper<T>((T) tp.getTo());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index c48365020..33351ba9c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -28,6 +28,7 @@ import java.util.Set;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.vector.SchemaChangeCallBack;
@@ -148,15 +149,15 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
* The RecordBatch iterator the contains the batch we should take over.
* @return A cloned vector container.
*/
- public static VectorContainer getTransferClone(VectorAccessible incoming) {
- VectorContainer vc = new VectorContainer();
+ public static VectorContainer getTransferClone(VectorAccessible incoming, OperatorContext oContext) {
+ VectorContainer vc = new VectorContainer(oContext);
for (VectorWrapper<?> w : incoming) {
vc.cloneAndTransfer(w);
}
return vc;
}
- public static VectorContainer getTransferClone(VectorAccessible incoming, VectorWrapper[] ignoreWrappers) {
+ public static VectorContainer getTransferClone(VectorAccessible incoming, VectorWrapper[] ignoreWrappers, OperatorContext oContext) {
Iterable<VectorWrapper<?>> wrappers = incoming;
if (ignoreWrappers != null) {
final List<VectorWrapper> ignored = Lists.newArrayList(ignoreWrappers);
@@ -165,7 +166,7 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
wrappers = resultant;
}
- final VectorContainer vc = new VectorContainer();
+ final VectorContainer vc = new VectorContainer(oContext);
for (VectorWrapper<?> w : wrappers) {
vc.cloneAndTransfer(w);
}
@@ -198,7 +199,7 @@ public class VectorContainer implements Iterable<VectorWrapper<?>>, VectorAccess
}
private void cloneAndTransfer(VectorWrapper<?> wrapper) {
- wrappers.add(wrapper.cloneAndTransfer());
+ wrappers.add(wrapper.cloneAndTransfer(oContext.getAllocator()));
}
public void addCollection(Iterable<ValueVector> vectors) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
index 5250f98a9..65ea4570e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorWrapper.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.record;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.vector.ValueVector;
@@ -31,7 +32,7 @@ public interface VectorWrapper<T extends ValueVector> {
public T[] getValueVectors();
public boolean isHyper();
public void clear();
- public VectorWrapper<T> cloneAndTransfer();
+ public VectorWrapper<T> cloneAndTransfer(BufferAllocator allocator);
public VectorWrapper<?> getChildWrapper(int[] ids);
public void transfer(VectorWrapper<?> destination);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index d39ce5e89..bcec920b7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -51,6 +51,18 @@ public class WritableBatch implements AutoCloseable {
this.buffers = buffers;
}
+ public WritableBatch transfer(BufferAllocator allocator) {
+ List<DrillBuf> newBuffers = Lists.newArrayList();
+ for (DrillBuf buf : buffers) {
+ int writerIndex = buf.writerIndex();
+ DrillBuf newBuf = buf.transferOwnership(allocator).buffer;
+ newBuf.writerIndex(writerIndex);
+ newBuffers.add(newBuf);
+ }
+ clear();
+ return new WritableBatch(def, newBuffers);
+ }
+
public RecordBatchDef getDef() {
return def;
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
index f7843f5ae..66b75710e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributedConcurrent.java
@@ -46,7 +46,7 @@ import static org.junit.Assert.assertNull;
* any particular order of execution. We ignore the results.
*/
public class TestTpchDistributedConcurrent extends BaseTestQuery {
- @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(120000); // Longer timeout than usual.
+ @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(140000); // Longer timeout than usual.
/*
* Valid test names taken from TestTpchDistributed. Fuller path prefixes are
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java
index b2054e61a..6ac8e976e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestSplitAndTransfer.java
@@ -51,7 +51,7 @@ public class TestSplitAndTransfer {
}
mutator.setValueCount(valueCount);
- final TransferPair tp = varCharVector.getTransferPair();
+ final TransferPair tp = varCharVector.getTransferPair(allocator);
final NullableVarCharVector newVarCharVector = (NullableVarCharVector) tp.getTo();
final Accessor accessor = newVarCharVector.getAccessor();
final int[][] startLengths = {{0, 201}, {201, 200}, {401, 99}};
diff --git a/exec/vector/src/main/codegen/templates/FixedValueVectors.java b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
index f67614a81..8e77dcc5e 100644
--- a/exec/vector/src/main/codegen/templates/FixedValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/FixedValueVectors.java
@@ -198,13 +198,13 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
data.writerIndex(actualLength);
}
- public TransferPair getTransferPair(){
- return new TransferImpl(getField());
+ public TransferPair getTransferPair(BufferAllocator allocator){
+ return new TransferImpl(getField(), allocator);
}
@Override
- public TransferPair getTransferPair(FieldReference ref){
- return new TransferImpl(getField().withPath(ref));
+ public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator){
+ return new TransferImpl(getField().withPath(ref), allocator);
}
@Override
@@ -214,8 +214,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
public void transferTo(${minor.class}Vector target){
target.clear();
- target.data = data;
- target.data.retain(1);
+ target.data = data.transferOwnership(target.allocator).buffer;
target.data.writerIndex(data.writerIndex());
clear();
}
@@ -224,15 +223,14 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F
final int startPoint = startIndex * ${type.width};
final int sliceLength = length * ${type.width};
target.clear();
- target.data = data.slice(startPoint, sliceLength);
- target.data.retain(1);
+ target.data = data.slice(startPoint, sliceLength).transferOwnership(target.allocator).buffer;
target.data.writerIndex(sliceLength);
}
private class TransferImpl implements TransferPair{
private ${minor.class}Vector to;
- public TransferImpl(MaterializedField field){
+ public TransferImpl(MaterializedField field, BufferAllocator allocator){
to = new ${minor.class}Vector(field, allocator);
}
diff --git a/exec/vector/src/main/codegen/templates/NullableValueVectors.java b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
index 13bdd4f97..d2c17ffde 100644
--- a/exec/vector/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/NullableValueVectors.java
@@ -243,13 +243,13 @@ public final class ${className} extends BaseDataValueVector implements <#if type
}
@Override
- public TransferPair getTransferPair(){
- return new TransferImpl(getField());
+ public TransferPair getTransferPair(BufferAllocator allocator){
+ return new TransferImpl(getField(), allocator);
}
@Override
- public TransferPair getTransferPair(FieldReference ref){
- return new TransferImpl(getField().withPath(ref));
+ public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator){
+ return new TransferImpl(getField().withPath(ref), allocator);
}
@Override
@@ -277,7 +277,7 @@ public final class ${className} extends BaseDataValueVector implements <#if type
private class TransferImpl implements TransferPair {
Nullable${minor.class}Vector to;
- public TransferImpl(MaterializedField field){
+ public TransferImpl(MaterializedField field, BufferAllocator allocator){
to = new Nullable${minor.class}Vector(field, allocator);
}
diff --git a/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java b/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java
index 21f56162f..ca39d7130 100644
--- a/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java
+++ b/exec/vector/src/main/codegen/templates/RepeatedValueVectors.java
@@ -79,13 +79,13 @@ public final class Repeated${minor.class}Vector extends BaseRepeatedValueVector
}
@Override
- public TransferPair getTransferPair() {
- return new TransferImpl(getField());
+ public TransferPair getTransferPair(BufferAllocator allocator) {
+ return new TransferImpl(getField(), allocator);
}
@Override
- public TransferPair getTransferPair(FieldReference ref){
- return new TransferImpl(getField().withPath(ref));
+ public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator){
+ return new TransferImpl(getField().withPath(ref), allocator);
}
@Override
@@ -131,7 +131,7 @@ public final class Repeated${minor.class}Vector extends BaseRepeatedValueVector
private class TransferImpl implements TransferPair {
final Repeated${minor.class}Vector to;
- public TransferImpl(MaterializedField field) {
+ public TransferImpl(MaterializedField field, BufferAllocator allocator) {
this.to = new Repeated${minor.class}Vector(field, allocator);
}
diff --git a/exec/vector/src/main/codegen/templates/UnionVector.java b/exec/vector/src/main/codegen/templates/UnionVector.java
index cc541e599..2e278b14c 100644
--- a/exec/vector/src/main/codegen/templates/UnionVector.java
+++ b/exec/vector/src/main/codegen/templates/UnionVector.java
@@ -80,6 +80,10 @@ public class UnionVector implements ValueVector {
this.callBack = callBack;
}
+ public BufferAllocator getAllocator() {
+ return allocator;
+ }
+
public List<MinorType> getSubTypes() {
return majorType.getSubTypeList();
}
@@ -198,13 +202,13 @@ public class UnionVector implements ValueVector {
}
@Override
- public TransferPair getTransferPair() {
- return new TransferImpl(field);
+ public TransferPair getTransferPair(BufferAllocator allocator) {
+ return new TransferImpl(field, allocator);
}
@Override
- public TransferPair getTransferPair(FieldReference ref) {
- return new TransferImpl(field.withPath(ref));
+ public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) {
+ return new TransferImpl(field.withPath(ref), allocator);
}
@Override
@@ -242,7 +246,7 @@ public class UnionVector implements ValueVector {
UnionVector to;
- public TransferImpl(MaterializedField field) {
+ public TransferImpl(MaterializedField field, BufferAllocator allocator) {
to = new UnionVector(field, allocator, null);
}
diff --git a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
index f7347343d..56d2d527b 100644
--- a/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/vector/src/main/codegen/templates/VariableLengthVectors.java
@@ -174,13 +174,13 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
}
@Override
- public TransferPair getTransferPair(){
- return new TransferImpl(getField());
+ public TransferPair getTransferPair(BufferAllocator allocator){
+ return new TransferImpl(getField(), allocator);
}
@Override
- public TransferPair getTransferPair(FieldReference ref){
- return new TransferImpl(getField().withPath(ref));
+ public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator){
+ return new TransferImpl(getField().withPath(ref), allocator);
}
@Override
@@ -191,8 +191,8 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
public void transferTo(${minor.class}Vector target){
target.clear();
this.offsetVector.transferTo(target.offsetVector);
- target.data = data;
- target.data.retain(1);
+ target.data = data.transferOwnership(target.allocator).buffer;
+ target.data.writerIndex(data.writerIndex());
clear();
}
@@ -207,8 +207,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
for (int i = 0; i < length + 1; i++) {
targetOffsetVectorMutator.set(i, offsetVectorAccessor.get(startIndex + i) - startPoint);
}
- target.data = data.slice(startPoint, sliceLength);
- target.data.retain(1);
+ target.data = data.slice(startPoint, sliceLength).transferOwnership(target.allocator).buffer;
target.getMutator().setValueCount(length);
}
@@ -242,7 +241,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
private class TransferImpl implements TransferPair{
${minor.class}Vector to;
- public TransferImpl(MaterializedField field){
+ public TransferImpl(MaterializedField field, BufferAllocator allocator){
to = new ${minor.class}Vector(field, allocator);
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
index eb5dbcd9d..23ad77888 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
@@ -71,8 +71,8 @@ public abstract class BaseValueVector implements ValueVector {
}
@Override
- public TransferPair getTransferPair() {
- return getTransferPair(new FieldReference(getField().getPath()));
+ public TransferPair getTransferPair(BufferAllocator allocator) {
+ return getTransferPair(new FieldReference(getField().getPath()), allocator);
}
@Override
@@ -119,5 +119,10 @@ public abstract class BaseValueVector implements ValueVector {
return true;
}
+
+ @Override
+ public BufferAllocator getAllocator() {
+ return allocator;
+ }
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
index c1504c60a..3ba11e2b9 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -208,13 +208,13 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
}
@Override
- public TransferPair getTransferPair() {
- return new TransferImpl(getField());
+ public TransferPair getTransferPair(BufferAllocator allocator) {
+ return new TransferImpl(getField(), allocator);
}
@Override
- public TransferPair getTransferPair(FieldReference ref) {
- return new TransferImpl(getField().withPath(ref));
+ public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) {
+ return new TransferImpl(getField().withPath(ref), allocator);
}
@Override
@@ -273,7 +273,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
private class TransferImpl implements TransferPair {
BitVector to;
- public TransferImpl(MaterializedField field) {
+ public TransferImpl(MaterializedField field, BufferAllocator allocator) {
this.to = new BitVector(field, allocator);
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
index 9ca4410d0..494f234bb 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
@@ -147,7 +147,7 @@ public class ObjectVector extends BaseValueVector {
}
@Override
- public TransferPair getTransferPair() {
+ public TransferPair getTransferPair(BufferAllocator allocator) {
throw new UnsupportedOperationException("ObjectVector does not support this");
}
@@ -157,7 +157,7 @@ public class ObjectVector extends BaseValueVector {
}
@Override
- public TransferPair getTransferPair(FieldReference ref) {
+ public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) {
throw new UnsupportedOperationException("ObjectVector does not support this");
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java
index b39fcfe16..a4a071ee8 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ValueVector.java
@@ -23,6 +23,7 @@ import io.netty.buffer.DrillBuf;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.TransferPair;
@@ -68,6 +69,8 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> {
*/
boolean allocateNewSafe();
+ BufferAllocator getAllocator();
+
/**
* Set the initial record capacity
* @param numRecords
@@ -99,9 +102,9 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> {
* Returns a {@link org.apache.drill.exec.record.TransferPair transfer pair}, creating a new target vector of
* the same type.
*/
- TransferPair getTransferPair();
+ TransferPair getTransferPair(BufferAllocator allocator);
- TransferPair getTransferPair(FieldReference ref);
+ TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator);
/**
* Returns a new {@link org.apache.drill.exec.record.TransferPair transfer pair} that is used to transfer underlying
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java
index c5326f663..165fc1408 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/ZeroVector.java
@@ -94,7 +94,7 @@ public class ZeroVector implements ValueVector {
}
@Override
- public TransferPair getTransferPair() {
+ public TransferPair getTransferPair(BufferAllocator allocator) {
return defaultPair;
}
@@ -138,6 +138,11 @@ public class ZeroVector implements ValueVector {
}
@Override
+ public BufferAllocator getAllocator() {
+ throw new UnsupportedOperationException("Tried to get allocator from ZeroVector");
+ }
+
+ @Override
public void setInitialCapacity(int numRecords) { }
@Override
@@ -146,7 +151,7 @@ public class ZeroVector implements ValueVector {
}
@Override
- public TransferPair getTransferPair(FieldReference ref) {
+ public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) {
return defaultPair;
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
index caedb967c..0ac2417bd 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
@@ -62,6 +62,10 @@ public abstract class AbstractContainerVector implements ValueVector {
}
}
+ public BufferAllocator getAllocator() {
+ return allocator;
+ }
+
/**
* Returns the field definition of this instance.
*/
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
index b780d1ac4..10975f548 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/ListVector.java
@@ -103,8 +103,8 @@ public class ListVector extends BaseRepeatedValueVector {
}
@Override
- public TransferPair getTransferPair(FieldReference ref) {
- return new TransferImpl(field.withPath(ref));
+ public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) {
+ return new TransferImpl(field.withPath(ref), allocator);
}
@Override
@@ -116,7 +116,7 @@ public class ListVector extends BaseRepeatedValueVector {
ListVector to;
- public TransferImpl(MaterializedField field) {
+ public TransferImpl(MaterializedField field, BufferAllocator allocator) {
to = new ListVector(field, allocator, null);
to.addOrGetVector(new VectorDescriptor(vector.getField().getType()));
}
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
index 60d74c1fe..6784ed45e 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
@@ -143,8 +143,8 @@ public class MapVector extends AbstractMapVector {
}
@Override
- public TransferPair getTransferPair() {
- return new MapTransferPair(this, getField().getPath());
+ public TransferPair getTransferPair(BufferAllocator allocator) {
+ return new MapTransferPair(this, getField().getPath(), allocator);
}
@Override
@@ -153,8 +153,8 @@ public class MapVector extends AbstractMapVector {
}
@Override
- public TransferPair getTransferPair(FieldReference ref) {
- return new MapTransferPair(this, ref);
+ public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) {
+ return new MapTransferPair(this, ref, allocator);
}
protected static class MapTransferPair implements TransferPair{
@@ -162,8 +162,8 @@ public class MapVector extends AbstractMapVector {
private final MapVector from;
private final MapVector to;
- public MapTransferPair(MapVector from, SchemaPath path) {
- this(from, new MapVector(MaterializedField.create(path, TYPE), from.allocator, from.callBack), false);
+ public MapTransferPair(MapVector from, SchemaPath path, BufferAllocator allocator) {
+ this(from, new MapVector(MaterializedField.create(path, TYPE), allocator, from.callBack), false);
}
public MapTransferPair(MapVector from, MapVector to) {
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
index cbc61f8d2..4706999fc 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
@@ -189,7 +189,7 @@ public class RepeatedListVector extends AbstractContainerVector
}
@Override
- public TransferPair getTransferPair(FieldReference ref) {
+ public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) {
return makeTransferPair(new DelegateRepeatedVector(ref, allocator));
}
@@ -344,13 +344,13 @@ public class RepeatedListVector extends AbstractContainerVector
}
@Override
- public TransferPair getTransferPair() {
- return new RepeatedListTransferPair(delegate.getTransferPair());
+ public TransferPair getTransferPair(BufferAllocator allocator) {
+ return new RepeatedListTransferPair(delegate.getTransferPair(allocator));
}
@Override
- public TransferPair getTransferPair(FieldReference ref) {
- return new RepeatedListTransferPair(delegate.getTransferPair(ref));
+ public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) {
+ return new RepeatedListTransferPair(delegate.getTransferPair(ref, allocator));
}
@Override
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
index cb597beb3..b13de9d78 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -160,8 +160,8 @@ public class RepeatedMapVector extends AbstractMapVector
}
@Override
- public TransferPair getTransferPair() {
- return new RepeatedMapTransferPair(this, getField().getPath());
+ public TransferPair getTransferPair(BufferAllocator allocator) {
+ return new RepeatedMapTransferPair(this, getField().getPath(), allocator);
}
@Override
@@ -224,13 +224,13 @@ public class RepeatedMapVector extends AbstractMapVector
return super.getFieldIdIfMatches(builder, addToBreadCrumb, seg);
}
- public TransferPair getTransferPairToSingleMap(FieldReference reference) {
- return new SingleMapTransferPair(this, reference);
+ public TransferPair getTransferPairToSingleMap(FieldReference reference, BufferAllocator allocator) {
+ return new SingleMapTransferPair(this, reference, allocator);
}
@Override
- public TransferPair getTransferPair(FieldReference ref) {
- return new RepeatedMapTransferPair(this, ref);
+ public TransferPair getTransferPair(FieldReference ref, BufferAllocator allocator) {
+ return new RepeatedMapTransferPair(this, ref, allocator);
}
@Override
@@ -261,8 +261,8 @@ public class RepeatedMapVector extends AbstractMapVector
private final MapVector to;
private static final MajorType MAP_TYPE = Types.required(MinorType.MAP);
- public SingleMapTransferPair(RepeatedMapVector from, SchemaPath path) {
- this(from, new MapVector(MaterializedField.create(path, MAP_TYPE), from.allocator, from.callBack), false);
+ public SingleMapTransferPair(RepeatedMapVector from, SchemaPath path, BufferAllocator allocator) {
+ this(from, new MapVector(MaterializedField.create(path, MAP_TYPE), allocator, from.callBack), false);
}
public SingleMapTransferPair(RepeatedMapVector from, MapVector to) {
@@ -326,8 +326,8 @@ public class RepeatedMapVector extends AbstractMapVector
private final RepeatedMapVector to;
private final RepeatedMapVector from;
- public RepeatedMapTransferPair(RepeatedMapVector from, SchemaPath path) {
- this(from, new RepeatedMapVector(MaterializedField.create(path, TYPE), from.allocator, from.callBack), false);
+ public RepeatedMapTransferPair(RepeatedMapVector from, SchemaPath path, BufferAllocator allocator) {
+ this(from, new RepeatedMapVector(MaterializedField.create(path, TYPE), allocator, from.callBack), false);
}
public RepeatedMapTransferPair(RepeatedMapVector from, RepeatedMapVector to) {
diff --git a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/PromotableWriter.java b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/PromotableWriter.java
index 894e60e84..ee16e9743 100644
--- a/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/PromotableWriter.java
+++ b/exec/vector/src/main/java/org/apache/drill/exec/vector/complex/impl/PromotableWriter.java
@@ -151,7 +151,7 @@ public class PromotableWriter extends AbstractPromotableFieldWriter {
private FieldWriter promoteToUnion() {
String name = vector.getField().getLastName();
- TransferPair tp = vector.getTransferPair(new FieldReference(vector.getField().getType().getMinorType().name().toLowerCase()));
+ TransferPair tp = vector.getTransferPair(new FieldReference(vector.getField().getType().getMinorType().name().toLowerCase()), vector.getAllocator());
tp.transfer();
if (parentContainer != null) {
unionVector = parentContainer.addOrGet(name, Types.optional(MinorType.UNION), UnionVector.class);