aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src
diff options
context:
space:
mode:
authorSteven Phillips <sphillips@maprtech.com>2014-08-18 17:23:23 -0700
committerJacques Nadeau <jacques@apache.org>2014-08-29 17:51:51 -0700
commit437366f03e5de4c4692703007bff2f5a134720dd (patch)
tree637752098e129163b40ac3946986e56f68d12c86 /exec/java-exec/src
parentfa3c8d52a9da0913cc734ff8d3f3968d0e25fab8 (diff)
DRILL-1329: External sort memory fixes
Diffstat (limited to 'exec/java-exec/src')
-rw-r--r--exec/java-exec/src/main/codegen/templates/NullableValueVectors.java8
-rw-r--r--exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java4
-rw-r--r--exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java10
-rw-r--r--exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java8
-rw-r--r--exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java24
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java161
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java206
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java26
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelector.java37
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelectorTemplate.java161
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java19
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java18
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java19
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java2
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java3
38 files changed, 392 insertions, 490 deletions
diff --git a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
index cb5284132..d9eae0fb8 100644
--- a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
@@ -73,9 +73,11 @@ public final class ${className} extends BaseValueVector implements <#if type.maj
}
@Override
- public DrillBuf[] getBuffers() {
- DrillBuf[] buffers = ObjectArrays.concat(bits.getBuffers(), values.getBuffers(), DrillBuf.class);
- clear();
+ public DrillBuf[] getBuffers(boolean clear) {
+ DrillBuf[] buffers = ObjectArrays.concat(bits.getBuffers(clear), values.getBuffers(clear), DrillBuf.class);
+ if (clear) {
+ clear();
+ }
return buffers;
}
diff --git a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
index cb37a1b52..2fbbc6fb1 100644
--- a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
+++ b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java
@@ -41,7 +41,7 @@ import org.apache.drill.exec.vector.complex.reader.FieldReader;
import parquet.io.api.RecordConsumer;
import parquet.schema.MessageType;
import parquet.io.api.Binary;
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
@@ -220,7 +220,7 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp
//consumer.endField(fieldName, fieldId);
<#else>
reader.read(holder);
- ByteBuf buf = holder.buffer;
+ DrillBuf buf = holder.buffer;
consumer.startField(fieldName, fieldId);
consumer.addBinary(Binary.fromByteBuffer(holder.buffer.nioBuffer(holder.start, holder.end - holder.start)));
consumer.endField(fieldName, fieldId);
diff --git a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
index 97117a4d3..195f182aa 100644
--- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
@@ -71,7 +71,7 @@ package org.apache.drill.exec.vector;
}
public void setCurrentValueCount(int count) {
- values.setCurrentValueCount(count);
+ values.setCurrentValueCount(offsets.getAccessor().get(count));
}
public int getBufferSize(){
@@ -259,9 +259,11 @@ package org.apache.drill.exec.vector;
</#if>
@Override
- public DrillBuf[] getBuffers() {
- DrillBuf[] buffers = ObjectArrays.concat(offsets.getBuffers(), values.getBuffers(), DrillBuf.class);
- clear();
+ public DrillBuf[] getBuffers(boolean clear) {
+ DrillBuf[] buffers = ObjectArrays.concat(offsets.getBuffers(clear), values.getBuffers(clear), DrillBuf.class);
+ if (clear) {
+ clear();
+ }
return buffers;
}
diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index 95cd3cc3c..fe255a844 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -134,9 +134,11 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V
@Override
- public DrillBuf[] getBuffers() {
- DrillBuf[] buffers = ObjectArrays.concat(offsetVector.getBuffers(), super.getBuffers(), DrillBuf.class);
- clear();
+ public DrillBuf[] getBuffers(boolean clear) {
+ DrillBuf[] buffers = ObjectArrays.concat(offsetVector.getBuffers(clear), super.getBuffers(clear), DrillBuf.class);
+ if (clear) {
+ clear();
+ }
return buffers;
}
diff --git a/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java b/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
index 6ad5da8ff..5399239b9 100644
--- a/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
+++ b/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
@@ -663,6 +663,10 @@ public final class DrillBuf extends AbstractByteBuf {
return new DrillBuf(allocator, a);
}
+ public boolean isRootBuffer(){
+ return rootBuffer;
+ }
+
public static DrillBuf wrapByteBuffer(ByteBuffer b){
if(!b.isDirect()){
throw new IllegalStateException("DrillBufs can only refer to direct memory.");
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
index e007bcc69..83b3d5a8e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java
@@ -27,6 +27,7 @@ import java.util.List;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.metrics.DrillMetrics;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.SerializedField;
@@ -114,6 +115,9 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable {
int dataLength = metaData.getBufferLength();
MaterializedField field = MaterializedField.create(metaData);
DrillBuf buf = allocator.buffer(dataLength);
+ if (buf == null) {
+ throw new IOException(new OutOfMemoryException());
+ }
buf.writeBytes(input, dataLength);
ValueVector vector = TypeHelper.getNewVector(field, allocator);
vector.load(metaData, buf);
@@ -143,10 +147,10 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable {
Preconditions.checkNotNull(output);
final Timer.Context timerContext = metrics.timer(WRITER_TIMER).time();
- ByteBuf[] incomingBuffers = batch.getBuffers();
+ DrillBuf[] incomingBuffers = batch.getBuffers();
UserBitShared.RecordBatchDef batchDef = batch.getDef();
- /* ByteBuf associated with the selection vector */
+ /* DrillBuf associated with the selection vector */
DrillBuf svBuf = null;
Integer svCount = null;
@@ -171,7 +175,7 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable {
}
/* Dump the array of ByteBuf's associated with the value vectors */
- for (ByteBuf buf : incomingBuffers)
+ for (DrillBuf buf : incomingBuffers)
{
/* dump the buffer into the OutputStream */
int bufLength = buf.readableBytes();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 36c162abe..2d39b5e60 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkState;
import static org.apache.drill.exec.proto.UserProtos.QueryResultsMode.STREAM_FULL;
import static org.apache.drill.exec.proto.UserProtos.RunQuery.newBuilder;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
import java.io.Closeable;
import java.io.IOException;
@@ -338,7 +339,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
}
@Override
- public ByteBuf getBuffer() {
+ public DrillBuf getBuffer() {
return null;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
index 55f11a463..ae80f7b45 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
@@ -162,7 +162,7 @@ public class TopLevelAllocator implements BufferAllocator {
public DrillBuf buffer(int size, int max) {
if(size == 0) return empty;
if(!childAcct.reserve(size)){
- logger.warn("Unable to allocate buffer of size {} due to memory limit. Current allocation: {}", size, getAllocatedMemory());
+ logger.warn("Unable to allocate buffer of size {} due to memory limit. Current allocation: {}", size, getAllocatedMemory(), new Exception());
return null;
};
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
index 8af69fa67..c196a962c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java
@@ -32,6 +32,8 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
public class ExternalSort extends Sort {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSort.class);
+ private long initialAllocation = 20000000;
+
@JsonCreator
public ExternalSort(@JsonProperty("child") PhysicalOperator child, @JsonProperty("orderings") List<Ordering> orderings, @JsonProperty("reverse") boolean reverse) {
super(child, orderings, reverse);
@@ -52,7 +54,9 @@ public class ExternalSort extends Sort {
@Override
protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
- return new ExternalSort(child, orderings, reverse);
+ ExternalSort newSort = new ExternalSort(child, orderings, reverse);
+ newSort.setMaxAllocation(getMaxAllocation());
+ return newSort;
}
@Override
@@ -60,5 +64,12 @@ public class ExternalSort extends Sort {
return CoreOperatorType.EXTERNAL_SORT_VALUE;
}
+ public void setMaxAllocation(long maxAllocation) {
+ this.maxAllocation = Math.max(initialAllocation, maxAllocation);
+ }
+
+ public long getInitialAllocation() {
+ return initialAllocation;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 62163052e..e56d883ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -60,9 +61,6 @@ import com.google.common.collect.Maps;
public class ScanBatch implements RecordBatch {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class);
- private static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
- private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
-
private static final int MAX_RECORD_CNT = Character.MAX_VALUE;
private final Map<MaterializedField.Key, ValueVector> fieldVectorMap = Maps.newHashMap();
@@ -147,7 +145,15 @@ public class ScanBatch implements RecordBatch {
long t1 = System.nanoTime();
oContext.getStats().startProcessing();
try {
- mutator.allocate(MAX_RECORD_CNT);
+ try {
+ currentReader.allocate(fieldVectorMap);
+ } catch (OutOfMemoryException e) {
+ logger.debug("Caught OutOfMemoryException");
+ for (ValueVector v : fieldVectorMap.values()) {
+ v.clear();
+ }
+ return IterOutcome.OUT_OF_MEMORY;
+ }
while ((recordCount = currentReader.next()) == 0) {
try {
if (!readers.hasNext()) {
@@ -169,7 +175,15 @@ public class ScanBatch implements RecordBatch {
currentReader = readers.next();
partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null;
currentReader.setup(mutator);
- mutator.allocate(MAX_RECORD_CNT);
+ try {
+ currentReader.allocate(fieldVectorMap);
+ } catch (OutOfMemoryException e) {
+ logger.debug("Caught OutOfMemoryException");
+ for (ValueVector v : fieldVectorMap.values()) {
+ v.clear();
+ }
+ return IterOutcome.OUT_OF_MEMORY;
+ }
addPartitionVectors();
} finally {
oContext.getStats().stopSetup();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index c8929d1c5..734088e12 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -80,7 +80,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
private boolean hasRemainder = false;
private int remainderIndex = 0;
private int recordCount;
-
+
private static final String EMPTY_STRING = "";
private class ClassifierResult {
@@ -137,7 +137,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
// VectorUtil.showVectorAccessibleContent(incoming, ",");
int incomingRecordCount = incoming.getRecordCount();
- doAlloc();
+ if (!doAlloc()) {
+ outOfMemory = true;
+ return;
+ }
int outputRecords = projector.projectRecords(0, incomingRecordCount, 0);
if (outputRecords < incomingRecordCount) {
@@ -161,7 +164,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
private void handleRemainder() {
int remainingRecordCount = incoming.getRecordCount() - remainderIndex;
- doAlloc();
+ if (!doAlloc()) {
+ outOfMemory = true;
+ return;
+ }
int projRecords = projector.projectRecords(remainderIndex, remainingRecordCount, 0);
if (projRecords < remainingRecordCount) {
setValueCount(projRecords);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
index 5c2ab22eb..237007046 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.xsort;
import java.io.IOException;
import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.cache.VectorAccessibleSerializable;
@@ -41,36 +42,23 @@ import com.google.common.base.Stopwatch;
public class BatchGroup implements VectorAccessible {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchGroup.class);
- private final VectorContainer firstContainer;
- private final VectorContainer secondContainer;
private VectorContainer currentContainer;
private SelectionVector2 sv2;
private int pointer = 0;
- private int batchPointer = 0;
- private boolean hasSecond = false;
private FSDataInputStream inputStream;
private FSDataOutputStream outputStream;
private Path path;
private FileSystem fs;
private BufferAllocator allocator;
private int spilledBatches = 0;
- private boolean done = false;
public BatchGroup(VectorContainer container, SelectionVector2 sv2) {
- this.firstContainer = container;
- this.secondContainer = null;
this.sv2 = sv2;
- this.currentContainer = firstContainer;
+ this.currentContainer = container;
}
- public BatchGroup(VectorContainer firstContainer, VectorContainer secondContainer, FileSystem fs, String path, BufferAllocator allocator) {
- assert firstContainer.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.NONE;
- assert secondContainer.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.NONE;
-
- this.firstContainer = firstContainer;
- this.secondContainer = secondContainer;
- currentContainer = firstContainer;
- this.hasSecond = true;
+ public BatchGroup(VectorContainer container, FileSystem fs, String path, BufferAllocator allocator) {
+ currentContainer = container;
this.fs = fs;
this.path = new Path(path);
this.allocator = allocator;
@@ -93,7 +81,7 @@ public class BatchGroup implements VectorAccessible {
watch.start();
outputBatch.writeToStream(outputStream);
newContainer.zeroVectors();
-// logger.debug("Took {} us to spill {} records", watch.elapsed(TimeUnit.MICROSECONDS), recordCount);
+ logger.debug("Took {} us to spill {} records", watch.elapsed(TimeUnit.MICROSECONDS), recordCount);
spilledBatches++;
}
@@ -107,132 +95,51 @@ public class BatchGroup implements VectorAccessible {
Stopwatch watch = new Stopwatch();
watch.start();
vas.readFromStream(inputStream);
- VectorContainer c = (VectorContainer) vas.get();
+ VectorContainer c = vas.get();
// logger.debug("Took {} us to read {} records", watch.elapsed(TimeUnit.MICROSECONDS), c.getRecordCount());
spilledBatches--;
+ currentContainer.zeroVectors();
+ Iterator<VectorWrapper<?>> wrapperIterator = c.iterator();
+ for (VectorWrapper w : currentContainer) {
+ TransferPair pair = wrapperIterator.next().getValueVector().makeTransferPair(w.getValueVector());
+ pair.transfer();
+ }
+ currentContainer.setRecordCount(c.getRecordCount());
+ c.zeroVectors();
return c;
}
public int getNextIndex() {
- if (pointer == currentContainer.getRecordCount()) {
- if (!hasSecond || (batchPointer == 1 && spilledBatches == 0)) {
+ int val;
+ if (pointer == getRecordCount()) {
+ if (spilledBatches == 0) {
return -1;
- } else if (batchPointer == 1 && spilledBatches > 0) {
- return -2;
}
- batchPointer++;
- currentContainer = secondContainer;
- pointer = 0;
+ try {
+ currentContainer.zeroVectors();
+ getBatch();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ pointer = 1;
+ return 0;
}
if (sv2 == null) {
- int val = pointer;
+ val = pointer;
pointer++;
assert val < currentContainer.getRecordCount();
- return val;
} else {
- int val = pointer;
+ val = pointer;
pointer++;
assert val < currentContainer.getRecordCount();
- return sv2.getIndex(val);
- }
- }
-
- public VectorContainer getFirstContainer() {
- return firstContainer;
- }
-
- public VectorContainer getSecondContainer() {
- return secondContainer;
- }
-
- public boolean hasSecond() {
- return hasSecond;
- }
-
- public void rotate() {
- if (batchPointer == 0) {
- return;
- }
- if (pointer == secondContainer.getRecordCount()) {
- try {
- getTwoBatches();
- return;
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- firstContainer.zeroVectors();
- Iterator<VectorWrapper<?>> wrapperIterator = secondContainer.iterator();
- for (VectorWrapper w : firstContainer) {
- TransferPair pair = wrapperIterator.next().getValueVector().makeTransferPair(w.getValueVector());
- pair.transfer();
+ val = sv2.getIndex(val);
}
- firstContainer.setRecordCount(secondContainer.getRecordCount());
- if (spilledBatches > 0) {
- VectorContainer c = null;
- try {
- c = getBatch();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- secondContainer.zeroVectors();
- wrapperIterator = c.iterator();
- for (VectorWrapper w : secondContainer) {
- TransferPair pair = wrapperIterator.next().getValueVector().makeTransferPair(w.getValueVector());
- pair.transfer();
- }
- secondContainer.setRecordCount(c.getRecordCount());
- c.zeroVectors();
- } else {
- secondContainer.zeroVectors();
- hasSecond = false;
- }
- batchPointer = 0;
- currentContainer = firstContainer;
+ return val;
}
- private void getTwoBatches() throws IOException {
- firstContainer.zeroVectors();
- if (spilledBatches > 0) {
- VectorContainer c = getBatch();
- Iterator<VectorWrapper<?>> wrapperIterator = c.iterator();
- for (VectorWrapper w : firstContainer) {
- TransferPair pair = wrapperIterator.next().getValueVector().makeTransferPair(w.getValueVector());
- pair.transfer();
- }
- firstContainer.setRecordCount(c.getRecordCount());
- c.zeroVectors();
- } else {
- batchPointer = -1;
- pointer = -1;
- firstContainer.zeroVectors();
- secondContainer.zeroVectors();
- }
- if (spilledBatches > 0) {
- VectorContainer c = getBatch();
- Iterator<VectorWrapper<?>> wrapperIterator = c.iterator();
- for (VectorWrapper w : secondContainer) {
- TransferPair pair = wrapperIterator.next().getValueVector().makeTransferPair(w.getValueVector());
- pair.transfer();
- }
- secondContainer.setRecordCount(c.getRecordCount());
- c.zeroVectors();
- } else {
- secondContainer.zeroVectors();
- hasSecond = false;
- }
- batchPointer = 0;
- currentContainer = firstContainer;
- pointer = 0;
-// BatchPrinter.printBatch(firstContainer);
-// BatchPrinter.printBatch(secondContainer);
- return;
- }
-
- public int getBatchPointer() {
- assert batchPointer < 2;
- return batchPointer;
+ public VectorContainer getContainer() {
+ return currentContainer;
}
public void cleanup() throws IOException {
@@ -263,7 +170,11 @@ public class BatchGroup implements VectorAccessible {
@Override
public int getRecordCount() {
- return currentContainer.getRecordCount();
+ if (sv2 != null) {
+ return sv2.getCount();
+ } else {
+ return currentContainer.getRecordCount();
+ }
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index d4b10014f..aa6527210 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.physical.impl.xsort;
+import io.netty.buffer.DrillBuf;
+
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
@@ -93,10 +95,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
private SingleBatchSorter sorter;
private SortRecordBatchBuilder builder;
private MSorter mSorter;
- private PriorityQueueSelector selector;
private PriorityQueueCopier copier;
private BufferAllocator copierAllocator;
private LinkedList<BatchGroup> batchGroups = Lists.newLinkedList();
+ private LinkedList<BatchGroup> spilledBatchGroups = Lists.newLinkedList();
private SelectionVector4 sv4;
private FileSystem fs;
private int spillCount = 0;
@@ -104,6 +106,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
private long uid;//used for spill files to ensure multiple sorts within same fragment don't clobber each others' files
private boolean useIncomingSchema = false;
private boolean first = true;
+ private long totalSizeInMemory = 0;
+ private long highWaterMark = Long.MAX_VALUE;
+ private int targetRecordCount;
public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
super(popConfig, context);
@@ -128,7 +133,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
@Override
public int getRecordCount() {
- return sv4.getCount();
+ if (sv4 != null) {
+ return sv4.getCount();
+ } else {
+ return container.getRecordCount();
+ }
}
@Override
@@ -195,21 +204,22 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
} else {
Stopwatch w = new Stopwatch();
w.start();
- int count = selector.next();
+// int count = selector.next();
+ int count = copier.next(targetRecordCount);
if(count > 0){
long t = w.elapsed(TimeUnit.MICROSECONDS);
logger.debug("Took {} us to merge {} records", t, count);
container.setRecordCount(count);
return IterOutcome.OK;
}else{
- logger.debug("selector returned 0 records");
+ logger.debug("copier returned 0 records");
return IterOutcome.NONE;
}
}
}
long totalcount = 0;
-
+
try{
outer: while (true) {
Stopwatch watch = new Stopwatch();
@@ -234,17 +244,21 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
// fall through.
case OK:
+ totalSizeInMemory += getBufferSize(incoming);
SelectionVector2 sv2;
// if (incoming.getRecordCount() == 0) {
// break outer;
// }
if (incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) {
sv2 = incoming.getSelectionVector2();
+ if (sv2.getBuffer(false).isRootBuffer()) {
+ oContext.getAllocator().takeOwnership(sv2.getBuffer(false));
+ }
} else {
try {
sv2 = newSV2();
} catch (OutOfMemoryException e) {
- throw new RuntimeException();
+ throw new RuntimeException(e);
}
}
int count = sv2.getCount();
@@ -263,7 +277,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
batchGroups.add(new BatchGroup(rbd.getContainer(), rbd.getSv2()));
batchesSinceLastSpill++;
- if (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE) {
+ if ((spillCount > 0 && totalSizeInMemory > .75 * highWaterMark) ||
+ (totalSizeInMemory > .95 * popConfig.getMaxAllocation()) ||
+ (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE)) {
mergeAndSpill();
batchesSinceLastSpill = 0;
}
@@ -271,6 +287,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
// logger.debug("Took {} us to sort {} records", t, count);
break;
case OUT_OF_MEMORY:
+ highWaterMark = totalSizeInMemory;
if (batchesSinceLastSpill > 2) mergeAndSpill();
batchesSinceLastSpill = 0;
break;
@@ -279,25 +296,14 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
}
-// if (schema == null || totalcount == 0){
-// builder may be null at this point if the first incoming batch is empty
-// useIncomingSchema = true;
-// return IterOutcome.NONE;
-// }
-
if (spillCount == 0) {
Stopwatch watch = new Stopwatch();
watch.start();
-// if (schema == null){
- // builder may be null at this point if the first incoming batch is empty
-// useIncomingSchema = true;
-// return IterOutcome.NONE;
-// }
builder = new SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES);
for (BatchGroup group : batchGroups) {
- RecordBatchData rbd = new RecordBatchData(group.getFirstContainer());
+ RecordBatchData rbd = new RecordBatchData(group.getContainer());
rbd.setSv2(group.getSv2());
builder.add(rbd);
}
@@ -311,16 +317,32 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
sv4 = mSorter.getSV4();
long t = watch.elapsed(TimeUnit.MICROSECONDS);
- logger.debug("Took {} us to sort {} records", t, sv4.getTotalCount());
+// logger.debug("Took {} us to sort {} records", t, sv4.getTotalCount());
+ container.buildSchema(SelectionVectorMode.FOUR_BYTE);
} else {
- constructHyperBatch(batchGroups, this.container);
- constructSV4();
- selector = createSelector();
- selector.setup(context, oContext.getAllocator(), this, sv4, batchGroups);
- selector.next();
+ mergeAndSpill();
+ batchGroups.addAll(spilledBatchGroups);
+ logger.warn("Starting to merge. {} batch groups. Current allocated memory: {}", batchGroups.size(), oContext.getAllocator().getAllocatedMemory());
+ VectorContainer hyperBatch = constructHyperBatch(batchGroups);
+ createCopier(hyperBatch, batchGroups, container);
+ int inMemoryRecordCount = 0;
+ for (BatchGroup g : batchGroups) {
+ inMemoryRecordCount += g.getRecordCount();
+ }
+ int estimatedRecordSize = 0;
+ for (VectorWrapper w : batchGroups.get(0)) {
+ try {
+ estimatedRecordSize += TypeHelper.getSize(w.getField().getType());
+ } catch (UnsupportedOperationException e) {
+ estimatedRecordSize += 50;
+ }
+ }
+ targetRecordCount = (int) Math.max(1, 250 * 1000 / estimatedRecordSize);
+ int count = copier.next(targetRecordCount);
+ container.buildSchema(SelectionVectorMode.NONE);
+ container.setRecordCount(count);
}
- container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
return IterOutcome.OK_NEW_SCHEMA;
}catch(SchemaChangeException | ClassTransformationException | IOException ex){
@@ -329,67 +351,93 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
context.fail(ex);
return IterOutcome.STOP;
} catch (UnsupportedOperationException e) {
- logger.error(e.getMessage());
throw new RuntimeException(e);
}
}
public void mergeAndSpill() throws SchemaChangeException {
logger.debug("Copier allocator current allocation {}", copierAllocator.getAllocatedMemory());
- VectorContainer hyperBatch = new VectorContainer();
VectorContainer outputContainer = new VectorContainer();
List<BatchGroup> batchGroupList = Lists.newArrayList();
- int recordCount = 0;
- for (int i = 0; i < SPILL_BATCH_GROUP_SIZE; i++) {
+ int batchCount = batchGroups.size();
+ for (int i = 0; i < batchCount / 2; i++) {
if (batchGroups.size() == 0) {
break;
}
- if (batchGroups.peekLast().getSecondContainer() != null) {
+ if (batchGroups.peekLast().getSv2() == null) {
break;
}
BatchGroup batch = batchGroups.pollLast();
- recordCount += batch.getSv2().getCount();
batchGroupList.add(batch);
+ long bufferSize = getBufferSize(batch);
+ totalSizeInMemory -= bufferSize;
}
if (batchGroupList.size() == 0) {
return;
}
- constructHyperBatch(batchGroupList, hyperBatch);
- createCopier(hyperBatch, batchGroupList, outputContainer, recordCount);
+ int estimatedRecordSize = 0;
+ for (VectorWrapper w : batchGroups.get(0)) {
+ try {
+ estimatedRecordSize += TypeHelper.getSize(w.getField().getType());
+ } catch (UnsupportedOperationException e) {
+ estimatedRecordSize += 50;
+ }
+ }
+ int targetRecordCount = Math.max(1, 250 * 1000 / estimatedRecordSize);
+ VectorContainer hyperBatch = constructHyperBatch(batchGroupList);
+ createCopier(hyperBatch, batchGroupList, outputContainer);
- int count = copier.next();
+ int count = copier.next(targetRecordCount);
assert count > 0;
VectorContainer c1 = VectorContainer.getTransferClone(outputContainer);
c1.buildSchema(BatchSchema.SelectionVectorMode.NONE);
c1.setRecordCount(count);
- count = copier.next();
- assert count > 0;
-
-
- VectorContainer c2 = VectorContainer.getTransferClone(outputContainer);
- c2.buildSchema(BatchSchema.SelectionVectorMode.NONE);
- c2.setRecordCount(count);
-
String outputFile = String.format(Utilities.getFileNameForQueryFragment(context, dirs.next(), "spill" + uid + "_" + spillCount++));
- BatchGroup newGroup = new BatchGroup(c1, c2, fs, outputFile, oContext.getAllocator());
+ BatchGroup newGroup = new BatchGroup(c1, fs, outputFile, oContext.getAllocator());
try {
- while ((count = copier.next()) > 0) {
+ while ((count = copier.next(targetRecordCount)) > 0) {
outputContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE);
outputContainer.setRecordCount(count);
newGroup.addBatch(outputContainer);
}
newGroup.closeOutputStream();
- batchGroups.add(newGroup);
+ spilledBatchGroups.add(newGroup);
for (BatchGroup group : batchGroupList) {
- group.cleanup();
+ group.cleanup();
}
hyperBatch.clear();
} catch (IOException e) {
- throw new RuntimeException(e);
+ throw new RuntimeException(e);
+ }
+ takeOwnership(c1);
+ totalSizeInMemory += getBufferSize(c1);
+ }
+
+ private void takeOwnership(VectorAccessible batch) {
+ for (VectorWrapper w : batch) {
+ DrillBuf[] bufs = w.getValueVector().getBuffers(false);
+ for (DrillBuf buf : bufs) {
+ if (buf.isRootBuffer()) {
+ oContext.getAllocator().takeOwnership(buf);
+ }
}
+ }
+ }
+
+ private long getBufferSize(VectorAccessible batch) {
+ long size = 0;
+ for (VectorWrapper w : batch) {
+ DrillBuf[] bufs = w.getValueVector().getBuffers(false);
+ for (DrillBuf buf : bufs) {
+ if (buf.isRootBuffer()) {
+ size += buf.capacity();
+ }
+ }
+ }
+ return size;
}
private SelectionVector2 newSV2() throws OutOfMemoryException {
@@ -401,8 +449,20 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
throw new RuntimeException();
}
batchesSinceLastSpill = 0;
- if (!sv2.allocateNew(incoming.getRecordCount())) {
- throw new OutOfMemoryException();
+ int waitTime = 1;
+ while (true) {
+ try {
+ Thread.sleep(waitTime * 1000);
+ } catch (InterruptedException e) {
+ throw new OutOfMemoryException(e);
+ }
+ waitTime *= 2;
+ if (sv2.allocateNew(incoming.getRecordCount())) {
+ break;
+ }
+ if (waitTime >= 32) {
+ throw new OutOfMemoryException("Unable to allocate sv2 buffer after repeated attempts");
+ }
}
}
for (int i = 0; i < incoming.getRecordCount(); i++) {
@@ -412,35 +472,21 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
return sv2;
}
- private void constructHyperBatch(List<BatchGroup> batchGroupList, VectorContainer cont) {
+ private VectorContainer constructHyperBatch(List<BatchGroup> batchGroupList) {
+ VectorContainer cont = new VectorContainer();
for (MaterializedField field : schema) {
- ValueVector[] vectors = new ValueVector[batchGroupList.size() * 2];
+ ValueVector[] vectors = new ValueVector[batchGroupList.size()];
int i = 0;
for (BatchGroup group : batchGroupList) {
vectors[i++] = group.getValueAccessorById(
field.getValueClass(),
- group.getValueVectorId(field.getPath()).getFieldIds()
- ).getValueVector();
- if (group.hasSecond()) {
- VectorContainer c = group.getSecondContainer();
- vectors[i++] = c.getValueAccessorById(
- field.getValueClass(),
- c.getValueVectorId(field.getPath()).getFieldIds()
- ).getValueVector();
- } else {
- vectors[i] = vectors[i - 1].getTransferPair().getTo(); //this vector should never be used. Just want to avoid having null elements in the hyper vector
- i++;
- }
+ group.getValueVectorId(field.getPath()).getFieldIds())
+ .getValueVector();
}
cont.add(vectors);
}
cont.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE);
- }
-
- private void constructSV4() throws SchemaChangeException {
- BufferAllocator.PreAllocator preAlloc = oContext.getAllocator().getNewPreAllocator();
- preAlloc.preAllocate(4 * TARGET_RECORD_COUNT);
- sv4 = new SelectionVector4(preAlloc.getAllocation(), TARGET_RECORD_COUNT, TARGET_RECORD_COUNT);
+ return cont;
}
private MSorter createNewMSorter() throws ClassTransformationException, IOException, SchemaChangeException {
@@ -526,23 +572,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
g.getEvalBlock()._return(JExpr.lit(0));
}
- private PriorityQueueSelector createSelector() throws SchemaChangeException {
- CodeGenerator<PriorityQueueSelector> cg = CodeGenerator.get(PriorityQueueSelector.TEMPLATE_DEFINITION, context.getFunctionRegistry());
- ClassGenerator<PriorityQueueSelector> g = cg.getRoot();
-
- generateComparisons(g, this);
-
- try {
- PriorityQueueSelector c = context.getImplementationClass(cg);
- return c;
- } catch (ClassTransformationException e) {
- throw new RuntimeException(e);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- private void createCopier(VectorAccessible batch, List<BatchGroup> batchGroupList, VectorContainer outputContainer, int recordCount) throws SchemaChangeException {
+ private void createCopier(VectorAccessible batch, List<BatchGroup> batchGroupList, VectorContainer outputContainer) throws SchemaChangeException {
try {
if (copier == null) {
CodeGenerator<PriorityQueueCopier> cg = CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION, context.getFunctionRegistry());
@@ -562,7 +592,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
outputContainer.add(v);
allocators.add(VectorAllocator.getAllocator(v, 110));
}
- copier.setup(context, copierAllocator, batch, batchGroupList, outputContainer, allocators, recordCount);
+ copier.setup(context, copierAllocator, batch, batchGroupList, outputContainer, allocators);
} catch (ClassTransformationException e) {
throw new RuntimeException(e);
} catch (IOException e) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
index f2da71754..0eda0a6d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java
@@ -32,8 +32,9 @@ import org.apache.drill.exec.vector.allocator.VectorAllocator;
import java.util.List;
public interface PriorityQueueCopier {
- public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups, VectorAccessible outgoing, List<VectorAllocator> allocators, int recordCnt) throws SchemaChangeException;
- public int next();
+ public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups,
+ VectorAccessible outgoing, List<VectorAllocator> allocators) throws SchemaChangeException;
+ public int next(int targetRecordCount);
public List<VectorAllocator> getAllocators();
public void cleanup();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
index 6e9c355c0..b1c316ca1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java
@@ -44,7 +44,8 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
private int targetRecordCount = ExternalSortBatch.SPILL_TARGET_RECORD_COUNT;
@Override
- public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups, VectorAccessible outgoing, List<VectorAllocator> allocators, int recordCnt) throws SchemaChangeException {
+ public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups,
+ VectorAccessible outgoing, List<VectorAllocator> allocators) throws SchemaChangeException {
this.context = context;
this.allocator = allocator;
this.hyperBatch = hyperBatch;
@@ -60,20 +61,15 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
queueSize = 0;
for (int i = 0; i < size; i++) {
- vector4.set(i, i * 2, batchGroups.get(i).getNextIndex());
+ vector4.set(i, i, batchGroups.get(i).getNextIndex());
siftUp();
queueSize++;
}
-
- // Check if the we have enough records to create BatchData with two containers.
- if (recordCnt < (2 * targetRecordCount)) {
- targetRecordCount = (recordCnt / 2);
- }
}
@Override
- public int next() {
- allocateVectors();
+ public int next(int targetRecordCount) {
+ allocateVectors(targetRecordCount);
for (int outgoingIndex = 0; outgoingIndex < targetRecordCount; outgoingIndex++) {
if (queueSize == 0) {
cleanup();
@@ -81,15 +77,12 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
}
int compoundIndex = vector4.get(0);
int batch = compoundIndex >>> 16;
- assert batch < batchGroups.size() * 2 : String.format("batch: %d batchGroups: %d", batch, batchGroups.size());
- int batchGroup = batch / 2;
+ assert batch < batchGroups.size() : String.format("batch: %d batchGroups: %d", batch, batchGroups.size());
if (!doCopy(compoundIndex, outgoingIndex)) {
setValueCount(outgoingIndex);
return outgoingIndex;
}
- int nextIndex = batchGroups.get(batchGroup).getNextIndex();
- batch = batch & 0xFFFE;
- batch += batchGroups.get(batchGroup).getBatchPointer();
+ int nextIndex = batchGroups.get(batch).getNextIndex();
if (nextIndex < 0) {
vector4.set(0, vector4.get(--queueSize));
} else {
@@ -117,6 +110,9 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
for (VectorWrapper w: outgoing) {
w.getValueVector().clear();
}
+ for (VectorWrapper w : hyperBatch) {
+ w.clear();
+ }
}
@Override
@@ -136,7 +132,7 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier
}
}
- private void allocateVectors() {
+ private void allocateVectors(int targetRecordCount) {
for(VectorAllocator a : allocators){
a.alloc(targetRecordCount);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelector.java
deleted file mode 100644
index 786667a8e..000000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelector.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.xsort;
-
-import org.apache.drill.exec.compile.TemplateClassDefinition;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.allocator.VectorAllocator;
-
-import java.util.List;
-
-public interface PriorityQueueSelector {
- public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, SelectionVector4 sv4, List<BatchGroup> batchGroups) throws SchemaChangeException;
- public int next();
- public void cleanup();
-
- public static TemplateClassDefinition<PriorityQueueSelector> TEMPLATE_DEFINITION = new TemplateClassDefinition<PriorityQueueSelector>(PriorityQueueSelector.class, PriorityQueueSelectorTemplate.class);
-
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelectorTemplate.java
deleted file mode 100644
index 65a072ba7..000000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelectorTemplate.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.physical.impl.xsort;
-
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.record.VectorAccessible;
-import org.apache.drill.exec.record.selection.SelectionVector4;
-import org.apache.drill.exec.vector.BigIntVector;
-import org.apache.drill.exec.vector.allocator.VectorAllocator;
-
-import javax.inject.Named;
-import java.util.List;
-
-public abstract class PriorityQueueSelectorTemplate implements PriorityQueueSelector {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PriorityQueueSelectorTemplate.class);
-
- private SelectionVector4 sv4;
- private SelectionVector4 vector4;
- private List<BatchGroup> batchGroups;
- private FragmentContext context;
- private BufferAllocator allocator;
- private int size;
- private int queueSize = 0;
- private int targetRecordCount = ExternalSortBatch.TARGET_RECORD_COUNT;
- private VectorAccessible hyperBatch;
-
- @Override
- public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, SelectionVector4 sv4, List<BatchGroup> batchGroups) throws SchemaChangeException {
- this.context = context;
- this.allocator = allocator;
- this.sv4 = sv4;
- this.batchGroups = batchGroups;
- this.size = batchGroups.size();
- this.hyperBatch = hyperBatch;
-
- BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator();
- preAlloc.preAllocate(4 * size);
- vector4 = new SelectionVector4(preAlloc.getAllocation(), size, Character.MAX_VALUE);
- doSetup(context, hyperBatch, null);
-
- for (int i = 0; i < size; i++, queueSize++) {
- vector4.set(i, i * 2, batchGroups.get(i).getNextIndex());
- siftUp();
- }
- }
-
- @Override
- public int next() {
- if (queueSize == 0) {
- cleanup();
- return 0;
- }
- rotate();
- for (int outgoingIndex = 0; outgoingIndex < targetRecordCount; outgoingIndex++) {
- int compoundIndex = vector4.get(0);
- int batch = compoundIndex >> 16;
- int batchGroup = batch / 2; //two containers per batch group
- sv4.set(outgoingIndex, compoundIndex);
- int nextIndex = batchGroups.get(batchGroup).getNextIndex();
- batch = batchGroup * 2 + batchGroups.get(batchGroup).getBatchPointer(); // increment batch pointer if batchGroup is currently using second container
- if (nextIndex == -1) {
- vector4.set(0, vector4.get(--queueSize));
- } else if (nextIndex == -2) {
- vector4.set(0, batch - 1, 0);
- sv4.setCount(outgoingIndex);
- assert outgoingIndex != 0;
- return outgoingIndex;
- } else {
- vector4.set(0, batch, nextIndex);
- }
- if (queueSize == 0) {
- sv4.setCount(++outgoingIndex);
- return outgoingIndex;
- }
- siftDown();
- }
- sv4.setCount(targetRecordCount);
- return targetRecordCount;
- }
-
- private void rotate() {
- for (BatchGroup group : batchGroups) {
- group.rotate();
- }
- for (int i = 0; i < vector4.getTotalCount(); i++) {
- vector4.set(i, vector4.get(i) & 0xFFFEFFFF);
- }
- }
-
- @Override
- public void cleanup() {
- vector4.clear();
- }
-
- private final void siftUp() {
- int p = queueSize;
- while (p > 0) {
- if (compare(p, (p - 1) / 2) < 0) {
- swap(p, (p - 1) / 2);
- p = (p - 1) / 2;
- } else {
- break;
- }
- }
- }
-
- private final void siftDown() {
- int p = 0;
- int next;
- while (p * 2 + 1 < queueSize) {
- if (p * 2 + 2 >= queueSize) {
- next = p * 2 + 1;
- } else {
- if (compare(p * 2 + 1, p * 2 + 2) <= 0) {
- next = p * 2 + 1;
- } else {
- next = p * 2 + 2;
- }
- }
- if (compare(p, next) > 0) {
- swap(p, next);
- p = next;
- } else {
- break;
- }
- }
- }
-
-
- public final void swap(int sv0, int sv1) {
- int tmp = vector4.get(sv0);
- vector4.set(sv0, vector4.get(sv1));
- vector4.set(sv1, tmp);
- }
-
- public final int compare(int leftIndex, int rightIndex) {
- int sv1 = vector4.get(leftIndex);
- int sv2 = vector4.get(rightIndex);
- return doEval(sv1, sv2);
- }
-
- public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing);
- public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex);
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index 381011513..b9690a6ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -143,7 +143,7 @@ public class WritableBatch {
continue;
}
- for (DrillBuf b : vv.getBuffers()) {
+ for (DrillBuf b : vv.getBuffers(true)) {
buffers.add(b);
}
// remove vv access to buffers.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
index 038bb2f4c..104301144 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
@@ -45,10 +45,14 @@ public class SelectionVector2 implements Closeable{
return recordCount;
}
- public DrillBuf getBuffer()
- {
- DrillBuf bufferHandle = this.buffer;
+ public DrillBuf getBuffer() {
+ return getBuffer(true);
+ }
+ public DrillBuf getBuffer(boolean clear) {
+ DrillBuf bufferHandle = this.buffer;
+
+ if (clear) {
/* Increment the ref count for this buffer */
bufferHandle.retain();
@@ -56,8 +60,9 @@ public class SelectionVector2 implements Closeable{
* caller. clear the buffer from within our selection vector
*/
clear();
+ }
- return bufferHandle;
+ return bufferHandle;
}
public void setBuffer(DrillBuf bufferHandle)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
index 4cc06c81f..209ec8fd7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
@@ -18,11 +18,15 @@
package org.apache.drill.exec.store;
import java.util.Collection;
+import java.util.Map;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.record.MaterializedField.Key;
+import org.apache.drill.exec.vector.ValueVector;
public abstract class AbstractRecordReader implements RecordReader {
private static final String COL_NULL_ERROR = "Columns cannot be null. Use star column to select all fields.";
@@ -59,4 +63,10 @@ public abstract class AbstractRecordReader implements RecordReader {
}).isPresent();
}
+ @Override
+ public void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException {
+ for (ValueVector v : vectorMap.values()) {
+ v.allocateNew();
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
index 17454217e..42cdcc385 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
@@ -19,7 +19,13 @@ package org.apache.drill.exec.store;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.MaterializedField.Key;
+import org.apache.drill.exec.vector.ValueVector;
+
+import java.util.Map;
public interface RecordReader {
@@ -36,6 +42,8 @@ public interface RecordReader {
*/
public abstract void setup(OutputMutator output) throws ExecutionSetupException;
+ public abstract void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException;
+
/**
* Set the operator context. The Reader can use this to access the operator context and allocate direct memory
* if needed
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
index ee78c39b5..2bd9df576 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.vector.BaseValueVector;
import org.apache.drill.exec.vector.complex.fn.JsonReader;
@@ -45,7 +46,7 @@ import org.apache.hadoop.fs.Path;
import com.google.common.collect.Lists;
-public class JSONRecordReader2 implements RecordReader{
+public class JSONRecordReader2 extends AbstractRecordReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader2.class);
private OutputMutator mutator;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
index 81505ae89..0714ab840 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
@@ -30,6 +30,8 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.MaterializedField.Key;
+import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockColumn;
import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockScanEntry;
@@ -37,8 +39,9 @@ import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
import java.util.List;
+import java.util.Map;
-public class MockRecordReader implements RecordReader {
+public class MockRecordReader extends AbstractRecordReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordReader.class);
private OutputMutator output;
@@ -120,6 +123,17 @@ public class MockRecordReader implements RecordReader {
}
@Override
+ public void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException {
+ try {
+ for (ValueVector v : vectorMap.values()) {
+ AllocationHelper.allocate(v, Character.MAX_VALUE, 50, 10);
+ }
+ } catch (NullPointerException e) {
+ throw new OutOfMemoryException();
+ }
+ }
+
+ @Override
public void cleanup() {
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 6c2d44c4b..c72e75030 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
@@ -36,14 +37,17 @@ import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.MaterializedField.Key;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.vector.NullableBitVector;
+import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
import org.apache.hadoop.fs.FileSystem;
@@ -320,6 +324,18 @@ public class ParquetRecordReader extends AbstractRecordReader {
}
}
+ @Override
+ public void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException {
+ try {
+ for (ValueVector v : vectorMap.values()) {
+ AllocationHelper.allocate(v, recordsPerBatch, 50, 10);
+ }
+ } catch (NullPointerException e) {
+ throw new OutOfMemoryException();
+ }
+ }
+
+
private SchemaPath toFieldName(String[] paths) {
return SchemaPath.getCompoundPath(paths);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 7a864f026..14075f3a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -24,12 +24,16 @@ import com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField.Key;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
+import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.RepeatedFixedWidthVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VariableWidthVector;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
@@ -114,6 +118,17 @@ public class DrillParquetReader extends AbstractRecordReader {
}
@Override
+ public void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException {
+ try {
+ for (ValueVector v : vectorMap.values()) {
+ AllocationHelper.allocate(v, Character.MAX_VALUE, 50, 10);
+ }
+ } catch (NullPointerException e) {
+ throw new OutOfMemoryException();
+ }
+ }
+
+ @Override
public void setup(OutputMutator output) throws ExecutionSetupException {
try {
@@ -202,7 +217,11 @@ public class DrillParquetReader extends AbstractRecordReader {
if (v instanceof VariableWidthVector) {
filled = Math.max(filled, ((VariableWidthVector) v).getCurrentSizeInBytes() * 100 / ((VariableWidthVector) v).getByteCapacity());
}
+// if (v instanceof RepeatedFixedWidthVector) {
+// filled = Math.max(filled, ((RepeatedFixedWidthVector) v).getAccessor().getGroupCount() * 100)
+// }
}
+ logger.debug("Percent filled: {}", filled);
return filled;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
index 48e09aa61..38160df8f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
@@ -22,12 +22,15 @@ import java.lang.reflect.Modifier;
import java.sql.Timestamp;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.record.MaterializedField.Key;
+import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.pojo.Writers.BitWriter;
import org.apache.drill.exec.store.pojo.Writers.DoubleWriter;
import org.apache.drill.exec.store.pojo.Writers.EnumWriter;
@@ -39,12 +42,12 @@ import org.apache.drill.exec.store.pojo.Writers.NDoubleWriter;
import org.apache.drill.exec.store.pojo.Writers.NIntWriter;
import org.apache.drill.exec.store.pojo.Writers.NTimeStampWriter;
import org.apache.drill.exec.store.pojo.Writers.StringWriter;
+import org.apache.drill.exec.vector.AllocationHelper;
+import org.apache.drill.exec.vector.ValueVector;
import com.google.common.collect.Lists;
-
-
-public class PojoRecordReader<T> implements RecordReader{
+public class PojoRecordReader<T> extends AbstractRecordReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PojoRecordReader.class);
public final int forJsonIgnore = 1;
@@ -120,6 +123,13 @@ public class PojoRecordReader<T> implements RecordReader{
}
+ @Override
+ public void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException {
+ for (ValueVector v : vectorMap.values()) {
+ AllocationHelper.allocate(v, Character.MAX_VALUE, 50, 10);
+ }
+ }
+
private void allocate(){
for(PojoWriter writer : writers){
writer.allocate();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index 2031aeee7..ef65f2a3a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -127,8 +127,8 @@ public class DrillTextRecordReader extends AbstractRecordReader {
@Override
public int next() {
- logger.debug("vector value capacity {}", vector.getValueCapacity());
- logger.debug("vector byte capacity {}", vector.getByteCapacity());
+// logger.debug("vector value capacity {}", vector.getValueCapacity());
+// logger.debug("vector byte capacity {}", vector.getByteCapacity());
int batchSize = 0;
try {
int recordCount = 0;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
index b711c66df..5db0299c3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
@@ -45,7 +45,10 @@ public abstract class BaseDataValueVector extends BaseValueVector{
*/
@Override
public void clear() {
- if (data != null) {
+ if (data == null) {
+ data = DeadBuf.DEAD_BUFFER;
+ }
+ if (data != DeadBuf.DEAD_BUFFER) {
data.release();
data = data.getAllocator().getEmpty();
valueCount = 0;
@@ -62,16 +65,20 @@ public abstract class BaseDataValueVector extends BaseValueVector{
@Override
- public DrillBuf[] getBuffers(){
+ public DrillBuf[] getBuffers(boolean clear){
DrillBuf[] out;
if(valueCount == 0){
out = new DrillBuf[0];
}else{
out = new DrillBuf[]{data};
- data.readerIndex(0);
- data.retain();
+ if (clear) {
+ data.readerIndex(0);
+ data.retain();
+ }
+ }
+ if (clear) {
+ clear();
}
- clear();
return out;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
index 1b3705e48..18da67d5f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
@@ -17,12 +17,11 @@
*/
package org.apache.drill.exec.vector;
+import io.netty.buffer.DrillBuf;
+
import java.util.Iterator;
import org.apache.drill.common.expression.FieldReference;
-
-import io.netty.buffer.ByteBuf;
-
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.record.MaterializedField;
@@ -62,7 +61,7 @@ public abstract class BaseValueVector implements ValueVector{
public abstract int getCurrentValueCount();
public abstract void setCurrentValueCount(int count);
- abstract public ByteBuf getData();
+ abstract public DrillBuf getData();
abstract static class BaseAccessor implements ValueVector.Accessor{
public abstract int getValueCount();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
index 2c215ffcd..032ccc243 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
@@ -136,7 +136,7 @@ public class ObjectVector extends BaseValueVector{
}
@Override
- public ByteBuf getData() {
+ public DrillBuf getData() {
throw new UnsupportedOperationException("ObjectVector does not support this");
}
@@ -166,7 +166,7 @@ public class ObjectVector extends BaseValueVector{
}
@Override
- public DrillBuf[] getBuffers() {
+ public DrillBuf[] getBuffers(boolean clear) {
throw new UnsupportedOperationException("ObjectVector does not support this");
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
index f7f010abd..3433537ac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java
@@ -22,6 +22,7 @@ import io.netty.buffer.DrillBuf;
import java.io.Closeable;
import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.OutOfMemoryRuntimeException;
import org.apache.drill.exec.proto.UserBitShared.SerializedField;
import org.apache.drill.exec.record.MaterializedField;
@@ -99,9 +100,12 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> {
* this buffer so it only should be used for in-context access. Also note that this buffer changes regularly thus
* external classes shouldn't hold a reference to it (unless they change it).
*
- * @return The underlying DrillBuf.
+ * @param clear
+ * Whether to clear vector
+ *
+ * @return The underlying ByteBuf.
*/
- public abstract DrillBuf[] getBuffers();
+ public abstract DrillBuf[] getBuffers(boolean clear);
/**
* Load the data provided in the buffer. Typically used when deserializing from the wire.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
index c91c39712..834719cd1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
@@ -276,10 +276,10 @@ public class MapVector extends AbstractContainerVector {
}
@Override
- public DrillBuf[] getBuffers() {
+ public DrillBuf[] getBuffers(boolean clear) {
List<DrillBuf> bufs = Lists.newArrayList();
for(ValueVector v : vectors.values()){
- for(DrillBuf b : v.getBuffers()){
+ for(DrillBuf b : v.getBuffers(clear)){
bufs.add(b);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
index f903b0ca4..fef416f82 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
@@ -355,8 +355,8 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea
}
@Override
- public DrillBuf[] getBuffers() {
- return ArrayUtils.addAll(offsets.getBuffers(), vector.getBuffers());
+ public DrillBuf[] getBuffers(boolean clear) {
+ return ArrayUtils.addAll(offsets.getBuffers(clear), vector.getBuffers(clear));
}
private void setVector(ValueVector v){
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
index 3fd1c12b7..678439b4c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -307,11 +307,11 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat
}
@Override
- public DrillBuf[] getBuffers() {
- List<DrillBuf> bufs = Lists.newArrayList(offsets.getBuffers());
+ public DrillBuf[] getBuffers(boolean clear) {
+ List<DrillBuf> bufs = Lists.newArrayList(offsets.getBuffers(clear));
for(ValueVector v : vectors.values()){
- for(DrillBuf b : v.getBuffers()){
+ for(DrillBuf b : v.getBuffers(clear)){
bufs.add(b);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index bb56e1046..912f9560b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
import org.apache.drill.exec.record.RawFragmentBatch;
import org.apache.drill.exec.rpc.ResponseSender;
import org.apache.drill.exec.rpc.data.DataRpcConfig;
@@ -59,7 +60,9 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
}
if (batch.getHeader().getIsOutOfMemory()) {
logger.debug("Setting autoread false");
- if (!outOfMemory.get() && !buffer.peekFirst().getHeader().getIsOutOfMemory()) {
+ RawFragmentBatch firstBatch = buffer.peekFirst();
+ FragmentRecordBatch header = firstBatch == null ? null :firstBatch.getHeader();
+ if (!outOfMemory.get() && !(header == null) && header.getIsOutOfMemory()) {
buffer.addFirst(batch);
}
outOfMemory.set(true);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 9521e65d6..603aeab96 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -45,6 +45,7 @@ import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.opt.BasicOptimizer;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.ExternalSort;
import org.apache.drill.exec.physical.impl.SendingAccountor;
import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
import org.apache.drill.exec.planner.fragment.Fragment;
@@ -333,6 +334,24 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
return;
}
+ int sortCount = 0;
+ for (PhysicalOperator op : plan.getSortedOperators()) {
+ if (op instanceof ExternalSort) sortCount++;
+ }
+
+ if (sortCount > 0) {
+ long maxWidthPerNode = context.getOptions().getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val;
+ long maxAllocPerNode = Math.min(DrillConfig.getMaxDirectMemory(), context.getConfig().getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC));
+ maxAllocPerNode = Math.min(maxAllocPerNode, context.getOptions().getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val);
+ long maxSortAlloc = maxAllocPerNode / (sortCount * maxWidthPerNode);
+ logger.debug("Max sort alloc: {}", maxSortAlloc);
+ for (PhysicalOperator op : plan.getSortedOperators()) {
+ if (op instanceof ExternalSort) {
+ ((ExternalSort)op).setMaxAllocation(maxSortAlloc);
+ }
+ }
+ }
+
PlanningSet planningSet = StatsCollector.collectStats(rootFragment);
SimpleParallelizer parallelizer = new SimpleParallelizer(context);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 2a8db6784..6b4ee9b01 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -114,7 +114,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
}
}
} catch (AssertionError | Exception e) {
- logger.debug("Failure while initializing operator tree", e);
+ logger.debug("Error while initializing or executing fragment", e);
context.fail(e);
internalFail(e);
} finally {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index 877ffc25f..fb0d1df5e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -180,6 +180,7 @@ public class TestParquetWriter extends BaseTestQuery {
}
@Test
+ @Ignore
public void testRepeatedBool() throws Exception {
String inputTable = "cp.`parquet/repeated_bool_data.json`";
runTestAndValidate("*", "*", inputTable, "repeated_bool_parquet");
@@ -225,7 +226,7 @@ public class TestParquetWriter extends BaseTestQuery {
public void runTestAndValidate(String selection, String validationSelection, String inputTable, String outputFile) throws Exception {
- Path path = new Path("/tmp/drilltest/" + outputFile);
+ Path path = new Path("/tmp/" + outputFile);
if (fs.exists(path)) {
fs.delete(path, true);
}