diff options
author | Steven Phillips <sphillips@maprtech.com> | 2014-08-18 17:23:23 -0700 |
---|---|---|
committer | Jacques Nadeau <jacques@apache.org> | 2014-08-29 17:51:51 -0700 |
commit | 437366f03e5de4c4692703007bff2f5a134720dd (patch) | |
tree | 637752098e129163b40ac3946986e56f68d12c86 /exec/java-exec/src | |
parent | fa3c8d52a9da0913cc734ff8d3f3968d0e25fab8 (diff) |
DRILL-1329: External sort memory fixes
Diffstat (limited to 'exec/java-exec/src')
38 files changed, 392 insertions, 490 deletions
diff --git a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java index cb5284132..d9eae0fb8 100644 --- a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java +++ b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java @@ -73,9 +73,11 @@ public final class ${className} extends BaseValueVector implements <#if type.maj } @Override - public DrillBuf[] getBuffers() { - DrillBuf[] buffers = ObjectArrays.concat(bits.getBuffers(), values.getBuffers(), DrillBuf.class); - clear(); + public DrillBuf[] getBuffers(boolean clear) { + DrillBuf[] buffers = ObjectArrays.concat(bits.getBuffers(clear), values.getBuffers(clear), DrillBuf.class); + if (clear) { + clear(); + } return buffers; } diff --git a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java index cb37a1b52..2fbbc6fb1 100644 --- a/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java +++ b/exec/java-exec/src/main/codegen/templates/ParquetOutputRecordWriter.java @@ -41,7 +41,7 @@ import org.apache.drill.exec.vector.complex.reader.FieldReader; import parquet.io.api.RecordConsumer; import parquet.schema.MessageType; import parquet.io.api.Binary; -import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import org.apache.drill.exec.memory.TopLevelAllocator; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; @@ -220,7 +220,7 @@ public abstract class ParquetOutputRecordWriter extends AbstractRecordWriter imp //consumer.endField(fieldName, fieldId); <#else> reader.read(holder); - ByteBuf buf = holder.buffer; + DrillBuf buf = holder.buffer; consumer.startField(fieldName, fieldId); consumer.addBinary(Binary.fromByteBuffer(holder.buffer.nioBuffer(holder.start, holder.end - holder.start))); consumer.endField(fieldName, fieldId); diff --git a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java index 97117a4d3..195f182aa 100644 --- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java +++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java @@ -71,7 +71,7 @@ package org.apache.drill.exec.vector; } public void setCurrentValueCount(int count) { - values.setCurrentValueCount(count); + values.setCurrentValueCount(offsets.getAccessor().get(count)); } public int getBufferSize(){ @@ -259,9 +259,11 @@ package org.apache.drill.exec.vector; </#if> @Override - public DrillBuf[] getBuffers() { - DrillBuf[] buffers = ObjectArrays.concat(offsets.getBuffers(), values.getBuffers(), DrillBuf.class); - clear(); + public DrillBuf[] getBuffers(boolean clear) { + DrillBuf[] buffers = ObjectArrays.concat(offsets.getBuffers(clear), values.getBuffers(clear), DrillBuf.class); + if (clear) { + clear(); + } return buffers; } diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java index 95cd3cc3c..fe255a844 100644 --- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java +++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java @@ -134,9 +134,11 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V @Override - public DrillBuf[] getBuffers() { - DrillBuf[] buffers = ObjectArrays.concat(offsetVector.getBuffers(), super.getBuffers(), DrillBuf.class); - clear(); + public DrillBuf[] getBuffers(boolean clear) { + DrillBuf[] buffers = ObjectArrays.concat(offsetVector.getBuffers(clear), super.getBuffers(clear), DrillBuf.class); + if (clear) { + clear(); + } return buffers; } diff --git a/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java b/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java index 6ad5da8ff..5399239b9 100644 --- a/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java +++ b/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java @@ -663,6 +663,10 @@ public final class DrillBuf extends AbstractByteBuf { return new DrillBuf(allocator, a); } + public boolean isRootBuffer(){ + return rootBuffer; + } + public static DrillBuf wrapByteBuffer(ByteBuffer b){ if(!b.isDirect()){ throw new IllegalStateException("DrillBufs can only refer to direct memory."); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java index e007bcc69..83b3d5a8e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java @@ -27,6 +27,7 @@ import java.util.List; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.metrics.DrillMetrics; import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.UserBitShared.SerializedField; @@ -114,6 +115,9 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable { int dataLength = metaData.getBufferLength(); MaterializedField field = MaterializedField.create(metaData); DrillBuf buf = allocator.buffer(dataLength); + if (buf == null) { + throw new IOException(new OutOfMemoryException()); + } buf.writeBytes(input, dataLength); ValueVector vector = TypeHelper.getNewVector(field, allocator); vector.load(metaData, buf); @@ -143,10 +147,10 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable { Preconditions.checkNotNull(output); final Timer.Context timerContext = metrics.timer(WRITER_TIMER).time(); - ByteBuf[] incomingBuffers = batch.getBuffers(); + DrillBuf[] incomingBuffers = batch.getBuffers(); UserBitShared.RecordBatchDef batchDef = batch.getDef(); - /* ByteBuf associated with the selection vector */ + /* DrillBuf associated with the selection vector */ DrillBuf svBuf = null; Integer svCount = null; @@ -171,7 +175,7 @@ public class VectorAccessibleSerializable extends AbstractStreamSerializable { } /* Dump the array of ByteBuf's associated with the value vectors */ - for (ByteBuf buf : incomingBuffers) + for (DrillBuf buf : incomingBuffers) { /* dump the buffer into the OutputStream */ int bufLength = buf.readableBytes(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java index 36c162abe..2d39b5e60 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkState; import static org.apache.drill.exec.proto.UserProtos.QueryResultsMode.STREAM_FULL; import static org.apache.drill.exec.proto.UserProtos.RunQuery.newBuilder; import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import java.io.Closeable; import java.io.IOException; @@ -338,7 +339,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{ } @Override - public ByteBuf getBuffer() { + public DrillBuf getBuffer() { return null; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java index 55f11a463..ae80f7b45 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java @@ -162,7 +162,7 @@ public class TopLevelAllocator implements BufferAllocator { public DrillBuf buffer(int size, int max) { if(size == 0) return empty; if(!childAcct.reserve(size)){ - logger.warn("Unable to allocate buffer of size {} due to memory limit. Current allocation: {}", size, getAllocatedMemory()); + logger.warn("Unable to allocate buffer of size {} due to memory limit. Current allocation: {}", size, getAllocatedMemory(), new Exception()); return null; }; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java index 8af69fa67..c196a962c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/ExternalSort.java @@ -32,6 +32,8 @@ import com.fasterxml.jackson.annotation.JsonTypeName; public class ExternalSort extends Sort { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSort.class); + private long initialAllocation = 20000000; + @JsonCreator public ExternalSort(@JsonProperty("child") PhysicalOperator child, @JsonProperty("orderings") List<Ordering> orderings, @JsonProperty("reverse") boolean reverse) { super(child, orderings, reverse); @@ -52,7 +54,9 @@ public class ExternalSort extends Sort { @Override protected PhysicalOperator getNewWithChild(PhysicalOperator child) { - return new ExternalSort(child, orderings, reverse); + ExternalSort newSort = new ExternalSort(child, orderings, reverse); + newSort.setMaxAllocation(getMaxAllocation()); + return newSort; } @Override @@ -60,5 +64,12 @@ public class ExternalSort extends Sort { return CoreOperatorType.EXTERNAL_SORT_VALUE; } + public void setMaxAllocation(long maxAllocation) { + this.maxAllocation = Math.max(initialAllocation, maxAllocation); + } + + public long getInitialAllocation() { + return initialAllocation; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index 62163052e..e56d883ce 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -33,6 +33,7 @@ import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.holders.NullableVarCharHolder; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.base.PhysicalOperator; @@ -60,9 +61,6 @@ import com.google.common.collect.Maps; public class ScanBatch implements RecordBatch { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class); - private static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024; - private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000; - private static final int MAX_RECORD_CNT = Character.MAX_VALUE; private final Map<MaterializedField.Key, ValueVector> fieldVectorMap = Maps.newHashMap(); @@ -147,7 +145,15 @@ public class ScanBatch implements RecordBatch { long t1 = System.nanoTime(); oContext.getStats().startProcessing(); try { - mutator.allocate(MAX_RECORD_CNT); + try { + currentReader.allocate(fieldVectorMap); + } catch (OutOfMemoryException e) { + logger.debug("Caught OutOfMemoryException"); + for (ValueVector v : fieldVectorMap.values()) { + v.clear(); + } + return IterOutcome.OUT_OF_MEMORY; + } while ((recordCount = currentReader.next()) == 0) { try { if (!readers.hasNext()) { @@ -169,7 +175,15 @@ public class ScanBatch implements RecordBatch { currentReader = readers.next(); partitionValues = partitionColumns.hasNext() ? partitionColumns.next() : null; currentReader.setup(mutator); - mutator.allocate(MAX_RECORD_CNT); + try { + currentReader.allocate(fieldVectorMap); + } catch (OutOfMemoryException e) { + logger.debug("Caught OutOfMemoryException"); + for (ValueVector v : fieldVectorMap.values()) { + v.clear(); + } + return IterOutcome.OUT_OF_MEMORY; + } addPartitionVectors(); } finally { oContext.getStats().stopSetup(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index c8929d1c5..734088e12 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -80,7 +80,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ private boolean hasRemainder = false; private int remainderIndex = 0; private int recordCount; - + private static final String EMPTY_STRING = ""; private class ClassifierResult { @@ -137,7 +137,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ // VectorUtil.showVectorAccessibleContent(incoming, ","); int incomingRecordCount = incoming.getRecordCount(); - doAlloc(); + if (!doAlloc()) { + outOfMemory = true; + return; + } int outputRecords = projector.projectRecords(0, incomingRecordCount, 0); if (outputRecords < incomingRecordCount) { @@ -161,7 +164,10 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ private void handleRemainder() { int remainingRecordCount = incoming.getRecordCount() - remainderIndex; - doAlloc(); + if (!doAlloc()) { + outOfMemory = true; + return; + } int projRecords = projector.projectRecords(remainderIndex, remainingRecordCount, 0); if (projRecords < remainingRecordCount) { setValueCount(projRecords); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java index 5c2ab22eb..237007046 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.xsort; import java.io.IOException; import java.util.Iterator; +import java.util.concurrent.TimeUnit; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.cache.VectorAccessibleSerializable; @@ -41,36 +42,23 @@ import com.google.common.base.Stopwatch; public class BatchGroup implements VectorAccessible { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchGroup.class); - private final VectorContainer firstContainer; - private final VectorContainer secondContainer; private VectorContainer currentContainer; private SelectionVector2 sv2; private int pointer = 0; - private int batchPointer = 0; - private boolean hasSecond = false; private FSDataInputStream inputStream; private FSDataOutputStream outputStream; private Path path; private FileSystem fs; private BufferAllocator allocator; private int spilledBatches = 0; - private boolean done = false; public BatchGroup(VectorContainer container, SelectionVector2 sv2) { - this.firstContainer = container; - this.secondContainer = null; this.sv2 = sv2; - this.currentContainer = firstContainer; + this.currentContainer = container; } - public BatchGroup(VectorContainer firstContainer, VectorContainer secondContainer, FileSystem fs, String path, BufferAllocator allocator) { - assert firstContainer.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.NONE; - assert secondContainer.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.NONE; - - this.firstContainer = firstContainer; - this.secondContainer = secondContainer; - currentContainer = firstContainer; - this.hasSecond = true; + public BatchGroup(VectorContainer container, FileSystem fs, String path, BufferAllocator allocator) { + currentContainer = container; this.fs = fs; this.path = new Path(path); this.allocator = allocator; @@ -93,7 +81,7 @@ public class BatchGroup implements VectorAccessible { watch.start(); outputBatch.writeToStream(outputStream); newContainer.zeroVectors(); -// logger.debug("Took {} us to spill {} records", watch.elapsed(TimeUnit.MICROSECONDS), recordCount); + logger.debug("Took {} us to spill {} records", watch.elapsed(TimeUnit.MICROSECONDS), recordCount); spilledBatches++; } @@ -107,132 +95,51 @@ public class BatchGroup implements VectorAccessible { Stopwatch watch = new Stopwatch(); watch.start(); vas.readFromStream(inputStream); - VectorContainer c = (VectorContainer) vas.get(); + VectorContainer c = vas.get(); // logger.debug("Took {} us to read {} records", watch.elapsed(TimeUnit.MICROSECONDS), c.getRecordCount()); spilledBatches--; + currentContainer.zeroVectors(); + Iterator<VectorWrapper<?>> wrapperIterator = c.iterator(); + for (VectorWrapper w : currentContainer) { + TransferPair pair = wrapperIterator.next().getValueVector().makeTransferPair(w.getValueVector()); + pair.transfer(); + } + currentContainer.setRecordCount(c.getRecordCount()); + c.zeroVectors(); return c; } public int getNextIndex() { - if (pointer == currentContainer.getRecordCount()) { - if (!hasSecond || (batchPointer == 1 && spilledBatches == 0)) { + int val; + if (pointer == getRecordCount()) { + if (spilledBatches == 0) { return -1; - } else if (batchPointer == 1 && spilledBatches > 0) { - return -2; } - batchPointer++; - currentContainer = secondContainer; - pointer = 0; + try { + currentContainer.zeroVectors(); + getBatch(); + } catch (IOException e) { + throw new RuntimeException(e); + } + pointer = 1; + return 0; } if (sv2 == null) { - int val = pointer; + val = pointer; pointer++; assert val < currentContainer.getRecordCount(); - return val; } else { - int val = pointer; + val = pointer; pointer++; assert val < currentContainer.getRecordCount(); - return sv2.getIndex(val); - } - } - - public VectorContainer getFirstContainer() { - return firstContainer; - } - - public VectorContainer getSecondContainer() { - return secondContainer; - } - - public boolean hasSecond() { - return hasSecond; - } - - public void rotate() { - if (batchPointer == 0) { - return; - } - if (pointer == secondContainer.getRecordCount()) { - try { - getTwoBatches(); - return; - } catch (IOException e) { - throw new RuntimeException(e); - } - } - firstContainer.zeroVectors(); - Iterator<VectorWrapper<?>> wrapperIterator = secondContainer.iterator(); - for (VectorWrapper w : firstContainer) { - TransferPair pair = wrapperIterator.next().getValueVector().makeTransferPair(w.getValueVector()); - pair.transfer(); + val = sv2.getIndex(val); } - firstContainer.setRecordCount(secondContainer.getRecordCount()); - if (spilledBatches > 0) { - VectorContainer c = null; - try { - c = getBatch(); - } catch (IOException e) { - throw new RuntimeException(e); - } - secondContainer.zeroVectors(); - wrapperIterator = c.iterator(); - for (VectorWrapper w : secondContainer) { - TransferPair pair = wrapperIterator.next().getValueVector().makeTransferPair(w.getValueVector()); - pair.transfer(); - } - secondContainer.setRecordCount(c.getRecordCount()); - c.zeroVectors(); - } else { - secondContainer.zeroVectors(); - hasSecond = false; - } - batchPointer = 0; - currentContainer = firstContainer; + return val; } - private void getTwoBatches() throws IOException { - firstContainer.zeroVectors(); - if (spilledBatches > 0) { - VectorContainer c = getBatch(); - Iterator<VectorWrapper<?>> wrapperIterator = c.iterator(); - for (VectorWrapper w : firstContainer) { - TransferPair pair = wrapperIterator.next().getValueVector().makeTransferPair(w.getValueVector()); - pair.transfer(); - } - firstContainer.setRecordCount(c.getRecordCount()); - c.zeroVectors(); - } else { - batchPointer = -1; - pointer = -1; - firstContainer.zeroVectors(); - secondContainer.zeroVectors(); - } - if (spilledBatches > 0) { - VectorContainer c = getBatch(); - Iterator<VectorWrapper<?>> wrapperIterator = c.iterator(); - for (VectorWrapper w : secondContainer) { - TransferPair pair = wrapperIterator.next().getValueVector().makeTransferPair(w.getValueVector()); - pair.transfer(); - } - secondContainer.setRecordCount(c.getRecordCount()); - c.zeroVectors(); - } else { - secondContainer.zeroVectors(); - hasSecond = false; - } - batchPointer = 0; - currentContainer = firstContainer; - pointer = 0; -// BatchPrinter.printBatch(firstContainer); -// BatchPrinter.printBatch(secondContainer); - return; - } - - public int getBatchPointer() { - assert batchPointer < 2; - return batchPointer; + public VectorContainer getContainer() { + return currentContainer; } public void cleanup() throws IOException { @@ -263,7 +170,11 @@ public class BatchGroup implements VectorAccessible { @Override public int getRecordCount() { - return currentContainer.getRecordCount(); + if (sv2 != null) { + return sv2.getCount(); + } else { + return currentContainer.getRecordCount(); + } } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index d4b10014f..aa6527210 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.physical.impl.xsort; +import io.netty.buffer.DrillBuf; + import java.io.IOException; import java.util.Iterator; import java.util.LinkedList; @@ -93,10 +95,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { private SingleBatchSorter sorter; private SortRecordBatchBuilder builder; private MSorter mSorter; - private PriorityQueueSelector selector; private PriorityQueueCopier copier; private BufferAllocator copierAllocator; private LinkedList<BatchGroup> batchGroups = Lists.newLinkedList(); + private LinkedList<BatchGroup> spilledBatchGroups = Lists.newLinkedList(); private SelectionVector4 sv4; private FileSystem fs; private int spillCount = 0; @@ -104,6 +106,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { private long uid;//used for spill files to ensure multiple sorts within same fragment don't clobber each others' files private boolean useIncomingSchema = false; private boolean first = true; + private long totalSizeInMemory = 0; + private long highWaterMark = Long.MAX_VALUE; + private int targetRecordCount; public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { super(popConfig, context); @@ -128,7 +133,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { @Override public int getRecordCount() { - return sv4.getCount(); + if (sv4 != null) { + return sv4.getCount(); + } else { + return container.getRecordCount(); + } } @Override @@ -195,21 +204,22 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } else { Stopwatch w = new Stopwatch(); w.start(); - int count = selector.next(); +// int count = selector.next(); + int count = copier.next(targetRecordCount); if(count > 0){ long t = w.elapsed(TimeUnit.MICROSECONDS); logger.debug("Took {} us to merge {} records", t, count); container.setRecordCount(count); return IterOutcome.OK; }else{ - logger.debug("selector returned 0 records"); + logger.debug("copier returned 0 records"); return IterOutcome.NONE; } } } long totalcount = 0; - + try{ outer: while (true) { Stopwatch watch = new Stopwatch(); @@ -234,17 +244,21 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } // fall through. case OK: + totalSizeInMemory += getBufferSize(incoming); SelectionVector2 sv2; // if (incoming.getRecordCount() == 0) { // break outer; // } if (incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) { sv2 = incoming.getSelectionVector2(); + if (sv2.getBuffer(false).isRootBuffer()) { + oContext.getAllocator().takeOwnership(sv2.getBuffer(false)); + } } else { try { sv2 = newSV2(); } catch (OutOfMemoryException e) { - throw new RuntimeException(); + throw new RuntimeException(e); } } int count = sv2.getCount(); @@ -263,7 +277,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } batchGroups.add(new BatchGroup(rbd.getContainer(), rbd.getSv2())); batchesSinceLastSpill++; - if (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE) { + if ((spillCount > 0 && totalSizeInMemory > .75 * highWaterMark) || + (totalSizeInMemory > .95 * popConfig.getMaxAllocation()) || + (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE)) { mergeAndSpill(); batchesSinceLastSpill = 0; } @@ -271,6 +287,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { // logger.debug("Took {} us to sort {} records", t, count); break; case OUT_OF_MEMORY: + highWaterMark = totalSizeInMemory; if (batchesSinceLastSpill > 2) mergeAndSpill(); batchesSinceLastSpill = 0; break; @@ -279,25 +296,14 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } } -// if (schema == null || totalcount == 0){ -// builder may be null at this point if the first incoming batch is empty -// useIncomingSchema = true; -// return IterOutcome.NONE; -// } - if (spillCount == 0) { Stopwatch watch = new Stopwatch(); watch.start(); -// if (schema == null){ - // builder may be null at this point if the first incoming batch is empty -// useIncomingSchema = true; -// return IterOutcome.NONE; -// } builder = new SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES); for (BatchGroup group : batchGroups) { - RecordBatchData rbd = new RecordBatchData(group.getFirstContainer()); + RecordBatchData rbd = new RecordBatchData(group.getContainer()); rbd.setSv2(group.getSv2()); builder.add(rbd); } @@ -311,16 +317,32 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { sv4 = mSorter.getSV4(); long t = watch.elapsed(TimeUnit.MICROSECONDS); - logger.debug("Took {} us to sort {} records", t, sv4.getTotalCount()); +// logger.debug("Took {} us to sort {} records", t, sv4.getTotalCount()); + container.buildSchema(SelectionVectorMode.FOUR_BYTE); } else { - constructHyperBatch(batchGroups, this.container); - constructSV4(); - selector = createSelector(); - selector.setup(context, oContext.getAllocator(), this, sv4, batchGroups); - selector.next(); + mergeAndSpill(); + batchGroups.addAll(spilledBatchGroups); + logger.warn("Starting to merge. {} batch groups. Current allocated memory: {}", batchGroups.size(), oContext.getAllocator().getAllocatedMemory()); + VectorContainer hyperBatch = constructHyperBatch(batchGroups); + createCopier(hyperBatch, batchGroups, container); + int inMemoryRecordCount = 0; + for (BatchGroup g : batchGroups) { + inMemoryRecordCount += g.getRecordCount(); + } + int estimatedRecordSize = 0; + for (VectorWrapper w : batchGroups.get(0)) { + try { + estimatedRecordSize += TypeHelper.getSize(w.getField().getType()); + } catch (UnsupportedOperationException e) { + estimatedRecordSize += 50; + } + } + targetRecordCount = (int) Math.max(1, 250 * 1000 / estimatedRecordSize); + int count = copier.next(targetRecordCount); + container.buildSchema(SelectionVectorMode.NONE); + container.setRecordCount(count); } - container.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE); return IterOutcome.OK_NEW_SCHEMA; }catch(SchemaChangeException | ClassTransformationException | IOException ex){ @@ -329,67 +351,93 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { context.fail(ex); return IterOutcome.STOP; } catch (UnsupportedOperationException e) { - logger.error(e.getMessage()); throw new RuntimeException(e); } } public void mergeAndSpill() throws SchemaChangeException { logger.debug("Copier allocator current allocation {}", copierAllocator.getAllocatedMemory()); - VectorContainer hyperBatch = new VectorContainer(); VectorContainer outputContainer = new VectorContainer(); List<BatchGroup> batchGroupList = Lists.newArrayList(); - int recordCount = 0; - for (int i = 0; i < SPILL_BATCH_GROUP_SIZE; i++) { + int batchCount = batchGroups.size(); + for (int i = 0; i < batchCount / 2; i++) { if (batchGroups.size() == 0) { break; } - if (batchGroups.peekLast().getSecondContainer() != null) { + if (batchGroups.peekLast().getSv2() == null) { break; } BatchGroup batch = batchGroups.pollLast(); - recordCount += batch.getSv2().getCount(); batchGroupList.add(batch); + long bufferSize = getBufferSize(batch); + totalSizeInMemory -= bufferSize; } if (batchGroupList.size() == 0) { return; } - constructHyperBatch(batchGroupList, hyperBatch); - createCopier(hyperBatch, batchGroupList, outputContainer, recordCount); + int estimatedRecordSize = 0; + for (VectorWrapper w : batchGroups.get(0)) { + try { + estimatedRecordSize += TypeHelper.getSize(w.getField().getType()); + } catch (UnsupportedOperationException e) { + estimatedRecordSize += 50; + } + } + int targetRecordCount = Math.max(1, 250 * 1000 / estimatedRecordSize); + VectorContainer hyperBatch = constructHyperBatch(batchGroupList); + createCopier(hyperBatch, batchGroupList, outputContainer); - int count = copier.next(); + int count = copier.next(targetRecordCount); assert count > 0; VectorContainer c1 = VectorContainer.getTransferClone(outputContainer); c1.buildSchema(BatchSchema.SelectionVectorMode.NONE); c1.setRecordCount(count); - count = copier.next(); - assert count > 0; - - - VectorContainer c2 = VectorContainer.getTransferClone(outputContainer); - c2.buildSchema(BatchSchema.SelectionVectorMode.NONE); - c2.setRecordCount(count); - String outputFile = String.format(Utilities.getFileNameForQueryFragment(context, dirs.next(), "spill" + uid + "_" + spillCount++)); - BatchGroup newGroup = new BatchGroup(c1, c2, fs, outputFile, oContext.getAllocator()); + BatchGroup newGroup = new BatchGroup(c1, fs, outputFile, oContext.getAllocator()); try { - while ((count = copier.next()) > 0) { + while ((count = copier.next(targetRecordCount)) > 0) { outputContainer.buildSchema(BatchSchema.SelectionVectorMode.NONE); outputContainer.setRecordCount(count); newGroup.addBatch(outputContainer); } newGroup.closeOutputStream(); - batchGroups.add(newGroup); + spilledBatchGroups.add(newGroup); for (BatchGroup group : batchGroupList) { - group.cleanup(); + group.cleanup(); } hyperBatch.clear(); } catch (IOException e) { - throw new RuntimeException(e); + throw new RuntimeException(e); + } + takeOwnership(c1); + totalSizeInMemory += getBufferSize(c1); + } + + private void takeOwnership(VectorAccessible batch) { + for (VectorWrapper w : batch) { + DrillBuf[] bufs = w.getValueVector().getBuffers(false); + for (DrillBuf buf : bufs) { + if (buf.isRootBuffer()) { + oContext.getAllocator().takeOwnership(buf); + } } + } + } + + private long getBufferSize(VectorAccessible batch) { + long size = 0; + for (VectorWrapper w : batch) { + DrillBuf[] bufs = w.getValueVector().getBuffers(false); + for (DrillBuf buf : bufs) { + if (buf.isRootBuffer()) { + size += buf.capacity(); + } + } + } + return size; } private SelectionVector2 newSV2() throws OutOfMemoryException { @@ -401,8 +449,20 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { throw new RuntimeException(); } batchesSinceLastSpill = 0; - if (!sv2.allocateNew(incoming.getRecordCount())) { - throw new OutOfMemoryException(); + int waitTime = 1; + while (true) { + try { + Thread.sleep(waitTime * 1000); + } catch (InterruptedException e) { + throw new OutOfMemoryException(e); + } + waitTime *= 2; + if (sv2.allocateNew(incoming.getRecordCount())) { + break; + } + if (waitTime >= 32) { + throw new OutOfMemoryException("Unable to allocate sv2 buffer after repeated attempts"); + } } } for (int i = 0; i < incoming.getRecordCount(); i++) { @@ -412,35 +472,21 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { return sv2; } - private void constructHyperBatch(List<BatchGroup> batchGroupList, VectorContainer cont) { + private VectorContainer constructHyperBatch(List<BatchGroup> batchGroupList) { + VectorContainer cont = new VectorContainer(); for (MaterializedField field : schema) { - ValueVector[] vectors = new ValueVector[batchGroupList.size() * 2]; + ValueVector[] vectors = new ValueVector[batchGroupList.size()]; int i = 0; for (BatchGroup group : batchGroupList) { vectors[i++] = group.getValueAccessorById( field.getValueClass(), - group.getValueVectorId(field.getPath()).getFieldIds() - ).getValueVector(); - if (group.hasSecond()) { - VectorContainer c = group.getSecondContainer(); - vectors[i++] = c.getValueAccessorById( - field.getValueClass(), - c.getValueVectorId(field.getPath()).getFieldIds() - ).getValueVector(); - } else { - vectors[i] = vectors[i - 1].getTransferPair().getTo(); //this vector should never be used. Just want to avoid having null elements in the hyper vector - i++; - } + group.getValueVectorId(field.getPath()).getFieldIds()) + .getValueVector(); } cont.add(vectors); } cont.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE); - } - - private void constructSV4() throws SchemaChangeException { - BufferAllocator.PreAllocator preAlloc = oContext.getAllocator().getNewPreAllocator(); - preAlloc.preAllocate(4 * TARGET_RECORD_COUNT); - sv4 = new SelectionVector4(preAlloc.getAllocation(), TARGET_RECORD_COUNT, TARGET_RECORD_COUNT); + return cont; } private MSorter createNewMSorter() throws ClassTransformationException, IOException, SchemaChangeException { @@ -526,23 +572,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { g.getEvalBlock()._return(JExpr.lit(0)); } - private PriorityQueueSelector createSelector() throws SchemaChangeException { - CodeGenerator<PriorityQueueSelector> cg = CodeGenerator.get(PriorityQueueSelector.TEMPLATE_DEFINITION, context.getFunctionRegistry()); - ClassGenerator<PriorityQueueSelector> g = cg.getRoot(); - - generateComparisons(g, this); - - try { - PriorityQueueSelector c = context.getImplementationClass(cg); - return c; - } catch (ClassTransformationException e) { - throw new RuntimeException(e); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private void createCopier(VectorAccessible batch, List<BatchGroup> batchGroupList, VectorContainer outputContainer, int recordCount) throws SchemaChangeException { + private void createCopier(VectorAccessible batch, List<BatchGroup> batchGroupList, VectorContainer outputContainer) throws SchemaChangeException { try { if (copier == null) { CodeGenerator<PriorityQueueCopier> cg = CodeGenerator.get(PriorityQueueCopier.TEMPLATE_DEFINITION, context.getFunctionRegistry()); @@ -562,7 +592,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { outputContainer.add(v); allocators.add(VectorAllocator.getAllocator(v, 110)); } - copier.setup(context, copierAllocator, batch, batchGroupList, outputContainer, allocators, recordCount); + copier.setup(context, copierAllocator, batch, batchGroupList, outputContainer, allocators); } catch (ClassTransformationException e) { throw new RuntimeException(e); } catch (IOException e) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java index f2da71754..0eda0a6d2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java @@ -32,8 +32,9 @@ import org.apache.drill.exec.vector.allocator.VectorAllocator; import java.util.List; public interface PriorityQueueCopier { - public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups, VectorAccessible outgoing, List<VectorAllocator> allocators, int recordCnt) throws SchemaChangeException; - public int next(); + public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups, + VectorAccessible outgoing, List<VectorAllocator> allocators) throws SchemaChangeException; + public int next(int targetRecordCount); public List<VectorAllocator> getAllocators(); public void cleanup(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java index 6e9c355c0..b1c316ca1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java @@ -44,7 +44,8 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier private int targetRecordCount = ExternalSortBatch.SPILL_TARGET_RECORD_COUNT; @Override - public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups, VectorAccessible outgoing, List<VectorAllocator> allocators, int recordCnt) throws SchemaChangeException { + public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups, + VectorAccessible outgoing, List<VectorAllocator> allocators) throws SchemaChangeException { this.context = context; this.allocator = allocator; this.hyperBatch = hyperBatch; @@ -60,20 +61,15 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier queueSize = 0; for (int i = 0; i < size; i++) { - vector4.set(i, i * 2, batchGroups.get(i).getNextIndex()); + vector4.set(i, i, batchGroups.get(i).getNextIndex()); siftUp(); queueSize++; } - - // Check if the we have enough records to create BatchData with two containers. - if (recordCnt < (2 * targetRecordCount)) { - targetRecordCount = (recordCnt / 2); - } } @Override - public int next() { - allocateVectors(); + public int next(int targetRecordCount) { + allocateVectors(targetRecordCount); for (int outgoingIndex = 0; outgoingIndex < targetRecordCount; outgoingIndex++) { if (queueSize == 0) { cleanup(); @@ -81,15 +77,12 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier } int compoundIndex = vector4.get(0); int batch = compoundIndex >>> 16; - assert batch < batchGroups.size() * 2 : String.format("batch: %d batchGroups: %d", batch, batchGroups.size()); - int batchGroup = batch / 2; + assert batch < batchGroups.size() : String.format("batch: %d batchGroups: %d", batch, batchGroups.size()); if (!doCopy(compoundIndex, outgoingIndex)) { setValueCount(outgoingIndex); return outgoingIndex; } - int nextIndex = batchGroups.get(batchGroup).getNextIndex(); - batch = batch & 0xFFFE; - batch += batchGroups.get(batchGroup).getBatchPointer(); + int nextIndex = batchGroups.get(batch).getNextIndex(); if (nextIndex < 0) { vector4.set(0, vector4.get(--queueSize)); } else { @@ -117,6 +110,9 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier for (VectorWrapper w: outgoing) { w.getValueVector().clear(); } + for (VectorWrapper w : hyperBatch) { + w.clear(); + } } @Override @@ -136,7 +132,7 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier } } - private void allocateVectors() { + private void allocateVectors(int targetRecordCount) { for(VectorAllocator a : allocators){ a.alloc(targetRecordCount); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelector.java deleted file mode 100644 index 786667a8e..000000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelector.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.impl.xsort; - -import org.apache.drill.exec.compile.TemplateClassDefinition; -import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.record.VectorAccessible; -import org.apache.drill.exec.record.selection.SelectionVector4; -import org.apache.drill.exec.vector.allocator.VectorAllocator; - -import java.util.List; - -public interface PriorityQueueSelector { - public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, SelectionVector4 sv4, List<BatchGroup> batchGroups) throws SchemaChangeException; - public int next(); - public void cleanup(); - - public static TemplateClassDefinition<PriorityQueueSelector> TEMPLATE_DEFINITION = new TemplateClassDefinition<PriorityQueueSelector>(PriorityQueueSelector.class, PriorityQueueSelectorTemplate.class); - -} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelectorTemplate.java deleted file mode 100644 index 65a072ba7..000000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelectorTemplate.java +++ /dev/null @@ -1,161 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.physical.impl.xsort; - -import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.record.VectorAccessible; -import org.apache.drill.exec.record.selection.SelectionVector4; -import org.apache.drill.exec.vector.BigIntVector; -import org.apache.drill.exec.vector.allocator.VectorAllocator; - -import javax.inject.Named; -import java.util.List; - -public abstract class PriorityQueueSelectorTemplate implements PriorityQueueSelector { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PriorityQueueSelectorTemplate.class); - - private SelectionVector4 sv4; - private SelectionVector4 vector4; - private List<BatchGroup> batchGroups; - private FragmentContext context; - private BufferAllocator allocator; - private int size; - private int queueSize = 0; - private int targetRecordCount = ExternalSortBatch.TARGET_RECORD_COUNT; - private VectorAccessible hyperBatch; - - @Override - public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, SelectionVector4 sv4, List<BatchGroup> batchGroups) throws SchemaChangeException { - this.context = context; - this.allocator = allocator; - this.sv4 = sv4; - this.batchGroups = batchGroups; - this.size = batchGroups.size(); - this.hyperBatch = hyperBatch; - - BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator(); - preAlloc.preAllocate(4 * size); - vector4 = new SelectionVector4(preAlloc.getAllocation(), size, Character.MAX_VALUE); - doSetup(context, hyperBatch, null); - - for (int i = 0; i < size; i++, queueSize++) { - vector4.set(i, i * 2, batchGroups.get(i).getNextIndex()); - siftUp(); - } - } - - @Override - public int next() { - if (queueSize == 0) { - cleanup(); - return 0; - } - rotate(); - for (int outgoingIndex = 0; outgoingIndex < targetRecordCount; outgoingIndex++) { - int compoundIndex = vector4.get(0); - int batch = compoundIndex >> 16; - int batchGroup = batch / 2; //two containers per batch group - sv4.set(outgoingIndex, compoundIndex); - int nextIndex = batchGroups.get(batchGroup).getNextIndex(); - batch = batchGroup * 2 + batchGroups.get(batchGroup).getBatchPointer(); // increment batch pointer if batchGroup is currently using second container - if (nextIndex == -1) { - vector4.set(0, vector4.get(--queueSize)); - } else if (nextIndex == -2) { - vector4.set(0, batch - 1, 0); - sv4.setCount(outgoingIndex); - assert outgoingIndex != 0; - return outgoingIndex; - } else { - vector4.set(0, batch, nextIndex); - } - if (queueSize == 0) { - sv4.setCount(++outgoingIndex); - return outgoingIndex; - } - siftDown(); - } - sv4.setCount(targetRecordCount); - return targetRecordCount; - } - - private void rotate() { - for (BatchGroup group : batchGroups) { - group.rotate(); - } - for (int i = 0; i < vector4.getTotalCount(); i++) { - vector4.set(i, vector4.get(i) & 0xFFFEFFFF); - } - } - - @Override - public void cleanup() { - vector4.clear(); - } - - private final void siftUp() { - int p = queueSize; - while (p > 0) { - if (compare(p, (p - 1) / 2) < 0) { - swap(p, (p - 1) / 2); - p = (p - 1) / 2; - } else { - break; - } - } - } - - private final void siftDown() { - int p = 0; - int next; - while (p * 2 + 1 < queueSize) { - if (p * 2 + 2 >= queueSize) { - next = p * 2 + 1; - } else { - if (compare(p * 2 + 1, p * 2 + 2) <= 0) { - next = p * 2 + 1; - } else { - next = p * 2 + 2; - } - } - if (compare(p, next) > 0) { - swap(p, next); - p = next; - } else { - break; - } - } - } - - - public final void swap(int sv0, int sv1) { - int tmp = vector4.get(sv0); - vector4.set(sv0, vector4.get(sv1)); - vector4.set(sv1, tmp); - } - - public final int compare(int leftIndex, int rightIndex) { - int sv1 = vector4.get(leftIndex); - int sv2 = vector4.get(rightIndex); - return doEval(sv1, sv2); - } - - public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing); - public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex); -} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java index 381011513..b9690a6ca 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java @@ -143,7 +143,7 @@ public class WritableBatch { continue; } - for (DrillBuf b : vv.getBuffers()) { + for (DrillBuf b : vv.getBuffers(true)) { buffers.add(b); } // remove vv access to buffers. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java index 038bb2f4c..104301144 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java @@ -45,10 +45,14 @@ public class SelectionVector2 implements Closeable{ return recordCount; } - public DrillBuf getBuffer() - { - DrillBuf bufferHandle = this.buffer; + public DrillBuf getBuffer() { + return getBuffer(true); + } + public DrillBuf getBuffer(boolean clear) { + DrillBuf bufferHandle = this.buffer; + + if (clear) { /* Increment the ref count for this buffer */ bufferHandle.retain(); @@ -56,8 +60,9 @@ public class SelectionVector2 implements Closeable{ * caller. clear the buffer from within our selection vector */ clear(); + } - return bufferHandle; + return bufferHandle; } public void setBuffer(DrillBuf bufferHandle) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java index 4cc06c81f..209ec8fd7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java @@ -18,11 +18,15 @@ package org.apache.drill.exec.store; import java.util.Collection; +import java.util.Map; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.memory.OutOfMemoryException; +import org.apache.drill.exec.record.MaterializedField.Key; +import org.apache.drill.exec.vector.ValueVector; public abstract class AbstractRecordReader implements RecordReader { private static final String COL_NULL_ERROR = "Columns cannot be null. Use star column to select all fields."; @@ -59,4 +63,10 @@ public abstract class AbstractRecordReader implements RecordReader { }).isPresent(); } + @Override + public void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException { + for (ValueVector v : vectorMap.values()) { + v.allocateNew(); + } + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java index 17454217e..42cdcc385 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java @@ -19,7 +19,13 @@ package org.apache.drill.exec.store; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.MaterializedField.Key; +import org.apache.drill.exec.vector.ValueVector; + +import java.util.Map; public interface RecordReader { @@ -36,6 +42,8 @@ public interface RecordReader { */ public abstract void setup(OutputMutator output) throws ExecutionSetupException; + public abstract void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException; + /** * Set the operator context. The Reader can use this to access the operator context and allocate direct memory * if needed diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java index ee78c39b5..2bd9df576 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java @@ -31,6 +31,7 @@ import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.vector.BaseValueVector; import org.apache.drill.exec.vector.complex.fn.JsonReader; @@ -45,7 +46,7 @@ import org.apache.hadoop.fs.Path; import com.google.common.collect.Lists; -public class JSONRecordReader2 implements RecordReader{ +public class JSONRecordReader2 extends AbstractRecordReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader2.class); private OutputMutator mutator; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java index 81505ae89..0714ab840 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java @@ -30,6 +30,8 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.MaterializedField.Key; +import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockColumn; import org.apache.drill.exec.store.mock.MockGroupScanPOP.MockScanEntry; @@ -37,8 +39,9 @@ import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.ValueVector; import java.util.List; +import java.util.Map; -public class MockRecordReader implements RecordReader { +public class MockRecordReader extends AbstractRecordReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockRecordReader.class); private OutputMutator output; @@ -120,6 +123,17 @@ public class MockRecordReader implements RecordReader { } @Override + public void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException { + try { + for (ValueVector v : vectorMap.values()) { + AllocationHelper.allocate(v, Character.MAX_VALUE, 50, 10); + } + } catch (NullPointerException e) { + throw new OutOfMemoryException(); + } + } + + @Override public void cleanup() { } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java index 6c2d44c4b..c72e75030 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; @@ -36,14 +37,17 @@ import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.Types; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.MaterializedField.Key; import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.vector.NullableBitVector; +import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.RepeatedFixedWidthVector; import org.apache.hadoop.fs.FileSystem; @@ -320,6 +324,18 @@ public class ParquetRecordReader extends AbstractRecordReader { } } + @Override + public void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException { + try { + for (ValueVector v : vectorMap.values()) { + AllocationHelper.allocate(v, recordsPerBatch, 50, 10); + } + } catch (NullPointerException e) { + throw new OutOfMemoryException(); + } + } + + private SchemaPath toFieldName(String[] paths) { return SchemaPath.getCompoundPath(paths); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java index 7a864f026..14075f3a4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java @@ -24,12 +24,16 @@ import com.google.common.collect.Lists; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.record.MaterializedField.Key; import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.parquet.RowGroupReadEntry; +import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.BaseValueVector; +import org.apache.drill.exec.vector.RepeatedFixedWidthVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.VariableWidthVector; import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; @@ -114,6 +118,17 @@ public class DrillParquetReader extends AbstractRecordReader { } @Override + public void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException { + try { + for (ValueVector v : vectorMap.values()) { + AllocationHelper.allocate(v, Character.MAX_VALUE, 50, 10); + } + } catch (NullPointerException e) { + throw new OutOfMemoryException(); + } + } + + @Override public void setup(OutputMutator output) throws ExecutionSetupException { try { @@ -202,7 +217,11 @@ public class DrillParquetReader extends AbstractRecordReader { if (v instanceof VariableWidthVector) { filled = Math.max(filled, ((VariableWidthVector) v).getCurrentSizeInBytes() * 100 / ((VariableWidthVector) v).getByteCapacity()); } +// if (v instanceof RepeatedFixedWidthVector) { +// filled = Math.max(filled, ((RepeatedFixedWidthVector) v).getAccessor().getGroupCount() * 100) +// } } + logger.debug("Percent filled: {}", filled); return filled; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java index 48e09aa61..38160df8f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java @@ -22,12 +22,15 @@ import java.lang.reflect.Modifier; import java.sql.Timestamp; import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; -import org.apache.drill.exec.store.RecordReader; +import org.apache.drill.exec.record.MaterializedField.Key; +import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.pojo.Writers.BitWriter; import org.apache.drill.exec.store.pojo.Writers.DoubleWriter; import org.apache.drill.exec.store.pojo.Writers.EnumWriter; @@ -39,12 +42,12 @@ import org.apache.drill.exec.store.pojo.Writers.NDoubleWriter; import org.apache.drill.exec.store.pojo.Writers.NIntWriter; import org.apache.drill.exec.store.pojo.Writers.NTimeStampWriter; import org.apache.drill.exec.store.pojo.Writers.StringWriter; +import org.apache.drill.exec.vector.AllocationHelper; +import org.apache.drill.exec.vector.ValueVector; import com.google.common.collect.Lists; - - -public class PojoRecordReader<T> implements RecordReader{ +public class PojoRecordReader<T> extends AbstractRecordReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PojoRecordReader.class); public final int forJsonIgnore = 1; @@ -120,6 +123,13 @@ public class PojoRecordReader<T> implements RecordReader{ } + @Override + public void allocate(Map<Key, ValueVector> vectorMap) throws OutOfMemoryException { + for (ValueVector v : vectorMap.values()) { + AllocationHelper.allocate(v, Character.MAX_VALUE, 50, 10); + } + } + private void allocate(){ for(PojoWriter writer : writers){ writer.allocate(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java index 2031aeee7..ef65f2a3a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java @@ -127,8 +127,8 @@ public class DrillTextRecordReader extends AbstractRecordReader { @Override public int next() { - logger.debug("vector value capacity {}", vector.getValueCapacity()); - logger.debug("vector byte capacity {}", vector.getByteCapacity()); +// logger.debug("vector value capacity {}", vector.getValueCapacity()); +// logger.debug("vector byte capacity {}", vector.getByteCapacity()); int batchSize = 0; try { int recordCount = 0; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java index b711c66df..5db0299c3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java @@ -45,7 +45,10 @@ public abstract class BaseDataValueVector extends BaseValueVector{ */ @Override public void clear() { - if (data != null) { + if (data == null) { + data = DeadBuf.DEAD_BUFFER; + } + if (data != DeadBuf.DEAD_BUFFER) { data.release(); data = data.getAllocator().getEmpty(); valueCount = 0; @@ -62,16 +65,20 @@ public abstract class BaseDataValueVector extends BaseValueVector{ @Override - public DrillBuf[] getBuffers(){ + public DrillBuf[] getBuffers(boolean clear){ DrillBuf[] out; if(valueCount == 0){ out = new DrillBuf[0]; }else{ out = new DrillBuf[]{data}; - data.readerIndex(0); - data.retain(); + if (clear) { + data.readerIndex(0); + data.retain(); + } + } + if (clear) { + clear(); } - clear(); return out; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java index 1b3705e48..18da67d5f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java @@ -17,12 +17,11 @@ */ package org.apache.drill.exec.vector; +import io.netty.buffer.DrillBuf; + import java.util.Iterator; import org.apache.drill.common.expression.FieldReference; - -import io.netty.buffer.ByteBuf; - import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.UserBitShared.SerializedField; import org.apache.drill.exec.record.MaterializedField; @@ -62,7 +61,7 @@ public abstract class BaseValueVector implements ValueVector{ public abstract int getCurrentValueCount(); public abstract void setCurrentValueCount(int count); - abstract public ByteBuf getData(); + abstract public DrillBuf getData(); abstract static class BaseAccessor implements ValueVector.Accessor{ public abstract int getValueCount(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java index 2c215ffcd..032ccc243 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java @@ -136,7 +136,7 @@ public class ObjectVector extends BaseValueVector{ } @Override - public ByteBuf getData() { + public DrillBuf getData() { throw new UnsupportedOperationException("ObjectVector does not support this"); } @@ -166,7 +166,7 @@ public class ObjectVector extends BaseValueVector{ } @Override - public DrillBuf[] getBuffers() { + public DrillBuf[] getBuffers(boolean clear) { throw new UnsupportedOperationException("ObjectVector does not support this"); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java index f7f010abd..3433537ac 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java @@ -22,6 +22,7 @@ import io.netty.buffer.DrillBuf; import java.io.Closeable; import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.OutOfMemoryRuntimeException; import org.apache.drill.exec.proto.UserBitShared.SerializedField; import org.apache.drill.exec.record.MaterializedField; @@ -99,9 +100,12 @@ public interface ValueVector extends Closeable, Iterable<ValueVector> { * this buffer so it only should be used for in-context access. Also note that this buffer changes regularly thus * external classes shouldn't hold a reference to it (unless they change it). * - * @return The underlying DrillBuf. + * @param clear + * Whether to clear vector + * + * @return The underlying ByteBuf. */ - public abstract DrillBuf[] getBuffers(); + public abstract DrillBuf[] getBuffers(boolean clear); /** * Load the data provided in the buffer. Typically used when deserializing from the wire. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java index c91c39712..834719cd1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java @@ -276,10 +276,10 @@ public class MapVector extends AbstractContainerVector { } @Override - public DrillBuf[] getBuffers() { + public DrillBuf[] getBuffers(boolean clear) { List<DrillBuf> bufs = Lists.newArrayList(); for(ValueVector v : vectors.values()){ - for(DrillBuf b : v.getBuffers()){ + for(DrillBuf b : v.getBuffers(clear)){ bufs.add(b); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java index f903b0ca4..fef416f82 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java @@ -355,8 +355,8 @@ public class RepeatedListVector extends AbstractContainerVector implements Repea } @Override - public DrillBuf[] getBuffers() { - return ArrayUtils.addAll(offsets.getBuffers(), vector.getBuffers()); + public DrillBuf[] getBuffers(boolean clear) { + return ArrayUtils.addAll(offsets.getBuffers(clear), vector.getBuffers(clear)); } private void setVector(ValueVector v){ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java index 3fd1c12b7..678439b4c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java @@ -307,11 +307,11 @@ public class RepeatedMapVector extends AbstractContainerVector implements Repeat } @Override - public DrillBuf[] getBuffers() { - List<DrillBuf> bufs = Lists.newArrayList(offsets.getBuffers()); + public DrillBuf[] getBuffers(boolean clear) { + List<DrillBuf> bufs = Lists.newArrayList(offsets.getBuffers(clear)); for(ValueVector v : vectors.values()){ - for(DrillBuf b : v.getBuffers()){ + for(DrillBuf b : v.getBuffers(clear)){ bufs.add(b); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java index bb56e1046..912f9560b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.proto.BitData.FragmentRecordBatch; import org.apache.drill.exec.record.RawFragmentBatch; import org.apache.drill.exec.rpc.ResponseSender; import org.apache.drill.exec.rpc.data.DataRpcConfig; @@ -59,7 +60,9 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ } if (batch.getHeader().getIsOutOfMemory()) { logger.debug("Setting autoread false"); - if (!outOfMemory.get() && !buffer.peekFirst().getHeader().getIsOutOfMemory()) { + RawFragmentBatch firstBatch = buffer.peekFirst(); + FragmentRecordBatch header = firstBatch == null ? null :firstBatch.getHeader(); + if (!outOfMemory.get() && !(header == null) && header.getIsOutOfMemory()) { buffer.addFirst(batch); } outOfMemory.set(true); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 9521e65d6..603aeab96 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -45,6 +45,7 @@ import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.opt.BasicOptimizer; import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.ExternalSort; import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch; import org.apache.drill.exec.planner.fragment.Fragment; @@ -333,6 +334,24 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ return; } + int sortCount = 0; + for (PhysicalOperator op : plan.getSortedOperators()) { + if (op instanceof ExternalSort) sortCount++; + } + + if (sortCount > 0) { + long maxWidthPerNode = context.getOptions().getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val; + long maxAllocPerNode = Math.min(DrillConfig.getMaxDirectMemory(), context.getConfig().getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC)); + maxAllocPerNode = Math.min(maxAllocPerNode, context.getOptions().getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val); + long maxSortAlloc = maxAllocPerNode / (sortCount * maxWidthPerNode); + logger.debug("Max sort alloc: {}", maxSortAlloc); + for (PhysicalOperator op : plan.getSortedOperators()) { + if (op instanceof ExternalSort) { + ((ExternalSort)op).setMaxAllocation(maxSortAlloc); + } + } + } + PlanningSet planningSet = StatsCollector.collectStats(rootFragment); SimpleParallelizer parallelizer = new SimpleParallelizer(context); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java index 2a8db6784..6b4ee9b01 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java @@ -114,7 +114,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid } } } catch (AssertionError | Exception e) { - logger.debug("Failure while initializing operator tree", e); + logger.debug("Error while initializing or executing fragment", e); context.fail(e); internalFail(e); } finally { diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java index 877ffc25f..fb0d1df5e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java @@ -180,6 +180,7 @@ public class TestParquetWriter extends BaseTestQuery { } @Test + @Ignore public void testRepeatedBool() throws Exception { String inputTable = "cp.`parquet/repeated_bool_data.json`"; runTestAndValidate("*", "*", inputTable, "repeated_bool_parquet"); @@ -225,7 +226,7 @@ public class TestParquetWriter extends BaseTestQuery { public void runTestAndValidate(String selection, String validationSelection, String inputTable, String outputFile) throws Exception { - Path path = new Path("/tmp/drilltest/" + outputFile); + Path path = new Path("/tmp/" + outputFile); if (fs.exists(path)) { fs.delete(path, true); } |