aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org
diff options
context:
space:
mode:
authorSteven Phillips <sphillips@maprtech.com>2014-04-11 18:41:03 -0700
committerSteven Phillips <sphillips@maprtech.com>2014-05-04 18:46:37 -0700
commita2355d42dbff51b858fc28540915cf793f1c0fac (patch)
treee5db0d8d9a3d5a0dcb15d1c5e9c24d9a1c47dffb /exec/java-exec/src/main/java/org
parent70dddc54a73183e58f5493b13b1b19e51162f752 (diff)
DRILL-620: Memory consumption fixes
accounting fixes trim buffers switch to using setSafe and copySafe methods only adaptive allocation operator based allocator wip handle OOM Operator Context
Diffstat (limited to 'exec/java-exec/src/main/java/org')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java43
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java54
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java60
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java15
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java29
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java22
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java32
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java15
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java36
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java20
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java37
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java62
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java75
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopier.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopierTemplate.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java19
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java89
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java92
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java66
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelector.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelectorTemplate.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java15
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutOfMemoryHandler.java31
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlProtobufLengthDecoder.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java83
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataProtobufLengthDecoder.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java33
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserProtobufLengthDecoder.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java21
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java23
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java20
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java67
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java65
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java19
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java21
121 files changed, 1327 insertions, 496 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index baef9b030..9eee08dd1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -62,5 +62,6 @@ public interface ExecConstants {
public static final String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size";
public static final String FILESYSTEM_PARTITION_COLUMN_LABEL = "drill.exec.storage.file.partition.column.label";
public static final String HAZELCAST_SUBNETS = "drill.exec.cache.hazel.subnets";
-
+ public static final String TOP_LEVEL_MAX_ALLOC = "drill.exec.memory.top.max";
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index 9511992e9..f4a6d7da6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -157,7 +157,7 @@ public class VectorAccessibleSerializable implements DrillSerializable {
if (svMode == BatchSchema.SelectionVectorMode.TWO_BYTE)
{
svCount = sv2.getCount();
- svBuf = sv2.getBuffer();
+ svBuf = sv2.getBuffer(); //this calls retain() internally
}
try
@@ -170,6 +170,7 @@ public class VectorAccessibleSerializable implements DrillSerializable {
{
svBuf.getBytes(0, output, svBuf.readableBytes());
sv2.setBuffer(svBuf);
+ svBuf.release(); // sv2 now owns the buffer
sv2.setRecordCount(svCount);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index bbd3e4233..fc650b96e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -268,6 +268,9 @@ public class DrillClient implements Closeable, ConnectionThrottle{
@Override
public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) {
// logger.debug("Result arrived. Is Last Chunk: {}. Full Result: {}", result.getHeader().getIsLastChunk(), result);
+ if (result.getHeader().getErrorCount() > 0) {
+ fail(new Exception(result.getHeader().getError(0).getMessage()));
+ }
results.add(result);
if(result.getHeader().getIsLastChunk()){
future.set(results);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
index 0d19340d3..624042e03 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
@@ -39,10 +39,12 @@ public class Accountor {
private final long total;
private ConcurrentMap<ByteBuf, DebugStackTrace> buffers = Maps.newConcurrentMap();
private final FragmentHandle handle;
+ private Accountor parent;
public Accountor(FragmentHandle handle, Accountor parent, long max, long preAllocated) {
// TODO: fix preallocation stuff
AtomicRemainder parentRemainder = parent != null ? parent.remainder : null;
+ this.parent = parent;
this.remainder = new AtomicRemainder(parentRemainder, max, preAllocated);
this.total = max;
this.handle = handle;
@@ -53,6 +55,13 @@ public class Accountor {
}
}
+ public long getAvailable() {
+ if (parent != null) {
+ return Math.min(parent.getAvailable(), getCapacity() - getAllocation());
+ }
+ return getCapacity() - getAllocation();
+ }
+
public long getCapacity() {
return total;
}
@@ -62,9 +71,7 @@ public class Accountor {
}
public boolean reserve(long size) {
- //TODO: for now, we won't stop reservation.
- remainder.get(size);
- return true;
+ return remainder.get(size);
}
public void forceAdditionalReservation(long size) {
@@ -89,7 +96,7 @@ public class Accountor {
if(buf != null){
DebugStackTrace dst = buffers.get(buf);
if(dst == null) throw new IllegalStateException("Partially releasing a buffer that has already been released. Buffer: " + buf);
- dst.size =- size;
+ dst.size -= size;
if(dst.size < 0){
throw new IllegalStateException("Partially releasing a buffer that has already been released. Buffer: " + buf);
}
@@ -150,7 +157,7 @@ public class Accountor {
}
- private class DebugStackTrace {
+ public class DebugStackTrace {
private StackTraceElement[] elements;
private long size;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
index 8476b5345..74849c2e2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
@@ -17,6 +17,9 @@
*/
package org.apache.drill.exec.memory;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -35,6 +38,7 @@ public class AtomicRemainder {
private final long initTotal;
private final long initShared;
private final long initPrivate;
+ private boolean closed = false;
public AtomicRemainder(AtomicRemainder parent, long max, long pre) {
this.parent = parent;
@@ -43,6 +47,7 @@ public class AtomicRemainder {
this.initTotal = max;
this.initShared = max - pre;
this.initPrivate = pre;
+// logger.info("new AtomicRemainder. a.s. {} a.p. {} hashcode {}", availableShared, availablePrivate, hashCode(), new Exception());
}
public long getRemainder() {
@@ -60,25 +65,36 @@ public class AtomicRemainder {
* @param size
*/
public void forceGet(long size) {
- if (DEBUG)
- logger.info("Force get {}", size);
- availableShared.addAndGet(size);
+ long newAvailableShared = availableShared.addAndGet(size);
+// if (DEBUG)
+// logger.info("Force get {}. a.s. {} a.p. {} hashcode: {}", size, availableShared, availablePrivate, hashCode(), new Exception());
+// assert newAvailableShared <= initShared;
if (parent != null)
parent.forceGet(size);
}
public boolean get(long size) {
- if (DEBUG)
- logger.info("Get {}", size);
if (availablePrivate.get() < 1) {
// if there is no preallocated memory, we can operate normally.
+ // if there is a parent allocator, check it before allocating.
+ if (parent != null && !parent.get(size)) {
+ return false;
+ }
+
// attempt to get shared memory, if fails, return false.
long outcome = availableShared.addAndGet(-size);
+// assert outcome <= initShared;
if (outcome < 0) {
- availableShared.addAndGet(size);
+ long newAvailableShared = availableShared.addAndGet(size);
+ assert newAvailableShared <= initShared;
+ if (parent != null) {
+ parent.returnAllocation(size);
+ }
return false;
} else {
+// if (DEBUG)
+// logger.info("Get {}. a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception());
return true;
}
@@ -86,6 +102,8 @@ public class AtomicRemainder {
// if there is preallocated memory, use that first.
long unaccount = availablePrivate.addAndGet(-size);
if (unaccount >= 0) {
+// if (DEBUG)
+// logger.info("Get {}. a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception());
return true;
} else {
@@ -102,6 +120,8 @@ public class AtomicRemainder {
if (account >= 0) {
// we were succesful, move private back to zero (since we allocated using shared).
availablePrivate.addAndGet(additionalSpaceNeeded);
+// if (DEBUG)
+// logger.info("Get {}. a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception());
return true;
} else {
// we failed to get space from available shared. Return allocations to initial state.
@@ -122,26 +142,31 @@ public class AtomicRemainder {
* @param size
*/
public void returnAllocation(long size) {
- if (DEBUG)
- logger.info("Return allocation {}", size);
long privateSize = availablePrivate.get();
long privateChange = Math.min(size, initPrivate - privateSize);
long sharedChange = size - privateChange;
availablePrivate.addAndGet(privateChange);
availableShared.addAndGet(sharedChange);
+// if (DEBUG)
+// logger.info("Return allocation {}, a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception());
if (parent != null) {
parent.returnAllocation(sharedChange);
}
+ assert getUsed() <= initTotal;
}
public void close() {
-
+ if (closed) {
+ logger.warn("Tried to close remainder, but it has already been closed", new Exception());
+ return;
+ }
if (availablePrivate.get() != initPrivate || availableShared.get() != initShared)
throw new IllegalStateException(
String
.format(ERROR, initPrivate, availablePrivate.get(), initPrivate - availablePrivate.get(), initShared, availableShared.get(), initShared - availableShared.get()));
if(parent != null) parent.returnAllocation(initPrivate);
+ closed = true;
}
static final String ERROR = "Failure while closing accountor. Expected private and shared pools to be set to initial values. However, one or more were not. Stats are\n\tzone\tinit\tallocated\tdelta \n\tprivate\t%d\t%d\t%d \n\tshared\t%d\t%d\t%d.";
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
index e71c9c958..0b2add2d6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
@@ -22,10 +22,13 @@ import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.PooledByteBufAllocatorL;
import io.netty.buffer.PooledUnsafeDirectByteBufL;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.util.AssertionUtil;
@@ -40,6 +43,10 @@ public class TopLevelAllocator implements BufferAllocator {
public TopLevelAllocator() {
this(DrillConfig.getMaxDirectMemory());
}
+
+ public TopLevelAllocator(DrillConfig config) {
+ this(Math.min(DrillConfig.getMaxDirectMemory(), config.getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC)));
+ }
public TopLevelAllocator(long maximumAllocation) {
this.acct = new Accountor(null, null, maximumAllocation, 0);
@@ -50,7 +57,7 @@ public class TopLevelAllocator implements BufferAllocator {
if(!acct.reserve(min)) return null;
ByteBuf buffer = innerAllocator.directBuffer(min, max);
AccountingByteBuf wrapped = new AccountingByteBuf(acct, (PooledUnsafeDirectByteBufL) buffer);
- acct.reserved(buffer.capacity() - min, wrapped);
+ acct.reserved(min, wrapped);
return wrapped;
}
@@ -74,15 +81,19 @@ public class TopLevelAllocator implements BufferAllocator {
if(!acct.reserve(initialReservation)){
throw new OutOfMemoryException(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, acct.getCapacity() - acct.getAllocation()));
};
- ChildAllocator allocator = new ChildAllocator(handle, acct, initialReservation, maximumReservation);
+ ChildAllocator allocator = new ChildAllocator(handle, acct, maximumReservation, initialReservation);
if(ENABLE_ACCOUNTING) children.add(allocator);
return allocator;
}
@Override
public void close() {
- if(ENABLE_ACCOUNTING && !children.isEmpty()){
- throw new IllegalStateException("Failure while trying to close allocator: Child level allocators not closed.");
+ if (ENABLE_ACCOUNTING) {
+ for (ChildAllocator child : children) {
+ if (!child.isClosed()) {
+ throw new IllegalStateException("Failure while trying to close allocator: Child level allocators not closed.");
+ }
+ }
}
acct.close();
}
@@ -91,14 +102,20 @@ public class TopLevelAllocator implements BufferAllocator {
private class ChildAllocator implements BufferAllocator{
private Accountor childAcct;
-
+ private Map<ChildAllocator, StackTraceElement[]> children = new HashMap<>();
+ private boolean closed = false;
+ private FragmentHandle handle;
+
public ChildAllocator(FragmentHandle handle, Accountor parentAccountor, long max, long pre) throws OutOfMemoryException{
+ assert max >= pre;
childAcct = new Accountor(handle, parentAccountor, max, pre);
+ this.handle = handle;
}
@Override
public AccountingByteBuf buffer(int size, int max) {
if(!childAcct.reserve(size)){
+ logger.warn("Unable to allocate buffer of size {} due to memory limit. Current allocation: {}", size, getAllocatedMemory());
return null;
};
@@ -121,9 +138,11 @@ public class TopLevelAllocator implements BufferAllocator {
public BufferAllocator getChildAllocator(FragmentHandle handle, long initialReservation, long maximumReservation)
throws OutOfMemoryException {
if(!childAcct.reserve(initialReservation)){
- throw new OutOfMemoryException(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, childAcct.getCapacity() - childAcct.getAllocation()));
+ throw new OutOfMemoryException(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, childAcct.getAvailable()));
};
- return new ChildAllocator(handle, childAcct, maximumReservation, initialReservation);
+ ChildAllocator newChildAllocator = new ChildAllocator(handle, childAcct, maximumReservation, initialReservation);
+ this.children.put(newChildAllocator, Thread.currentThread().getStackTrace());
+ return newChildAllocator;
}
public PreAllocator getNewPreAllocator(){
@@ -132,7 +151,28 @@ public class TopLevelAllocator implements BufferAllocator {
@Override
public void close() {
+ if (ENABLE_ACCOUNTING) {
+ for (ChildAllocator child : children.keySet()) {
+ if (!child.isClosed()) {
+ StringBuilder sb = new StringBuilder();
+ StackTraceElement[] elements = children.get(child);
+ for (int i = 3; i < elements.length; i++) {
+ sb.append("\t\t");
+ sb.append(elements[i]);
+ sb.append("\n");
+ }
+ throw new IllegalStateException(String.format(
+ "Failure while trying to close child allocator: Child level allocators not closed. Fragment %d:%d. Stack trace: \n %s",
+ handle.getMajorFragmentId(), handle.getMinorFragmentId(), sb.toString()));
+ }
+ }
+ }
childAcct.close();
+ closed = true;
+ }
+
+ public boolean isClosed() {
+ return closed;
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 2035aa0c0..f3bcfef6e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -26,6 +26,7 @@ import net.hydromatic.optiq.SchemaPlus;
import net.hydromatic.optiq.tools.Frameworks;
import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.compile.ClassTransformer;
import org.apache.drill.exec.compile.QueryClassLoader;
import org.apache.drill.exec.exception.ClassTransformationException;
@@ -83,6 +84,7 @@ public class FragmentContext implements Closeable {
this.queryStartTime = fragment.getQueryStartTime();
this.rootFragmentTimeZone = fragment.getTimeZone();
logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
+ logger.debug("Fragment max allocation: {}", fragment.getMemMax());
this.allocator = context.getAllocator().getChildAllocator(fragment.getHandle(), fragment.getMemInitial(), fragment.getMemMax());
}
@@ -138,10 +140,15 @@ public class FragmentContext implements Closeable {
* Get this fragment's allocator.
* @return
*/
+ @Deprecated
public BufferAllocator getAllocator() {
return allocator;
}
+ public BufferAllocator getNewChildAllocator(long initialReservation, long maximumReservation) throws OutOfMemoryException {
+ return allocator.getChildAllocator(getHandle(), initialReservation, maximumReservation);
+ }
+
public <T> T getImplementationClass(ClassGenerator<T> cg) throws ClassTransformationException, IOException {
return getImplementationClass(cg.getCodeGenerator());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
new file mode 100644
index 000000000..3b7b4c189
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import org.apache.drill.common.util.Hook.Closeable;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+
+public class OperatorContext implements Closeable {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContext.class);
+
+ private final BufferAllocator allocator;
+ private boolean closed = false;
+ private PhysicalOperator popConfig;
+
+ public OperatorContext(PhysicalOperator popConfig, FragmentContext context) throws OutOfMemoryException {
+ this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation());
+ this.popConfig = popConfig;
+ }
+
+ public BufferAllocator getAllocator() {
+ if (allocator == null) {
+ throw new UnsupportedOperationException("Operator context does not have an allocator");
+ }
+ return allocator;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public void close() {
+ if (closed) {
+ logger.debug("Attempted to close Operator context for {}, but context is already closed", popConfig != null ? popConfig.getClass().getName() : null);
+ return;
+ }
+ logger.debug("Closing context for {}", popConfig != null ? popConfig.getClass().getName() : null);
+ if (allocator != null) {
+ allocator.close();
+ }
+ closed = true;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
index 7eced4d94..a79cbc3c3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
@@ -26,6 +26,8 @@ import com.google.common.base.Preconditions;
public abstract class AbstractBase implements PhysicalOperator{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractBase.class);
+ protected long initialAllocation = 1000000L;
+ protected long maxAllocation = 10000000000L;
@Override
@@ -48,5 +50,15 @@ public abstract class AbstractBase implements PhysicalOperator{
public SelectionVectorMode getSVMode() {
return SelectionVectorMode.NONE;
}
+
+ @Override
+ public long getInitialAllocation() {
+ return initialAllocation;
+ }
+
+ @Override
+ public long getMaxAllocation() {
+ return maxAllocation;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index 69fc44776..f4cee2ab1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.base;
import java.util.Iterator;
import java.util.List;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.drill.common.expression.SchemaPath;
import com.google.common.collect.Iterators;
@@ -45,6 +46,18 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca
@Override
public GroupScan clone(List<SchemaPath> columns) {
- throw new UnsupportedOperationException(String.format("%s does not implmemnt clone(columns) method!", this.getClass().getCanonicalName()));
+ throw new UnsupportedOperationException(String.format("%s does not implement clone(columns) method!", this.getClass().getCanonicalName()));
+ }
+
+ @Override
+ @JsonIgnore
+ public long getInitialAllocation() {
+ return 0;
+ }
+
+ @Override
+ @JsonIgnore
+ public long getMaxAllocation() {
+ return 0;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java
index 57b9c1872..97334eaf5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java
@@ -27,7 +27,7 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import com.google.common.collect.Iterators;
-public abstract class AbstractSubScan implements SubScan{
+public abstract class AbstractSubScan extends AbstractBase implements SubScan{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSubScan.class);
@Override
@@ -72,5 +72,4 @@ public abstract class AbstractSubScan implements SubScan{
public SelectionVectorMode getSVMode() {
return SelectionVectorMode.NONE;
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
index 66e1b4612..db57922f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
@@ -87,4 +87,14 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
@JsonIgnore
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException;
+ /**
+ * @return The memory to preallocate for this operator
+ */
+ public long getInitialAllocation();
+
+ /**
+ * @return The maximum memory this operator can allocate
+ */
+ public long getMaxAllocation();
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
index 79f5f13a2..b55abefc4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.record.RecordBatch;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index e93fbcc33..73ed72322 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -32,20 +32,19 @@ import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.record.*;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.TypedFieldId;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.util.BatchPrinter;
+import org.apache.drill.exec.util.VectorUtil;
import org.apache.drill.exec.vector.ValueVector;
import com.google.common.collect.Maps;
@@ -58,12 +57,16 @@ import org.apache.drill.exec.vector.allocator.VectorAllocator;
public class ScanBatch implements RecordBatch {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class);
+ private static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
+ private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
+
final Map<MaterializedField, ValueVector> fieldVectorMap = Maps.newHashMap();
private final VectorContainer container = new VectorContainer();
private int recordCount;
private boolean schemaChanged = true;
private final FragmentContext context;
+ private final OperatorContext oContext;
private Iterator<RecordReader> readers;
private RecordReader currentReader;
private BatchSchema schema;
@@ -74,12 +77,13 @@ public class ScanBatch implements RecordBatch {
List<Integer> selectedPartitionColumns;
private String partitionColumnDesignator;
- public ScanBatch(FragmentContext context, Iterator<RecordReader> readers, List<String[]> partitionColumns, List<Integer> selectedPartitionColumns) throws ExecutionSetupException {
+ public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers, List<String[]> partitionColumns, List<Integer> selectedPartitionColumns) throws ExecutionSetupException {
this.context = context;
this.readers = readers;
if (!readers.hasNext())
throw new ExecutionSetupException("A scan batch must contain at least one reader.");
this.currentReader = readers.next();
+ this.oContext = new OperatorContext(subScanConfig, context);
this.currentReader.setup(mutator);
this.partitionColumns = partitionColumns.iterator();
this.partitionValues = this.partitionColumns.hasNext() ? this.partitionColumns.next() : null;
@@ -89,8 +93,8 @@ public class ScanBatch implements RecordBatch {
addPartitionVectors();
}
- public ScanBatch(FragmentContext context, Iterator<RecordReader> readers) throws ExecutionSetupException {
- this(context, readers, Collections.EMPTY_LIST, Collections.EMPTY_LIST);
+ public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers) throws ExecutionSetupException {
+ this(subScanConfig, context, readers, Collections.EMPTY_LIST, Collections.EMPTY_LIST);
}
@Override
@@ -173,7 +177,7 @@ public class ScanBatch implements RecordBatch {
byte[] bytes = val.getBytes();
AllocationHelper.allocate(v, recordCount, val.length());
for (int j = 0; j < recordCount; j++) {
- v.getMutator().set(j, bytes);
+ v.getMutator().setSafe(j, bytes);
}
v.getMutator().setValueCount(recordCount);
} else {
@@ -239,7 +243,7 @@ public class ScanBatch implements RecordBatch {
@SuppressWarnings("unchecked")
@Override
public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException {
- ValueVector v = TypeHelper.getNewVector(field, context.getAllocator());
+ ValueVector v = TypeHelper.getNewVector(field, oContext.getAllocator());
if(!clazz.isAssignableFrom(v.getClass())) throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName()));
addField(v);
return (T) v;
@@ -259,6 +263,7 @@ public class ScanBatch implements RecordBatch {
public void cleanup(){
container.clear();
+ oContext.close();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 2fc854ae5..a0ff28aec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -70,11 +70,6 @@ public class ScreenCreator implements RootCreator<Screen>{
this.connection = context.getConnection();
}
- private void closeAllocator(){
- sendCount.waitForSendComplete();
- context.getAllocator().close();
- }
-
@Override
public boolean next() {
if(!ok){
@@ -86,7 +81,7 @@ public class ScreenCreator implements RootCreator<Screen>{
// logger.debug("Screen Outcome {}", outcome);
switch(outcome){
case STOP: {
- closeAllocator();
+ sendCount.waitForSendComplete();
QueryResult header = QueryResult.newBuilder() //
.setQueryId(context.getHandle().getQueryId()) //
.setRowCount(0) //
@@ -101,7 +96,7 @@ public class ScreenCreator implements RootCreator<Screen>{
return false;
}
case NONE: {
- closeAllocator();
+ sendCount.waitForSendComplete();
context.getStats().batchesCompleted.inc(1);
QueryResult header = QueryResult.newBuilder() //
.setQueryId(context.getHandle().getQueryId()) //
@@ -133,8 +128,8 @@ public class ScreenCreator implements RootCreator<Screen>{
@Override
public void stop() {
- incoming.cleanup();
sendCount.waitForSendComplete();
+ incoming.cleanup();
}
private SendListener listener = new SendListener();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 17e233a52..7679701fe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -97,8 +97,8 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
@Override
public void stop() {
ok = false;
- incoming.cleanup();
sendCount.waitForSendComplete();
+ incoming.cleanup();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java
index 90d51b610..c58366439 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.TopN;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.record.VectorContainer;
@@ -26,7 +27,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
public interface PriorityQueue {
public void add(FragmentContext context, RecordBatchData batch) throws SchemaChangeException;
- public void init(int limit, FragmentContext context, boolean hasSv2) throws SchemaChangeException;
+ public void init(int limit, FragmentContext context, BufferAllocator allocator, boolean hasSv2) throws SchemaChangeException;
public void generate() throws SchemaChangeException;
public VectorContainer getHyperBatch();
public SelectionVector4 getHeapSv4();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
index d2d8d304a..e0e7e518f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
@@ -38,16 +38,18 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
private SelectionVector4 finalSv4;//This is for final sorted output
private ExpandableHyperContainer hyperBatch;
private FragmentContext context;
+ private BufferAllocator allocator;
private int limit;
private int queueSize = 0;
private int batchCount = 0;
private boolean hasSv2;
@Override
- public void init(int limit, FragmentContext context, boolean hasSv2) throws SchemaChangeException {
+ public void init(int limit, FragmentContext context, BufferAllocator allocator, boolean hasSv2) throws SchemaChangeException {
this.limit = limit;
this.context = context;
- BufferAllocator.PreAllocator preAlloc = context.getAllocator().getNewPreAllocator();
+ this.allocator = allocator;
+ BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
preAlloc.preAllocate(4 * (limit + 1));
heapSv4 = new SelectionVector4(preAlloc.getAllocation(), limit, Character.MAX_VALUE);
this.hasSv2 = hasSv2;
@@ -64,7 +66,7 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
newContainer.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
this.hyperBatch = new ExpandableHyperContainer(newContainer);
this.batchCount = hyperBatch.iterator().next().getValueVectors().length;
- BufferAllocator.PreAllocator preAlloc = context.getAllocator().getNewPreAllocator();
+ BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
preAlloc.preAllocate(4 * (limit + 1));
this.heapSv4 = new SelectionVector4(preAlloc.getAllocation(), limit, Character.MAX_VALUE);
for (int i = 0; i < v4.getTotalCount(); i++) {
@@ -113,7 +115,7 @@ public abstract class PriorityQueueTemplate implements PriorityQueue {
public void generate() throws SchemaChangeException {
Stopwatch watch = new Stopwatch();
watch.start();
- BufferAllocator.PreAllocator preAlloc = context.getAllocator().getNewPreAllocator();
+ BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
preAlloc.preAllocate(4 * queueSize);
finalSv4 = new SelectionVector4(preAlloc.getAllocation(), queueSize, 4000);
for (int i = queueSize - 1; i >= 0; i--) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 7073a6cc9..2a57aaa34 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -35,6 +35,8 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.HoldingContainerExpression;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.TopN;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
@@ -57,6 +59,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopNBatch.class);
private static final long MAX_SORT_BYTES = 1L * 1024 * 1024 * 1024;
+ public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
+ public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
private final int batchPurgeThreshold;
public final MappingSet MAIN_MAPPING = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP);
@@ -73,7 +77,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
private int batchCount;
private Copier copier;
- public TopNBatch(TopN popConfig, FragmentContext context, RecordBatch incoming) {
+ public TopNBatch(TopN popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
super(popConfig, context);
this.incoming = incoming;
this.config = popConfig;
@@ -88,7 +92,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
@Override
public void kill() {
incoming.kill();
- cleanup();
}
@Override
@@ -105,13 +108,13 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
@Override
public void cleanup() {
- super.cleanup();
if (sv4 != null) {
sv4.clear();
}
if (priorityQueue != null) {
priorityQueue.cleanup();
}
+ super.cleanup();
incoming.cleanup();
}
@@ -121,7 +124,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
if(getSelectionVector4().next()){
return IterOutcome.OK;
}else{
- cleanup();
return IterOutcome.NONE;
}
}
@@ -139,7 +141,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
case NOT_YET:
throw new UnsupportedOperationException();
case STOP:
- cleanup();
return upstream;
case OK_NEW_SCHEMA:
// only change in the case that the schema truly changes. Artificial schema changes are ignored.
@@ -198,21 +199,22 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
SimpleRecordBatch batch = new SimpleRecordBatch(c, selectionVector4, context);
SimpleRecordBatch newBatch = new SimpleRecordBatch(newContainer, null, context);
if (copier == null) {
- copier = RemovingRecordBatch.getGenerated4Copier(batch, context, newContainer, newBatch);
+ copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(), newContainer, newBatch);
} else {
List<VectorAllocator> allocators = Lists.newArrayList();
for(VectorWrapper<?> i : batch){
- ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator());
+ ValueVector v = TypeHelper.getNewVector(i.getField(), oContext.getAllocator());
newContainer.add(v);
allocators.add(RemovingRecordBatch.getAllocator4(v));
}
copier.setupRemover(context, batch, newBatch, allocators.toArray(new VectorAllocator[allocators.size()]));
}
- SortRecordBatchBuilder builder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES);
+ SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES);
do {
int count = selectionVector4.getCount();
- copier.copyRecords();
+ int copiedRecords = copier.copyRecords(0, count);
+ assert copiedRecords == count;
for(VectorWrapper<?> v : newContainer){
ValueVector.Mutator m = v.getValueVector().getMutator();
m.setValueCount(count);
@@ -264,7 +266,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
g.getEvalBlock()._return(JExpr.lit(0));
PriorityQueue q = context.getImplementationClass(cg);
- q.init(config.getLimit(), context, schema.getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE);
+ q.init(config.getLimit(), context, oContext.getAllocator(), schema.getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE);
return q;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
index 6c6e92c91..65669b12f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java
@@ -22,20 +22,14 @@ import java.util.Iterator;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.RawFragmentBatch;
-import org.apache.drill.exec.record.RawFragmentBatchProvider;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.record.TypedFieldId;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.*;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
-public class WireRecordBatch implements RecordBatch{
+public class WireRecordBatch implements RecordBatch {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WireRecordBatch.class);
private RecordBatchLoader batchLoader;
@@ -44,10 +38,10 @@ public class WireRecordBatch implements RecordBatch{
private BatchSchema schema;
- public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider fragProvider) {
+ public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider fragProvider) throws OutOfMemoryException {
this.fragProvider = fragProvider;
this.context = context;
- this.batchLoader = new RecordBatchLoader(context.getAllocator());
+ this.batchLoader = new RecordBatchLoader(null);
}
@Override
@@ -69,7 +63,7 @@ public class WireRecordBatch implements RecordBatch{
public void kill() {
fragProvider.kill(context);
}
-
+
@Override
public Iterator<VectorWrapper<?>> iterator() {
return batchLoader.iterator();
@@ -101,15 +95,19 @@ public class WireRecordBatch implements RecordBatch{
RawFragmentBatch batch = fragProvider.getNext();
// skip over empty batches. we do this since these are basically control messages.
- while(batch != null && batch.getHeader().getDef().getRecordCount() == 0){
+ while(batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0){
batch = fragProvider.getNext();
}
-
+
if (batch == null){
batchLoader.clear();
return IterOutcome.NONE;
}
-
+
+ if (batch.getHeader().getIsOutOfMemory()) {
+ return IterOutcome.OUT_OF_MEMORY;
+ }
+
// logger.debug("Next received batch {}", batch);
@@ -136,7 +134,7 @@ public class WireRecordBatch implements RecordBatch{
@Override
public void cleanup() {
+ fragProvider.cleanup();
}
-
-
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index a75aac9e7..ee5cfa842 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.ExpressionPosition;
@@ -42,6 +43,7 @@ import org.apache.drill.exec.expr.HoldingContainerExpression;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.ValueVectorWriteExpression;
import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.HashAggregate;
import org.apache.drill.exec.record.AbstractRecordBatch;
@@ -85,7 +87,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
private final MappingSet UpdateAggrValuesMapping = new MappingSet("incomingRowIdx" /* read index */, "outRowIdx" /* write index */, "htRowIdx" /* workspace index */, "incoming" /* read container */, "outgoing" /* write container */, "aggrValuesContainer" /* workspace container */, UPDATE_AGGR_INSIDE, UPDATE_AGGR_OUTSIDE, UPDATE_AGGR_INSIDE);
- public HashAggBatch(HashAggregate popConfig, RecordBatch incoming, FragmentContext context) {
+ public HashAggBatch(HashAggregate popConfig, RecordBatch incoming, FragmentContext context) throws ExecutionSetupException {
super(popConfig, context);
this.incoming = incoming;
}
@@ -197,7 +199,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
if(expr == null) continue;
final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
- ValueVector vv = TypeHelper.getNewVector(outputField, context.getAllocator());
+ ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator());
keyAllocators.add(VectorAllocator.getAllocator(vv, 50));
// add this group-by vector to the output container
@@ -213,7 +215,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
if(expr == null) continue;
final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
- ValueVector vv = TypeHelper.getNewVector(outputField, context.getAllocator());
+ ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator());
valueAllocators.add(VectorAllocator.getAllocator(vv, 50));
aggrOutFieldIds[i] = container.add(vv);
@@ -227,7 +229,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
container.buildSchema(SelectionVectorMode.NONE);
HashAggregator agg = context.getImplementationClass(top);
- agg.setup(popConfig, context, incoming, this,
+ agg.setup(popConfig, context, oContext.getAllocator(), incoming, this,
aggrExprs,
cgInner.getWorkspaceTypes(),
groupByOutFieldIds,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index b0f81efd1..d7abcd224 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -35,6 +35,7 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.HashAggregate;
import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
@@ -58,6 +59,9 @@ import com.google.common.collect.Lists;
public abstract class HashAggTemplate implements HashAggregator {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class);
+
+ private static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
+ private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
private static final boolean EXTRA_DEBUG_1 = false;
private static final boolean EXTRA_DEBUG_2 = false;
@@ -75,6 +79,7 @@ public abstract class HashAggTemplate implements HashAggregator {
private VectorAllocator[] keyAllocators;
private VectorAllocator[] valueAllocators;
private FragmentContext context;
+ private BufferAllocator allocator;
private HashAggregate hashAggrConfig;
private HashTable htable;
@@ -101,7 +106,7 @@ public abstract class HashAggTemplate implements HashAggregator {
for(int i = 0; i < materializedValueFields.length; i++) {
MaterializedField outputField = materializedValueFields[i];
// Create a type-specific ValueVector for this value
- vector = TypeHelper.getNewVector(outputField, context.getAllocator()) ;
+ vector = TypeHelper.getNewVector(outputField, allocator) ;
VectorAllocator.getAllocator(vector, 50 /* avg. width */).alloc(HashTable.BATCH_SIZE) ;
aggrValuesContainer.add(vector) ;
@@ -149,7 +154,7 @@ public abstract class HashAggTemplate implements HashAggregator {
@Override
- public void setup(HashAggregate hashAggrConfig, FragmentContext context, RecordBatch incoming, RecordBatch outgoing,
+ public void setup(HashAggregate hashAggrConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incoming, RecordBatch outgoing,
LogicalExpression[] valueExprs,
List<TypedFieldId> valueFieldIds,
TypedFieldId[] groupByOutFieldIds,
@@ -164,6 +169,7 @@ public abstract class HashAggTemplate implements HashAggregator {
}
this.context = context;
+ this.allocator = allocator;
this.incoming = incoming;
this.schema = incoming.getSchema();
this.keyAllocators = keyAllocators;
@@ -193,7 +199,7 @@ public abstract class HashAggTemplate implements HashAggregator {
}
}
- ChainedHashTable ht = new ChainedHashTable(hashAggrConfig.getHtConfig(), context, incoming, null /* no incoming probe */, outgoing) ;
+ ChainedHashTable ht = new ChainedHashTable(hashAggrConfig.getHtConfig(), context, allocator, incoming, null /* no incoming probe */, outgoing) ;
this.htable = ht.createAndSetupHashTable(groupByOutFieldIds) ;
batchHolders = new ArrayList<BatchHolder>();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
index b23dbeec6..9032f2a51 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
@@ -25,6 +25,7 @@ import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.HashAggregate;
import org.apache.drill.exec.record.RecordBatch;
@@ -40,7 +41,7 @@ public interface HashAggregator {
RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR
}
- public abstract void setup(HashAggregate hashAggrConfig, FragmentContext context, RecordBatch incoming,
+ public abstract void setup(HashAggregate hashAggrConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incoming,
RecordBatch outgoing, LogicalExpression[] valueExprs,
List<TypedFieldId> valueFieldIds,
TypedFieldId[] keyFieldIds,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index c942dc67d..88bada5ac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -39,6 +39,8 @@ import org.apache.drill.exec.expr.HoldingContainerExpression;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.ValueVectorWriteExpression;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.StreamingAggregate;
import org.apache.drill.exec.physical.impl.aggregate.StreamingAggregator.AggOutcome;
@@ -64,7 +66,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
private final RecordBatch incoming;
private boolean done = false;
- public StreamingAggBatch(StreamingAggregate popConfig, RecordBatch incoming, FragmentContext context) {
+ public StreamingAggBatch(StreamingAggregate popConfig, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
super(popConfig, context);
this.incoming = incoming;
}
@@ -105,7 +107,6 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount());
switch(out){
case CLEANUP_AND_RETURN:
- incoming.cleanup();
container.clear();
done = true;
return aggregator.getOutcome();
@@ -165,7 +166,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
if(expr == null) continue;
keyExprs[i] = expr;
final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
- ValueVector vector = TypeHelper.getNewVector(outputField, context.getAllocator());
+ ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
allocators.add(VectorAllocator.getAllocator(vector, 50));
keyOutputIds[i] = container.add(vector);
}
@@ -176,7 +177,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
if(expr == null) continue;
final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
- ValueVector vector = TypeHelper.getNewVector(outputField, context.getAllocator());
+ ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
allocators.add(VectorAllocator.getAllocator(vector, 50));
TypedFieldId id = container.add(vector);
valueExprs[i] = new ValueVectorWriteExpression(id, expr, true);
@@ -315,7 +316,11 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
}
-
+ @Override
+ public void cleanup() {
+ super.cleanup();
+ incoming.cleanup();
+ }
@Override
protected void killIncoming() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index 4b9e3adda..0a0158368 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -128,6 +128,6 @@ public class BroadcastSenderRootExec implements RootExec {
@Override
public void stop() {
ok = false;
- incoming.kill();
+ incoming.cleanup();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index ec579fc5b..e1179d05a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -41,6 +41,7 @@ import org.apache.drill.exec.expr.ValueVectorReadExpression;
import org.apache.drill.exec.expr.ValueVectorWriteExpression;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
import org.apache.drill.exec.expr.fn.impl.BitFunctions;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
@@ -93,18 +94,21 @@ public class ChainedHashTable {
private HashTableConfig htConfig;
private final FragmentContext context;
+ private final BufferAllocator allocator;
private final RecordBatch incomingBuild;
private final RecordBatch incomingProbe;
private final RecordBatch outgoing;
public ChainedHashTable(HashTableConfig htConfig,
FragmentContext context,
+ BufferAllocator allocator,
RecordBatch incomingBuild,
RecordBatch incomingProbe,
RecordBatch outgoing) {
this.htConfig = htConfig;
this.context = context;
+ this.allocator = allocator;
this.incomingBuild = incomingBuild;
this.incomingProbe = incomingProbe;
this.outgoing = outgoing;
@@ -136,7 +140,7 @@ public class ChainedHashTable {
final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
// create a type-specific ValueVector for this key
- ValueVector vv = TypeHelper.getNewVector(outputField, context.getAllocator());
+ ValueVector vv = TypeHelper.getNewVector(outputField, allocator);
VectorAllocator.getAllocator(vv, 50 /* avg width */).alloc(HashTable.BATCH_SIZE);
htKeyFieldIds[i] = htContainerOrig.add(vv);
@@ -171,7 +175,7 @@ public class ChainedHashTable {
setupGetHash(cg /* use top level code generator for getHash */, GetHashIncomingProbeMapping, keyExprsProbe);
HashTable ht = context.getImplementationClass(top);
- ht.setup(htConfig, context, incomingBuild, incomingProbe, outgoing, htContainerOrig);
+ ht.setup(htConfig, context, allocator, incomingBuild, incomingProbe, outgoing, htContainerOrig);
return ht;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index 2f1172a6c..e5959f2ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.common;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
@@ -43,7 +44,7 @@ public interface HashTable {
static final public int BATCH_SIZE = Character.MAX_VALUE+1;
static final public int BATCH_MASK = 0x0000FFFF;
- public void setup(HashTableConfig htConfig, FragmentContext context,
+ public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator,
RecordBatch incomingBuild, RecordBatch incomingProbe,
RecordBatch outgoing, VectorContainer htContainerOrig);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index f67939e14..23a0cf5b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -29,6 +29,7 @@ import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.common.types.Types;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.VectorContainer;
@@ -76,6 +77,8 @@ public abstract class HashTableTemplate implements HashTable {
private FragmentContext context;
+ private BufferAllocator allocator;
+
// The incoming build side record batch
private RecordBatch incomingBuild;
@@ -119,7 +122,7 @@ public abstract class HashTableTemplate implements HashTable {
} else { // otherwise create a new one using the original's fields
htContainer = new VectorContainer();
for (VectorWrapper<?> w : htContainerOrig) {
- ValueVector vv = TypeHelper.getNewVector(w.getField(), context.getAllocator());
+ ValueVector vv = TypeHelper.getNewVector(w.getField(), allocator);
VectorAllocator.getAllocator(vv, 50 /* avg width */).alloc(HashTable.BATCH_SIZE);
htContainer.add(vv);
}
@@ -131,10 +134,10 @@ public abstract class HashTableTemplate implements HashTable {
private void init(IntVector links, IntVector hashValues, int size) {
for (int i=0; i < size; i++) {
- links.getMutator().set(i, EMPTY_SLOT);
+ links.getMutator().setSafe(i, EMPTY_SLOT);
}
for (int i=0; i < size; i++) {
- hashValues.getMutator().set(i, 0);
+ hashValues.getMutator().setSafe(i, 0);
}
links.getMutator().setValueCount(size);
hashValues.getMutator().setValueCount(size);
@@ -181,8 +184,8 @@ public abstract class HashTableTemplate implements HashTable {
// since this is the last entry in the hash chain, the links array at position currentIdx
// will point to a null (empty) slot
- links.getMutator().set(currentIdxWithinBatch, EMPTY_SLOT);
- hashValues.getMutator().set(currentIdxWithinBatch, hashValue);
+ links.getMutator().setSafe(currentIdxWithinBatch, EMPTY_SLOT);
+ hashValues.getMutator().setSafe(currentIdxWithinBatch, hashValue);
maxOccupiedIdx = Math.max(maxOccupiedIdx, currentIdxWithinBatch);
@@ -192,7 +195,7 @@ public abstract class HashTableTemplate implements HashTable {
}
private void updateLinks(int lastEntryIdxWithinBatch, int currentIdx) {
- links.getMutator().set(lastEntryIdxWithinBatch, currentIdx);
+ links.getMutator().setSafe(lastEntryIdxWithinBatch, currentIdx);
}
private void rehash(int numbuckets, IntVector newStartIndices, int batchStartIdx) {
@@ -211,9 +214,9 @@ public abstract class HashTableTemplate implements HashTable {
int newStartIdx = newStartIndices.getAccessor().get(bucketIdx);
if (newStartIdx == EMPTY_SLOT) { // new bucket was empty
- newStartIndices.getMutator().set(bucketIdx, entryIdx); // update the start index to point to entry
- newLinks.getMutator().set(entryIdxWithinBatch, EMPTY_SLOT);
- newHashValues.getMutator().set(entryIdxWithinBatch, hash);
+ newStartIndices.getMutator().setSafe(bucketIdx, entryIdx); // update the start index to point to entry
+ newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT);
+ newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash);
if (EXTRA_DEBUG) logger.debug("New bucket was empty. bucketIdx = {}, newStartIndices[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, bucketIdx, newStartIndices.getAccessor().get(bucketIdx), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch));
@@ -224,9 +227,9 @@ public abstract class HashTableTemplate implements HashTable {
while (true) {
idxWithinBatch = idx & BATCH_MASK;
if (newLinks.getAccessor().get(idxWithinBatch) == EMPTY_SLOT) {
- newLinks.getMutator().set(idxWithinBatch, entryIdx);
- newLinks.getMutator().set(entryIdxWithinBatch, EMPTY_SLOT);
- newHashValues.getMutator().set(entryIdxWithinBatch, hash);
+ newLinks.getMutator().setSafe(idxWithinBatch, entryIdx);
+ newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT);
+ newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash);
if (EXTRA_DEBUG) logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, idxWithinBatch, newLinks.getAccessor().get(idxWithinBatch), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch));
@@ -319,7 +322,7 @@ public abstract class HashTableTemplate implements HashTable {
@Override
- public void setup(HashTableConfig htConfig, FragmentContext context,
+ public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator,
RecordBatch incomingBuild, RecordBatch incomingProbe,
RecordBatch outgoing, VectorContainer htContainerOrig) {
float loadf = htConfig.getLoadFactor();
@@ -333,6 +336,7 @@ public abstract class HashTableTemplate implements HashTable {
this.htConfig = htConfig;
this.context = context;
+ this.allocator = allocator;
this.incomingBuild = incomingBuild;
this.incomingProbe = incomingProbe;
this.outgoing = outgoing;
@@ -419,7 +423,7 @@ public abstract class HashTableTemplate implements HashTable {
if (insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch)) {
// update the start index array
- startIndices.getMutator().set(getBucketIndex(hash, numBuckets()), currentIdx);
+ startIndices.getMutator().setSafe(getBucketIndex(hash, numBuckets()), currentIdx);
htIdxHolder.value = currentIdx;
return PutStatus.KEY_ADDED;
}
@@ -600,10 +604,10 @@ public abstract class HashTableTemplate implements HashTable {
}
private IntVector allocMetadataVector(int size, int initialValue) {
- IntVector vector = (IntVector) TypeHelper.getNewVector(dummyIntField, context.getAllocator());
+ IntVector vector = (IntVector) TypeHelper.getNewVector(dummyIntField, allocator);
vector.allocateNew(size);
for (int i=0; i < size; i++) {
- vector.getMutator().set(i, initialValue);
+ vector.getMutator().setSafe(i, initialValue);
}
vector.getMutator().setValueCount(size);
return vector;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
index 5f2bc4dbd..c5c81c6e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.filter;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.Filter;
import org.apache.drill.exec.physical.impl.BatchCreator;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index 1cd418c2d..566dfe0aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.Filter;
import org.apache.drill.exec.record.*;
@@ -50,7 +51,7 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
private BufferAllocator.PreAllocator svAllocator;
private Filterer filter;
- public FilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext context) {
+ public FilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
super(pop, context, incoming);
}
@@ -78,17 +79,18 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
protected void doWork() {
int recordCount = incoming.getRecordCount();
filter.filterBatch(recordCount);
- for(VectorWrapper<?> v : container){
- ValueVector.Mutator m = v.getValueVector().getMutator();
- m.setValueCount(recordCount);
- }
+// for(VectorWrapper<?> v : container){
+// ValueVector.Mutator m = v.getValueVector().getMutator();
+// m.setValueCount(recordCount);
+// }
}
@Override
public void cleanup() {
- super.cleanup();
if(sv2 != null) sv2.clear();
+ if(sv4 != null) sv4.clear();
+ super.cleanup();
}
@Override
@@ -97,16 +99,16 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
switch(incoming.getSchema().getSelectionVectorMode()){
case NONE:
- sv2 = new SelectionVector2(context.getAllocator());
+ sv2 = new SelectionVector2(oContext.getAllocator());
this.filter = generateSV2Filterer();
break;
case TWO_BYTE:
- sv2 = new SelectionVector2(context.getAllocator());
+ sv2 = new SelectionVector2(oContext.getAllocator());
this.filter = generateSV2Filterer();
break;
case FOUR_BYTE:
// set up the multi-batch selection vector
- this.svAllocator = context.getAllocator().getNewPreAllocator();
+ this.svAllocator = oContext.getAllocator().getNewPreAllocator();
if (!svAllocator.preAllocate(incoming.getRecordCount()*4))
throw new SchemaChangeException("Attempted to filter an SV4 which exceeds allowed memory (" +
incoming.getRecordCount() * 4 + " bytes)");
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 5f0cc94d7..b624b3082 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -20,6 +20,8 @@ package org.apache.drill.exec.physical.impl.join;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.record.*;
import org.eigenbase.rel.JoinRelType;
@@ -50,6 +52,10 @@ import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.allocator.VectorAllocator;
public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
+
+ public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
+ public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
+
// Probe side record batch
private final RecordBatch left;
@@ -136,7 +142,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
if (hashJoinProbe == null) {
// Initialize the hash join helper context
- hjHelper = new HashJoinHelper(context);
+ hjHelper = new HashJoinHelper(context, oContext.getAllocator());
/* Build phase requires setting up the hash table. Hash table will
* materialize both the build and probe side expressions while
@@ -185,13 +191,11 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
}
// No more output records, clean up and return
- cleanup();
return IterOutcome.NONE;
} catch (ClassTransformationException | SchemaChangeException | IOException e) {
context.fail(e);
killIncoming();
- cleanup();
return IterOutcome.STOP;
}
}
@@ -222,7 +226,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
// Create the chained hash table
- ChainedHashTable ht = new ChainedHashTable(htConfig, context, this.right, this.left, null);
+ ChainedHashTable ht = new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null);
hashTable = ht.createAndSetupHashTable(null);
}
@@ -322,15 +326,16 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), fieldId, true));
JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), fieldId, false));
-
- g.getEvalBlock().add(outVV.invoke("copyFrom")
- .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
- .arg(outIndex)
- .arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))));
+ g.getEvalBlock()._if(outVV.invoke("copyFromSafe")
+ .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
+ .arg(outIndex)
+ .arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))).not())._then()._return(JExpr.FALSE);
fieldId++;
}
}
+ g.rotateBlock();
+ g.getEvalBlock()._return(JExpr.TRUE);
// Generate the code to project probe side records
g.setMappingSet(projectProbeMapping);
@@ -350,14 +355,18 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(vv.getField().getType(), fieldId, false));
JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), outputFieldId, false));
- g.getEvalBlock().add(outVV.invoke("copyFrom").arg(probeIndex).arg(outIndex).arg(inVV));
+ g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE);
fieldId++;
outputFieldId++;
}
+ g.rotateBlock();
+ g.getEvalBlock()._return(JExpr.TRUE);
+
recordCount = left.getRecordCount();
}
+
HashJoinProbe hj = context.getImplementationClass(cg);
hj.setupHashJoinProbe(context, hyperContainer, left, recordCount, this, hashTable, hjHelper, joinType);
@@ -370,7 +379,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
}
}
- public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) {
+ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
super(popConfig, context);
this.left = left;
this.right = right;
@@ -382,13 +391,11 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
public void killIncoming() {
this.left.kill();
this.right.kill();
- cleanup();
}
@Override
public void cleanup() {
- left.cleanup();
- right.cleanup();
+ hyperContainer.clear();
hjHelper.clear();
container.clear();
@@ -398,5 +405,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
hashTable.clear();
}
super.cleanup();
+ left.cleanup();
+ right.cleanup();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
index 0728ac95e..b1ed07ee1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java
@@ -25,6 +25,7 @@ import java.util.List;
import io.netty.buffer.ByteBuf;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.physical.impl.common.HashTable;
@@ -60,6 +61,7 @@ public class HashJoinHelper {
// Fragment context
FragmentContext context;
+ BufferAllocator allocator;
// Constant to indicate index is empty.
static final int INDEX_EMPTY = -1;
@@ -67,8 +69,9 @@ public class HashJoinHelper {
// bits to shift while obtaining batch index from SV4
static final int SHIFT_SIZE = 16;
- public HashJoinHelper(FragmentContext context) {
+ public HashJoinHelper(FragmentContext context, BufferAllocator allocator) {
this.context = context;
+ this.allocator = allocator;
}
public void addStartIndexBatch() throws SchemaChangeException {
@@ -102,7 +105,7 @@ public class HashJoinHelper {
public SelectionVector4 getNewSV4(int recordCount) throws SchemaChangeException {
- ByteBuf vector = context.getAllocator().buffer((recordCount * 4));
+ ByteBuf vector = allocator.buffer((recordCount * 4));
SelectionVector4 sv4 = new SelectionVector4(vector, recordCount, recordCount);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
index 0ffdf52a1..160d35285 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
@@ -52,6 +52,6 @@ public interface HashJoinProbe {
JoinRelType joinRelType);
public abstract void doSetup(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, RecordBatch outgoing);
public abstract int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException;
- public abstract void projectBuildRecord(int buildIndex, int outIndex);
- public abstract void projectProbeRecord(int probeIndex, int outIndex);
+ public abstract boolean projectBuildRecord(int buildIndex, int outIndex);
+ public abstract boolean projectProbeRecord(int probeIndex, int outIndex);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index 3d6f4d6e6..0abf678a8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -94,7 +94,8 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
public void executeProjectRightPhase() {
while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsProcessed < recordsToProcess) {
- projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed++), outputRecords++);
+ boolean success = projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed++), outputRecords++);
+ assert success;
}
}
@@ -146,8 +147,10 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
*/
hjHelper.setRecordMatched(currentCompositeIdx);
- projectBuildRecord(currentCompositeIdx, outputRecords);
- projectProbeRecord(recordsProcessed, outputRecords);
+ boolean success = projectBuildRecord(currentCompositeIdx, outputRecords);
+ assert success;
+ success = projectProbeRecord(recordsProcessed, outputRecords);
+ assert success;
outputRecords++;
/* Projected single row from the build side with matching key but there
@@ -179,7 +182,8 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
}
else {
hjHelper.setRecordMatched(currentCompositeIdx);
- projectBuildRecord(currentCompositeIdx, outputRecords);
+ boolean success = projectBuildRecord(currentCompositeIdx, outputRecords);
+ assert success;
projectProbeRecord(recordsProcessed, outputRecords);
outputRecords++;
@@ -221,6 +225,6 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
public abstract void doSetup(@Named("context") FragmentContext context, @Named("buildBatch") VectorContainer buildBatch, @Named("probeBatch") RecordBatch probeBatch,
@Named("outgoing") RecordBatch outgoing);
- public abstract void projectBuildRecord(@Named("buildIndex") int buildIndex, @Named("outIndex") int outIndex);
- public abstract void projectProbeRecord(@Named("probeIndex") int probeIndex, @Named("outIndex") int outIndex);
+ public abstract boolean projectBuildRecord(@Named("buildIndex") int buildIndex, @Named("outIndex") int outIndex);
+ public abstract boolean projectProbeRecord(@Named("probeIndex") int probeIndex, @Named("outIndex") int outIndex);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index bbdfbe5db..db90085de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -38,6 +38,8 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.MergeJoinPOP;
import org.apache.drill.exec.physical.impl.join.JoinWorker.JoinOutcome;
@@ -64,6 +66,9 @@ import com.sun.codemodel.JVar;
public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinBatch.class);
+
+ public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
+ public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
public final MappingSet setupMapping =
new MappingSet("null", "null",
@@ -103,9 +108,9 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
private JoinWorker worker;
public MergeJoinBatchBuilder batchBuilder;
- protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) {
+ protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
super(popConfig, context);
-
+
if (popConfig.getConditions().size() == 0) {
throw new UnsupportedOperationException("Merge Join currently does not support cartesian join. This join operator was configured with 0 conditions");
}
@@ -113,7 +118,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
this.right = right;
this.joinType = popConfig.getJoinType();
this.status = new JoinStatus(left, right, this);
- this.batchBuilder = new MergeJoinBatchBuilder(context, status);
+ this.batchBuilder = new MergeJoinBatchBuilder(oContext.getAllocator(), status);
this.conditions = popConfig.getConditions();
}
@@ -204,7 +209,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
}
public void resetBatchBuilder() {
- batchBuilder = new MergeJoinBatchBuilder(context, status);
+ batchBuilder = new MergeJoinBatchBuilder(oContext.getAllocator(), status);
}
public void addRightToBatchBuilder() {
@@ -384,7 +389,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
// add fields from both batches
if (leftCount > 0) {
for (VectorWrapper<?> w : left) {
- ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), context.getAllocator());
+ ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), oContext.getAllocator());
VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVector().getBufferSize() / left.getRecordCount())).alloc(joinBatchSize);
container.add(outgoingVector);
}
@@ -392,7 +397,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
if (rightCount > 0) {
for (VectorWrapper<?> w : right) {
- ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), context.getAllocator());
+ ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), oContext.getAllocator());
VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVector().getBufferSize() / right.getRecordCount())).alloc(joinBatchSize);
container.add(outgoingVector);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
index f665c1f61..a75437ce3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.join;
import com.google.common.collect.ArrayListMultimap;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.BufferAllocator.PreAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
@@ -38,10 +39,10 @@ public class MergeJoinBatchBuilder {
private PreAllocator svAllocator;
private JoinStatus status;
- public MergeJoinBatchBuilder(FragmentContext context, JoinStatus status) {
+ public MergeJoinBatchBuilder(BufferAllocator allocator, JoinStatus status) {
this.container = new VectorContainer();
this.status = status;
- this.svAllocator = context.getAllocator().getNewPreAllocator();
+ this.svAllocator = allocator.getNewPreAllocator();
}
public boolean add(RecordBatch batch) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index dcd452eb8..3f2ec27fe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.limit;
import com.beust.jcommander.internal.Lists;
import com.google.common.base.Objects;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.Limit;
import org.apache.drill.exec.record.*;
@@ -38,9 +39,9 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
private boolean skipBatch;
List<TransferPair> transfers = Lists.newArrayList();
- public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch incoming) {
+ public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
super(popConfig, context, incoming);
- outgoingSv = new SelectionVector2(context.getAllocator());
+ outgoingSv = new SelectionVector2(oContext.getAllocator());
recordsToSkip = popConfig.getFirst();
noEndLimit = popConfig.getLast() == null;
if(!noEndLimit) {
@@ -79,8 +80,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
@Override
public IterOutcome next() {
if(!noEndLimit && recordsLeft <= 0) {
- killIncoming();
- cleanup();
+ // don't kill incoming batches or call cleanup yet, as this could close allocators before the buffers have been cleared
return IterOutcome.NONE;
}
@@ -160,7 +160,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
@Override
public void cleanup(){
- super.cleanup();
outgoingSv.clear();
+ super.cleanup();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java
index 9096018ad..46a156f97 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java
@@ -35,7 +35,7 @@ public interface MergingReceiverGeneratorBase {
public abstract int doCompare(MergingRecordBatch.Node left,
MergingRecordBatch.Node right);
- public abstract void doCopy(int inBatch, int inIndex, int outIndex);
+ public abstract boolean doCopy(int inBatch, int inIndex, int outIndex);
public static TemplateClassDefinition<MergingReceiverGeneratorBase> TEMPLATE_DEFINITION =
new TemplateClassDefinition<>(MergingReceiverGeneratorBase.class, MergingReceiverTemplate.class);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java
index 6945b4dac..197e96039 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java
@@ -55,7 +55,7 @@ public abstract class MergingReceiverTemplate implements MergingReceiverGenerato
* @param inIndex incoming record position to copy from
* @param outIndex outgoing record position to copy to
*/
- public abstract void doCopy(@Named("inBatch") int inBatch, @Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+ public abstract boolean doCopy(@Named("inBatch") int inBatch, @Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
// public abstract void doEval(@Named("inBatch") int inBatch, @Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index dcfe02f09..a2c424fe2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -41,19 +41,12 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.ValueVectorReadExpression;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.MergingReceiverPOP;
import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.RawFragmentBatch;
-import org.apache.drill.exec.record.RawFragmentBatchProvider;
-import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.record.RecordBatchLoader;
-import org.apache.drill.exec.record.SchemaBuilder;
-import org.apache.drill.exec.record.TypedFieldId;
-import org.apache.drill.exec.record.VectorContainer;
-import org.apache.drill.exec.record.VectorWrapper;
-import org.apache.drill.exec.record.WritableBatch;
+import org.apache.drill.exec.record.*;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.ValueVector;
@@ -74,9 +67,12 @@ import com.sun.codemodel.JVar;
/**
* The MergingRecordBatch merges pre-sorted record batches from remote senders.
*/
-public class MergingRecordBatch implements RecordBatch {
+public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> implements RecordBatch {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergingRecordBatch.class);
+ private static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
+ private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
+
private RecordBatchLoader[] batchLoaders;
private RawFragmentBatchProvider[] fragProviders;
private FragmentContext context;
@@ -98,8 +94,9 @@ public class MergingRecordBatch implements RecordBatch {
public MergingRecordBatch(FragmentContext context,
MergingReceiverPOP config,
- RawFragmentBatchProvider[] fragProviders) {
+ RawFragmentBatchProvider[] fragProviders) throws OutOfMemoryException {
+ super(config, context);
this.fragProviders = fragProviders;
this.context = context;
this.config = config;
@@ -154,7 +151,7 @@ public class MergingRecordBatch implements RecordBatch {
batchLoaders = new RecordBatchLoader[senderCount];
for (int i = 0; i < senderCount; ++i) {
incomingBatches[i] = rawBatches.get(i);
- batchLoaders[i] = new RecordBatchLoader(context.getAllocator());
+ batchLoaders[i] = new RecordBatchLoader(oContext.getAllocator());
}
int i = 0;
@@ -182,7 +179,7 @@ public class MergingRecordBatch implements RecordBatch {
bldr.addField(v.getField());
// allocate a new value vector
- ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), context.getAllocator());
+ ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), oContext.getAllocator());
VectorAllocator allocator = VectorAllocator.getAllocator(outgoingVector, 50);
allocator.alloc(DEFAULT_ALLOC_RECORD_COUNT);
allocators.add(allocator);
@@ -226,7 +223,10 @@ public class MergingRecordBatch implements RecordBatch {
while (!pqueue.isEmpty()) {
// pop next value from pq and copy to outgoing batch
Node node = pqueue.peek();
- copyRecordToOutgoingBatch(pqueue.poll());
+ if (!copyRecordToOutgoingBatch(node)) {
+ break;
+ }
+ pqueue.poll();
if (isOutgoingFull()) {
// set a flag so that we reallocate on the next iteration
@@ -320,10 +320,15 @@ public class MergingRecordBatch implements RecordBatch {
@Override
public void kill() {
+ cleanup();
for (RawFragmentBatchProvider provider : fragProviders) {
provider.kill(context);
}
-
+ }
+
+ @Override
+ protected void killIncoming() {
+ //No op
}
@Override
@@ -593,17 +598,19 @@ public class MergingRecordBatch implements RecordBatch {
// ((IntVector) outgoingVectors[i]).copyFrom(inIndex,
// outgoingBatch.getRecordCount(),
// (IntVector) vv1);
- cg.getEvalBlock().add(
+ cg.getEvalBlock()._if(
((JExpression) JExpr.cast(vvClass, outgoingVectors.component(JExpr.lit(fieldIdx))))
- .invoke("copyFrom")
+ .invoke("copyFromSafe")
.arg(inIndex)
.arg(outIndex)
.arg(JExpr.cast(vvClass,
((JExpression) incomingVectors.component(JExpr.direct("inBatch")))
- .component(JExpr.lit(fieldIdx)))));
+ .component(JExpr.lit(fieldIdx)))).not())._then()._return(JExpr.FALSE);
++fieldIdx;
}
+ cg.rotateBlock();
+ cg.getEvalBlock()._return(JExpr.TRUE);
// compile generated code and call the generated setup method
MergingReceiverGeneratorBase newMerger;
@@ -618,12 +625,17 @@ public class MergingRecordBatch implements RecordBatch {
/**
* Copy the record referenced by the supplied node to the next output position.
- * Side Effect: increments outgoing position
+ * Side Effect: increments outgoing position if successful
*
* @param node Reference to the next record to copy from the incoming batches
*/
- private void copyRecordToOutgoingBatch(Node node) {
- merger.doCopy(node.batchId, node.valueIndex, outgoingPosition++);
+ private boolean copyRecordToOutgoingBatch(Node node) {
+ if (!merger.doCopy(node.batchId, node.valueIndex, outgoingPosition)) {
+ return false;
+ } else {
+ outgoingPosition++;
+ return true;
+ }
}
/**
@@ -647,6 +659,12 @@ public class MergingRecordBatch implements RecordBatch {
rbl.clear();
}
}
+ oContext.close();
+ if (fragProviders != null) {
+ for (RawFragmentBatchProvider f : fragProviders) {
+ f.cleanup();
+ }
+ }
}
} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
index 8563d1c94..dd7011afb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java
@@ -64,7 +64,9 @@ public abstract class OrderedPartitionProjectorTemplate implements OrderedPartit
int counter = 0;
for (int i = 0; i < countN; i++, firstOutputIndex++) {
int partition = getPartition(i);
- partitionValues.getMutator().set(i, partition);
+ if (!partitionValues.getMutator().setSafe(i, partition)) {
+ throw new RuntimeException();
+ }
counter++;
}
for(TransferPair t : transfers){
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index 36428cea9..4641de6e1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -41,12 +41,15 @@ import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.ValueVectorReadExpression;
import org.apache.drill.exec.expr.ValueVectorWriteExpression;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.OrderedPartitionSender;
import org.apache.drill.exec.physical.impl.sort.SortBatch;
@@ -84,6 +87,9 @@ import com.sun.codemodel.JExpr;
public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPartitionSender> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedPartitionRecordBatch.class);
+ private static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
+ private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
+
public final MappingSet mainMapping = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_CONSTANT_MAP,
ClassGenerator.DEFAULT_SCALAR_MAP);
public final MappingSet incomingMapping = new MappingSet("inIndex", null, "incoming", null,
@@ -115,7 +121,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
private final String mapKey;
private List<VectorContainer> sampledIncomingBatches;
- public OrderedPartitionRecordBatch(OrderedPartitionSender pop, RecordBatch incoming, FragmentContext context) {
+ public OrderedPartitionRecordBatch(OrderedPartitionSender pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
super(pop, context);
this.incoming = incoming;
this.partitions = pop.getDestinations().size();
@@ -134,13 +140,14 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
SchemaPath outputPath = popConfig.getRef();
MaterializedField outputField = MaterializedField.create(outputPath, Types.required(TypeProtos.MinorType.INT));
- this.partitionKeyVector = (IntVector) TypeHelper.getNewVector(outputField, context.getAllocator());
+ this.partitionKeyVector = (IntVector) TypeHelper.getNewVector(outputField, oContext.getAllocator());
}
@Override
public void cleanup() {
+ incoming.cleanup();
super.cleanup();
this.partitionVectors.clear();
this.partitionKeyVector.clear();
@@ -153,7 +160,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
// Start collecting batches until recordsToSample records have been collected
- SortRecordBatchBuilder builder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES);
+ SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES);
builder.add(incoming);
recordsSampled += incoming.getRecordCount();
@@ -190,9 +197,20 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
// popConfig.orderings.
VectorContainer containerToCache = new VectorContainer();
- SampleCopier copier = getCopier(sv4, sortedSamples, containerToCache, popConfig.getOrderings());
- copier.copyRecords(recordsSampled / (samplingFactor * partitions), 0, samplingFactor * partitions);
-
+ List<ValueVector> localAllocationVectors = Lists.newArrayList();
+ SampleCopier copier = getCopier(sv4, sortedSamples, containerToCache, popConfig.getOrderings(), localAllocationVectors);
+ int allocationSize = 50;
+ while (true) {
+ for (ValueVector vv : localAllocationVectors) {
+ AllocationHelper.allocate(vv, samplingFactor * partitions, allocationSize);
+ }
+ if (copier.copyRecords(recordsSampled / (samplingFactor * partitions), 0, samplingFactor * partitions)) {
+ break;
+ } else {
+ containerToCache.zeroVectors();
+ allocationSize *= 2;
+ }
+ }
for (VectorWrapper<?> vw : containerToCache) {
vw.getValueVector().getMutator().setValueCount(copier.getOutputRecords());
}
@@ -202,7 +220,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
// into a serializable wrapper object, and then add to distributed map
WritableBatch batch = WritableBatch.getBatchNoHVWrap(containerToCache.getRecordCount(), containerToCache, false);
- VectorAccessibleSerializable sampleToSave = new VectorAccessibleSerializable(batch, context.getAllocator());
+ VectorAccessibleSerializable sampleToSave = new VectorAccessibleSerializable(batch, oContext.getAllocator());
mmap.put(mapKey, sampleToSave);
this.sampledIncomingBatches = builder.getHeldRecordBatches();
@@ -283,7 +301,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
// Get all samples from distributed map
- SortRecordBatchBuilder containerBuilder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES);
+ SortRecordBatchBuilder containerBuilder = new SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES);
for (VectorAccessibleSerializable w : mmap.get(mapKey)) {
containerBuilder.add(w.get());
}
@@ -306,12 +324,25 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
// Copy every Nth record from the samples into a candidate partition table, where N = totalSampledRecords/partitions
// Attempt to push this to the distributed map. Only the first candidate to get pushed will be used.
VectorContainer candidatePartitionTable = new VectorContainer();
- SampleCopier copier = getCopier(newSv4, allSamplesContainer, candidatePartitionTable, orderDefs);
- int skipRecords = containerBuilder.getSv4().getTotalCount() / partitions;
- copier.copyRecords(skipRecords, skipRecords, partitions - 1);
- assert copier.getOutputRecords() == partitions - 1 : String.format("output records: %d partitions: %d", copier.getOutputRecords(), partitions);
- for (VectorWrapper<?> vw : candidatePartitionTable) {
- vw.getValueVector().getMutator().setValueCount(copier.getOutputRecords());
+ SampleCopier copier = null;
+ List<ValueVector> localAllocationVectors = Lists.newArrayList();
+ copier = getCopier(newSv4, allSamplesContainer, candidatePartitionTable, orderDefs, localAllocationVectors);
+ int allocationSize = 50;
+ while (true) {
+ for (ValueVector vv : localAllocationVectors) {
+ AllocationHelper.allocate(vv, samplingFactor * partitions, allocationSize);
+ }
+ int skipRecords = containerBuilder.getSv4().getTotalCount() / partitions;
+ if (copier.copyRecords(skipRecords, skipRecords, partitions - 1)) {
+ assert copier.getOutputRecords() == partitions - 1 : String.format("output records: %d partitions: %d", copier.getOutputRecords(), partitions);
+ for (VectorWrapper<?> vw : candidatePartitionTable) {
+ vw.getValueVector().getMutator().setValueCount(copier.getOutputRecords());
+ }
+ break;
+ } else {
+ candidatePartitionTable.zeroVectors();
+ allocationSize *= 2;
+ }
}
candidatePartitionTable.setRecordCount(copier.getOutputRecords());
WritableBatch batch = WritableBatch.getBatchNoHVWrap(candidatePartitionTable.getRecordCount(), candidatePartitionTable, false);
@@ -339,8 +370,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
* @throws SchemaChangeException
*/
private SampleCopier getCopier(SelectionVector4 sv4, VectorContainer incoming, VectorContainer outgoing,
- List<Ordering> orderings) throws SchemaChangeException {
- List<ValueVector> localAllocationVectors = Lists.newArrayList();
+ List<Ordering> orderings, List<ValueVector> localAllocationVectors) throws SchemaChangeException {
final ErrorCollector collector = new ErrorCollectorImpl();
final ClassGenerator<SampleCopier> cg = CodeGenerator.getRoot(SampleCopier.TEMPLATE_DEFINITION,
context.getFunctionRegistry());
@@ -358,16 +388,15 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
"Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
}
- ValueVector vector = TypeHelper.getNewVector(outputField, context.getAllocator());
+ ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
localAllocationVectors.add(vector);
TypedFieldId fid = outgoing.add(vector);
- ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr);
- cg.addExpr(write);
- logger.debug("Added eval.");
- }
- for (ValueVector vv : localAllocationVectors) {
- AllocationHelper.allocate(vv, samplingFactor * partitions, 50);
+ ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
+ HoldingContainer hc = cg.addExpr(write);
+ cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
}
+ cg.rotateBlock();
+ cg.getEvalBlock()._return(JExpr.TRUE);
outgoing.buildSchema(BatchSchema.SelectionVectorMode.NONE);
try {
SampleCopier sampleCopier = context.getImplementationClass(cg);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopier.java
index ddb605bde..3af35723f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopier.java
@@ -27,7 +27,7 @@ public interface SampleCopier {
public static TemplateClassDefinition<SampleCopier> TEMPLATE_DEFINITION = new TemplateClassDefinition<SampleCopier>(SampleCopier.class, SampleCopierTemplate.class);
public void setupCopier(FragmentContext context, SelectionVector4 sv4, VectorAccessible incoming, VectorAccessible outgoing) throws SchemaChangeException;
- public abstract void copyRecords(int skip, int start, int total);
+ public abstract boolean copyRecords(int skip, int start, int total);
public int getOutputRecords();
} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopierTemplate.java
index ddb56c1f7..73fcd1fd7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopierTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopierTemplate.java
@@ -44,19 +44,22 @@ public abstract class SampleCopierTemplate implements SampleCopier {
@Override
- public void copyRecords(int skip, int start, int total) {
+ public boolean copyRecords(int skip, int start, int total) {
final int recordCount = sv4.getCount();
int outgoingPosition = 0;
int increment = skip > 0 ? skip : 1;
for(int svIndex = start; svIndex < sv4.getCount() && outputRecords < total; svIndex += increment, outgoingPosition++){
int deRefIndex = sv4.get(svIndex);
- doEval(deRefIndex, outgoingPosition);
+ if (!doEval(deRefIndex, outgoingPosition)) {
+ return false;
+ }
outputRecords++;
}
+ return true;
}
public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
- public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+ public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
index 94fd38531..6e115a7fc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.physical.impl.SendingAccountor;
@@ -42,6 +43,7 @@ import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.util.VectorUtil;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.allocator.VectorAllocator;
import org.apache.drill.exec.work.ErrorHelper;
@@ -60,6 +62,7 @@ public class OutgoingRecordBatch implements VectorAccessible {
private final HashPartitionSender operator;
private final RecordBatch incoming;
private final FragmentContext context;
+ private final BufferAllocator allocator;
private final VectorContainer vectorContainer = new VectorContainer();
private final SendingAccountor sendCount;
private final int oppositeMinorFragmentId;
@@ -72,9 +75,11 @@ public class OutgoingRecordBatch implements VectorAccessible {
private static int DEFAULT_ALLOC_SIZE = 20000;
private static int DEFAULT_VARIABLE_WIDTH_SIZE = 2048;
- public OutgoingRecordBatch(SendingAccountor sendCount, HashPartitionSender operator, DataTunnel tunnel, RecordBatch incoming, FragmentContext context, int oppositeMinorFragmentId) {
+ public OutgoingRecordBatch(SendingAccountor sendCount, HashPartitionSender operator, DataTunnel tunnel, RecordBatch incoming,
+ FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId) {
this.incoming = incoming;
this.context = context;
+ this.allocator = allocator;
this.operator = operator;
this.tunnel = tunnel;
this.sendCount = sendCount;
@@ -111,6 +116,7 @@ public class OutgoingRecordBatch implements VectorAccessible {
w.getValueVector().getMutator().setValueCount(recordCount);
}
+
// BatchPrinter.printBatch(vectorContainer);
FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast,
@@ -170,7 +176,7 @@ public class OutgoingRecordBatch implements VectorAccessible {
bldr.addField(v.getField());
// allocate a new value vector
- ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), context.getAllocator());
+ ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), allocator);
VectorAllocator.getAllocator(outgoingVector, 100).alloc(recordCapacity);
vectorContainer.add(outgoingVector);
// logger.debug("Reallocating to cap " + recordCapacity + " because of newly init'd vector : " + v.getValueVector());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 3e3157b48..604808547 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -29,13 +29,17 @@ import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.physical.impl.RootExec;
import org.apache.drill.exec.physical.impl.SendingAccountor;
import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorWrapper;
@@ -57,6 +61,7 @@ public class PartitionSenderRootExec implements RootExec {
private OutgoingRecordBatch[] outgoing;
private Partitioner partitioner;
private FragmentContext context;
+ private OperatorContext oContext;
private boolean ok = true;
private AtomicLong batchesSent = new AtomicLong(0);
private final SendingAccountor sendCount = new SendingAccountor();
@@ -64,11 +69,12 @@ public class PartitionSenderRootExec implements RootExec {
public PartitionSenderRootExec(FragmentContext context,
RecordBatch incoming,
- HashPartitionSender operator) {
+ HashPartitionSender operator) throws OutOfMemoryException {
this.incoming = incoming;
this.operator = operator;
this.context = context;
+ this.oContext = new OperatorContext(operator, context);
this.outgoing = new OutgoingRecordBatch[operator.getDestinations().size()];
int fieldId = 0;
for (CoordinationProtos.DrillbitEndpoint endpoint : operator.getDestinations()) {
@@ -77,6 +83,7 @@ public class PartitionSenderRootExec implements RootExec {
context.getDataTunnel(endpoint, opposite),
incoming,
context,
+ oContext.getAllocator(),
fieldId);
fieldId++;
}
@@ -252,16 +259,17 @@ public class PartitionSenderRootExec implements RootExec {
// ((IntVector) outgoingVectors[bucket][0]).copyFrom(inIndex,
// outgoingBatches[bucket].getRecordCount(),
// vv1);
- cg.getEvalBlock().add(
+ cg.getEvalBlock()._if(
((JExpression) JExpr.cast(vvClass,
((JExpression)
outgoingVectors
.component(bucket))
.component(JExpr.lit(fieldId))))
- .invoke("copyFrom")
+ .invoke("copyFromSafe")
.arg(inIndex)
.arg(((JExpression) outgoingBatches.component(bucket)).invoke("getRecordCount"))
- .arg(incomingVV));
+ .arg(incomingVV).not())._then().add(((JExpression) outgoingBatches.component(bucket)).invoke("flush"))
+ ._return();
++fieldId;
}
@@ -306,7 +314,8 @@ public class PartitionSenderRootExec implements RootExec {
for(OutgoingRecordBatch b : outgoing){
b.clear();
}
- incoming.cleanup();
sendCount.waitForSendComplete();
+ oContext.close();
+ incoming.cleanup();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index e8ee3ccb8..347092a36 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -22,6 +22,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import com.sun.codemodel.JExpr;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.FieldReference;
@@ -34,13 +35,16 @@ import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.ValueVectorReadExpression;
import org.apache.drill.exec.expr.ValueVectorWriteExpression;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression;
import org.apache.drill.exec.record.AbstractSingleRecordBatch;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.MaterializedField;
@@ -48,6 +52,7 @@ import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.util.VectorUtil;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
@@ -59,26 +64,81 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
private Projector projector;
private List<ValueVector> allocationVectors;
+ private boolean hasRemainder = false;
+ private int remainderIndex = 0;
+ private int recordCount;
- public ProjectRecordBatch(Project pop, RecordBatch incoming, FragmentContext context){
+ public ProjectRecordBatch(Project pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
super(pop, context, incoming);
}
@Override
public int getRecordCount() {
- return incoming.getRecordCount();
+ return recordCount;
+ }
+
+ @Override
+ public IterOutcome next() {
+ if (hasRemainder) {
+ handleRemainder();
+ return IterOutcome.OK;
+ }
+ return super.next();
}
@Override
protected void doWork() {
- int recordCount = incoming.getRecordCount();
+// VectorUtil.showVectorAccessibleContent(incoming, ",");
+ int incomingRecordCount = incoming.getRecordCount();
for(ValueVector v : this.allocationVectors){
- AllocationHelper.allocate(v, recordCount, 250);
+ AllocationHelper.allocate(v, incomingRecordCount, 250);
}
- projector.projectRecords(recordCount, 0);
- for(VectorWrapper<?> v : container){
- ValueVector.Mutator m = v.getValueVector().getMutator();
- m.setValueCount(recordCount);
+ int outputRecords = projector.projectRecords(0, incomingRecordCount, 0);
+ if (outputRecords < incomingRecordCount) {
+ for(VectorWrapper<?> v : container){
+ ValueVector.Mutator m = v.getValueVector().getMutator();
+ m.setValueCount(outputRecords);
+ }
+ hasRemainder = true;
+ remainderIndex = outputRecords;
+ this.recordCount = remainderIndex;
+ } else {
+ for(VectorWrapper<?> v : container){
+ ValueVector.Mutator m = v.getValueVector().getMutator();
+ m.setValueCount(incomingRecordCount);
+ }
+ for(VectorWrapper<?> v: incoming) {
+ v.clear();
+ }
+ this.recordCount = outputRecords;
+ }
+ }
+
+ private void handleRemainder() {
+ int remainingRecordCount = incoming.getRecordCount() - remainderIndex;
+ for(ValueVector v : this.allocationVectors){
+ AllocationHelper.allocate(v, remainingRecordCount, 250);
+ }
+ int outputIndex = projector.projectRecords(remainderIndex, remainingRecordCount, 0);
+ if (outputIndex < incoming.getRecordCount()) {
+ for(VectorWrapper<?> v : container){
+ ValueVector.Mutator m = v.getValueVector().getMutator();
+ m.setValueCount(outputIndex - remainderIndex);
+ }
+ hasRemainder = true;
+ this.recordCount = outputIndex - remainderIndex;
+ remainderIndex = outputIndex;
+ } else {
+ for(VectorWrapper<?> v : container){
+ ValueVector.Mutator m = v.getValueVector().getMutator();
+ m.setValueCount(remainingRecordCount);
+ }
+ hasRemainder = false;
+ remainderIndex = 0;
+ for(VectorWrapper<?> v: incoming) {
+ v.clear();
+ }
+ this.recordCount = remainingRecordCount;
}
}
@@ -156,17 +216,20 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
// logger.debug("Added transfer.");
}else{
// need to do evaluation.
- ValueVector vector = TypeHelper.getNewVector(outputField, context.getAllocator());
+ ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
allocationVectors.add(vector);
TypedFieldId fid = container.add(vector);
- ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr);
- cg.addExpr(write);
-// logger.debug("Added eval.");
+ ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
+ HoldingContainer hc = cg.addExpr(write);
+ cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
+ logger.debug("Added eval.");
}
- }
+ }
}
+ cg.rotateBlock();
+ cg.getEvalBlock()._return(JExpr.TRUE);
container.buildSchema(incoming.getSchema().getSelectionVectorMode());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
index 2857fe19a..ebfce4100 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java
@@ -28,7 +28,7 @@ import org.apache.drill.exec.record.TransferPair;
public interface Projector {
public abstract void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException;
- public abstract int projectRecords(int recordCount, int firstOutputIndex);
+ public abstract int projectRecords(int startIndex, int recordCount, int firstOutputIndex);
public static TemplateClassDefinition<Projector> TEMPLATE_DEFINITION = new TemplateClassDefinition<Projector>(Projector.class, ProjectorTemplate.class);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
index bd26ce48b..60e599384 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java
@@ -43,7 +43,7 @@ public abstract class ProjectorTemplate implements Projector {
}
@Override
- public final int projectRecords(final int recordCount, int firstOutputIndex) {
+ public final int projectRecords(int startIndex, final int recordCount, int firstOutputIndex) {
switch(svMode){
case FOUR_BYTE:
throw new UnsupportedOperationException();
@@ -60,8 +60,17 @@ public abstract class ProjectorTemplate implements Projector {
case NONE:
final int countN = recordCount;
- for (int i = 0; i < countN; i++, firstOutputIndex++) {
- doEval(i, firstOutputIndex);
+ int i;
+ for (i = startIndex; i < startIndex + countN; i++, firstOutputIndex++) {
+ if (!doEval(i, firstOutputIndex)) {
+ break;
+ }
+ }
+ if (i < recordCount || startIndex > 0) {
+ for(TransferPair t : transfers){
+ t.splitAndTransfer(startIndex, i - startIndex);
+ }
+ return i;
}
for(TransferPair t : transfers){
t.transfer();
@@ -91,7 +100,7 @@ public abstract class ProjectorTemplate implements Projector {
}
public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
- public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+ public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index 29e629aa6..375276e0b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -32,6 +32,8 @@ import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.record.AbstractRecordBatch;
@@ -60,10 +62,10 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
private Sorter sorter;
private BatchSchema schema;
- public SortBatch(Sort popConfig, FragmentContext context, RecordBatch incoming) {
+ public SortBatch(Sort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
super(popConfig, context);
this.incoming = incoming;
- this.builder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES);
+ this.builder = new SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES);
}
@Override
@@ -74,7 +76,6 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
@Override
public void kill() {
incoming.kill();
- cleanup();
}
@Override
@@ -91,9 +92,9 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
@Override
public void cleanup() {
+ builder.clear();
super.cleanup();
incoming.cleanup();
- builder.clear();
}
@Override
@@ -116,7 +117,6 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
case NOT_YET:
throw new UnsupportedOperationException();
case STOP:
- cleanup();
return upstream;
case OK_NEW_SCHEMA:
// only change in the case that the schema truly changes. Artificial schema changes are ignored.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
index dad885802..0aab7b2ad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
@@ -28,7 +28,7 @@ public interface Copier {
public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Copier>(Copier.class, CopierTemplate4.class);
public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException;
- public abstract void copyRecords();
+ public abstract int copyRecords(int index, int recordCount);
} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
index 6d1273139..2f589a515 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
@@ -48,22 +48,20 @@ public abstract class CopierTemplate2 implements Copier{
}
@Override
- public void copyRecords(){
- final int recordCount = sv2.getCount();
+ public int copyRecords(int index, int recordCount){
allocateVectors(recordCount);
int outgoingPosition = 0;
- for(int svIndex = 0; svIndex < recordCount; svIndex++, outgoingPosition++){
- doEval(sv2.getIndex(svIndex), outgoingPosition);
- }
-// logger.debug("This: {}, Incoming: {}", System.identityHashCode(this), incoming);
- for(VectorWrapper<?> v : incoming){
- v.clear();
+ for(int svIndex = index; svIndex < index + recordCount; svIndex++, outgoingPosition++){
+ if (!doEval(sv2.getIndex(svIndex), outgoingPosition)) {
+ break;
+ }
}
+ return outgoingPosition;
}
public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
- public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+ public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
index 9f4ae7ede..a7aba6e73 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
@@ -50,24 +50,21 @@ public abstract class CopierTemplate4 implements Copier{
@Override
- public void copyRecords(){
+ public int copyRecords(int index, int recordCount){
// logger.debug("Copying records.");
- final int recordCount = sv4.getCount();
allocateVectors(recordCount);
int outgoingPosition = 0;
- for(int svIndex = 0; svIndex < sv4.getCount(); svIndex++, outgoingPosition++){
+ for(int svIndex = index; svIndex < index + recordCount; svIndex++, outgoingPosition++){
int deRefIndex = sv4.get(svIndex);
- doEval(deRefIndex, outgoingPosition);
+ if (!doEval(deRefIndex, outgoingPosition)) {
+ break;
+ }
}
-
-// for(VectorWrapper<?> v : incoming){
-// v.clear();
-// }
-
+ return outgoingPosition;
}
public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
- public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+ public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 499f4d1eb..4018991b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -25,6 +25,8 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.SelectionVectorRemover;
import org.apache.drill.exec.record.*;
@@ -54,8 +56,10 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
private Copier copier;
private int recordCount;
-
- public RemovingRecordBatch(SelectionVectorRemover popConfig, FragmentContext context, RecordBatch incoming) {
+ private boolean hasRemainder;
+ private int remainderIndex;
+
+ public RemovingRecordBatch(SelectionVectorRemover popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
super(popConfig, context, incoming);
logger.debug("Created.");
}
@@ -88,12 +92,64 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
}
@Override
+ public IterOutcome next() {
+ if (hasRemainder) {
+ handleRemainder();
+ return IterOutcome.OK;
+ }
+ return super.next();
+ }
+
+ @Override
protected void doWork() {
recordCount = incoming.getRecordCount();
- copier.copyRecords();
- for(VectorWrapper<?> v : container){
- ValueVector.Mutator m = v.getValueVector().getMutator();
- m.setValueCount(recordCount);
+ int copiedRecords = copier.copyRecords(0, recordCount);
+ if (copiedRecords < recordCount) {
+ for(VectorWrapper<?> v : container){
+ ValueVector.Mutator m = v.getValueVector().getMutator();
+ m.setValueCount(copiedRecords);
+ }
+ hasRemainder = true;
+ remainderIndex = copiedRecords;
+ this.recordCount = remainderIndex;
+ } else {
+ for(VectorWrapper<?> v : container){
+ ValueVector.Mutator m = v.getValueVector().getMutator();
+ m.setValueCount(recordCount);
+ }
+ if (incoming.getSchema().getSelectionVectorMode() != SelectionVectorMode.FOUR_BYTE) {
+ for(VectorWrapper<?> v: incoming) {
+ v.clear();
+ }
+ if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) {
+ incoming.getSelectionVector2().clear();
+ }
+ }
+ }
+ }
+
+ private void handleRemainder() {
+ int remainingRecordCount = incoming.getRecordCount() - remainderIndex;
+ int copiedRecords = copier.copyRecords(0, recordCount);
+ if (copiedRecords < remainingRecordCount) {
+ for(VectorWrapper<?> v : container){
+ ValueVector.Mutator m = v.getValueVector().getMutator();
+ m.setValueCount(copiedRecords);
+ }
+ remainderIndex += copiedRecords;
+ this.recordCount = copiedRecords;
+ } else {
+ for(VectorWrapper<?> v : container){
+ ValueVector.Mutator m = v.getValueVector().getMutator();
+ m.setValueCount(remainingRecordCount);
+ }
+ if (incoming.getSchema().getSelectionVectorMode() != SelectionVectorMode.FOUR_BYTE) {
+ for(VectorWrapper<?> v: incoming) {
+ v.clear();
+ }
+ }
+ remainderIndex = 0;
+ hasRemainder = false;
}
}
@@ -116,10 +172,12 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
}
@Override
- public void copyRecords() {
+ public int copyRecords(int index, int recordCount) {
+ assert index == 0 && recordCount == incoming.getRecordCount() : "Straight copier cannot split batch";
for(TransferPair tp : pairs){
tp.transfer();
}
+ return recordCount;
}
public List<ValueVector> getOut() {
@@ -140,7 +198,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
List<VectorAllocator> allocators = Lists.newArrayList();
for(VectorWrapper<?> i : incoming){
- ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator());
+ ValueVector v = TypeHelper.getNewVector(i.getField(), oContext.getAllocator());
container.add(v);
allocators.add(VectorAllocator.getAllocator(i.getValueVector(), v));
}
@@ -158,15 +216,15 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
private Copier getGenerated4Copier() throws SchemaChangeException {
Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE);
- return getGenerated4Copier(incoming, context, container, this);
+ return getGenerated4Copier(incoming, context, oContext.getAllocator(), container, this);
}
- public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, VectorContainer container, RecordBatch outgoing) throws SchemaChangeException{
+ public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, BufferAllocator allocator, VectorContainer container, RecordBatch outgoing) throws SchemaChangeException{
List<VectorAllocator> allocators = Lists.newArrayList();
for(VectorWrapper<?> i : batch){
- ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator());
+ ValueVector v = TypeHelper.getNewVector(i.getField(), allocator);
container.add(v);
allocators.add(getAllocator4(v));
}
@@ -195,23 +253,27 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
if(hyper){
- g.getEvalBlock().add(
+ g.getEvalBlock()._if(
outVV
- .invoke("copyFrom")
+ .invoke("copyFromSafe")
.arg(
inIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
.arg(outIndex)
.arg(
inVV.component(inIndex.shrz(JExpr.lit(16)))
)
- );
+ .not()
+ )
+ ._then()._return(JExpr.FALSE);
}else{
- g.getEvalBlock().add(outVV.invoke("copyFrom").arg(inIndex).arg(outIndex).arg(inVV));
+ g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(inIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE);
}
fieldId++;
}
+ g.rotateBlock();
+ g.getEvalBlock()._return(JExpr.TRUE);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index 00c3d2fe2..b012cec1d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -111,7 +111,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
}
WritableBatch batch = WritableBatch.getBatchNoHVWrap(incoming.getRecordCount(), incoming, incomingHasSv2 ? true
: false);
- VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, sv, context.getAllocator());
+ VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, sv, oContext.getAllocator());
try {
wrap.writeToStreamAndRetain(fos);
@@ -119,6 +119,9 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
throw new RuntimeException(e);
}
batch.reconstructContainer(container);
+ if (incomingHasSv2) {
+ sv = wrap.getSv2();
+ }
}
@Override
@@ -161,6 +164,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
logger.error("Unable to close file descriptors for file: " + getFileName());
}
super.cleanup();
+ incoming.cleanup();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java
index 0fc5e0f67..c27b3c8c0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java
@@ -18,7 +18,9 @@
package org.apache.drill.exec.physical.impl.union;
import com.google.common.collect.Lists;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.config.Union;
import org.apache.drill.exec.record.*;
import org.apache.drill.exec.record.selection.SelectionVector2;
@@ -39,7 +41,7 @@ public class UnionRecordBatch extends AbstractRecordBatch<Union> {
private ArrayList<TransferPair> transfers;
private int outRecordCount;
- public UnionRecordBatch(Union config, List<RecordBatch> children, FragmentContext context) {
+ public UnionRecordBatch(Union config, List<RecordBatch> children, FragmentContext context) throws OutOfMemoryException {
super(config, context);
this.incoming = children;
this.incomingIterator = incoming.iterator();
@@ -47,7 +49,6 @@ public class UnionRecordBatch extends AbstractRecordBatch<Union> {
sv = null;
}
-
@Override
public int getRecordCount() {
return outRecordCount;
@@ -119,10 +120,10 @@ public class UnionRecordBatch extends AbstractRecordBatch<Union> {
transfer.transfer();
}
- for (VectorWrapper<?> vw : this.container) {
- ValueVector.Mutator m = vw.getValueVector().getMutator();
- m.setValueCount(outRecordCount);
- }
+// for (VectorWrapper<?> vw : this.container) {
+// ValueVector.Mutator m = vw.getValueVector().getMutator();
+// m.setValueCount(outRecordCount);
+// }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
index 20540dd3b..d87a9f58c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
@@ -88,7 +88,7 @@ public class BatchGroup implements VectorAccessible {
watch.start();
outputBatch.writeToStream(outputStream);
newContainer.zeroVectors();
- logger.debug("Took {} us to spill {} records", watch.elapsed(TimeUnit.MICROSECONDS), recordCount);
+// logger.debug("Took {} us to spill {} records", watch.elapsed(TimeUnit.MICROSECONDS), recordCount);
spilledBatches++;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 8bb3d43d1..930f851bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.ExternalSort;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
@@ -82,6 +83,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
private MSorter mSorter;
private PriorityQueueSelector selector;
private PriorityQueueCopier copier;
+ private BufferAllocator copierAllocator;
private LinkedList<BatchGroup> batchGroups = Lists.newLinkedList();
private SelectionVector4 sv4;
private FileSystem fs;
@@ -89,7 +91,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
private int batchesSinceLastSpill = 0;
private long uid;//used for spill files to ensure multiple sorts within same fragment don't clobber each others' files
- public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) {
+ public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
super(popConfig, context);
this.incoming = incoming;
DrillConfig config = context.getConfig();
@@ -107,6 +109,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
SPILL_DIRECTORIES = config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS);
dirs = Iterators.cycle(Lists.newArrayList(SPILL_DIRECTORIES));
uid = System.nanoTime();
+ copierAllocator = oContext.getAllocator().getChildAllocator(context.getHandle(), 10000000, 20000000);
}
@Override
@@ -117,7 +120,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
@Override
public void kill() {
incoming.kill();
- cleanup();
}
@Override
@@ -134,8 +136,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
@Override
public void cleanup() {
- super.cleanup();
- incoming.cleanup();
if (batchGroups != null) {
for (BatchGroup group: batchGroups) {
try {
@@ -151,6 +151,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
if (sv4 != null) {
sv4.clear();
}
+ copierAllocator.close();
+ super.cleanup();
+ incoming.cleanup();
}
@Override
@@ -170,11 +173,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
int count = selector.next();
if(count > 0){
long t = w.elapsed(TimeUnit.MICROSECONDS);
-// logger.debug("Took {} us to merge {} records", t, count);
+ logger.debug("Took {} us to merge {} records", t, count);
container.setRecordCount(count);
return IterOutcome.OK;
}else{
- cleanup();
+ logger.debug("selector returned 0 records");
return IterOutcome.NONE;
}
}
@@ -192,7 +195,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
case NOT_YET:
throw new UnsupportedOperationException();
case STOP:
- cleanup();
return upstream;
case OK_NEW_SCHEMA:
// only change in the case that the schema truly changes. Artificial schema changes are ignored.
@@ -207,7 +209,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
if (incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) {
sv2 = incoming.getSelectionVector2();
} else {
- sv2 = newSV2();
+ try {
+ sv2 = newSV2();
+ } catch (OutOfMemoryException e) {
+ throw new RuntimeException();
+ }
}
int count = sv2.getCount();
assert sv2.getCount() > 0;
@@ -225,6 +231,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
long t = w.elapsed(TimeUnit.MICROSECONDS);
// logger.debug("Took {} us to sort {} records", t, count);
break;
+ case OUT_OF_MEMORY:
+ mergeAndSpill();
+ batchesSinceLastSpill = 0;
+ break;
default:
throw new UnsupportedOperationException();
}
@@ -243,7 +253,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
return IterOutcome.NONE;
}
- builder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES);
+ builder = new SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES);
for (BatchGroup group : batchGroups) {
RecordBatchData rbd = new RecordBatchData(group.getFirstContainer());
@@ -254,7 +264,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
builder.build(context, container);
sv4 = builder.getSv4();
mSorter = createNewMSorter();
- mSorter.setup(context, getSelectionVector4(), this.container);
+ mSorter.setup(context, oContext.getAllocator(), getSelectionVector4(), this.container);
mSorter.sort(this.container);
sv4 = mSorter.getSV4();
@@ -265,7 +275,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
constructHyperBatch(batchGroups, this.container);
constructSV4();
selector = createSelector();
- selector.setup(context, this, sv4, batchGroups);
+ selector.setup(context, oContext.getAllocator(), this, sv4, batchGroups);
selector.next();
}
@@ -284,12 +294,22 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
public void mergeAndSpill() throws SchemaChangeException {
+ logger.debug("Copier allocator current allocation {}", copierAllocator.getAllocatedMemory());
VectorContainer hyperBatch = new VectorContainer();
VectorContainer outputContainer = new VectorContainer();
List<BatchGroup> batchGroupList = Lists.newArrayList();
for (int i = 0; i < SPILL_BATCH_GROUP_SIZE; i++) {
+ if (batchGroups.size() == 0) {
+ break;
+ }
+ if (batchGroups.peekLast().getSecondContainer() != null) {
+ break;
+ }
batchGroupList.add(batchGroups.pollLast());
}
+ if (batchGroupList.size() == 0) {
+ return;
+ }
constructHyperBatch(batchGroupList, hyperBatch);
createCopier(hyperBatch, batchGroupList, outputContainer);
@@ -309,7 +329,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
c2.setRecordCount(count);
String outputFile = String.format(Utilities.getFileNameForQueryFragment(context, dirs.next(), "spill" + uid + "_" + spillCount++));
- BatchGroup newGroup = new BatchGroup(c1, c2, fs, outputFile, context.getAllocator());
+ BatchGroup newGroup = new BatchGroup(c1, c2, fs, outputFile, oContext.getAllocator());
try {
while ((count = copier.next()) > 0) {
@@ -328,9 +348,19 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
}
- private SelectionVector2 newSV2() {
- SelectionVector2 sv2 = new SelectionVector2(context.getAllocator());
- sv2.allocateNew(incoming.getRecordCount());
+ private SelectionVector2 newSV2() throws OutOfMemoryException {
+ SelectionVector2 sv2 = new SelectionVector2(oContext.getAllocator());
+ if (!sv2.allocateNew(incoming.getRecordCount())) {
+ try {
+ mergeAndSpill();
+ } catch (SchemaChangeException e) {
+ throw new RuntimeException();
+ }
+ batchesSinceLastSpill = 0;
+ if (!sv2.allocateNew(incoming.getRecordCount())) {
+ throw new OutOfMemoryException();
+ }
+ }
for (int i = 0; i < incoming.getRecordCount(); i++) {
sv2.setIndex(i, (char) i);
}
@@ -360,7 +390,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
private void constructSV4() throws SchemaChangeException {
- BufferAllocator.PreAllocator preAlloc = context.getAllocator().getNewPreAllocator();
+ BufferAllocator.PreAllocator preAlloc = oContext.getAllocator().getNewPreAllocator();
preAlloc.preAllocate(4 * TARGET_RECORD_COUNT);
sv4 = new SelectionVector4(preAlloc.getAllocation(), TARGET_RECORD_COUNT, TARGET_RECORD_COUNT);
}
@@ -476,11 +506,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
List<VectorAllocator> allocators = Lists.newArrayList();
for(VectorWrapper<?> i : batch){
- ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator());
+ ValueVector v = TypeHelper.getNewVector(i.getField(), copierAllocator);
outputContainer.add(v);
allocators.add(VectorAllocator.getAllocator(v, 110));
}
- copier.setup(context, batch, batchGroupList, outputContainer, allocators);
+ copier.setup(context, copierAllocator, batch, batchGroupList, outputContainer, allocators);
} catch (ClassTransformationException e) {
throw new RuntimeException(e);
} catch (IOException e) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index e87774b39..c54b2b7df 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -33,15 +33,18 @@ import java.util.Queue;
public abstract class MSortTemplate implements MSorter, IndexedSortable{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MSortTemplate.class);
-
+
+ private BufferAllocator allocator;
private SelectionVector4 vector4;
private SelectionVector4 aux;
private long compares;
private Queue<Integer> runStarts = Queues.newLinkedBlockingQueue();
private Queue<Integer> newRunStarts;
-
- public void setup(FragmentContext context, SelectionVector4 vector4, VectorContainer hyperBatch) throws SchemaChangeException{
+
+ @Override
+ public void setup(FragmentContext context, BufferAllocator allocator, SelectionVector4 vector4, VectorContainer hyperBatch) throws SchemaChangeException{
+ this.allocator = allocator;
// we pass in the local hyperBatch since that is where we'll be reading data.
Preconditions.checkNotNull(vector4);
this.vector4 = vector4.createNewWrapperCurrent();
@@ -60,7 +63,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
throw new UnsupportedOperationException("Missing batch");
}
}
- BufferAllocator.PreAllocator preAlloc = context.getAllocator().getNewPreAllocator();
+ BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
preAlloc.preAllocate(4 * this.vector4.getTotalCount());
aux = new SelectionVector4(preAlloc.getAllocation(), this.vector4.getTotalCount(), Character.MAX_VALUE);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java
index 6ad4e3ddb..1300830f6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java
@@ -19,12 +19,13 @@ package org.apache.drill.exec.physical.impl.xsort;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.selection.SelectionVector4;
public interface MSorter {
- public void setup(FragmentContext context, SelectionVector4 vector4, VectorContainer hyperBatch) throws SchemaChangeException;
+ public void setup(FragmentContext context, BufferAllocator allocator, SelectionVector4 vector4, VectorContainer hyperBatch) throws SchemaChangeException;
public void sort(VectorContainer container);
public SelectionVector4 getSV4();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
index b31e28721..712296352 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.xsort;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.physical.impl.svremover.Copier;
@@ -31,7 +32,7 @@ import org.apache.drill.exec.vector.allocator.VectorAllocator;
import java.util.List;
public interface PriorityQueueCopier {
- public void setup(FragmentContext context, VectorAccessible hyperBatch, List<BatchGroup> batchGroups, VectorAccessible outgoing, List<VectorAllocator> allocators) throws SchemaChangeException;
+ public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups, VectorAccessible outgoing, List<VectorAllocator> allocators) throws SchemaChangeException;
public int next();
public List<VectorAllocator> getAllocators();
public void cleanup();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
index bc2c19c60..4221ae284 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
@@ -42,6 +42,7 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
private List<BatchGroup> batchGroups;
private VectorAccessible hyperBatch;
private FragmentContext context;
+ private BufferAllocator allocator;
private VectorAccessible outgoing;
private List<VectorAllocator> allocators;
private int size;
@@ -49,19 +50,21 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
private int targetRecordCount = ExternalSortBatch.SPILL_TARGET_RECORD_COUNT;
@Override
- public void setup(FragmentContext context, VectorAccessible hyperBatch, List<BatchGroup> batchGroups, VectorAccessible outgoing, List<VectorAllocator> allocators) throws SchemaChangeException {
+ public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups, VectorAccessible outgoing, List<VectorAllocator> allocators) throws SchemaChangeException {
this.context = context;
+ this.allocator = allocator;
this.hyperBatch = hyperBatch;
this.batchGroups = batchGroups;
this.outgoing = outgoing;
this.allocators = allocators;
this.size = batchGroups.size();
- BufferAllocator.PreAllocator preAlloc = context.getAllocator().getNewPreAllocator();
+ BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
preAlloc.preAllocate(4 * size);
vector4 = new SelectionVector4(preAlloc.getAllocation(), size, Character.MAX_VALUE);
doSetup(context, hyperBatch, outgoing);
+ queueSize = 0;
for (int i = 0; i < size; i++) {
vector4.set(i, i * 2, batchGroups.get(i).getNextIndex());
siftUp();
@@ -79,10 +82,14 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
}
int compoundIndex = vector4.get(0);
int batch = compoundIndex >>> 16;
- assert batch < batchGroups.size() * 2;
+ assert batch < batchGroups.size() * 2 : String.format("batch: %d batchGroups: %d", batch, batchGroups.size());
int batchGroup = batch / 2;
- doCopy(compoundIndex, outgoingIndex);
+ if (!doCopy(compoundIndex, outgoingIndex)) {
+ setValueCount(outgoingIndex);
+ return outgoingIndex;
+ }
int nextIndex = batchGroups.get(batchGroup).getNextIndex();
+ batch = batch & 0xFFFE;
batch += batchGroups.get(batchGroup).getBatchPointer();
if (nextIndex < 0) {
vector4.set(0, vector4.get(--queueSize));
@@ -172,6 +179,6 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex);
- public abstract void doCopy(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+ public abstract boolean doCopy(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelector.java
index 963b8b03c..786667a8e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelector.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.xsort;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -27,7 +28,7 @@ import org.apache.drill.exec.vector.allocator.VectorAllocator;
import java.util.List;
public interface PriorityQueueSelector {
- public void setup(FragmentContext context, VectorAccessible hyperBatch, SelectionVector4 sv4, List<BatchGroup> batchGroups) throws SchemaChangeException;
+ public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, SelectionVector4 sv4, List<BatchGroup> batchGroups) throws SchemaChangeException;
public int next();
public void cleanup();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelectorTemplate.java
index ccb26cfee..65a072ba7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelectorTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelectorTemplate.java
@@ -35,20 +35,22 @@ public abstract class PriorityQueueSelectorTemplate implements PriorityQueueSele
private SelectionVector4 vector4;
private List<BatchGroup> batchGroups;
private FragmentContext context;
+ private BufferAllocator allocator;
private int size;
private int queueSize = 0;
private int targetRecordCount = ExternalSortBatch.TARGET_RECORD_COUNT;
private VectorAccessible hyperBatch;
@Override
- public void setup(FragmentContext context, VectorAccessible hyperBatch, SelectionVector4 sv4, List<BatchGroup> batchGroups) throws SchemaChangeException {
+ public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, SelectionVector4 sv4, List<BatchGroup> batchGroups) throws SchemaChangeException {
this.context = context;
+ this.allocator = allocator;
this.sv4 = sv4;
this.batchGroups = batchGroups;
this.size = batchGroups.size();
this.hyperBatch = hyperBatch;
- BufferAllocator.PreAllocator preAlloc = context.getAllocator().getNewPreAllocator();
+ BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
preAlloc.preAllocate(4 * size);
vector4 = new SelectionVector4(preAlloc.getAllocation(), size, Character.MAX_VALUE);
doSetup(context, hyperBatch, null);
@@ -78,6 +80,7 @@ public abstract class PriorityQueueSelectorTemplate implements PriorityQueueSele
} else if (nextIndex == -2) {
vector4.set(0, batch - 1, 0);
sv4.setCount(outgoingIndex);
+ assert outgoingIndex != 0;
return outgoingIndex;
} else {
vector4.set(0, batch, nextIndex);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
index d518f04bf..0ba84f9fe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.physical.impl.xsort;
import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.sort.Sorter;
@@ -29,6 +30,7 @@ import org.apache.hadoop.util.IndexedSortable;
import org.apache.hadoop.util.QuickSort;
import javax.inject.Named;
+import java.util.concurrent.TimeUnit;
public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, IndexedSortable{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleBatchSorterTemplate.class);
@@ -44,7 +46,10 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In
@Override
public void sort(SelectionVector2 vector2){
QuickSort qs = new QuickSort();
+ Stopwatch watch = new Stopwatch();
+ watch.start();
qs.sort(this, 0, vector2.getCount());
+ logger.debug("Took {} us to sort {} records", watch.elapsed(TimeUnit.MICROSECONDS), vector2.getCount());
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
index 70200a9a1..87078a2d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
@@ -33,6 +33,7 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
@Override
public PhysicalOperator visitExchange(Exchange exchange, IndexedFragmentNode iNode) throws ExecutionSetupException {
+ iNode.addAllocation(exchange);
if(exchange == iNode.getNode().getSendingExchange()){
// this is a sending exchange.
@@ -56,6 +57,7 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
@Override
public PhysicalOperator visitSubScan(SubScan subScan, IndexedFragmentNode value) throws ExecutionSetupException {
+ value.addAllocation(subScan);
// TODO - implement this
return super.visitOp(subScan, value);
}
@@ -63,6 +65,8 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
@Override
public PhysicalOperator visitStore(Store store, IndexedFragmentNode iNode) throws ExecutionSetupException {
PhysicalOperator child = store.getChild().accept(this, iNode);
+
+ iNode.addAllocation(store);
try {
PhysicalOperator o = store.getSpecificStore(child, iNode.getMinorFragmentId());
@@ -75,6 +79,7 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
@Override
public PhysicalOperator visitOp(PhysicalOperator op, IndexedFragmentNode iNode) throws ExecutionSetupException {
+ iNode.addAllocation(op);
// logger.debug("Visiting catch all: {}", op);
List<PhysicalOperator> children = Lists.newArrayList();
for(PhysicalOperator child : op){
@@ -104,6 +109,10 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
public Wrapper getInfo() {
return info;
}
+
+ public void addAllocation(PhysicalOperator pop) {
+ info.addAllocation(pop);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index 581499d1e..6e951df0a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -132,7 +132,9 @@ public class SimpleParallelizer {
.setAssignment(wrapper.getAssignedEndpoint(minorFragmentId)) //
.setLeafFragment(isLeafFragment) //
.setQueryStartTime(queryStartTime)
- .setTimeZone(timeZone)
+ .setTimeZone(timeZone)//
+ .setMemInitial(wrapper.getInitialAllocation())//
+ .setMemMax(wrapper.getMaxAllocation())
.build();
if (isRootNode) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
index ca933c6ab..6d720a7d1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
@@ -80,9 +80,9 @@ public class StatsCollector {
}
@Override
- public Void visitSubScan(SubScan subScan, Wrapper value) throws RuntimeException {
+ public Void visitSubScan(SubScan subScan, Wrapper wrapper) throws RuntimeException {
// TODO - implement this
- return super.visitOp(subScan, value);
+ return super.visitOp(subScan, wrapper);
}
@Override
@@ -93,9 +93,9 @@ public class StatsCollector {
}
@Override
- public Void visitLimit(Limit limit, Wrapper value) throws RuntimeException {
+ public Void visitLimit(Limit limit, Wrapper wrapper) throws RuntimeException {
// TODO: Implement this
- return visitOp(limit, value);
+ return visitOp(limit, wrapper);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
index 94fcac51c..8602bf002 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
@@ -48,6 +48,8 @@ public class Wrapper {
private final Stats stats;
private boolean endpointsAssigned;
private Map<DrillbitEndpoint, EndpointAffinity> endpointAffinity = Maps.newHashMap();
+ private long initialAllocation = 0;
+ private long maxAllocation = 0;
// a list of assigned endpoints. Technically, there could repeated endpoints in this list if we'd like to assign the
// same fragment multiple times to the same endpoint.
@@ -99,6 +101,19 @@ public class Wrapper {
return node;
}
+ public long getInitialAllocation() {
+ return initialAllocation;
+ }
+
+ public long getMaxAllocation() {
+ return maxAllocation;
+ }
+
+ public void addAllocation(PhysicalOperator pop) {
+ initialAllocation += pop.getInitialAllocation();
+ maxAllocation += pop.getMaxAllocation();
+ }
+
private class AssignEndpointsToScanAndStore extends AbstractPhysicalVisitor<Void, List<DrillbitEndpoint>, PhysicalOperatorSetupException>{
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index 7898937dc..5521c4e08 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -21,15 +21,7 @@ import java.util.Iterator;
import net.hydromatic.optiq.tools.RuleSet;
-import org.apache.drill.exec.planner.physical.FilterPrule;
-import org.apache.drill.exec.planner.physical.LimitPrule;
-import org.apache.drill.exec.planner.physical.MergeJoinPrule;
-import org.apache.drill.exec.planner.physical.ProjectPrule;
-import org.apache.drill.exec.planner.physical.ScanPrule;
-import org.apache.drill.exec.planner.physical.ScreenPrule;
-import org.apache.drill.exec.planner.physical.SortConvertPrule;
-import org.apache.drill.exec.planner.physical.SortPrule;
-import org.apache.drill.exec.planner.physical.StreamAggPrule;
+import org.apache.drill.exec.planner.physical.*;
import org.eigenbase.rel.RelFactories;
import org.eigenbase.rel.rules.MergeProjectRule;
import org.eigenbase.rel.rules.PushFilterPastJoinRule;
@@ -116,9 +108,9 @@ public class DrillRuleSets {
StreamAggPrule.INSTANCE,
MergeJoinPrule.INSTANCE,
FilterPrule.INSTANCE,
- LimitPrule.INSTANCE
+ LimitPrule.INSTANCE,
-// PushLimitToTopN.INSTANCE
+ PushLimitToTopN.INSTANCE
// ExpandConversionRule.INSTANCE,
// SwapJoinRule.INSTANCE,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
index 344be4ebc..998ed0a8b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java
@@ -51,11 +51,11 @@ public class SortPrel extends SortRel implements Prel {
PhysicalOperator childPOP = child.getPhysicalOperator(creator);
- childPOP = PrelUtil.removeSvIfRequired(childPOP, SelectionVectorMode.NONE);
- Sort g = new Sort(childPOP, PrelUtil.getOrdering(this.collation, getChild().getRowType()), false);
+// childPOP = PrelUtil.removeSvIfRequired(childPOP, SelectionVectorMode.NONE);
+// Sort g = new Sort(childPOP, PrelUtil.getOrdering(this.collation, getChild().getRowType()), false);
-// childPOP = PrelUtil.removeSvIfRequired(childPOP, SelectionVectorMode.NONE, SelectionVectorMode.TWO_BYTE);
-// Sort g = new ExternalSort(childPOP, PrelUtil.getOrdering(this.collation, getChild().getRowType()), false);
+ childPOP = PrelUtil.removeSvIfRequired(childPOP, SelectionVectorMode.NONE, SelectionVectorMode.TWO_BYTE);
+ Sort g = new ExternalSort(childPOP, PrelUtil.getOrdering(this.collation, getChild().getRowType()), false);
return g;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index d4f458f33..214f81c95 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -20,7 +20,11 @@ package org.apache.drill.exec.record;
import java.util.Iterator;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -31,11 +35,13 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
protected final VectorContainer container = new VectorContainer();
protected final T popConfig;
protected final FragmentContext context;
-
- protected AbstractRecordBatch(T popConfig, FragmentContext context) {
+ protected final OperatorContext oContext;
+
+ protected AbstractRecordBatch(T popConfig, FragmentContext context) throws OutOfMemoryException {
super();
this.context = context;
this.popConfig = popConfig;
+ this.oContext = new OperatorContext(popConfig, context);
}
@Override
@@ -60,13 +66,13 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
@Override
public void kill() {
killIncoming();
- cleanup();
}
protected abstract void killIncoming();
public void cleanup(){
container.clear();
+ oContext.close();
}
@Override
@@ -97,6 +103,4 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
return batch;
}
-
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index 7b832e43a..dd2cfe059 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -18,16 +18,19 @@
package org.apache.drill.exec.record;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> extends AbstractRecordBatch<T> {
final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
-
+
protected final RecordBatch incoming;
private boolean first = true;
+ protected boolean outOfMemory = false;
- public AbstractSingleRecordBatch(T popConfig, FragmentContext context, RecordBatch incoming) {
+ public AbstractSingleRecordBatch(T popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
super(popConfig, context);
this.incoming = incoming;
}
@@ -46,7 +49,8 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
case NONE:
case NOT_YET:
case STOP:
- cleanup();
+ return upstream;
+ case OUT_OF_MEMORY:
return upstream;
case OK_NEW_SCHEMA:
try{
@@ -60,6 +64,10 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
// fall through.
case OK:
doWork();
+ if (outOfMemory) {
+ outOfMemory = false;
+ return IterOutcome.OUT_OF_MEMORY;
+ }
return upstream; // change if upstream changed, otherwise normal.
default:
throw new UnsupportedOperationException();
@@ -69,8 +77,8 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
@Override
public void cleanup() {
// logger.debug("Cleaning up.");
- incoming.cleanup();
super.cleanup();
+ incoming.cleanup();
}
protected abstract void setupNewSchema() throws SchemaChangeException;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
index 6f5f7a720..acbd8bd81 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java
@@ -25,5 +25,5 @@ public interface RawFragmentBatchProvider {
public RawFragmentBatch getNext() throws IOException;
public void kill(FragmentContext context);
-
+ public void cleanup();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index b77a6a8fc..31283c61b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -45,7 +45,8 @@ public interface RecordBatch extends VectorAccessible {
OK_NEW_SCHEMA, // A full collection of records
STOP, // Informs parent nodes that the query has terminated. In this case, a consumer can consume their QueryContext
// to understand the current state of things.
- NOT_YET // used by batches that haven't received incoming data yet.
+ NOT_YET, // used by batches that haven't received incoming data yet.
+ OUT_OF_MEMORY // an upstream operator was unable to allocate memory. A batch receiving this should release memory if it can
}
public static enum SetupOutcome {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java
index 96a1c227b..ba2c7b2b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java
@@ -21,6 +21,7 @@ import org.apache.drill.exec.vector.ValueVector;
public interface TransferPair {
public void transfer();
+ public void splitAndTransfer(int startIndex, int length);
public ValueVector getTo();
public void copyValue(int from, int to);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 288aa7f77..396834c49 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -71,6 +71,7 @@ public class WritableBatch {
cbb.addComponent(buf);
}
+
List<FieldMetadata> fields = def.getFieldList();
int bufferOffset = 0;
@@ -83,7 +84,10 @@ public class WritableBatch {
for (VectorWrapper<?> vv : container) {
FieldMetadata fmd = fields.get(vectorIndex);
ValueVector v = vv.getValueVector();
- v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
+ ByteBuf bb = cbb.slice(bufferOffset, fmd.getBufferLength());
+// v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
+ v.load(fmd, bb);
+ bb.release();
vectorIndex++;
bufferOffset += fmd.getBufferLength();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
index f7b5155b6..af1e2b6fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
@@ -77,10 +77,14 @@ public class SelectionVector2 implements Closeable{
public void setIndex(int index, char value){
buffer.setChar(index * RECORD_SIZE, value);
}
-
- public void allocateNew(int size){
+
+ public boolean allocateNew(int size){
clear();
buffer = allocator.buffer(size * RECORD_SIZE);
+ if (buffer == null) {
+ return false;
+ }
+ return true;
}
public SelectionVector2 clone(){
@@ -98,7 +102,7 @@ public class SelectionVector2 implements Closeable{
}
public void clear() {
- if (buffer != DeadBuf.DEAD_BUFFER) {
+ if (buffer != null && buffer != DeadBuf.DEAD_BUFFER) {
buffer.release();
buffer = DeadBuf.DEAD_BUFFER;
recordCount = 0;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
index c665949a5..d50a64e64 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java
@@ -46,7 +46,7 @@ public abstract class BasicClientWithConnection<T extends EnumLite, HANDSHAKE_SE
protected GenericFutureListener<ChannelFuture> getCloseHandler(ServerConnection clientConnection) {
return getCloseHandler(clientConnection.getChannel());
}
-
+
@Override
protected Response handle(ServerConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
return handleReponse( (ConnectionThrottle) connection, rpcType, pBody, dBody);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index f5e77f10c..8f533e3cf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -73,7 +73,7 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
ch.closeFuture().addListener(getCloseHandler(connection));
ch.pipeline().addLast( //
- getDecoder(connection.getAllocator()), //
+ getDecoder(connection.getAllocator(), getOutOfMemoryHandler()), //
new RpcDecoder("s-" + rpcConfig.getName()), //
new RpcEncoder("s-" + rpcConfig.getName()), //
getHandshakeHandler(connection), new InboundHandler(connection), //
@@ -85,7 +85,11 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
});
}
- public abstract ProtobufLengthDecoder getDecoder(BufferAllocator allocator);
+ public OutOfMemoryHandler getOutOfMemoryHandler() {
+ return OutOfMemoryHandler.DEFAULT_INSTANCE;
+ }
+
+ public abstract ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler);
@Override
public boolean isClient() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java
index 1527e7942..8fc446fd4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java
@@ -41,7 +41,7 @@ public class InboundRpcMessage extends RpcMessage{
}
void release(){
- pBody.release();
+ if (pBody != null) pBody.release();
if(dBody != null) dBody.release();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutOfMemoryHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutOfMemoryHandler.java
new file mode 100644
index 000000000..5d7db478e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutOfMemoryHandler.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+public interface OutOfMemoryHandler {
+
+ public static OutOfMemoryHandler DEFAULT_INSTANCE = new OutOfMemoryHandler() {
+ @Override
+ public void handle() {
+ throw new UnsupportedOperationException();
+ }
+ };
+
+ public void handle();
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
index 23fa46d06..473e3e6c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
@@ -18,6 +18,8 @@
package org.apache.drill.exec.rpc;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.SwappedByteBuf;
+import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.CorruptedFrameException;
@@ -27,6 +29,7 @@ import java.util.List;
import org.apache.drill.exec.memory.BufferAllocator;
import com.google.protobuf.CodedInputStream;
+import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
/**
* Modified version of ProtobufVarint32FrameDecoder that avoids bytebuf copy.
@@ -34,12 +37,13 @@ import com.google.protobuf.CodedInputStream;
public class ProtobufLengthDecoder extends ByteToMessageDecoder {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtobufLengthDecoder.class);
-
private BufferAllocator allocator;
+ private OutOfMemoryHandler outOfMemoryHandler;
- public ProtobufLengthDecoder(BufferAllocator allocator) {
+ public ProtobufLengthDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) {
super();
this.allocator = allocator;
+ this.outOfMemoryHandler = outOfMemoryHandler;
}
@@ -82,6 +86,7 @@ public class ProtobufLengthDecoder extends ByteToMessageDecoder {
if(outBuf == null){
logger.warn("Failure allocating buffer on incoming stream due to memory limits. Current Allocation: {}.", allocator.getAllocatedMemory());
in.resetReaderIndex();
+ outOfMemoryHandler.handle();
return;
}
outBuf.writeBytes(in, in.readerIndex(), length);
@@ -104,4 +109,9 @@ public class ProtobufLengthDecoder extends ByteToMessageDecoder {
}
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+ ctx.fireChannelReadComplete();
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index 1a422ee63..30101b2f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -149,7 +149,7 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
if (!ctx.channel().isOpen()) return;
if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Received message {}", msg);
switch (msg.mode) {
- case REQUEST:
+ case REQUEST: {
// handle message and ack.
Response r = handle(connection, msg.rpcType, msg.pBody, msg.dBody);
msg.release(); // we release our ownership. Handle could have taken over ownership.
@@ -159,6 +159,7 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp
if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Adding message to outbound buffer. {}", outMessage);
ctx.writeAndFlush(outMessage);
break;
+ }
case RESPONSE:
try{
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
index af2b58bf5..178ac43af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java
@@ -26,12 +26,7 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.BitControl.BitControlHandshake;
import org.apache.drill.exec.proto.BitControl.RpcType;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.rpc.BasicClient;
-import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
-import org.apache.drill.exec.rpc.Response;
-import org.apache.drill.exec.rpc.RpcBus;
-import org.apache.drill.exec.rpc.RpcConnectionHandler;
-import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.*;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.work.batch.ControlMessageHandler;
@@ -99,7 +94,7 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo
@Override
public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) {
- return new ControlProtobufLengthDecoder(allocator);
+ return new ControlProtobufLengthDecoder(allocator, OutOfMemoryHandler.DEFAULT_INSTANCE);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlProtobufLengthDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlProtobufLengthDecoder.java
index c00dc5483..7edfe204e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlProtobufLengthDecoder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlProtobufLengthDecoder.java
@@ -23,14 +23,15 @@ import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.rpc.OutOfMemoryHandler;
import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
/**
* Purely to simplify memory debugging.
*/
public class ControlProtobufLengthDecoder extends ProtobufLengthDecoder{
- public ControlProtobufLengthDecoder(BufferAllocator allocator) {
- super(allocator);
+ public ControlProtobufLengthDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) {
+ super(allocator, outOfMemoryHandler);
}
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
index 4b6a85d4e..3e1a2a4c3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java
@@ -25,10 +25,7 @@ import io.netty.util.concurrent.GenericFutureListener;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.BitControl.BitControlHandshake;
import org.apache.drill.exec.proto.BitControl.RpcType;
-import org.apache.drill.exec.rpc.BasicServer;
-import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
-import org.apache.drill.exec.rpc.Response;
-import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.*;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.work.batch.ControlMessageHandler;
@@ -98,8 +95,8 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{
}
@Override
- public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) {
- return new ControlProtobufLengthDecoder(allocator);
+ public ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) {
+ return new ControlProtobufLengthDecoder(allocator, outOfMemoryHandler);
}
private class ProxyCloseHandler implements GenericFutureListener<ChannelFuture> {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java
index af4da41fe..58fa403fc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java
@@ -17,25 +17,33 @@
*/
package org.apache.drill.exec.rpc.data;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
+import org.apache.drill.exec.memory.AccountingByteBuf;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.rpc.RemoteConnection;
import org.apache.drill.exec.work.fragment.FragmentManager;
-class BitServerConnection extends RemoteConnection{
+public class BitServerConnection extends RemoteConnection{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitServerConnection.class);
- private final BufferAllocator initialAllocator;
+ private AllocatorProxy proxy = new AllocatorProxy();
private volatile FragmentManager manager;
public BitServerConnection(Channel channel, BufferAllocator initialAllocator) {
super(channel);
- this.initialAllocator = initialAllocator;
+ proxy.setAllocator(initialAllocator);
}
void setManager(FragmentManager manager){
this.manager = manager;
+ if (manager != null) { // Do this check for TestBitRpc test
+ this.proxy.setAllocator(manager.getFragmentContext().getAllocator());
+ manager.addConnection(this);
+ }
}
@Override
@@ -43,13 +51,76 @@ class BitServerConnection extends RemoteConnection{
if(manager != null){
return manager.getFragmentContext().getAllocator();
}
-
- return initialAllocator;
-
+ return proxy;
}
public FragmentManager getFragmentManager(){
return manager;
}
+
+ final static String ERROR_MESSAGE = "Attempted to access AllocatorProxy";
+
+ private static class AllocatorProxy implements BufferAllocator {
+ private BufferAllocator allocator;
+
+ public void setAllocator(BufferAllocator allocator) {
+ this.allocator = allocator;
+ }
+
+ @Override
+ public AccountingByteBuf buffer(int size) {
+ if (allocator == null) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ return allocator.buffer(size);
+ }
+
+ @Override
+ public AccountingByteBuf buffer(int minSize, int maxSize) {
+ if (allocator == null) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ return allocator.buffer(minSize, maxSize);
+ }
+
+ @Override
+ public ByteBufAllocator getUnderlyingAllocator() {
+ if (allocator == null) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ return allocator.getUnderlyingAllocator();
+ }
+
+ @Override
+ public BufferAllocator getChildAllocator(FragmentHandle handle, long initialReservation, long maximumReservation) throws OutOfMemoryException {
+ if (allocator == null) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ return allocator.getChildAllocator(handle, initialReservation, maximumReservation);
+ }
+
+ @Override
+ public PreAllocator getNewPreAllocator() {
+ if (allocator == null) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ return allocator.getNewPreAllocator();
+ }
+
+ @Override
+ public void close() {
+ if (allocator != null) {
+ allocator.close();
+ }
+ }
+
+ @Override
+ public long getAllocatedMemory() {
+ if (allocator == null) {
+ throw new UnsupportedOperationException(ERROR_MESSAGE);
+ }
+ return allocator.getAllocatedMemory();
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
index 6c57f22f7..e22df7c71 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
@@ -28,11 +28,7 @@ import org.apache.drill.exec.proto.BitData.BitServerHandshake;
import org.apache.drill.exec.proto.BitData.RpcType;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserBitShared.RpcChannel;
-import org.apache.drill.exec.rpc.BasicClient;
-import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
-import org.apache.drill.exec.rpc.Response;
-import org.apache.drill.exec.rpc.RpcConnectionHandler;
-import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.*;
import org.apache.drill.exec.rpc.control.ControlProtobufLengthDecoder;
import org.apache.drill.exec.server.BootStrapContext;
@@ -95,6 +91,6 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl
@Override
public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) {
- return new DataProtobufLengthDecoder(allocator);
+ return new DataProtobufLengthDecoder(allocator, OutOfMemoryHandler.DEFAULT_INSTANCE);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataProtobufLengthDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataProtobufLengthDecoder.java
index d4391480b..b648c72de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataProtobufLengthDecoder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataProtobufLengthDecoder.java
@@ -23,12 +23,13 @@ import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.rpc.OutOfMemoryHandler;
import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
public class DataProtobufLengthDecoder extends ProtobufLengthDecoder{
- public DataProtobufLengthDecoder(BufferAllocator allocator) {
- super(allocator);
+ public DataProtobufLengthDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) {
+ super(allocator, outOfMemoryHandler);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index 3dd7912cd..7354d72f9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -22,16 +22,15 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.BitData.BitClientHandshake;
import org.apache.drill.exec.proto.BitData.BitServerHandshake;
import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
import org.apache.drill.exec.proto.BitData.RpcType;
import org.apache.drill.exec.proto.UserBitShared.RpcChannel;
-import org.apache.drill.exec.rpc.BasicServer;
-import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
-import org.apache.drill.exec.rpc.Response;
-import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.*;
import org.apache.drill.exec.rpc.control.WorkEventBus;
import org.apache.drill.exec.server.BootStrapContext;
@@ -44,6 +43,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
private final BootStrapContext context;
private final WorkEventBus workBus;
private final DataResponseHandler dataHandler;
+ private BitServerConnection connection;
public DataServer(BootStrapContext context, WorkEventBus workBus, DataResponseHandler dataHandler) {
super(DataRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup());
@@ -65,7 +65,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
@Override
public BitServerConnection initRemoteConnection(Channel channel) {
- return new BitServerConnection(channel, context.getAllocator());
+ return connection = new BitServerConnection(channel, context.getAllocator());
}
@Override
@@ -89,6 +89,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
};
}
+
@Override
protected Response handle(BitServerConnection connection, int rpcType, ByteBuf pBody, ByteBuf body) throws RpcException {
assert rpcType == RpcType.REQ_RECORD_BATCH_VALUE;
@@ -121,8 +122,26 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
}
+ private final static FragmentRecordBatch OOM_FRAGMENT = FragmentRecordBatch.newBuilder().setIsOutOfMemory(true).build();
+
+ @Override
+ public OutOfMemoryHandler getOutOfMemoryHandler() {
+ return new OutOfMemoryHandler() {
+ @Override
+ public void handle() {
+ try {
+ logger.debug("Setting autoRead false");
+ connection.getFragmentManager().setAutoRead(false);
+ connection.getFragmentManager().handle(new RawFragmentBatch(connection, OOM_FRAGMENT, null));
+ } catch (FragmentSetupException e) {
+ throw new RuntimeException();
+ }
+ }
+ };
+ }
+
@Override
- public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) {
- return new DataProtobufLengthDecoder(allocator);
+ public ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) {
+ return new DataProtobufLengthDecoder(allocator, outOfMemoryHandler);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 5345b31a8..37d8d676a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -67,23 +67,26 @@ public class QueryResultHandler {
failAll();
}
}
-
+
if(failed){
l.submissionFailed(new RpcException("Remote failure while running query." + batch.getHeader().getErrorList()));
resultsListener.remove(result.getQueryId(), l);
}else{
+ try {
l.resultArrived(batch, throttle);
+ } catch (Exception e) {
+ batch.release();
+ l.submissionFailed(new RpcException(e));
+ }
}
if (
(failed || result.getIsLastChunk())
- &&
+ &&
(!(l instanceof BufferingListener) || ((BufferingListener)l).output != null)
) {
resultsListener.remove(result.getQueryId(), l);
}
-
-
}
private void failAll() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 50d456df0..f497d39b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -30,11 +30,7 @@ import org.apache.drill.exec.proto.UserProtos.RpcType;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.proto.UserProtos.UserProperties;
import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
-import org.apache.drill.exec.rpc.BasicClientWithConnection;
-import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
-import org.apache.drill.exec.rpc.Response;
-import org.apache.drill.exec.rpc.RpcConnectionHandler;
-import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.*;
import com.google.protobuf.MessageLite;
@@ -104,7 +100,7 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
@Override
public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) {
- return new UserProtobufLengthDecoder(allocator);
+ return new UserProtobufLengthDecoder(allocator, OutOfMemoryHandler.DEFAULT_INSTANCE);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserProtobufLengthDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserProtobufLengthDecoder.java
index 680a07d49..99e77774c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserProtobufLengthDecoder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserProtobufLengthDecoder.java
@@ -23,12 +23,13 @@ import io.netty.channel.ChannelHandlerContext;
import java.util.List;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.rpc.OutOfMemoryHandler;
import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
public class UserProtobufLengthDecoder extends ProtobufLengthDecoder{
- public UserProtobufLengthDecoder(BufferAllocator allocator) {
- super(allocator);
+ public UserProtobufLengthDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) {
+ super(allocator, outOfMemoryHandler);
}
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index ae4b01a53..acd841202 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -34,13 +34,7 @@ import org.apache.drill.exec.proto.UserProtos.RpcType;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.proto.UserProtos.UserProperties;
import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
-import org.apache.drill.exec.rpc.Acks;
-import org.apache.drill.exec.rpc.BasicServer;
-import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
-import org.apache.drill.exec.rpc.RemoteConnection;
-import org.apache.drill.exec.rpc.Response;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.*;
import org.apache.drill.exec.work.user.UserWorker;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -150,7 +144,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
}
@Override
- public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) {
- return new UserProtobufLengthDecoder(allocator);
+ public ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) {
+ return new UserProtobufLengthDecoder(allocator, outOfMemoryHandler);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
index 102201678..fb8a014b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
@@ -22,6 +22,9 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
public interface RecordReader {
+ public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
+ public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
+
/**
* Configure the RecordReader with the provided schema and the record batch that should be written to.
*
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
index 2c5ef42c7..1d6aa4d15 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java
@@ -25,16 +25,27 @@ public class VectorHolder {
private int length;
private ValueVector vector;
private int currentLength;
+ private boolean repeated;
public VectorHolder(int length, ValueVector vector) {
this.length = length;
this.vector = vector;
+ if (vector instanceof RepeatedFixedWidthVector || vector instanceof RepeatedVariableWidthVector) {
+ repeated = true;
+ }
}
public VectorHolder(ValueVector vector) {
this.length = vector.getValueCapacity();
this.vector = vector;
+ if (vector instanceof RepeatedFixedWidthVector || vector instanceof RepeatedVariableWidthVector) {
+ repeated = true;
+ }
+ }
+
+ public boolean isRepeated() {
+ return repeated;
}
public ValueVector getValueVector() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index 7ae10f814..67502ef9c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -153,7 +153,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
}
}
- return new ScanBatch(context, readers.iterator(), partitionColumns, selectedPartitionColumns);
+ return new ScanBatch(scan, context, readers.iterator(), partitionColumns, selectedPartitionColumns);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
index fb16edfa6..84587a946 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
@@ -32,6 +32,6 @@ public class DirectBatchCreator implements BatchCreator<DirectSubScan>{
@Override
public RecordBatch getBatch(FragmentContext context, DirectSubScan config, List<RecordBatch> children)
throws ExecutionSetupException {
- return new ScanBatch(context, Collections.singleton(config.getReader()).iterator());
+ return new ScanBatch(config, context, Collections.singleton(config.getReader()).iterator());
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 2e8cd2ede..1c8539cc0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -40,6 +40,7 @@ import org.apache.drill.exec.expr.holders.NullableBitHolder;
import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
import org.apache.drill.exec.expr.holders.NullableIntHolder;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
@@ -65,7 +66,7 @@ import com.google.common.collect.Maps;
public class JSONRecordReader implements RecordReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class);
- private static final int DEFAULT_LENGTH = 256 * 1024; // 256kb
+ private static final int DEFAULT_LENGTH = 4000;
public static final Charset UTF_8 = Charset.forName("UTF-8");
private final Map<String, VectorHolder> valueVectorMap;
@@ -78,22 +79,20 @@ public class JSONRecordReader implements RecordReader {
private RecordSchema currentSchema;
private List<Field> removedFields;
private OutputMutator outputMutator;
- private BufferAllocator allocator;
private int batchSize;
private final List<SchemaPath> columns;
public JSONRecordReader(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem, int batchSize,
- List<SchemaPath> columns) {
+ List<SchemaPath> columns) throws OutOfMemoryException {
this.hadoopPath = new Path(inputPath);
this.fileSystem = fileSystem;
- this.allocator = fragmentContext.getAllocator();
this.batchSize = batchSize;
valueVectorMap = Maps.newHashMap();
this.columns = columns;
}
public JSONRecordReader(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem,
- List<SchemaPath> columns) {
+ List<SchemaPath> columns) throws OutOfMemoryException {
this(fragmentContext, inputPath, fileSystem, DEFAULT_LENGTH, columns);
}
@@ -162,7 +161,10 @@ public class JSONRecordReader implements RecordReader {
}
for (VectorHolder holder : valueVectorMap.values()) {
- holder.populateVectorLength();
+ if (holder.isRepeated()) {
+ holder.setGroupCount(nextRowIndex);
+ }
+ holder.getValueVector().getMutator().setValueCount(nextRowIndex);
}
return nextRowIndex;
@@ -200,10 +202,6 @@ public class JSONRecordReader implements RecordReader {
return removedFields;
}
- public BufferAllocator getAllocator() {
- return allocator;
- }
-
private boolean fieldSelected(String field){
SchemaPath sp = SchemaPath.getCompoundPath(field.split("\\."));
@@ -523,11 +521,10 @@ public class JSONRecordReader implements RecordReader {
MaterializedField f = MaterializedField.create(SchemaPath.getCompoundPath(fullFieldName.split("\\.")), type);
- ValueVector v = TypeHelper.getNewVector(f, allocator);
+ ValueVector v = outputMutator.addField(f, TypeHelper.getValueVectorClass(minorType, type.getMode()));
AllocationHelper.allocate(v, batchSize, 50);
holder = new VectorHolder(v);
valueVectorMap.put(fullFieldName, holder);
- outputMutator.addField(v);
return holder;
}
return holder;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
index 07e0cbe91..2544d2b6f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
@@ -23,11 +23,14 @@ import java.util.Properties;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
@@ -209,10 +212,10 @@ public class HiveRecordReader implements RecordReader {
try {
for (int i = 0; i < columnNames.size(); i++) {
PrimitiveCategory pCat = primitiveCategories.get(i);
- MaterializedField field = MaterializedField.create(SchemaPath.getSimplePath(columnNames.get(i)), getMajorType(pCat));
- ValueVector vv = TypeHelper.getNewVector(field, context.getAllocator());
+ MajorType type = getMajorType(pCat);
+ MaterializedField field = MaterializedField.create(SchemaPath.getSimplePath(columnNames.get(i)), type);
+ ValueVector vv = output.addField(field, TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
vectors.add(vv);
- output.addField(vv);
}
for (int i = 0; i < selectedPartitionNames.size(); i++) {
String type = selectedPartitionTypes.get(i);
@@ -249,7 +252,7 @@ public class HiveRecordReader implements RecordReader {
TinyIntVector v = (TinyIntVector) vector;
byte value = (byte) val;
for (int j = 0; j < recordCount; j++) {
- v.getMutator().set(j, value);
+ v.getMutator().setSafe(j, value);
}
break;
}
@@ -257,7 +260,7 @@ public class HiveRecordReader implements RecordReader {
Float8Vector v = (Float8Vector) vector;
double value = (double) val;
for (int j = 0; j < recordCount; j++) {
- v.getMutator().set(j, value);
+ v.getMutator().setSafe(j, value);
}
break;
}
@@ -265,7 +268,7 @@ public class HiveRecordReader implements RecordReader {
Float4Vector v = (Float4Vector) vector;
float value = (float) val;
for (int j = 0; j < recordCount; j++) {
- v.getMutator().set(j, value);
+ v.getMutator().setSafe(j, value);
}
break;
}
@@ -273,7 +276,7 @@ public class HiveRecordReader implements RecordReader {
IntVector v = (IntVector) vector;
int value = (int) val;
for (int j = 0; j < recordCount; j++) {
- v.getMutator().set(j, value);
+ v.getMutator().setSafe(j, value);
}
break;
}
@@ -281,7 +284,7 @@ public class HiveRecordReader implements RecordReader {
BigIntVector v = (BigIntVector) vector;
long value = (long) val;
for (int j = 0; j < recordCount; j++) {
- v.getMutator().set(j, value);
+ v.getMutator().setSafe(j, value);
}
break;
}
@@ -289,7 +292,7 @@ public class HiveRecordReader implements RecordReader {
SmallIntVector v = (SmallIntVector) vector;
short value = (short) val;
for (int j = 0; j < recordCount; j++) {
- v.getMutator().set(j, value);
+ v.getMutator().setSafe(j, value);
}
break;
}
@@ -297,7 +300,7 @@ public class HiveRecordReader implements RecordReader {
VarCharVector v = (VarCharVector) vector;
byte[] value = (byte[]) val;
for (int j = 0; j < recordCount; j++) {
- v.getMutator().set(j, value);
+ v.getMutator().setSafe(j, value);
}
break;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
index b155661ed..62f2ec736 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java
@@ -66,6 +66,6 @@ public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> {
}
}
}
- return new ScanBatch(context, readers.iterator());
+ return new ScanBatch(config, context, readers.iterator());
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java
index f1821965e..125ee13a6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java
@@ -80,8 +80,7 @@ public class HiveTextRecordReader extends HiveRecordReader {
for (int i = start; (b = bytes[i]) != delimiter; i++) {
value = (value * 10) + b - 48;
}
- ((NullableIntVector) vv).getMutator().set(index, value); // No need to use setSafe for fixed length vectors
- return true;
+ return ((NullableIntVector) vv).getMutator().setSafe(index, value);
}
case LONG: {
long value = 0;
@@ -89,8 +88,7 @@ public class HiveTextRecordReader extends HiveRecordReader {
for (int i = start; (b = bytes[i]) != delimiter; i++) {
value = (value * 10) + b - 48;
}
- ((NullableBigIntVector) vv).getMutator().set(index, value); // No need to use setSafe for fixed length vectors
- return true;
+ return ((NullableBigIntVector) vv).getMutator().setSafe(index, value);
}
case SHORT:
throw new UnsupportedOperationException();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
index a40024529..a7e814638 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
@@ -33,6 +33,6 @@ public class InfoSchemaBatchCreator implements BatchCreator<InfoSchemaSubScan>{
@Override
public RecordBatch getBatch(FragmentContext context, InfoSchemaSubScan config, List<RecordBatch> children) throws ExecutionSetupException {
RecordReader rr = new RowRecordReader(context, config.getTable(), context.getRootSchema());
- return new ScanBatch(context, Collections.singleton(rr).iterator());
+ return new ScanBatch(config, context, Collections.singleton(rr).iterator());
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java
index 5d723dc99..ac601d43f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java
@@ -23,6 +23,8 @@ import net.hydromatic.optiq.SchemaPlus;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.store.RecordReader;
@@ -38,9 +40,13 @@ import org.apache.drill.exec.vector.ValueVector;
public class RowRecordReader implements RecordReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RowRecordReader.class);
+ public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
+ public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
+
protected final VectorSet batch;
protected final RowProvider provider;
protected final FragmentContext context;
+ protected final BufferAllocator allocator;
protected OutputMutator output;
private int bufSize = 256*1024;
@@ -50,14 +56,16 @@ public class RowRecordReader implements RecordReader {
* @param context
* @param vectors
*/
- public RowRecordReader(FragmentContext context, VectorSet batch, RowProvider provider) {
+ public RowRecordReader(FragmentContext context, VectorSet batch, RowProvider provider) throws OutOfMemoryException {
this.context = context;
+ this.allocator = context.getNewChildAllocator(ALLOCATOR_INITIAL_RESERVATION, ALLOCATOR_MAX_RESERVATION);
this.provider = provider;
this.batch = batch;
}
- public RowRecordReader(FragmentContext context, SelectedTable table, SchemaPlus rootSchema){
+ public RowRecordReader(FragmentContext context, SelectedTable table, SchemaPlus rootSchema) throws OutOfMemoryException {
this.context = context;
+ this.allocator = context.getNewChildAllocator(ALLOCATOR_INITIAL_RESERVATION, ALLOCATOR_MAX_RESERVATION);
this.provider = table.getProvider(rootSchema);
this.batch = table.getFixedTable();
}
@@ -68,7 +76,7 @@ public class RowRecordReader implements RecordReader {
@Override
public void setup(OutputMutator output) throws ExecutionSetupException {
this.output = output;
- batch.createVectors(context.getAllocator());
+ batch.createVectors(allocator);
// Inform drill of the output columns. They were set up when the vector handler was created.
// Note we are currently working with fixed tables.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
index 7a2ed1b9d..eb9e7a693 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
@@ -23,6 +23,8 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
@@ -38,11 +40,12 @@ public class MockRecordReader implements RecordReader {
private OutputMutator output;
private MockScanEntry config;
private FragmentContext context;
+ private BufferAllocator alcator;
private ValueVector[] valueVectors;
private int recordsRead;
private int batchRecordCount;
- public MockRecordReader(FragmentContext context, MockScanEntry config) {
+ public MockRecordReader(FragmentContext context, MockScanEntry config) throws OutOfMemoryException {
this.context = context;
this.config = config;
}
@@ -55,14 +58,11 @@ public class MockRecordReader implements RecordReader {
return x;
}
- private ValueVector getVector(String name, MajorType type, int length) {
+ private MaterializedField getVector(String name, MajorType type, int length) {
assert context != null : "Context shouldn't be null.";
MaterializedField f = MaterializedField.create(SchemaPath.getSimplePath(name), type);
- ValueVector v;
- v = TypeHelper.getNewVector(f, context.getAllocator());
- AllocationHelper.allocate(v, length, 50, 4);
- return v;
+ return f;
}
@@ -75,8 +75,8 @@ public class MockRecordReader implements RecordReader {
batchRecordCount = 250000 / estimateRowSize;
for (int i = 0; i < config.getTypes().length; i++) {
- valueVectors[i] = getVector(config.getTypes()[i].getName(), config.getTypes()[i].getMajorType(), batchRecordCount);
- output.addField(valueVectors[i]);
+ MajorType type = config.getTypes()[i].getMajorType();
+ valueVectors[i] = output.addField(getVector(config.getTypes()[i].getName(), type, batchRecordCount), TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
}
output.setNewSchema();
} catch (SchemaChangeException e) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
index 5c51a5af6..0bfd03886 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
@@ -41,6 +41,6 @@ public class MockScanBatchCreator implements BatchCreator<MockSubScanPOP>{
for(MockScanEntry e : entries){
readers.add(new MockRecordReader(context, e));
}
- return new ScanBatch(context, readers.iterator());
+ return new ScanBatch(config, context, readers.iterator());
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java
index 4c060f2b7..16c27159c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java
@@ -52,8 +52,10 @@ final class NullableBitReader extends ColumnReader {
defLevel = pageReadStatus.definitionLevels.readInteger();
// if the value is defined
if (defLevel == columnDescriptor.getMaxDefinitionLevel()){
- ((NullableBitVector)valueVecHolder.getValueVector()).getMutator().set(i + valuesReadInCurrentPass,
- pageReadStatus.valueReader.readBoolean() ? 1 : 0 );
+ if (!((NullableBitVector)valueVecHolder.getValueVector()).getMutator().setSafe(i + valuesReadInCurrentPass,
+ pageReadStatus.valueReader.readBoolean() ? 1 : 0 )) {
+ throw new RuntimeException();
+ }
}
// otherwise the value is skipped, because the bit vector indicating nullability is zero filled
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
index 6e17fbae9..9acb55745 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java
@@ -29,6 +29,7 @@ import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
@@ -77,7 +78,6 @@ class ParquetRecordReader implements RecordReader {
private List<ColumnReader> columnStatuses;
FileSystem fileSystem;
- BufferAllocator allocator;
private long batchSize;
Path hadoopPath;
private VarLenBinaryReader varLengthReader;
@@ -107,7 +107,6 @@ class ParquetRecordReader implements RecordReader {
String path, int rowGroupIndex, FileSystem fs,
CodecFactoryExposer codecFactoryExposer, ParquetMetadata footer,
List<SchemaPath> columns) throws ExecutionSetupException {
- this.allocator = fragmentContext.getAllocator();
hadoopPath = new Path(path);
fileSystem = fs;
this.codecFactoryExposer = codecFactoryExposer;
@@ -214,13 +213,13 @@ class ParquetRecordReader implements RecordReader {
for (int i = 0; i < columns.size(); ++i) {
column = columns.get(i);
columnChunkMetaData = footer.getBlocks().get(0).getColumns().get(i);
- field = MaterializedField.create(toFieldName(column.getPath()),
- toMajorType(column.getType(), getDataMode(column)));
+ MajorType type = toMajorType(column.getType(), getDataMode(column));
+ field = MaterializedField.create(toFieldName(column.getPath()), type);
// the field was not requested to be read
if ( ! fieldSelected(field)) continue;
fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY;
- ValueVector v = TypeHelper.getNewVector(field, allocator);
+ ValueVector v = output.addField(field, TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
createFixedColumnReader(fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, v);
} else {
@@ -237,17 +236,8 @@ class ParquetRecordReader implements RecordReader {
throw new ExecutionSetupException(e);
}
- output.removeAllFields();
+// output.removeAllFields();
try {
- for (ColumnReader crs : columnStatuses) {
- output.addField(crs.valueVecHolder.getValueVector());
- }
- for (VarLenBinaryReader.VarLengthColumn r : varLengthReader.columns) {
- output.addField(r.valueVecHolder.getValueVector());
- }
- for (VarLenBinaryReader.NullableVarLengthColumn r : varLengthReader.nullableColumns) {
- output.addField(r.valueVecHolder.getValueVector());
- }
output.setNewSchema();
}catch(SchemaChangeException e) {
throw new ExecutionSetupException("Error setting up output mutator.", e);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 6278a7974..df6581fb8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -123,6 +123,6 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
}
}
- return new ScanBatch(context, readers.iterator(), partitionColumns, selectedPartitionColumns);
+ return new ScanBatch(rowGroupScan, context, readers.iterator(), partitionColumns, selectedPartitionColumns);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
index e9e54f00f..86aec44c8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java
@@ -32,7 +32,7 @@ public class AllocationHelper {
}else if(v instanceof RepeatedFixedWidthVector){
((RepeatedFixedWidthVector) v).allocateNew(valueCount, valueCount * repeatedPerTop);
}else if(v instanceof RepeatedVariableWidthVector){
- ((RepeatedVariableWidthVector) v).allocateNew(valueCount * bytesPerValue, valueCount, valueCount * repeatedPerTop);
+ ((RepeatedVariableWidthVector) v).allocateNew(valueCount * bytesPerValue * repeatedPerTop, valueCount, valueCount * repeatedPerTop);
}else{
throw new UnsupportedOperationException();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
index 619fdad38..155d7d6cb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.exec.expr.holders.BitHolder;
import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.memory.AccountingByteBuf;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.UserBitShared.FieldMetadata;
import org.apache.drill.exec.record.MaterializedField;
@@ -38,6 +39,9 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
private final Accessor accessor = new Accessor();
private final Mutator mutator = new Mutator();
+ private int allocationValueCount = 4000;
+ private int allocationMonitor = 0;
+
private int valueCapacity;
public BitVector(MaterializedField field, BufferAllocator allocator) {
@@ -57,6 +61,19 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
return (int) Math.ceil((float)valueCount / 8.0);
}
+ private int getByteIndex(int index) {
+ return (int) Math.floor((float) index / 8.0);
+ }
+
+ public void allocateNew() {
+ clear();
+ if (allocationMonitor > 5) {
+ allocationValueCount = Math.min(1, (int)(allocationValueCount * 0.9));
+ } else if (allocationMonitor < -5) {
+ allocationValueCount = (int) (allocationValueCount * 1.1);
+ }
+ }
+
/**
* Allocate a new memory space for this vector. Must be called prior to using the ValueVector.
*
@@ -132,6 +149,37 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
clear();
}
+ public void splitAndTransferTo(int startIndex, int length, BitVector target) {
+ assert startIndex + length <= valueCount;
+ int firstByte = getByteIndex(startIndex);
+ int lastByte = getSizeFromCount(startIndex + length) - 1;
+ int offset = startIndex % 8;
+ if (offset == 0) {
+ // slice
+ target.data = this.data.slice(firstByte, lastByte - firstByte + 1);
+ target.data.retain();
+ } else {
+ // Copy data
+ target.clear();
+ target.allocateNew(length);
+ if ((startIndex + length) % 8 == 0) {
+ lastByte++;
+ }
+ int i = firstByte;
+ // TODO maybe do this one word at a time, rather than byte?
+ for (; i <= lastByte - 1; i++) {
+ target.data.setByte(i - firstByte, (((this.data.getByte(i) & 0xFF) >>> offset) + (this.data.getByte(i + 1) << (8 - offset))));
+ }
+ if (startIndex + length == this.valueCount) {
+ target.data.setByte(i - firstByte, ((this.data.getByte(lastByte) & 0xFF) >>> offset));
+ }
+ }
+ }
+
+ private void copyTo(int startIndex, int length, BitVector target) {
+
+ }
+
private class TransferImpl implements TransferPair {
BitVector to;
@@ -151,6 +199,10 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
transferTo(to);
}
+ public void splitAndTransfer(int startIndex, int length) {
+ splitAndTransferTo(startIndex, length, to);
+ }
+
@Override
public void copyValue(int fromIndex, int toIndex) {
to.copyFrom(fromIndex, toIndex, BitVector.this);
@@ -237,7 +289,10 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
}
public boolean setSafe(int index, int value) {
- if(index >= getValueCapacity()) return false;
+ if(index >= getValueCapacity()) {
+ allocationMonitor--;
+ return false;
+ }
set(index, value);
return true;
}
@@ -256,7 +311,15 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe
public final void setValueCount(int valueCount) {
BitVector.this.valueCount = valueCount;
- data.writerIndex(getSizeFromCount(valueCount));
+ int idx = getSizeFromCount(valueCount);
+ if (((float) data.capacity()) / idx > 1.1) {
+ allocationMonitor++;
+ }
+ data.writerIndex(idx);
+ if (data instanceof AccountingByteBuf) {
+ data.capacity(idx);
+ data.writerIndex(idx);
+ }
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
index 24e3473ef..258b354b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
@@ -33,6 +33,11 @@ import org.apache.drill.exec.record.TransferPair;
*/
public interface ValueVector extends Closeable {
+ /**
+ * Allocate new buffers. ValueVector implements logic to determine how much to allocate.
+ */
+ public void allocateNew();
+
public int getBufferSize();
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index b6d441cff..27f82216c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -22,11 +22,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.cache.DistributedCache;
@@ -71,7 +67,7 @@ public class WorkManager implements Closeable{
private final UserWorker userWorker;
private final WorkerBee bee;
private final WorkEventBus workBus;
- private Executor executor;
+ private ExecutorService executor;
private final EventThread eventThread;
public WorkManager(BootStrapContext context){
@@ -108,6 +104,11 @@ public class WorkManager implements Closeable{
@Override
public void close() throws IOException {
+ try {
+ executor.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ logger.warn("Executor interrupted while awaiting termination");
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
index 30e6df285..76db1ed4e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
@@ -81,6 +81,11 @@ public abstract class AbstractDataCollector implements DataCollector{
public abstract void streamFinished(int minorFragmentId);
public boolean batchArrived(int minorFragmentId, RawFragmentBatch batch) throws IOException {
+ if (batch.getHeader().getIsOutOfMemory()) {
+ for (RawBatchBuffer buffer : buffers) {
+ buffer.enqueue(batch);
+ }
+ }
boolean decremented = false;
if (remainders.compareAndSet(minorFragmentId, 0, 1)) {
int rem = remainingRequired.decrementAndGet();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
index 3cb18b631..9b3b870d3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java
@@ -64,6 +64,16 @@ public class IncomingBuffers {
public boolean batchArrived(RawFragmentBatch batch) throws FragmentSetupException {
// no need to do anything if we've already enabled running.
// logger.debug("New Batch Arrived {}", batch);
+ if (batch.getHeader().getIsOutOfMemory()) {
+ for (DataCollector fSet : fragCounts.values()) {
+ try {
+ fSet.batchArrived(0, batch);
+ } catch (IOException e) {
+ throw new RuntimeException();
+ }
+ }
+ return false;
+ }
if(batch.getHeader().getIsLastBatch()){
streamsRemaining.decrementAndGet();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
index afac86fbf..c8a15258d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
@@ -29,13 +29,17 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.BitData;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.record.RawFragmentBatch;
import org.apache.drill.exec.rpc.RemoteConnection;
+import org.apache.drill.exec.rpc.data.BitServerConnection;
import org.apache.drill.exec.store.LocalSyncableFileSystem;
+import org.apache.drill.exec.work.fragment.FragmentManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -55,20 +59,27 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
private static String DRILL_LOCAL_IMPL_STRING = "fs.drill-local.impl";
private static final float STOP_SPOOLING_FRACTION = (float) 0.5;
+ public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
+ public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
private final LinkedBlockingDeque<RawFragmentBatchWrapper> buffer = Queues.newLinkedBlockingDeque();
private volatile boolean finished = false;
private volatile long queueSize = 0;
private long threshold;
private FragmentContext context;
+ private BufferAllocator allocator;
private volatile AtomicBoolean spooling = new AtomicBoolean(false);
private FileSystem fs;
private Path path;
private FSDataOutputStream outputStream;
private FSDataInputStream inputStream;
+ private boolean outOfMemory = false;
+ private boolean closed = false;
+ private FragmentManager fragmentManager;
- public SpoolingRawBatchBuffer(FragmentContext context) throws IOException {
+ public SpoolingRawBatchBuffer(FragmentContext context) throws IOException, OutOfMemoryException {
this.context = context;
+ this.allocator = context.getNewChildAllocator(ALLOCATOR_INITIAL_RESERVATION, ALLOCATOR_MAX_RESERVATION);
this.threshold = context.getConfig().getLong(ExecConstants.SPOOLING_BUFFER_MEMORY);
Configuration conf = new Configuration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, context.getConfig().getString(ExecConstants.TEMP_FILESYSTEM));
@@ -86,6 +97,20 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
@Override
public synchronized void enqueue(RawFragmentBatch batch) throws IOException {
+ if (batch.getHeader().getIsOutOfMemory()) {
+ if (fragmentManager == null) {
+ fragmentManager = ((BitServerConnection) batch.getConnection()).getFragmentManager();
+ }
+// fragmentManager.setAutoRead(false);
+// logger.debug("Setting autoRead false");
+ if (!outOfMemory && !buffer.peekFirst().isOutOfMemory()) {
+ logger.debug("Adding OOM message to front of queue. Current queue size: {}", buffer.size());
+ buffer.addFirst(new RawFragmentBatchWrapper(batch, true));
+ } else {
+ logger.debug("ignoring duplicate OOM message");
+ }
+ return;
+ }
RawFragmentBatchWrapper wrapper;
boolean spool = spooling.get();
wrapper = new RawFragmentBatchWrapper(batch, !spool);
@@ -105,7 +130,7 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
@Override
public void kill(FragmentContext context) {
- cleanup();
+ allocator.close();
}
@@ -116,6 +141,11 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
@Override
public RawFragmentBatch getNext() throws IOException {
+ if (outOfMemory && buffer.size() < 10) {
+ outOfMemory = false;
+ fragmentManager.setAutoRead(true);
+ logger.debug("Setting autoRead true");
+ }
boolean spool = spooling.get();
RawFragmentBatchWrapper w = buffer.poll();
RawFragmentBatch batch;
@@ -123,21 +153,27 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
try {
w = buffer.take();
batch = w.get();
+ if (batch.getHeader().getIsOutOfMemory()) {
+ outOfMemory = true;
+ return batch;
+ }
queueSize -= w.getBodySize();
return batch;
} catch (InterruptedException e) {
- cleanup();
return null;
}
}
if (w == null) {
- cleanup();
return null;
}
batch = w.get();
+ if (batch.getHeader().getIsOutOfMemory()) {
+ outOfMemory = true;
+ return batch;
+ }
queueSize -= w.getBodySize();
- assert queueSize >= 0;
+// assert queueSize >= 0;
if (spool && queueSize < threshold * STOP_SPOOLING_FRACTION) {
logger.debug("buffer size {} less than {}x threshold. Stop spooling.", queueSize, STOP_SPOOLING_FRACTION);
spooling.set(false);
@@ -145,7 +181,13 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
return batch;
}
- private void cleanup() {
+ public void cleanup() {
+ if (closed) {
+ logger.warn("Tried cleanup twice");
+ return;
+ }
+ closed = true;
+ allocator.close();
try {
if (outputStream != null) {
outputStream.close();
@@ -171,6 +213,7 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
private boolean available;
private CountDownLatch latch = new CountDownLatch(1);
private int bodyLength;
+ private boolean outOfMemory = false;
public RawFragmentBatchWrapper(RawFragmentBatch batch, boolean available) {
Preconditions.checkNotNull(batch);
@@ -225,7 +268,7 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
Stopwatch watch = new Stopwatch();
watch.start();
BitData.FragmentRecordBatch header = BitData.FragmentRecordBatch.parseDelimitedFrom(stream);
- ByteBuf buf = context.getAllocator().buffer(bodyLength);
+ ByteBuf buf = allocator.buffer(bodyLength);
buf.writeBytes(stream, bodyLength);
batch = new RawFragmentBatch(null, header, buf);
available = true;
@@ -233,6 +276,14 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
long t = watch.elapsed(TimeUnit.MICROSECONDS);
logger.debug("Took {} us to read {} from disk. Rate {} mb/s", t, bodyLength, bodyLength / t);
}
+
+ private boolean isOutOfMemory() {
+ return outOfMemory;
+ }
+
+ private void setOutOfMemory(boolean outOfMemory) {
+ this.outOfMemory = outOfMemory;
+ }
}
private String getFileName() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 97d8d34ce..4853d329a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -52,6 +52,11 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
}
@Override
+ public void cleanup() {
+
+ }
+
+ @Override
public void kill(FragmentContext context) {
while(!buffer.isEmpty()){
RawFragmentBatch batch = buffer.poll();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
index 7d92c9a64..0a4b2350f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java
@@ -21,6 +21,7 @@ import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.RemoteConnection;
/**
* The Fragment Manager is responsible managing incoming data and executing a fragment. Once enough data and resources
@@ -51,4 +52,12 @@ public interface FragmentManager {
public abstract FragmentHandle getHandle();
public abstract FragmentContext getFragmentContext();
+
+ public abstract void addConnection(RemoteConnection connection);
+
+ /**
+ * Sets autoRead property on all connections
+ * @param autoRead
+ */
+ public abstract void setAutoRead(boolean autoRead);
} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index d82c1c001..c8f20212e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -18,6 +18,8 @@
package org.apache.drill.exec.work.fragment;
import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.exception.FragmentSetupException;
@@ -31,6 +33,7 @@ import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.RemoteConnection;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.work.batch.IncomingBuffers;
@@ -46,6 +49,7 @@ public class NonRootFragmentManager implements FragmentManager {
private volatile boolean cancel = false;
private final FragmentContext context;
private final PhysicalPlanReader reader;
+ private List<RemoteConnection> connections = new CopyOnWriteArrayList<>();
public NonRootFragmentManager(PlanFragment fragment, DrillbitContext context) throws FragmentSetupException{
try{
@@ -118,8 +122,17 @@ public class NonRootFragmentManager implements FragmentManager {
public FragmentContext getFragmentContext() {
return context;
}
-
-
-
+ @Override
+ public void addConnection(RemoteConnection connection) {
+ connections.add(connection);
+ }
+
+ @Override
+ public void setAutoRead(boolean autoRead) {
+ for (RemoteConnection c : connections) {
+ c.setAutoRead(autoRead);
+ }
+ }
+
} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
index 40f4d2b30..c763d5568 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
@@ -21,8 +21,12 @@ import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.record.RawFragmentBatch;
+import org.apache.drill.exec.rpc.RemoteConnection;
import org.apache.drill.exec.work.batch.IncomingBuffers;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
public class RootFragmentManager implements FragmentManager{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootFragmentManager.class);
@@ -30,6 +34,7 @@ public class RootFragmentManager implements FragmentManager{
private final FragmentExecutor runner;
private final FragmentHandle handle;
private volatile boolean cancel = false;
+ private List<RemoteConnection> connections = new CopyOnWriteArrayList<>();
public RootFragmentManager(FragmentHandle handle, IncomingBuffers buffers, FragmentExecutor runner) {
super();
@@ -66,7 +71,17 @@ public class RootFragmentManager implements FragmentManager{
public FragmentContext getFragmentContext() {
return runner.getContext();
}
-
-
-
+
+ @Override
+ public void addConnection(RemoteConnection connection) {
+ connections.add(connection);
+ }
+
+ @Override
+ public void setAutoRead(boolean autoRead) {
+ for (RemoteConnection c : connections) {
+ c.setAutoRead(autoRead);
+ }
+ }
+
}