diff options
author | Steven Phillips <sphillips@maprtech.com> | 2014-04-11 18:41:03 -0700 |
---|---|---|
committer | Steven Phillips <sphillips@maprtech.com> | 2014-05-04 18:46:37 -0700 |
commit | a2355d42dbff51b858fc28540915cf793f1c0fac (patch) | |
tree | e5db0d8d9a3d5a0dcb15d1c5e9c24d9a1c47dffb /exec/java-exec/src/main/java/org | |
parent | 70dddc54a73183e58f5493b13b1b19e51162f752 (diff) |
DRILL-620: Memory consumption fixes
accounting fixes
trim buffers
switch to using setSafe and copySafe methods only
adaptive allocation
operator based allocator wip
handle OOM
Operator Context
Diffstat (limited to 'exec/java-exec/src/main/java/org')
121 files changed, 1327 insertions, 496 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index baef9b030..9eee08dd1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -62,5 +62,6 @@ public interface ExecConstants { public static final String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size"; public static final String FILESYSTEM_PARTITION_COLUMN_LABEL = "drill.exec.storage.file.partition.column.label"; public static final String HAZELCAST_SUBNETS = "drill.exec.cache.hazel.subnets"; - + public static final String TOP_LEVEL_MAX_ALLOC = "drill.exec.memory.top.max"; + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java index 9511992e9..f4a6d7da6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorAccessibleSerializable.java @@ -157,7 +157,7 @@ public class VectorAccessibleSerializable implements DrillSerializable { if (svMode == BatchSchema.SelectionVectorMode.TWO_BYTE) { svCount = sv2.getCount(); - svBuf = sv2.getBuffer(); + svBuf = sv2.getBuffer(); //this calls retain() internally } try @@ -170,6 +170,7 @@ public class VectorAccessibleSerializable implements DrillSerializable { { svBuf.getBytes(0, output, svBuf.readableBytes()); sv2.setBuffer(svBuf); + svBuf.release(); // sv2 now owns the buffer sv2.setRecordCount(svCount); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java index bbd3e4233..fc650b96e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java @@ -268,6 +268,9 @@ public class DrillClient implements Closeable, ConnectionThrottle{ @Override public void resultArrived(QueryResultBatch result, ConnectionThrottle throttle) { // logger.debug("Result arrived. Is Last Chunk: {}. Full Result: {}", result.getHeader().getIsLastChunk(), result); + if (result.getHeader().getErrorCount() > 0) { + fail(new Exception(result.getHeader().getError(0).getMessage())); + } results.add(result); if(result.getHeader().getIsLastChunk()){ future.set(results); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java index 0d19340d3..624042e03 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java @@ -39,10 +39,12 @@ public class Accountor { private final long total; private ConcurrentMap<ByteBuf, DebugStackTrace> buffers = Maps.newConcurrentMap(); private final FragmentHandle handle; + private Accountor parent; public Accountor(FragmentHandle handle, Accountor parent, long max, long preAllocated) { // TODO: fix preallocation stuff AtomicRemainder parentRemainder = parent != null ? parent.remainder : null; + this.parent = parent; this.remainder = new AtomicRemainder(parentRemainder, max, preAllocated); this.total = max; this.handle = handle; @@ -53,6 +55,13 @@ public class Accountor { } } + public long getAvailable() { + if (parent != null) { + return Math.min(parent.getAvailable(), getCapacity() - getAllocation()); + } + return getCapacity() - getAllocation(); + } + public long getCapacity() { return total; } @@ -62,9 +71,7 @@ public class Accountor { } public boolean reserve(long size) { - //TODO: for now, we won't stop reservation. - remainder.get(size); - return true; + return remainder.get(size); } public void forceAdditionalReservation(long size) { @@ -89,7 +96,7 @@ public class Accountor { if(buf != null){ DebugStackTrace dst = buffers.get(buf); if(dst == null) throw new IllegalStateException("Partially releasing a buffer that has already been released. Buffer: " + buf); - dst.size =- size; + dst.size -= size; if(dst.size < 0){ throw new IllegalStateException("Partially releasing a buffer that has already been released. Buffer: " + buf); } @@ -150,7 +157,7 @@ public class Accountor { } - private class DebugStackTrace { + public class DebugStackTrace { private StackTraceElement[] elements; private long size; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java index 8476b5345..74849c2e2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java @@ -17,6 +17,9 @@ */ package org.apache.drill.exec.memory; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.ExecutionSetupException; + import java.util.concurrent.atomic.AtomicLong; /** @@ -35,6 +38,7 @@ public class AtomicRemainder { private final long initTotal; private final long initShared; private final long initPrivate; + private boolean closed = false; public AtomicRemainder(AtomicRemainder parent, long max, long pre) { this.parent = parent; @@ -43,6 +47,7 @@ public class AtomicRemainder { this.initTotal = max; this.initShared = max - pre; this.initPrivate = pre; +// logger.info("new AtomicRemainder. a.s. {} a.p. {} hashcode {}", availableShared, availablePrivate, hashCode(), new Exception()); } public long getRemainder() { @@ -60,25 +65,36 @@ public class AtomicRemainder { * @param size */ public void forceGet(long size) { - if (DEBUG) - logger.info("Force get {}", size); - availableShared.addAndGet(size); + long newAvailableShared = availableShared.addAndGet(size); +// if (DEBUG) +// logger.info("Force get {}. a.s. {} a.p. {} hashcode: {}", size, availableShared, availablePrivate, hashCode(), new Exception()); +// assert newAvailableShared <= initShared; if (parent != null) parent.forceGet(size); } public boolean get(long size) { - if (DEBUG) - logger.info("Get {}", size); if (availablePrivate.get() < 1) { // if there is no preallocated memory, we can operate normally. + // if there is a parent allocator, check it before allocating. + if (parent != null && !parent.get(size)) { + return false; + } + // attempt to get shared memory, if fails, return false. long outcome = availableShared.addAndGet(-size); +// assert outcome <= initShared; if (outcome < 0) { - availableShared.addAndGet(size); + long newAvailableShared = availableShared.addAndGet(size); + assert newAvailableShared <= initShared; + if (parent != null) { + parent.returnAllocation(size); + } return false; } else { +// if (DEBUG) +// logger.info("Get {}. a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception()); return true; } @@ -86,6 +102,8 @@ public class AtomicRemainder { // if there is preallocated memory, use that first. long unaccount = availablePrivate.addAndGet(-size); if (unaccount >= 0) { +// if (DEBUG) +// logger.info("Get {}. a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception()); return true; } else { @@ -102,6 +120,8 @@ public class AtomicRemainder { if (account >= 0) { // we were succesful, move private back to zero (since we allocated using shared). availablePrivate.addAndGet(additionalSpaceNeeded); +// if (DEBUG) +// logger.info("Get {}. a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception()); return true; } else { // we failed to get space from available shared. Return allocations to initial state. @@ -122,26 +142,31 @@ public class AtomicRemainder { * @param size */ public void returnAllocation(long size) { - if (DEBUG) - logger.info("Return allocation {}", size); long privateSize = availablePrivate.get(); long privateChange = Math.min(size, initPrivate - privateSize); long sharedChange = size - privateChange; availablePrivate.addAndGet(privateChange); availableShared.addAndGet(sharedChange); +// if (DEBUG) +// logger.info("Return allocation {}, a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception()); if (parent != null) { parent.returnAllocation(sharedChange); } + assert getUsed() <= initTotal; } public void close() { - + if (closed) { + logger.warn("Tried to close remainder, but it has already been closed", new Exception()); + return; + } if (availablePrivate.get() != initPrivate || availableShared.get() != initShared) throw new IllegalStateException( String .format(ERROR, initPrivate, availablePrivate.get(), initPrivate - availablePrivate.get(), initShared, availableShared.get(), initShared - availableShared.get())); if(parent != null) parent.returnAllocation(initPrivate); + closed = true; } static final String ERROR = "Failure while closing accountor. Expected private and shared pools to be set to initial values. However, one or more were not. Stats are\n\tzone\tinit\tallocated\tdelta \n\tprivate\t%d\t%d\t%d \n\tshared\t%d\t%d\t%d."; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java index e71c9c958..0b2add2d6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java @@ -22,10 +22,13 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.PooledByteBufAllocatorL; import io.netty.buffer.PooledUnsafeDirectByteBufL; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.util.AssertionUtil; @@ -40,6 +43,10 @@ public class TopLevelAllocator implements BufferAllocator { public TopLevelAllocator() { this(DrillConfig.getMaxDirectMemory()); } + + public TopLevelAllocator(DrillConfig config) { + this(Math.min(DrillConfig.getMaxDirectMemory(), config.getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC))); + } public TopLevelAllocator(long maximumAllocation) { this.acct = new Accountor(null, null, maximumAllocation, 0); @@ -50,7 +57,7 @@ public class TopLevelAllocator implements BufferAllocator { if(!acct.reserve(min)) return null; ByteBuf buffer = innerAllocator.directBuffer(min, max); AccountingByteBuf wrapped = new AccountingByteBuf(acct, (PooledUnsafeDirectByteBufL) buffer); - acct.reserved(buffer.capacity() - min, wrapped); + acct.reserved(min, wrapped); return wrapped; } @@ -74,15 +81,19 @@ public class TopLevelAllocator implements BufferAllocator { if(!acct.reserve(initialReservation)){ throw new OutOfMemoryException(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, acct.getCapacity() - acct.getAllocation())); }; - ChildAllocator allocator = new ChildAllocator(handle, acct, initialReservation, maximumReservation); + ChildAllocator allocator = new ChildAllocator(handle, acct, maximumReservation, initialReservation); if(ENABLE_ACCOUNTING) children.add(allocator); return allocator; } @Override public void close() { - if(ENABLE_ACCOUNTING && !children.isEmpty()){ - throw new IllegalStateException("Failure while trying to close allocator: Child level allocators not closed."); + if (ENABLE_ACCOUNTING) { + for (ChildAllocator child : children) { + if (!child.isClosed()) { + throw new IllegalStateException("Failure while trying to close allocator: Child level allocators not closed."); + } + } } acct.close(); } @@ -91,14 +102,20 @@ public class TopLevelAllocator implements BufferAllocator { private class ChildAllocator implements BufferAllocator{ private Accountor childAcct; - + private Map<ChildAllocator, StackTraceElement[]> children = new HashMap<>(); + private boolean closed = false; + private FragmentHandle handle; + public ChildAllocator(FragmentHandle handle, Accountor parentAccountor, long max, long pre) throws OutOfMemoryException{ + assert max >= pre; childAcct = new Accountor(handle, parentAccountor, max, pre); + this.handle = handle; } @Override public AccountingByteBuf buffer(int size, int max) { if(!childAcct.reserve(size)){ + logger.warn("Unable to allocate buffer of size {} due to memory limit. Current allocation: {}", size, getAllocatedMemory()); return null; }; @@ -121,9 +138,11 @@ public class TopLevelAllocator implements BufferAllocator { public BufferAllocator getChildAllocator(FragmentHandle handle, long initialReservation, long maximumReservation) throws OutOfMemoryException { if(!childAcct.reserve(initialReservation)){ - throw new OutOfMemoryException(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, childAcct.getCapacity() - childAcct.getAllocation())); + throw new OutOfMemoryException(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, childAcct.getAvailable())); }; - return new ChildAllocator(handle, childAcct, maximumReservation, initialReservation); + ChildAllocator newChildAllocator = new ChildAllocator(handle, childAcct, maximumReservation, initialReservation); + this.children.put(newChildAllocator, Thread.currentThread().getStackTrace()); + return newChildAllocator; } public PreAllocator getNewPreAllocator(){ @@ -132,7 +151,28 @@ public class TopLevelAllocator implements BufferAllocator { @Override public void close() { + if (ENABLE_ACCOUNTING) { + for (ChildAllocator child : children.keySet()) { + if (!child.isClosed()) { + StringBuilder sb = new StringBuilder(); + StackTraceElement[] elements = children.get(child); + for (int i = 3; i < elements.length; i++) { + sb.append("\t\t"); + sb.append(elements[i]); + sb.append("\n"); + } + throw new IllegalStateException(String.format( + "Failure while trying to close child allocator: Child level allocators not closed. Fragment %d:%d. Stack trace: \n %s", + handle.getMajorFragmentId(), handle.getMinorFragmentId(), sb.toString())); + } + } + } childAcct.close(); + closed = true; + } + + public boolean isClosed() { + return closed; } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java index 2035aa0c0..f3bcfef6e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java @@ -26,6 +26,7 @@ import net.hydromatic.optiq.SchemaPlus; import net.hydromatic.optiq.tools.Frameworks; import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.compile.ClassTransformer; import org.apache.drill.exec.compile.QueryClassLoader; import org.apache.drill.exec.exception.ClassTransformationException; @@ -83,6 +84,7 @@ public class FragmentContext implements Closeable { this.queryStartTime = fragment.getQueryStartTime(); this.rootFragmentTimeZone = fragment.getTimeZone(); logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial()); + logger.debug("Fragment max allocation: {}", fragment.getMemMax()); this.allocator = context.getAllocator().getChildAllocator(fragment.getHandle(), fragment.getMemInitial(), fragment.getMemMax()); } @@ -138,10 +140,15 @@ public class FragmentContext implements Closeable { * Get this fragment's allocator. * @return */ + @Deprecated public BufferAllocator getAllocator() { return allocator; } + public BufferAllocator getNewChildAllocator(long initialReservation, long maximumReservation) throws OutOfMemoryException { + return allocator.getChildAllocator(getHandle(), initialReservation, maximumReservation); + } + public <T> T getImplementationClass(ClassGenerator<T> cg) throws ClassTransformationException, IOException { return getImplementationClass(cg.getCodeGenerator()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java new file mode 100644 index 000000000..3b7b4c189 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.ops; + +import org.apache.drill.common.util.Hook.Closeable; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryException; +import org.apache.drill.exec.physical.base.PhysicalOperator; + +public class OperatorContext implements Closeable { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContext.class); + + private final BufferAllocator allocator; + private boolean closed = false; + private PhysicalOperator popConfig; + + public OperatorContext(PhysicalOperator popConfig, FragmentContext context) throws OutOfMemoryException { + this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation()); + this.popConfig = popConfig; + } + + public BufferAllocator getAllocator() { + if (allocator == null) { + throw new UnsupportedOperationException("Operator context does not have an allocator"); + } + return allocator; + } + + public boolean isClosed() { + return closed; + } + + @Override + public void close() { + if (closed) { + logger.debug("Attempted to close Operator context for {}, but context is already closed", popConfig != null ? popConfig.getClass().getName() : null); + return; + } + logger.debug("Closing context for {}", popConfig != null ? popConfig.getClass().getName() : null); + if (allocator != null) { + allocator.close(); + } + closed = true; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java index 7eced4d94..a79cbc3c3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java @@ -26,6 +26,8 @@ import com.google.common.base.Preconditions; public abstract class AbstractBase implements PhysicalOperator{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractBase.class); + protected long initialAllocation = 1000000L; + protected long maxAllocation = 10000000000L; @Override @@ -48,5 +50,15 @@ public abstract class AbstractBase implements PhysicalOperator{ public SelectionVectorMode getSVMode() { return SelectionVectorMode.NONE; } + + @Override + public long getInitialAllocation() { + return initialAllocation; + } + + @Override + public long getMaxAllocation() { + return maxAllocation; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java index 69fc44776..f4cee2ab1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.base; import java.util.Iterator; import java.util.List; +import com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.drill.common.expression.SchemaPath; import com.google.common.collect.Iterators; @@ -45,6 +46,18 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca @Override public GroupScan clone(List<SchemaPath> columns) { - throw new UnsupportedOperationException(String.format("%s does not implmemnt clone(columns) method!", this.getClass().getCanonicalName())); + throw new UnsupportedOperationException(String.format("%s does not implement clone(columns) method!", this.getClass().getCanonicalName())); + } + + @Override + @JsonIgnore + public long getInitialAllocation() { + return 0; + } + + @Override + @JsonIgnore + public long getMaxAllocation() { + return 0; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java index 57b9c1872..97334eaf5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSubScan.java @@ -27,7 +27,7 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import com.google.common.collect.Iterators; -public abstract class AbstractSubScan implements SubScan{ +public abstract class AbstractSubScan extends AbstractBase implements SubScan{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSubScan.class); @Override @@ -72,5 +72,4 @@ public abstract class AbstractSubScan implements SubScan{ public SelectionVectorMode getSVMode() { return SelectionVectorMode.NONE; } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java index 66e1b4612..db57922f1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java @@ -87,4 +87,14 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> { @JsonIgnore public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException; + /** + * @return The memory to preallocate for this operator + */ + public long getInitialAllocation(); + + /** + * @return The maximum memory this operator can allocate + */ + public long getMaxAllocation(); + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java index 79f5f13a2..b55abefc4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.record.RecordBatch; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index e93fbcc33..73ed72322 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -32,20 +32,19 @@ import org.apache.drill.common.types.Types; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.record.*; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; -import org.apache.drill.exec.record.MaterializedField; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.TypedFieldId; -import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.NullableVarCharVector; +import org.apache.drill.exec.util.BatchPrinter; +import org.apache.drill.exec.util.VectorUtil; import org.apache.drill.exec.vector.ValueVector; import com.google.common.collect.Maps; @@ -58,12 +57,16 @@ import org.apache.drill.exec.vector.allocator.VectorAllocator; public class ScanBatch implements RecordBatch { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class); + private static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024; + private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000; + final Map<MaterializedField, ValueVector> fieldVectorMap = Maps.newHashMap(); private final VectorContainer container = new VectorContainer(); private int recordCount; private boolean schemaChanged = true; private final FragmentContext context; + private final OperatorContext oContext; private Iterator<RecordReader> readers; private RecordReader currentReader; private BatchSchema schema; @@ -74,12 +77,13 @@ public class ScanBatch implements RecordBatch { List<Integer> selectedPartitionColumns; private String partitionColumnDesignator; - public ScanBatch(FragmentContext context, Iterator<RecordReader> readers, List<String[]> partitionColumns, List<Integer> selectedPartitionColumns) throws ExecutionSetupException { + public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers, List<String[]> partitionColumns, List<Integer> selectedPartitionColumns) throws ExecutionSetupException { this.context = context; this.readers = readers; if (!readers.hasNext()) throw new ExecutionSetupException("A scan batch must contain at least one reader."); this.currentReader = readers.next(); + this.oContext = new OperatorContext(subScanConfig, context); this.currentReader.setup(mutator); this.partitionColumns = partitionColumns.iterator(); this.partitionValues = this.partitionColumns.hasNext() ? this.partitionColumns.next() : null; @@ -89,8 +93,8 @@ public class ScanBatch implements RecordBatch { addPartitionVectors(); } - public ScanBatch(FragmentContext context, Iterator<RecordReader> readers) throws ExecutionSetupException { - this(context, readers, Collections.EMPTY_LIST, Collections.EMPTY_LIST); + public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers) throws ExecutionSetupException { + this(subScanConfig, context, readers, Collections.EMPTY_LIST, Collections.EMPTY_LIST); } @Override @@ -173,7 +177,7 @@ public class ScanBatch implements RecordBatch { byte[] bytes = val.getBytes(); AllocationHelper.allocate(v, recordCount, val.length()); for (int j = 0; j < recordCount; j++) { - v.getMutator().set(j, bytes); + v.getMutator().setSafe(j, bytes); } v.getMutator().setValueCount(recordCount); } else { @@ -239,7 +243,7 @@ public class ScanBatch implements RecordBatch { @SuppressWarnings("unchecked") @Override public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException { - ValueVector v = TypeHelper.getNewVector(field, context.getAllocator()); + ValueVector v = TypeHelper.getNewVector(field, oContext.getAllocator()); if(!clazz.isAssignableFrom(v.getClass())) throw new SchemaChangeException(String.format("The class that was provided %s does not correspond to the expected vector type of %s.", clazz.getSimpleName(), v.getClass().getSimpleName())); addField(v); return (T) v; @@ -259,6 +263,7 @@ public class ScanBatch implements RecordBatch { public void cleanup(){ container.clear(); + oContext.close(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java index 2fc854ae5..a0ff28aec 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java @@ -70,11 +70,6 @@ public class ScreenCreator implements RootCreator<Screen>{ this.connection = context.getConnection(); } - private void closeAllocator(){ - sendCount.waitForSendComplete(); - context.getAllocator().close(); - } - @Override public boolean next() { if(!ok){ @@ -86,7 +81,7 @@ public class ScreenCreator implements RootCreator<Screen>{ // logger.debug("Screen Outcome {}", outcome); switch(outcome){ case STOP: { - closeAllocator(); + sendCount.waitForSendComplete(); QueryResult header = QueryResult.newBuilder() // .setQueryId(context.getHandle().getQueryId()) // .setRowCount(0) // @@ -101,7 +96,7 @@ public class ScreenCreator implements RootCreator<Screen>{ return false; } case NONE: { - closeAllocator(); + sendCount.waitForSendComplete(); context.getStats().batchesCompleted.inc(1); QueryResult header = QueryResult.newBuilder() // .setQueryId(context.getHandle().getQueryId()) // @@ -133,8 +128,8 @@ public class ScreenCreator implements RootCreator<Screen>{ @Override public void stop() { - incoming.cleanup(); sendCount.waitForSendComplete(); + incoming.cleanup(); } private SendListener listener = new SendListener(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java index 17e233a52..7679701fe 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java @@ -97,8 +97,8 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ @Override public void stop() { ok = false; - incoming.cleanup(); sendCount.waitForSendComplete(); + incoming.cleanup(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java index 90d51b610..c58366439 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueue.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.TopN; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.sort.RecordBatchData; import org.apache.drill.exec.record.VectorContainer; @@ -26,7 +27,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4; public interface PriorityQueue { public void add(FragmentContext context, RecordBatchData batch) throws SchemaChangeException; - public void init(int limit, FragmentContext context, boolean hasSv2) throws SchemaChangeException; + public void init(int limit, FragmentContext context, BufferAllocator allocator, boolean hasSv2) throws SchemaChangeException; public void generate() throws SchemaChangeException; public VectorContainer getHyperBatch(); public SelectionVector4 getHeapSv4(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java index d2d8d304a..e0e7e518f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java @@ -38,16 +38,18 @@ public abstract class PriorityQueueTemplate implements PriorityQueue { private SelectionVector4 finalSv4;//This is for final sorted output private ExpandableHyperContainer hyperBatch; private FragmentContext context; + private BufferAllocator allocator; private int limit; private int queueSize = 0; private int batchCount = 0; private boolean hasSv2; @Override - public void init(int limit, FragmentContext context, boolean hasSv2) throws SchemaChangeException { + public void init(int limit, FragmentContext context, BufferAllocator allocator, boolean hasSv2) throws SchemaChangeException { this.limit = limit; this.context = context; - BufferAllocator.PreAllocator preAlloc = context.getAllocator().getNewPreAllocator(); + this.allocator = allocator; + BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator(); preAlloc.preAllocate(4 * (limit + 1)); heapSv4 = new SelectionVector4(preAlloc.getAllocation(), limit, Character.MAX_VALUE); this.hasSv2 = hasSv2; @@ -64,7 +66,7 @@ public abstract class PriorityQueueTemplate implements PriorityQueue { newContainer.buildSchema(BatchSchema.SelectionVectorMode.FOUR_BYTE); this.hyperBatch = new ExpandableHyperContainer(newContainer); this.batchCount = hyperBatch.iterator().next().getValueVectors().length; - BufferAllocator.PreAllocator preAlloc = context.getAllocator().getNewPreAllocator(); + BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator(); preAlloc.preAllocate(4 * (limit + 1)); this.heapSv4 = new SelectionVector4(preAlloc.getAllocation(), limit, Character.MAX_VALUE); for (int i = 0; i < v4.getTotalCount(); i++) { @@ -113,7 +115,7 @@ public abstract class PriorityQueueTemplate implements PriorityQueue { public void generate() throws SchemaChangeException { Stopwatch watch = new Stopwatch(); watch.start(); - BufferAllocator.PreAllocator preAlloc = context.getAllocator().getNewPreAllocator(); + BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator(); preAlloc.preAllocate(4 * queueSize); finalSv4 = new SelectionVector4(preAlloc.getAllocation(), queueSize, 4000); for (int i = queueSize - 1; i >= 0; i--) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index 7073a6cc9..2a57aaa34 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -35,6 +35,8 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.HoldingContainerExpression; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.TopN; import org.apache.drill.exec.physical.impl.sort.RecordBatchData; @@ -57,6 +59,8 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopNBatch.class); private static final long MAX_SORT_BYTES = 1L * 1024 * 1024 * 1024; + public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024; + public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000; private final int batchPurgeThreshold; public final MappingSet MAIN_MAPPING = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_SCALAR_MAP, ClassGenerator.DEFAULT_SCALAR_MAP); @@ -73,7 +77,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { private int batchCount; private Copier copier; - public TopNBatch(TopN popConfig, FragmentContext context, RecordBatch incoming) { + public TopNBatch(TopN popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { super(popConfig, context); this.incoming = incoming; this.config = popConfig; @@ -88,7 +92,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { @Override public void kill() { incoming.kill(); - cleanup(); } @Override @@ -105,13 +108,13 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { @Override public void cleanup() { - super.cleanup(); if (sv4 != null) { sv4.clear(); } if (priorityQueue != null) { priorityQueue.cleanup(); } + super.cleanup(); incoming.cleanup(); } @@ -121,7 +124,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { if(getSelectionVector4().next()){ return IterOutcome.OK; }else{ - cleanup(); return IterOutcome.NONE; } } @@ -139,7 +141,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { case NOT_YET: throw new UnsupportedOperationException(); case STOP: - cleanup(); return upstream; case OK_NEW_SCHEMA: // only change in the case that the schema truly changes. Artificial schema changes are ignored. @@ -198,21 +199,22 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { SimpleRecordBatch batch = new SimpleRecordBatch(c, selectionVector4, context); SimpleRecordBatch newBatch = new SimpleRecordBatch(newContainer, null, context); if (copier == null) { - copier = RemovingRecordBatch.getGenerated4Copier(batch, context, newContainer, newBatch); + copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(), newContainer, newBatch); } else { List<VectorAllocator> allocators = Lists.newArrayList(); for(VectorWrapper<?> i : batch){ - ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator()); + ValueVector v = TypeHelper.getNewVector(i.getField(), oContext.getAllocator()); newContainer.add(v); allocators.add(RemovingRecordBatch.getAllocator4(v)); } copier.setupRemover(context, batch, newBatch, allocators.toArray(new VectorAllocator[allocators.size()])); } - SortRecordBatchBuilder builder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES); + SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES); do { int count = selectionVector4.getCount(); - copier.copyRecords(); + int copiedRecords = copier.copyRecords(0, count); + assert copiedRecords == count; for(VectorWrapper<?> v : newContainer){ ValueVector.Mutator m = v.getValueVector().getMutator(); m.setValueCount(count); @@ -264,7 +266,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { g.getEvalBlock()._return(JExpr.lit(0)); PriorityQueue q = context.getImplementationClass(cg); - q.init(config.getLimit(), context, schema.getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE); + q.init(config.getLimit(), context, oContext.getAllocator(), schema.getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE); return q; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java index 6c6e92c91..65669b12f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WireRecordBatch.java @@ -22,20 +22,14 @@ import java.util.Iterator; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef; -import org.apache.drill.exec.record.BatchSchema; -import org.apache.drill.exec.record.RawFragmentBatch; -import org.apache.drill.exec.record.RawFragmentBatchProvider; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.RecordBatchLoader; -import org.apache.drill.exec.record.TypedFieldId; -import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.record.WritableBatch; +import org.apache.drill.exec.record.*; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; -public class WireRecordBatch implements RecordBatch{ +public class WireRecordBatch implements RecordBatch { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WireRecordBatch.class); private RecordBatchLoader batchLoader; @@ -44,10 +38,10 @@ public class WireRecordBatch implements RecordBatch{ private BatchSchema schema; - public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider fragProvider) { + public WireRecordBatch(FragmentContext context, RawFragmentBatchProvider fragProvider) throws OutOfMemoryException { this.fragProvider = fragProvider; this.context = context; - this.batchLoader = new RecordBatchLoader(context.getAllocator()); + this.batchLoader = new RecordBatchLoader(null); } @Override @@ -69,7 +63,7 @@ public class WireRecordBatch implements RecordBatch{ public void kill() { fragProvider.kill(context); } - + @Override public Iterator<VectorWrapper<?>> iterator() { return batchLoader.iterator(); @@ -101,15 +95,19 @@ public class WireRecordBatch implements RecordBatch{ RawFragmentBatch batch = fragProvider.getNext(); // skip over empty batches. we do this since these are basically control messages. - while(batch != null && batch.getHeader().getDef().getRecordCount() == 0){ + while(batch != null && !batch.getHeader().getIsOutOfMemory() && batch.getHeader().getDef().getRecordCount() == 0){ batch = fragProvider.getNext(); } - + if (batch == null){ batchLoader.clear(); return IterOutcome.NONE; } - + + if (batch.getHeader().getIsOutOfMemory()) { + return IterOutcome.OUT_OF_MEMORY; + } + // logger.debug("Next received batch {}", batch); @@ -136,7 +134,7 @@ public class WireRecordBatch implements RecordBatch{ @Override public void cleanup() { + fragProvider.cleanup(); } - - + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index a75aac9e7..ee5cfa842 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.ExpressionPosition; @@ -42,6 +43,7 @@ import org.apache.drill.exec.expr.HoldingContainerExpression; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.ValueVectorWriteExpression; import org.apache.drill.exec.expr.holders.IntHolder; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.HashAggregate; import org.apache.drill.exec.record.AbstractRecordBatch; @@ -85,7 +87,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { private final MappingSet UpdateAggrValuesMapping = new MappingSet("incomingRowIdx" /* read index */, "outRowIdx" /* write index */, "htRowIdx" /* workspace index */, "incoming" /* read container */, "outgoing" /* write container */, "aggrValuesContainer" /* workspace container */, UPDATE_AGGR_INSIDE, UPDATE_AGGR_OUTSIDE, UPDATE_AGGR_INSIDE); - public HashAggBatch(HashAggregate popConfig, RecordBatch incoming, FragmentContext context) { + public HashAggBatch(HashAggregate popConfig, RecordBatch incoming, FragmentContext context) throws ExecutionSetupException { super(popConfig, context); this.incoming = incoming; } @@ -197,7 +199,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { if(expr == null) continue; final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); - ValueVector vv = TypeHelper.getNewVector(outputField, context.getAllocator()); + ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator()); keyAllocators.add(VectorAllocator.getAllocator(vv, 50)); // add this group-by vector to the output container @@ -213,7 +215,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { if(expr == null) continue; final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); - ValueVector vv = TypeHelper.getNewVector(outputField, context.getAllocator()); + ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator()); valueAllocators.add(VectorAllocator.getAllocator(vv, 50)); aggrOutFieldIds[i] = container.add(vv); @@ -227,7 +229,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { container.buildSchema(SelectionVectorMode.NONE); HashAggregator agg = context.getImplementationClass(top); - agg.setup(popConfig, context, incoming, this, + agg.setup(popConfig, context, oContext.getAllocator(), incoming, this, aggrExprs, cgInner.getWorkspaceTypes(), groupByOutFieldIds, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index b0f81efd1..d7abcd224 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -35,6 +35,7 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.holders.IntHolder; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.HashAggregate; import org.apache.drill.exec.physical.impl.common.ChainedHashTable; @@ -58,6 +59,9 @@ import com.google.common.collect.Lists; public abstract class HashAggTemplate implements HashAggregator { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class); + + private static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024; + private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000; private static final boolean EXTRA_DEBUG_1 = false; private static final boolean EXTRA_DEBUG_2 = false; @@ -75,6 +79,7 @@ public abstract class HashAggTemplate implements HashAggregator { private VectorAllocator[] keyAllocators; private VectorAllocator[] valueAllocators; private FragmentContext context; + private BufferAllocator allocator; private HashAggregate hashAggrConfig; private HashTable htable; @@ -101,7 +106,7 @@ public abstract class HashAggTemplate implements HashAggregator { for(int i = 0; i < materializedValueFields.length; i++) { MaterializedField outputField = materializedValueFields[i]; // Create a type-specific ValueVector for this value - vector = TypeHelper.getNewVector(outputField, context.getAllocator()) ; + vector = TypeHelper.getNewVector(outputField, allocator) ; VectorAllocator.getAllocator(vector, 50 /* avg. width */).alloc(HashTable.BATCH_SIZE) ; aggrValuesContainer.add(vector) ; @@ -149,7 +154,7 @@ public abstract class HashAggTemplate implements HashAggregator { @Override - public void setup(HashAggregate hashAggrConfig, FragmentContext context, RecordBatch incoming, RecordBatch outgoing, + public void setup(HashAggregate hashAggrConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incoming, RecordBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] groupByOutFieldIds, @@ -164,6 +169,7 @@ public abstract class HashAggTemplate implements HashAggregator { } this.context = context; + this.allocator = allocator; this.incoming = incoming; this.schema = incoming.getSchema(); this.keyAllocators = keyAllocators; @@ -193,7 +199,7 @@ public abstract class HashAggTemplate implements HashAggregator { } } - ChainedHashTable ht = new ChainedHashTable(hashAggrConfig.getHtConfig(), context, incoming, null /* no incoming probe */, outgoing) ; + ChainedHashTable ht = new ChainedHashTable(hashAggrConfig.getHtConfig(), context, allocator, incoming, null /* no incoming probe */, outgoing) ; this.htable = ht.createAndSetupHashTable(groupByOutFieldIds) ; batchHolders = new ArrayList<BatchHolder>(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java index b23dbeec6..9032f2a51 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java @@ -25,6 +25,7 @@ import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.HashAggregate; import org.apache.drill.exec.record.RecordBatch; @@ -40,7 +41,7 @@ public interface HashAggregator { RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR } - public abstract void setup(HashAggregate hashAggrConfig, FragmentContext context, RecordBatch incoming, + public abstract void setup(HashAggregate hashAggrConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incoming, RecordBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index c942dc67d..88bada5ac 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -39,6 +39,8 @@ import org.apache.drill.exec.expr.HoldingContainerExpression; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.ValueVectorWriteExpression; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.StreamingAggregate; import org.apache.drill.exec.physical.impl.aggregate.StreamingAggregator.AggOutcome; @@ -64,7 +66,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { private final RecordBatch incoming; private boolean done = false; - public StreamingAggBatch(StreamingAggregate popConfig, RecordBatch incoming, FragmentContext context) { + public StreamingAggBatch(StreamingAggregate popConfig, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException { super(popConfig, context); this.incoming = incoming; } @@ -105,7 +107,6 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { logger.debug("Aggregator response {}, records {}", out, aggregator.getOutputCount()); switch(out){ case CLEANUP_AND_RETURN: - incoming.cleanup(); container.clear(); done = true; return aggregator.getOutcome(); @@ -165,7 +166,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { if(expr == null) continue; keyExprs[i] = expr; final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); - ValueVector vector = TypeHelper.getNewVector(outputField, context.getAllocator()); + ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); allocators.add(VectorAllocator.getAllocator(vector, 50)); keyOutputIds[i] = container.add(vector); } @@ -176,7 +177,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { if(expr == null) continue; final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); - ValueVector vector = TypeHelper.getNewVector(outputField, context.getAllocator()); + ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); allocators.add(VectorAllocator.getAllocator(vector, 50)); TypedFieldId id = container.add(vector); valueExprs[i] = new ValueVectorWriteExpression(id, expr, true); @@ -315,7 +316,11 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { } - + @Override + public void cleanup() { + super.cleanup(); + incoming.cleanup(); + } @Override protected void killIncoming() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java index 4b9e3adda..0a0158368 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java @@ -128,6 +128,6 @@ public class BroadcastSenderRootExec implements RootExec { @Override public void stop() { ok = false; - incoming.kill(); + incoming.cleanup(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java index ec579fc5b..e1179d05a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java @@ -41,6 +41,7 @@ import org.apache.drill.exec.expr.ValueVectorReadExpression; import org.apache.drill.exec.expr.ValueVectorWriteExpression; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; import org.apache.drill.exec.expr.fn.impl.BitFunctions; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; @@ -93,18 +94,21 @@ public class ChainedHashTable { private HashTableConfig htConfig; private final FragmentContext context; + private final BufferAllocator allocator; private final RecordBatch incomingBuild; private final RecordBatch incomingProbe; private final RecordBatch outgoing; public ChainedHashTable(HashTableConfig htConfig, FragmentContext context, + BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing) { this.htConfig = htConfig; this.context = context; + this.allocator = allocator; this.incomingBuild = incomingBuild; this.incomingProbe = incomingProbe; this.outgoing = outgoing; @@ -136,7 +140,7 @@ public class ChainedHashTable { final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); // create a type-specific ValueVector for this key - ValueVector vv = TypeHelper.getNewVector(outputField, context.getAllocator()); + ValueVector vv = TypeHelper.getNewVector(outputField, allocator); VectorAllocator.getAllocator(vv, 50 /* avg width */).alloc(HashTable.BATCH_SIZE); htKeyFieldIds[i] = htContainerOrig.add(vv); @@ -171,7 +175,7 @@ public class ChainedHashTable { setupGetHash(cg /* use top level code generator for getHash */, GetHashIncomingProbeMapping, keyExprsProbe); HashTable ht = context.getImplementationClass(top); - ht.setup(htConfig, context, incomingBuild, incomingProbe, outgoing, htContainerOrig); + ht.setup(htConfig, context, allocator, incomingBuild, incomingProbe, outgoing, htContainerOrig); return ht; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java index 2f1172a6c..e5959f2ec 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.common; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.expr.holders.IntHolder; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorContainer; @@ -43,7 +44,7 @@ public interface HashTable { static final public int BATCH_SIZE = Character.MAX_VALUE+1; static final public int BATCH_MASK = 0x0000FFFF; - public void setup(HashTableConfig htConfig, FragmentContext context, + public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java index f67939e14..23a0cf5b6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java @@ -29,6 +29,7 @@ import org.apache.drill.common.logical.data.NamedExpression; import org.apache.drill.common.types.Types; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.VectorContainer; @@ -76,6 +77,8 @@ public abstract class HashTableTemplate implements HashTable { private FragmentContext context; + private BufferAllocator allocator; + // The incoming build side record batch private RecordBatch incomingBuild; @@ -119,7 +122,7 @@ public abstract class HashTableTemplate implements HashTable { } else { // otherwise create a new one using the original's fields htContainer = new VectorContainer(); for (VectorWrapper<?> w : htContainerOrig) { - ValueVector vv = TypeHelper.getNewVector(w.getField(), context.getAllocator()); + ValueVector vv = TypeHelper.getNewVector(w.getField(), allocator); VectorAllocator.getAllocator(vv, 50 /* avg width */).alloc(HashTable.BATCH_SIZE); htContainer.add(vv); } @@ -131,10 +134,10 @@ public abstract class HashTableTemplate implements HashTable { private void init(IntVector links, IntVector hashValues, int size) { for (int i=0; i < size; i++) { - links.getMutator().set(i, EMPTY_SLOT); + links.getMutator().setSafe(i, EMPTY_SLOT); } for (int i=0; i < size; i++) { - hashValues.getMutator().set(i, 0); + hashValues.getMutator().setSafe(i, 0); } links.getMutator().setValueCount(size); hashValues.getMutator().setValueCount(size); @@ -181,8 +184,8 @@ public abstract class HashTableTemplate implements HashTable { // since this is the last entry in the hash chain, the links array at position currentIdx // will point to a null (empty) slot - links.getMutator().set(currentIdxWithinBatch, EMPTY_SLOT); - hashValues.getMutator().set(currentIdxWithinBatch, hashValue); + links.getMutator().setSafe(currentIdxWithinBatch, EMPTY_SLOT); + hashValues.getMutator().setSafe(currentIdxWithinBatch, hashValue); maxOccupiedIdx = Math.max(maxOccupiedIdx, currentIdxWithinBatch); @@ -192,7 +195,7 @@ public abstract class HashTableTemplate implements HashTable { } private void updateLinks(int lastEntryIdxWithinBatch, int currentIdx) { - links.getMutator().set(lastEntryIdxWithinBatch, currentIdx); + links.getMutator().setSafe(lastEntryIdxWithinBatch, currentIdx); } private void rehash(int numbuckets, IntVector newStartIndices, int batchStartIdx) { @@ -211,9 +214,9 @@ public abstract class HashTableTemplate implements HashTable { int newStartIdx = newStartIndices.getAccessor().get(bucketIdx); if (newStartIdx == EMPTY_SLOT) { // new bucket was empty - newStartIndices.getMutator().set(bucketIdx, entryIdx); // update the start index to point to entry - newLinks.getMutator().set(entryIdxWithinBatch, EMPTY_SLOT); - newHashValues.getMutator().set(entryIdxWithinBatch, hash); + newStartIndices.getMutator().setSafe(bucketIdx, entryIdx); // update the start index to point to entry + newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT); + newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash); if (EXTRA_DEBUG) logger.debug("New bucket was empty. bucketIdx = {}, newStartIndices[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, bucketIdx, newStartIndices.getAccessor().get(bucketIdx), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch)); @@ -224,9 +227,9 @@ public abstract class HashTableTemplate implements HashTable { while (true) { idxWithinBatch = idx & BATCH_MASK; if (newLinks.getAccessor().get(idxWithinBatch) == EMPTY_SLOT) { - newLinks.getMutator().set(idxWithinBatch, entryIdx); - newLinks.getMutator().set(entryIdxWithinBatch, EMPTY_SLOT); - newHashValues.getMutator().set(entryIdxWithinBatch, hash); + newLinks.getMutator().setSafe(idxWithinBatch, entryIdx); + newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT); + newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash); if (EXTRA_DEBUG) logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, idxWithinBatch, newLinks.getAccessor().get(idxWithinBatch), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch)); @@ -319,7 +322,7 @@ public abstract class HashTableTemplate implements HashTable { @Override - public void setup(HashTableConfig htConfig, FragmentContext context, + public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig) { float loadf = htConfig.getLoadFactor(); @@ -333,6 +336,7 @@ public abstract class HashTableTemplate implements HashTable { this.htConfig = htConfig; this.context = context; + this.allocator = allocator; this.incomingBuild = incomingBuild; this.incomingProbe = incomingProbe; this.outgoing = outgoing; @@ -419,7 +423,7 @@ public abstract class HashTableTemplate implements HashTable { if (insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, lastEntryIdxWithinBatch)) { // update the start index array - startIndices.getMutator().set(getBucketIndex(hash, numBuckets()), currentIdx); + startIndices.getMutator().setSafe(getBucketIndex(hash, numBuckets()), currentIdx); htIdxHolder.value = currentIdx; return PutStatus.KEY_ADDED; } @@ -600,10 +604,10 @@ public abstract class HashTableTemplate implements HashTable { } private IntVector allocMetadataVector(int size, int initialValue) { - IntVector vector = (IntVector) TypeHelper.getNewVector(dummyIntField, context.getAllocator()); + IntVector vector = (IntVector) TypeHelper.getNewVector(dummyIntField, allocator); vector.allocateNew(size); for (int i=0; i < size; i++) { - vector.getMutator().set(i, initialValue); + vector.getMutator().setSafe(i, initialValue); } vector.getMutator().setValueCount(size); return vector; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java index 5f2bc4dbd..c5c81c6e7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.filter; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.Filter; import org.apache.drill.exec.physical.impl.BatchCreator; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java index 1cd418c2d..566dfe0aa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java @@ -30,6 +30,7 @@ import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.Filter; import org.apache.drill.exec.record.*; @@ -50,7 +51,7 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ private BufferAllocator.PreAllocator svAllocator; private Filterer filter; - public FilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext context) { + public FilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException { super(pop, context, incoming); } @@ -78,17 +79,18 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ protected void doWork() { int recordCount = incoming.getRecordCount(); filter.filterBatch(recordCount); - for(VectorWrapper<?> v : container){ - ValueVector.Mutator m = v.getValueVector().getMutator(); - m.setValueCount(recordCount); - } +// for(VectorWrapper<?> v : container){ +// ValueVector.Mutator m = v.getValueVector().getMutator(); +// m.setValueCount(recordCount); +// } } @Override public void cleanup() { - super.cleanup(); if(sv2 != null) sv2.clear(); + if(sv4 != null) sv4.clear(); + super.cleanup(); } @Override @@ -97,16 +99,16 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ switch(incoming.getSchema().getSelectionVectorMode()){ case NONE: - sv2 = new SelectionVector2(context.getAllocator()); + sv2 = new SelectionVector2(oContext.getAllocator()); this.filter = generateSV2Filterer(); break; case TWO_BYTE: - sv2 = new SelectionVector2(context.getAllocator()); + sv2 = new SelectionVector2(oContext.getAllocator()); this.filter = generateSV2Filterer(); break; case FOUR_BYTE: // set up the multi-batch selection vector - this.svAllocator = context.getAllocator().getNewPreAllocator(); + this.svAllocator = oContext.getAllocator().getNewPreAllocator(); if (!svAllocator.preAllocate(incoming.getRecordCount()*4)) throw new SchemaChangeException("Attempted to filter an SV4 which exceeds allowed memory (" + incoming.getRecordCount() * 4 + " bytes)"); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 5f0cc94d7..b624b3082 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -20,6 +20,8 @@ package org.apache.drill.exec.physical.impl.join; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.logical.data.JoinCondition; import org.apache.drill.common.logical.data.NamedExpression; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.record.*; import org.eigenbase.rel.JoinRelType; @@ -50,6 +52,10 @@ import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.allocator.VectorAllocator; public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { + + public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024; + public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000; + // Probe side record batch private final RecordBatch left; @@ -136,7 +142,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { if (hashJoinProbe == null) { // Initialize the hash join helper context - hjHelper = new HashJoinHelper(context); + hjHelper = new HashJoinHelper(context, oContext.getAllocator()); /* Build phase requires setting up the hash table. Hash table will * materialize both the build and probe side expressions while @@ -185,13 +191,11 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { } // No more output records, clean up and return - cleanup(); return IterOutcome.NONE; } catch (ClassTransformationException | SchemaChangeException | IOException e) { context.fail(e); killIncoming(); - cleanup(); return IterOutcome.STOP; } } @@ -222,7 +226,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { // Create the chained hash table - ChainedHashTable ht = new ChainedHashTable(htConfig, context, this.right, this.left, null); + ChainedHashTable ht = new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null); hashTable = ht.createAndSetupHashTable(null); } @@ -322,15 +326,16 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), fieldId, true)); JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), fieldId, false)); - - g.getEvalBlock().add(outVV.invoke("copyFrom") - .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE))) - .arg(outIndex) - .arg(inVV.component(buildIndex.shrz(JExpr.lit(16))))); + g.getEvalBlock()._if(outVV.invoke("copyFromSafe") + .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE))) + .arg(outIndex) + .arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))).not())._then()._return(JExpr.FALSE); fieldId++; } } + g.rotateBlock(); + g.getEvalBlock()._return(JExpr.TRUE); // Generate the code to project probe side records g.setMappingSet(projectProbeMapping); @@ -350,14 +355,18 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(vv.getField().getType(), fieldId, false)); JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(vv.getField().getType(), outputFieldId, false)); - g.getEvalBlock().add(outVV.invoke("copyFrom").arg(probeIndex).arg(outIndex).arg(inVV)); + g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE); fieldId++; outputFieldId++; } + g.rotateBlock(); + g.getEvalBlock()._return(JExpr.TRUE); + recordCount = left.getRecordCount(); } + HashJoinProbe hj = context.getImplementationClass(cg); hj.setupHashJoinProbe(context, hyperContainer, left, recordCount, this, hashTable, hjHelper, joinType); @@ -370,7 +379,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { } } - public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) { + public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException { super(popConfig, context); this.left = left; this.right = right; @@ -382,13 +391,11 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { public void killIncoming() { this.left.kill(); this.right.kill(); - cleanup(); } @Override public void cleanup() { - left.cleanup(); - right.cleanup(); + hyperContainer.clear(); hjHelper.clear(); container.clear(); @@ -398,5 +405,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { hashTable.clear(); } super.cleanup(); + left.cleanup(); + right.cleanup(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java index 0728ac95e..b1ed07ee1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinHelper.java @@ -25,6 +25,7 @@ import java.util.List; import io.netty.buffer.ByteBuf; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.physical.impl.common.HashTable; @@ -60,6 +61,7 @@ public class HashJoinHelper { // Fragment context FragmentContext context; + BufferAllocator allocator; // Constant to indicate index is empty. static final int INDEX_EMPTY = -1; @@ -67,8 +69,9 @@ public class HashJoinHelper { // bits to shift while obtaining batch index from SV4 static final int SHIFT_SIZE = 16; - public HashJoinHelper(FragmentContext context) { + public HashJoinHelper(FragmentContext context, BufferAllocator allocator) { this.context = context; + this.allocator = allocator; } public void addStartIndexBatch() throws SchemaChangeException { @@ -102,7 +105,7 @@ public class HashJoinHelper { public SelectionVector4 getNewSV4(int recordCount) throws SchemaChangeException { - ByteBuf vector = context.getAllocator().buffer((recordCount * 4)); + ByteBuf vector = allocator.buffer((recordCount * 4)); SelectionVector4 sv4 = new SelectionVector4(vector, recordCount, recordCount); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java index 0ffdf52a1..160d35285 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java @@ -52,6 +52,6 @@ public interface HashJoinProbe { JoinRelType joinRelType); public abstract void doSetup(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, RecordBatch outgoing); public abstract int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException; - public abstract void projectBuildRecord(int buildIndex, int outIndex); - public abstract void projectProbeRecord(int probeIndex, int outIndex); + public abstract boolean projectBuildRecord(int buildIndex, int outIndex); + public abstract boolean projectProbeRecord(int probeIndex, int outIndex); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java index 3d6f4d6e6..0abf678a8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java @@ -94,7 +94,8 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { public void executeProjectRightPhase() { while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsProcessed < recordsToProcess) { - projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed++), outputRecords++); + boolean success = projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed++), outputRecords++); + assert success; } } @@ -146,8 +147,10 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { */ hjHelper.setRecordMatched(currentCompositeIdx); - projectBuildRecord(currentCompositeIdx, outputRecords); - projectProbeRecord(recordsProcessed, outputRecords); + boolean success = projectBuildRecord(currentCompositeIdx, outputRecords); + assert success; + success = projectProbeRecord(recordsProcessed, outputRecords); + assert success; outputRecords++; /* Projected single row from the build side with matching key but there @@ -179,7 +182,8 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { } else { hjHelper.setRecordMatched(currentCompositeIdx); - projectBuildRecord(currentCompositeIdx, outputRecords); + boolean success = projectBuildRecord(currentCompositeIdx, outputRecords); + assert success; projectProbeRecord(recordsProcessed, outputRecords); outputRecords++; @@ -221,6 +225,6 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { public abstract void doSetup(@Named("context") FragmentContext context, @Named("buildBatch") VectorContainer buildBatch, @Named("probeBatch") RecordBatch probeBatch, @Named("outgoing") RecordBatch outgoing); - public abstract void projectBuildRecord(@Named("buildIndex") int buildIndex, @Named("outIndex") int outIndex); - public abstract void projectProbeRecord(@Named("probeIndex") int probeIndex, @Named("outIndex") int outIndex); + public abstract boolean projectBuildRecord(@Named("buildIndex") int buildIndex, @Named("outIndex") int outIndex); + public abstract boolean projectProbeRecord(@Named("probeIndex") int probeIndex, @Named("outIndex") int outIndex); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index bbdfbe5db..db90085de 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -38,6 +38,8 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.MergeJoinPOP; import org.apache.drill.exec.physical.impl.join.JoinWorker.JoinOutcome; @@ -64,6 +66,9 @@ import com.sun.codemodel.JVar; public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinBatch.class); + + public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024; + public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000; public final MappingSet setupMapping = new MappingSet("null", "null", @@ -103,9 +108,9 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { private JoinWorker worker; public MergeJoinBatchBuilder batchBuilder; - protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) { + protected MergeJoinBatch(MergeJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException { super(popConfig, context); - + if (popConfig.getConditions().size() == 0) { throw new UnsupportedOperationException("Merge Join currently does not support cartesian join. This join operator was configured with 0 conditions"); } @@ -113,7 +118,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { this.right = right; this.joinType = popConfig.getJoinType(); this.status = new JoinStatus(left, right, this); - this.batchBuilder = new MergeJoinBatchBuilder(context, status); + this.batchBuilder = new MergeJoinBatchBuilder(oContext.getAllocator(), status); this.conditions = popConfig.getConditions(); } @@ -204,7 +209,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } public void resetBatchBuilder() { - batchBuilder = new MergeJoinBatchBuilder(context, status); + batchBuilder = new MergeJoinBatchBuilder(oContext.getAllocator(), status); } public void addRightToBatchBuilder() { @@ -384,7 +389,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { // add fields from both batches if (leftCount > 0) { for (VectorWrapper<?> w : left) { - ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), context.getAllocator()); + ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), oContext.getAllocator()); VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVector().getBufferSize() / left.getRecordCount())).alloc(joinBatchSize); container.add(outgoingVector); } @@ -392,7 +397,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { if (rightCount > 0) { for (VectorWrapper<?> w : right) { - ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), context.getAllocator()); + ValueVector outgoingVector = TypeHelper.getNewVector(w.getField(), oContext.getAllocator()); VectorAllocator.getAllocator(outgoingVector, (int) Math.ceil(w.getValueVector().getBufferSize() / right.getRecordCount())).alloc(joinBatchSize); container.add(outgoingVector); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java index f665c1f61..a75437ce3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatchBuilder.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.join; import com.google.common.collect.ArrayListMultimap; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.BufferAllocator.PreAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.sort.RecordBatchData; @@ -38,10 +39,10 @@ public class MergeJoinBatchBuilder { private PreAllocator svAllocator; private JoinStatus status; - public MergeJoinBatchBuilder(FragmentContext context, JoinStatus status) { + public MergeJoinBatchBuilder(BufferAllocator allocator, JoinStatus status) { this.container = new VectorContainer(); this.status = status; - this.svAllocator = context.getAllocator().getNewPreAllocator(); + this.svAllocator = allocator.getNewPreAllocator(); } public boolean add(RecordBatch batch) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java index dcd452eb8..3f2ec27fe 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.limit; import com.beust.jcommander.internal.Lists; import com.google.common.base.Objects; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.Limit; import org.apache.drill.exec.record.*; @@ -38,9 +39,9 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { private boolean skipBatch; List<TransferPair> transfers = Lists.newArrayList(); - public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch incoming) { + public LimitRecordBatch(Limit popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { super(popConfig, context, incoming); - outgoingSv = new SelectionVector2(context.getAllocator()); + outgoingSv = new SelectionVector2(oContext.getAllocator()); recordsToSkip = popConfig.getFirst(); noEndLimit = popConfig.getLast() == null; if(!noEndLimit) { @@ -79,8 +80,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { @Override public IterOutcome next() { if(!noEndLimit && recordsLeft <= 0) { - killIncoming(); - cleanup(); + // don't kill incoming batches or call cleanup yet, as this could close allocators before the buffers have been cleared return IterOutcome.NONE; } @@ -160,7 +160,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { @Override public void cleanup(){ - super.cleanup(); outgoingSv.clear(); + super.cleanup(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java index 9096018ad..46a156f97 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverGeneratorBase.java @@ -35,7 +35,7 @@ public interface MergingReceiverGeneratorBase { public abstract int doCompare(MergingRecordBatch.Node left, MergingRecordBatch.Node right); - public abstract void doCopy(int inBatch, int inIndex, int outIndex); + public abstract boolean doCopy(int inBatch, int inIndex, int outIndex); public static TemplateClassDefinition<MergingReceiverGeneratorBase> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(MergingReceiverGeneratorBase.class, MergingReceiverTemplate.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java index 6945b4dac..197e96039 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingReceiverTemplate.java @@ -55,7 +55,7 @@ public abstract class MergingReceiverTemplate implements MergingReceiverGenerato * @param inIndex incoming record position to copy from * @param outIndex outgoing record position to copy to */ - public abstract void doCopy(@Named("inBatch") int inBatch, @Named("inIndex") int inIndex, @Named("outIndex") int outIndex); + public abstract boolean doCopy(@Named("inBatch") int inBatch, @Named("inIndex") int inIndex, @Named("outIndex") int outIndex); // public abstract void doEval(@Named("inBatch") int inBatch, @Named("inIndex") int inIndex, @Named("outIndex") int outIndex); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index dcfe02f09..a2c424fe2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -41,19 +41,12 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.ValueVectorReadExpression; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.MergingReceiverPOP; import org.apache.drill.exec.proto.UserBitShared; -import org.apache.drill.exec.record.BatchSchema; -import org.apache.drill.exec.record.RawFragmentBatch; -import org.apache.drill.exec.record.RawFragmentBatchProvider; -import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.record.RecordBatchLoader; -import org.apache.drill.exec.record.SchemaBuilder; -import org.apache.drill.exec.record.TypedFieldId; -import org.apache.drill.exec.record.VectorContainer; -import org.apache.drill.exec.record.VectorWrapper; -import org.apache.drill.exec.record.WritableBatch; +import org.apache.drill.exec.record.*; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.ValueVector; @@ -74,9 +67,12 @@ import com.sun.codemodel.JVar; /** * The MergingRecordBatch merges pre-sorted record batches from remote senders. */ -public class MergingRecordBatch implements RecordBatch { +public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> implements RecordBatch { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergingRecordBatch.class); + private static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024; + private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000; + private RecordBatchLoader[] batchLoaders; private RawFragmentBatchProvider[] fragProviders; private FragmentContext context; @@ -98,8 +94,9 @@ public class MergingRecordBatch implements RecordBatch { public MergingRecordBatch(FragmentContext context, MergingReceiverPOP config, - RawFragmentBatchProvider[] fragProviders) { + RawFragmentBatchProvider[] fragProviders) throws OutOfMemoryException { + super(config, context); this.fragProviders = fragProviders; this.context = context; this.config = config; @@ -154,7 +151,7 @@ public class MergingRecordBatch implements RecordBatch { batchLoaders = new RecordBatchLoader[senderCount]; for (int i = 0; i < senderCount; ++i) { incomingBatches[i] = rawBatches.get(i); - batchLoaders[i] = new RecordBatchLoader(context.getAllocator()); + batchLoaders[i] = new RecordBatchLoader(oContext.getAllocator()); } int i = 0; @@ -182,7 +179,7 @@ public class MergingRecordBatch implements RecordBatch { bldr.addField(v.getField()); // allocate a new value vector - ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), context.getAllocator()); + ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), oContext.getAllocator()); VectorAllocator allocator = VectorAllocator.getAllocator(outgoingVector, 50); allocator.alloc(DEFAULT_ALLOC_RECORD_COUNT); allocators.add(allocator); @@ -226,7 +223,10 @@ public class MergingRecordBatch implements RecordBatch { while (!pqueue.isEmpty()) { // pop next value from pq and copy to outgoing batch Node node = pqueue.peek(); - copyRecordToOutgoingBatch(pqueue.poll()); + if (!copyRecordToOutgoingBatch(node)) { + break; + } + pqueue.poll(); if (isOutgoingFull()) { // set a flag so that we reallocate on the next iteration @@ -320,10 +320,15 @@ public class MergingRecordBatch implements RecordBatch { @Override public void kill() { + cleanup(); for (RawFragmentBatchProvider provider : fragProviders) { provider.kill(context); } - + } + + @Override + protected void killIncoming() { + //No op } @Override @@ -593,17 +598,19 @@ public class MergingRecordBatch implements RecordBatch { // ((IntVector) outgoingVectors[i]).copyFrom(inIndex, // outgoingBatch.getRecordCount(), // (IntVector) vv1); - cg.getEvalBlock().add( + cg.getEvalBlock()._if( ((JExpression) JExpr.cast(vvClass, outgoingVectors.component(JExpr.lit(fieldIdx)))) - .invoke("copyFrom") + .invoke("copyFromSafe") .arg(inIndex) .arg(outIndex) .arg(JExpr.cast(vvClass, ((JExpression) incomingVectors.component(JExpr.direct("inBatch"))) - .component(JExpr.lit(fieldIdx))))); + .component(JExpr.lit(fieldIdx)))).not())._then()._return(JExpr.FALSE); ++fieldIdx; } + cg.rotateBlock(); + cg.getEvalBlock()._return(JExpr.TRUE); // compile generated code and call the generated setup method MergingReceiverGeneratorBase newMerger; @@ -618,12 +625,17 @@ public class MergingRecordBatch implements RecordBatch { /** * Copy the record referenced by the supplied node to the next output position. - * Side Effect: increments outgoing position + * Side Effect: increments outgoing position if successful * * @param node Reference to the next record to copy from the incoming batches */ - private void copyRecordToOutgoingBatch(Node node) { - merger.doCopy(node.batchId, node.valueIndex, outgoingPosition++); + private boolean copyRecordToOutgoingBatch(Node node) { + if (!merger.doCopy(node.batchId, node.valueIndex, outgoingPosition)) { + return false; + } else { + outgoingPosition++; + return true; + } } /** @@ -647,6 +659,12 @@ public class MergingRecordBatch implements RecordBatch { rbl.clear(); } } + oContext.close(); + if (fragProviders != null) { + for (RawFragmentBatchProvider f : fragProviders) { + f.cleanup(); + } + } } }
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java index 8563d1c94..dd7011afb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionProjectorTemplate.java @@ -64,7 +64,9 @@ public abstract class OrderedPartitionProjectorTemplate implements OrderedPartit int counter = 0; for (int i = 0; i < countN; i++, firstOutputIndex++) { int partition = getPartition(i); - partitionValues.getMutator().set(i, partition); + if (!partitionValues.getMutator().setSafe(i, partition)) { + throw new RuntimeException(); + } counter++; } for(TransferPair t : transfers){ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java index 36428cea9..4641de6e1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java @@ -41,12 +41,15 @@ import org.apache.drill.exec.compile.sig.MappingSet; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.ValueVectorReadExpression; import org.apache.drill.exec.expr.ValueVectorWriteExpression; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.OrderedPartitionSender; import org.apache.drill.exec.physical.impl.sort.SortBatch; @@ -84,6 +87,9 @@ import com.sun.codemodel.JExpr; public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPartitionSender> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrderedPartitionRecordBatch.class); + private static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024; + private static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000; + public final MappingSet mainMapping = new MappingSet( (String) null, null, ClassGenerator.DEFAULT_CONSTANT_MAP, ClassGenerator.DEFAULT_SCALAR_MAP); public final MappingSet incomingMapping = new MappingSet("inIndex", null, "incoming", null, @@ -115,7 +121,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart private final String mapKey; private List<VectorContainer> sampledIncomingBatches; - public OrderedPartitionRecordBatch(OrderedPartitionSender pop, RecordBatch incoming, FragmentContext context) { + public OrderedPartitionRecordBatch(OrderedPartitionSender pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException { super(pop, context); this.incoming = incoming; this.partitions = pop.getDestinations().size(); @@ -134,13 +140,14 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart SchemaPath outputPath = popConfig.getRef(); MaterializedField outputField = MaterializedField.create(outputPath, Types.required(TypeProtos.MinorType.INT)); - this.partitionKeyVector = (IntVector) TypeHelper.getNewVector(outputField, context.getAllocator()); + this.partitionKeyVector = (IntVector) TypeHelper.getNewVector(outputField, oContext.getAllocator()); } @Override public void cleanup() { + incoming.cleanup(); super.cleanup(); this.partitionVectors.clear(); this.partitionKeyVector.clear(); @@ -153,7 +160,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart // Start collecting batches until recordsToSample records have been collected - SortRecordBatchBuilder builder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES); + SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES); builder.add(incoming); recordsSampled += incoming.getRecordCount(); @@ -190,9 +197,20 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart // popConfig.orderings. VectorContainer containerToCache = new VectorContainer(); - SampleCopier copier = getCopier(sv4, sortedSamples, containerToCache, popConfig.getOrderings()); - copier.copyRecords(recordsSampled / (samplingFactor * partitions), 0, samplingFactor * partitions); - + List<ValueVector> localAllocationVectors = Lists.newArrayList(); + SampleCopier copier = getCopier(sv4, sortedSamples, containerToCache, popConfig.getOrderings(), localAllocationVectors); + int allocationSize = 50; + while (true) { + for (ValueVector vv : localAllocationVectors) { + AllocationHelper.allocate(vv, samplingFactor * partitions, allocationSize); + } + if (copier.copyRecords(recordsSampled / (samplingFactor * partitions), 0, samplingFactor * partitions)) { + break; + } else { + containerToCache.zeroVectors(); + allocationSize *= 2; + } + } for (VectorWrapper<?> vw : containerToCache) { vw.getValueVector().getMutator().setValueCount(copier.getOutputRecords()); } @@ -202,7 +220,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart // into a serializable wrapper object, and then add to distributed map WritableBatch batch = WritableBatch.getBatchNoHVWrap(containerToCache.getRecordCount(), containerToCache, false); - VectorAccessibleSerializable sampleToSave = new VectorAccessibleSerializable(batch, context.getAllocator()); + VectorAccessibleSerializable sampleToSave = new VectorAccessibleSerializable(batch, oContext.getAllocator()); mmap.put(mapKey, sampleToSave); this.sampledIncomingBatches = builder.getHeldRecordBatches(); @@ -283,7 +301,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart // Get all samples from distributed map - SortRecordBatchBuilder containerBuilder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES); + SortRecordBatchBuilder containerBuilder = new SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES); for (VectorAccessibleSerializable w : mmap.get(mapKey)) { containerBuilder.add(w.get()); } @@ -306,12 +324,25 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart // Copy every Nth record from the samples into a candidate partition table, where N = totalSampledRecords/partitions // Attempt to push this to the distributed map. Only the first candidate to get pushed will be used. VectorContainer candidatePartitionTable = new VectorContainer(); - SampleCopier copier = getCopier(newSv4, allSamplesContainer, candidatePartitionTable, orderDefs); - int skipRecords = containerBuilder.getSv4().getTotalCount() / partitions; - copier.copyRecords(skipRecords, skipRecords, partitions - 1); - assert copier.getOutputRecords() == partitions - 1 : String.format("output records: %d partitions: %d", copier.getOutputRecords(), partitions); - for (VectorWrapper<?> vw : candidatePartitionTable) { - vw.getValueVector().getMutator().setValueCount(copier.getOutputRecords()); + SampleCopier copier = null; + List<ValueVector> localAllocationVectors = Lists.newArrayList(); + copier = getCopier(newSv4, allSamplesContainer, candidatePartitionTable, orderDefs, localAllocationVectors); + int allocationSize = 50; + while (true) { + for (ValueVector vv : localAllocationVectors) { + AllocationHelper.allocate(vv, samplingFactor * partitions, allocationSize); + } + int skipRecords = containerBuilder.getSv4().getTotalCount() / partitions; + if (copier.copyRecords(skipRecords, skipRecords, partitions - 1)) { + assert copier.getOutputRecords() == partitions - 1 : String.format("output records: %d partitions: %d", copier.getOutputRecords(), partitions); + for (VectorWrapper<?> vw : candidatePartitionTable) { + vw.getValueVector().getMutator().setValueCount(copier.getOutputRecords()); + } + break; + } else { + candidatePartitionTable.zeroVectors(); + allocationSize *= 2; + } } candidatePartitionTable.setRecordCount(copier.getOutputRecords()); WritableBatch batch = WritableBatch.getBatchNoHVWrap(candidatePartitionTable.getRecordCount(), candidatePartitionTable, false); @@ -339,8 +370,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart * @throws SchemaChangeException */ private SampleCopier getCopier(SelectionVector4 sv4, VectorContainer incoming, VectorContainer outgoing, - List<Ordering> orderings) throws SchemaChangeException { - List<ValueVector> localAllocationVectors = Lists.newArrayList(); + List<Ordering> orderings, List<ValueVector> localAllocationVectors) throws SchemaChangeException { final ErrorCollector collector = new ErrorCollectorImpl(); final ClassGenerator<SampleCopier> cg = CodeGenerator.getRoot(SampleCopier.TEMPLATE_DEFINITION, context.getFunctionRegistry()); @@ -358,16 +388,15 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart "Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); } - ValueVector vector = TypeHelper.getNewVector(outputField, context.getAllocator()); + ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); localAllocationVectors.add(vector); TypedFieldId fid = outgoing.add(vector); - ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr); - cg.addExpr(write); - logger.debug("Added eval."); - } - for (ValueVector vv : localAllocationVectors) { - AllocationHelper.allocate(vv, samplingFactor * partitions, 50); + ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true); + HoldingContainer hc = cg.addExpr(write); + cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE); } + cg.rotateBlock(); + cg.getEvalBlock()._return(JExpr.TRUE); outgoing.buildSchema(BatchSchema.SelectionVectorMode.NONE); try { SampleCopier sampleCopier = context.getImplementationClass(cg); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopier.java index ddb605bde..3af35723f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopier.java @@ -27,7 +27,7 @@ public interface SampleCopier { public static TemplateClassDefinition<SampleCopier> TEMPLATE_DEFINITION = new TemplateClassDefinition<SampleCopier>(SampleCopier.class, SampleCopierTemplate.class); public void setupCopier(FragmentContext context, SelectionVector4 sv4, VectorAccessible incoming, VectorAccessible outgoing) throws SchemaChangeException; - public abstract void copyRecords(int skip, int start, int total); + public abstract boolean copyRecords(int skip, int start, int total); public int getOutputRecords(); }
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopierTemplate.java index ddb56c1f7..73fcd1fd7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopierTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/SampleCopierTemplate.java @@ -44,19 +44,22 @@ public abstract class SampleCopierTemplate implements SampleCopier { @Override - public void copyRecords(int skip, int start, int total) { + public boolean copyRecords(int skip, int start, int total) { final int recordCount = sv4.getCount(); int outgoingPosition = 0; int increment = skip > 0 ? skip : 1; for(int svIndex = start; svIndex < sv4.getCount() && outputRecords < total; svIndex += increment, outgoingPosition++){ int deRefIndex = sv4.get(svIndex); - doEval(deRefIndex, outgoingPosition); + if (!doEval(deRefIndex, outgoingPosition)) { + return false; + } outputRecords++; } + return true; } public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing); - public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); + public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java index 94fd38531..6e115a7fc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/OutgoingRecordBatch.java @@ -24,6 +24,7 @@ import java.util.Iterator; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.SendingAccountor; @@ -42,6 +43,7 @@ import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.rpc.BaseRpcOutcomeListener; import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.util.VectorUtil; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.allocator.VectorAllocator; import org.apache.drill.exec.work.ErrorHelper; @@ -60,6 +62,7 @@ public class OutgoingRecordBatch implements VectorAccessible { private final HashPartitionSender operator; private final RecordBatch incoming; private final FragmentContext context; + private final BufferAllocator allocator; private final VectorContainer vectorContainer = new VectorContainer(); private final SendingAccountor sendCount; private final int oppositeMinorFragmentId; @@ -72,9 +75,11 @@ public class OutgoingRecordBatch implements VectorAccessible { private static int DEFAULT_ALLOC_SIZE = 20000; private static int DEFAULT_VARIABLE_WIDTH_SIZE = 2048; - public OutgoingRecordBatch(SendingAccountor sendCount, HashPartitionSender operator, DataTunnel tunnel, RecordBatch incoming, FragmentContext context, int oppositeMinorFragmentId) { + public OutgoingRecordBatch(SendingAccountor sendCount, HashPartitionSender operator, DataTunnel tunnel, RecordBatch incoming, + FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId) { this.incoming = incoming; this.context = context; + this.allocator = allocator; this.operator = operator; this.tunnel = tunnel; this.sendCount = sendCount; @@ -111,6 +116,7 @@ public class OutgoingRecordBatch implements VectorAccessible { w.getValueVector().getMutator().setValueCount(recordCount); } + // BatchPrinter.printBatch(vectorContainer); FragmentWritableBatch writableBatch = new FragmentWritableBatch(isLast, @@ -170,7 +176,7 @@ public class OutgoingRecordBatch implements VectorAccessible { bldr.addField(v.getField()); // allocate a new value vector - ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), context.getAllocator()); + ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), allocator); VectorAllocator.getAllocator(outgoingVector, 100).alloc(recordCapacity); vectorContainer.add(outgoingVector); // logger.debug("Reallocating to cap " + recordCapacity + " because of newly init'd vector : " + v.getValueVector()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 3e3157b48..604808547 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -29,13 +29,17 @@ import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.RootExec; import org.apache.drill.exec.physical.impl.SendingAccountor; import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression; import org.apache.drill.exec.proto.CoordinationProtos; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorWrapper; @@ -57,6 +61,7 @@ public class PartitionSenderRootExec implements RootExec { private OutgoingRecordBatch[] outgoing; private Partitioner partitioner; private FragmentContext context; + private OperatorContext oContext; private boolean ok = true; private AtomicLong batchesSent = new AtomicLong(0); private final SendingAccountor sendCount = new SendingAccountor(); @@ -64,11 +69,12 @@ public class PartitionSenderRootExec implements RootExec { public PartitionSenderRootExec(FragmentContext context, RecordBatch incoming, - HashPartitionSender operator) { + HashPartitionSender operator) throws OutOfMemoryException { this.incoming = incoming; this.operator = operator; this.context = context; + this.oContext = new OperatorContext(operator, context); this.outgoing = new OutgoingRecordBatch[operator.getDestinations().size()]; int fieldId = 0; for (CoordinationProtos.DrillbitEndpoint endpoint : operator.getDestinations()) { @@ -77,6 +83,7 @@ public class PartitionSenderRootExec implements RootExec { context.getDataTunnel(endpoint, opposite), incoming, context, + oContext.getAllocator(), fieldId); fieldId++; } @@ -252,16 +259,17 @@ public class PartitionSenderRootExec implements RootExec { // ((IntVector) outgoingVectors[bucket][0]).copyFrom(inIndex, // outgoingBatches[bucket].getRecordCount(), // vv1); - cg.getEvalBlock().add( + cg.getEvalBlock()._if( ((JExpression) JExpr.cast(vvClass, ((JExpression) outgoingVectors .component(bucket)) .component(JExpr.lit(fieldId)))) - .invoke("copyFrom") + .invoke("copyFromSafe") .arg(inIndex) .arg(((JExpression) outgoingBatches.component(bucket)).invoke("getRecordCount")) - .arg(incomingVV)); + .arg(incomingVV).not())._then().add(((JExpression) outgoingBatches.component(bucket)).invoke("flush")) + ._return(); ++fieldId; } @@ -306,7 +314,8 @@ public class PartitionSenderRootExec implements RootExec { for(OutgoingRecordBatch b : outgoing){ b.clear(); } - incoming.cleanup(); sendCount.waitForSendComplete(); + oContext.close(); + incoming.cleanup(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index e8ee3ccb8..347092a36 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -22,6 +22,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import com.sun.codemodel.JExpr; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.FieldReference; @@ -34,13 +35,16 @@ import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.ValueVectorReadExpression; import org.apache.drill.exec.expr.ValueVectorWriteExpression; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.Project; +import org.apache.drill.exec.physical.impl.filter.ReturnValueExpression; import org.apache.drill.exec.record.AbstractSingleRecordBatch; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.MaterializedField; @@ -48,6 +52,7 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.util.VectorUtil; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.ValueVector; @@ -59,26 +64,81 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ private Projector projector; private List<ValueVector> allocationVectors; + private boolean hasRemainder = false; + private int remainderIndex = 0; + private int recordCount; - public ProjectRecordBatch(Project pop, RecordBatch incoming, FragmentContext context){ + public ProjectRecordBatch(Project pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException { super(pop, context, incoming); } @Override public int getRecordCount() { - return incoming.getRecordCount(); + return recordCount; + } + + @Override + public IterOutcome next() { + if (hasRemainder) { + handleRemainder(); + return IterOutcome.OK; + } + return super.next(); } @Override protected void doWork() { - int recordCount = incoming.getRecordCount(); +// VectorUtil.showVectorAccessibleContent(incoming, ","); + int incomingRecordCount = incoming.getRecordCount(); for(ValueVector v : this.allocationVectors){ - AllocationHelper.allocate(v, recordCount, 250); + AllocationHelper.allocate(v, incomingRecordCount, 250); } - projector.projectRecords(recordCount, 0); - for(VectorWrapper<?> v : container){ - ValueVector.Mutator m = v.getValueVector().getMutator(); - m.setValueCount(recordCount); + int outputRecords = projector.projectRecords(0, incomingRecordCount, 0); + if (outputRecords < incomingRecordCount) { + for(VectorWrapper<?> v : container){ + ValueVector.Mutator m = v.getValueVector().getMutator(); + m.setValueCount(outputRecords); + } + hasRemainder = true; + remainderIndex = outputRecords; + this.recordCount = remainderIndex; + } else { + for(VectorWrapper<?> v : container){ + ValueVector.Mutator m = v.getValueVector().getMutator(); + m.setValueCount(incomingRecordCount); + } + for(VectorWrapper<?> v: incoming) { + v.clear(); + } + this.recordCount = outputRecords; + } + } + + private void handleRemainder() { + int remainingRecordCount = incoming.getRecordCount() - remainderIndex; + for(ValueVector v : this.allocationVectors){ + AllocationHelper.allocate(v, remainingRecordCount, 250); + } + int outputIndex = projector.projectRecords(remainderIndex, remainingRecordCount, 0); + if (outputIndex < incoming.getRecordCount()) { + for(VectorWrapper<?> v : container){ + ValueVector.Mutator m = v.getValueVector().getMutator(); + m.setValueCount(outputIndex - remainderIndex); + } + hasRemainder = true; + this.recordCount = outputIndex - remainderIndex; + remainderIndex = outputIndex; + } else { + for(VectorWrapper<?> v : container){ + ValueVector.Mutator m = v.getValueVector().getMutator(); + m.setValueCount(remainingRecordCount); + } + hasRemainder = false; + remainderIndex = 0; + for(VectorWrapper<?> v: incoming) { + v.clear(); + } + this.recordCount = remainingRecordCount; } } @@ -156,17 +216,20 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ // logger.debug("Added transfer."); }else{ // need to do evaluation. - ValueVector vector = TypeHelper.getNewVector(outputField, context.getAllocator()); + ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); allocationVectors.add(vector); TypedFieldId fid = container.add(vector); - ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr); - cg.addExpr(write); -// logger.debug("Added eval."); + ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true); + HoldingContainer hc = cg.addExpr(write); + cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE); + logger.debug("Added eval."); } - } + } } + cg.rotateBlock(); + cg.getEvalBlock()._return(JExpr.TRUE); container.buildSchema(incoming.getSchema().getSelectionVectorMode()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java index 2857fe19a..ebfce4100 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/Projector.java @@ -28,7 +28,7 @@ import org.apache.drill.exec.record.TransferPair; public interface Projector { public abstract void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException; - public abstract int projectRecords(int recordCount, int firstOutputIndex); + public abstract int projectRecords(int startIndex, int recordCount, int firstOutputIndex); public static TemplateClassDefinition<Projector> TEMPLATE_DEFINITION = new TemplateClassDefinition<Projector>(Projector.class, ProjectorTemplate.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java index bd26ce48b..60e599384 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectorTemplate.java @@ -43,7 +43,7 @@ public abstract class ProjectorTemplate implements Projector { } @Override - public final int projectRecords(final int recordCount, int firstOutputIndex) { + public final int projectRecords(int startIndex, final int recordCount, int firstOutputIndex) { switch(svMode){ case FOUR_BYTE: throw new UnsupportedOperationException(); @@ -60,8 +60,17 @@ public abstract class ProjectorTemplate implements Projector { case NONE: final int countN = recordCount; - for (int i = 0; i < countN; i++, firstOutputIndex++) { - doEval(i, firstOutputIndex); + int i; + for (i = startIndex; i < startIndex + countN; i++, firstOutputIndex++) { + if (!doEval(i, firstOutputIndex)) { + break; + } + } + if (i < recordCount || startIndex > 0) { + for(TransferPair t : transfers){ + t.splitAndTransfer(startIndex, i - startIndex); + } + return i; } for(TransferPair t : transfers){ t.transfer(); @@ -91,7 +100,7 @@ public abstract class ProjectorTemplate implements Projector { } public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); - public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); + public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java index 29e629aa6..375276e0b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java @@ -32,6 +32,8 @@ import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.Sort; import org.apache.drill.exec.record.AbstractRecordBatch; @@ -60,10 +62,10 @@ public class SortBatch extends AbstractRecordBatch<Sort> { private Sorter sorter; private BatchSchema schema; - public SortBatch(Sort popConfig, FragmentContext context, RecordBatch incoming) { + public SortBatch(Sort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { super(popConfig, context); this.incoming = incoming; - this.builder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES); + this.builder = new SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES); } @Override @@ -74,7 +76,6 @@ public class SortBatch extends AbstractRecordBatch<Sort> { @Override public void kill() { incoming.kill(); - cleanup(); } @Override @@ -91,9 +92,9 @@ public class SortBatch extends AbstractRecordBatch<Sort> { @Override public void cleanup() { + builder.clear(); super.cleanup(); incoming.cleanup(); - builder.clear(); } @Override @@ -116,7 +117,6 @@ public class SortBatch extends AbstractRecordBatch<Sort> { case NOT_YET: throw new UnsupportedOperationException(); case STOP: - cleanup(); return upstream; case OK_NEW_SCHEMA: // only change in the case that the schema truly changes. Artificial schema changes are ignored. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java index dad885802..0aab7b2ad 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java @@ -28,7 +28,7 @@ public interface Copier { public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Copier>(Copier.class, CopierTemplate4.class); public void setupRemover(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, VectorAllocator[] allocators) throws SchemaChangeException; - public abstract void copyRecords(); + public abstract int copyRecords(int index, int recordCount); }
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java index 6d1273139..2f589a515 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java @@ -48,22 +48,20 @@ public abstract class CopierTemplate2 implements Copier{ } @Override - public void copyRecords(){ - final int recordCount = sv2.getCount(); + public int copyRecords(int index, int recordCount){ allocateVectors(recordCount); int outgoingPosition = 0; - for(int svIndex = 0; svIndex < recordCount; svIndex++, outgoingPosition++){ - doEval(sv2.getIndex(svIndex), outgoingPosition); - } -// logger.debug("This: {}, Incoming: {}", System.identityHashCode(this), incoming); - for(VectorWrapper<?> v : incoming){ - v.clear(); + for(int svIndex = index; svIndex < index + recordCount; svIndex++, outgoingPosition++){ + if (!doEval(sv2.getIndex(svIndex), outgoingPosition)) { + break; + } } + return outgoingPosition; } public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); - public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); + public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java index 9f4ae7ede..a7aba6e73 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java @@ -50,24 +50,21 @@ public abstract class CopierTemplate4 implements Copier{ @Override - public void copyRecords(){ + public int copyRecords(int index, int recordCount){ // logger.debug("Copying records."); - final int recordCount = sv4.getCount(); allocateVectors(recordCount); int outgoingPosition = 0; - for(int svIndex = 0; svIndex < sv4.getCount(); svIndex++, outgoingPosition++){ + for(int svIndex = index; svIndex < index + recordCount; svIndex++, outgoingPosition++){ int deRefIndex = sv4.get(svIndex); - doEval(deRefIndex, outgoingPosition); + if (!doEval(deRefIndex, outgoingPosition)) { + break; + } } - -// for(VectorWrapper<?> v : incoming){ -// v.clear(); -// } - + return outgoingPosition; } public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); - public abstract void doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); + public abstract boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java index 499f4d1eb..4018991b3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java @@ -25,6 +25,8 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.SelectionVectorRemover; import org.apache.drill.exec.record.*; @@ -54,8 +56,10 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect private Copier copier; private int recordCount; - - public RemovingRecordBatch(SelectionVectorRemover popConfig, FragmentContext context, RecordBatch incoming) { + private boolean hasRemainder; + private int remainderIndex; + + public RemovingRecordBatch(SelectionVectorRemover popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { super(popConfig, context, incoming); logger.debug("Created."); } @@ -88,12 +92,64 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect } @Override + public IterOutcome next() { + if (hasRemainder) { + handleRemainder(); + return IterOutcome.OK; + } + return super.next(); + } + + @Override protected void doWork() { recordCount = incoming.getRecordCount(); - copier.copyRecords(); - for(VectorWrapper<?> v : container){ - ValueVector.Mutator m = v.getValueVector().getMutator(); - m.setValueCount(recordCount); + int copiedRecords = copier.copyRecords(0, recordCount); + if (copiedRecords < recordCount) { + for(VectorWrapper<?> v : container){ + ValueVector.Mutator m = v.getValueVector().getMutator(); + m.setValueCount(copiedRecords); + } + hasRemainder = true; + remainderIndex = copiedRecords; + this.recordCount = remainderIndex; + } else { + for(VectorWrapper<?> v : container){ + ValueVector.Mutator m = v.getValueVector().getMutator(); + m.setValueCount(recordCount); + } + if (incoming.getSchema().getSelectionVectorMode() != SelectionVectorMode.FOUR_BYTE) { + for(VectorWrapper<?> v: incoming) { + v.clear(); + } + if (incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE) { + incoming.getSelectionVector2().clear(); + } + } + } + } + + private void handleRemainder() { + int remainingRecordCount = incoming.getRecordCount() - remainderIndex; + int copiedRecords = copier.copyRecords(0, recordCount); + if (copiedRecords < remainingRecordCount) { + for(VectorWrapper<?> v : container){ + ValueVector.Mutator m = v.getValueVector().getMutator(); + m.setValueCount(copiedRecords); + } + remainderIndex += copiedRecords; + this.recordCount = copiedRecords; + } else { + for(VectorWrapper<?> v : container){ + ValueVector.Mutator m = v.getValueVector().getMutator(); + m.setValueCount(remainingRecordCount); + } + if (incoming.getSchema().getSelectionVectorMode() != SelectionVectorMode.FOUR_BYTE) { + for(VectorWrapper<?> v: incoming) { + v.clear(); + } + } + remainderIndex = 0; + hasRemainder = false; } } @@ -116,10 +172,12 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect } @Override - public void copyRecords() { + public int copyRecords(int index, int recordCount) { + assert index == 0 && recordCount == incoming.getRecordCount() : "Straight copier cannot split batch"; for(TransferPair tp : pairs){ tp.transfer(); } + return recordCount; } public List<ValueVector> getOut() { @@ -140,7 +198,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect List<VectorAllocator> allocators = Lists.newArrayList(); for(VectorWrapper<?> i : incoming){ - ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator()); + ValueVector v = TypeHelper.getNewVector(i.getField(), oContext.getAllocator()); container.add(v); allocators.add(VectorAllocator.getAllocator(i.getValueVector(), v)); } @@ -158,15 +216,15 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect private Copier getGenerated4Copier() throws SchemaChangeException { Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE); - return getGenerated4Copier(incoming, context, container, this); + return getGenerated4Copier(incoming, context, oContext.getAllocator(), container, this); } - public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, VectorContainer container, RecordBatch outgoing) throws SchemaChangeException{ + public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, BufferAllocator allocator, VectorContainer container, RecordBatch outgoing) throws SchemaChangeException{ List<VectorAllocator> allocators = Lists.newArrayList(); for(VectorWrapper<?> i : batch){ - ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator()); + ValueVector v = TypeHelper.getNewVector(i.getField(), allocator); container.add(v); allocators.add(getAllocator4(v)); } @@ -195,23 +253,27 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect if(hyper){ - g.getEvalBlock().add( + g.getEvalBlock()._if( outVV - .invoke("copyFrom") + .invoke("copyFromSafe") .arg( inIndex.band(JExpr.lit((int) Character.MAX_VALUE))) .arg(outIndex) .arg( inVV.component(inIndex.shrz(JExpr.lit(16))) ) - ); + .not() + ) + ._then()._return(JExpr.FALSE); }else{ - g.getEvalBlock().add(outVV.invoke("copyFrom").arg(inIndex).arg(outIndex).arg(inVV)); + g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(inIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE); } fieldId++; } + g.rotateBlock(); + g.getEvalBlock()._return(JExpr.TRUE); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java index 00c3d2fe2..b012cec1d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java @@ -111,7 +111,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> { } WritableBatch batch = WritableBatch.getBatchNoHVWrap(incoming.getRecordCount(), incoming, incomingHasSv2 ? true : false); - VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, sv, context.getAllocator()); + VectorAccessibleSerializable wrap = new VectorAccessibleSerializable(batch, sv, oContext.getAllocator()); try { wrap.writeToStreamAndRetain(fos); @@ -119,6 +119,9 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> { throw new RuntimeException(e); } batch.reconstructContainer(container); + if (incomingHasSv2) { + sv = wrap.getSv2(); + } } @Override @@ -161,6 +164,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> { logger.error("Unable to close file descriptors for file: " + getFileName()); } super.cleanup(); + incoming.cleanup(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java index 0fc5e0f67..c27b3c8c0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionRecordBatch.java @@ -18,7 +18,9 @@ package org.apache.drill.exec.physical.impl.union; import com.google.common.collect.Lists; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.config.Union; import org.apache.drill.exec.record.*; import org.apache.drill.exec.record.selection.SelectionVector2; @@ -39,7 +41,7 @@ public class UnionRecordBatch extends AbstractRecordBatch<Union> { private ArrayList<TransferPair> transfers; private int outRecordCount; - public UnionRecordBatch(Union config, List<RecordBatch> children, FragmentContext context) { + public UnionRecordBatch(Union config, List<RecordBatch> children, FragmentContext context) throws OutOfMemoryException { super(config, context); this.incoming = children; this.incomingIterator = incoming.iterator(); @@ -47,7 +49,6 @@ public class UnionRecordBatch extends AbstractRecordBatch<Union> { sv = null; } - @Override public int getRecordCount() { return outRecordCount; @@ -119,10 +120,10 @@ public class UnionRecordBatch extends AbstractRecordBatch<Union> { transfer.transfer(); } - for (VectorWrapper<?> vw : this.container) { - ValueVector.Mutator m = vw.getValueVector().getMutator(); - m.setValueCount(outRecordCount); - } +// for (VectorWrapper<?> vw : this.container) { +// ValueVector.Mutator m = vw.getValueVector().getMutator(); +// m.setValueCount(outRecordCount); +// } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java index 20540dd3b..d87a9f58c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java @@ -88,7 +88,7 @@ public class BatchGroup implements VectorAccessible { watch.start(); outputBatch.writeToStream(outputStream); newContainer.zeroVectors(); - logger.debug("Took {} us to spill {} records", watch.elapsed(TimeUnit.MICROSECONDS), recordCount); +// logger.debug("Took {} us to spill {} records", watch.elapsed(TimeUnit.MICROSECONDS), recordCount); spilledBatches++; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index 8bb3d43d1..930f851bc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -38,6 +38,7 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.ExternalSort; import org.apache.drill.exec.physical.impl.sort.RecordBatchData; @@ -82,6 +83,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { private MSorter mSorter; private PriorityQueueSelector selector; private PriorityQueueCopier copier; + private BufferAllocator copierAllocator; private LinkedList<BatchGroup> batchGroups = Lists.newLinkedList(); private SelectionVector4 sv4; private FileSystem fs; @@ -89,7 +91,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { private int batchesSinceLastSpill = 0; private long uid;//used for spill files to ensure multiple sorts within same fragment don't clobber each others' files - public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) { + public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { super(popConfig, context); this.incoming = incoming; DrillConfig config = context.getConfig(); @@ -107,6 +109,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { SPILL_DIRECTORIES = config.getStringList(ExecConstants.EXTERNAL_SORT_SPILL_DIRS); dirs = Iterators.cycle(Lists.newArrayList(SPILL_DIRECTORIES)); uid = System.nanoTime(); + copierAllocator = oContext.getAllocator().getChildAllocator(context.getHandle(), 10000000, 20000000); } @Override @@ -117,7 +120,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { @Override public void kill() { incoming.kill(); - cleanup(); } @Override @@ -134,8 +136,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { @Override public void cleanup() { - super.cleanup(); - incoming.cleanup(); if (batchGroups != null) { for (BatchGroup group: batchGroups) { try { @@ -151,6 +151,9 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { if (sv4 != null) { sv4.clear(); } + copierAllocator.close(); + super.cleanup(); + incoming.cleanup(); } @Override @@ -170,11 +173,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { int count = selector.next(); if(count > 0){ long t = w.elapsed(TimeUnit.MICROSECONDS); -// logger.debug("Took {} us to merge {} records", t, count); + logger.debug("Took {} us to merge {} records", t, count); container.setRecordCount(count); return IterOutcome.OK; }else{ - cleanup(); + logger.debug("selector returned 0 records"); return IterOutcome.NONE; } } @@ -192,7 +195,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { case NOT_YET: throw new UnsupportedOperationException(); case STOP: - cleanup(); return upstream; case OK_NEW_SCHEMA: // only change in the case that the schema truly changes. Artificial schema changes are ignored. @@ -207,7 +209,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { if (incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) { sv2 = incoming.getSelectionVector2(); } else { - sv2 = newSV2(); + try { + sv2 = newSV2(); + } catch (OutOfMemoryException e) { + throw new RuntimeException(); + } } int count = sv2.getCount(); assert sv2.getCount() > 0; @@ -225,6 +231,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { long t = w.elapsed(TimeUnit.MICROSECONDS); // logger.debug("Took {} us to sort {} records", t, count); break; + case OUT_OF_MEMORY: + mergeAndSpill(); + batchesSinceLastSpill = 0; + break; default: throw new UnsupportedOperationException(); } @@ -243,7 +253,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { return IterOutcome.NONE; } - builder = new SortRecordBatchBuilder(context.getAllocator(), MAX_SORT_BYTES); + builder = new SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES); for (BatchGroup group : batchGroups) { RecordBatchData rbd = new RecordBatchData(group.getFirstContainer()); @@ -254,7 +264,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { builder.build(context, container); sv4 = builder.getSv4(); mSorter = createNewMSorter(); - mSorter.setup(context, getSelectionVector4(), this.container); + mSorter.setup(context, oContext.getAllocator(), getSelectionVector4(), this.container); mSorter.sort(this.container); sv4 = mSorter.getSV4(); @@ -265,7 +275,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { constructHyperBatch(batchGroups, this.container); constructSV4(); selector = createSelector(); - selector.setup(context, this, sv4, batchGroups); + selector.setup(context, oContext.getAllocator(), this, sv4, batchGroups); selector.next(); } @@ -284,12 +294,22 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } public void mergeAndSpill() throws SchemaChangeException { + logger.debug("Copier allocator current allocation {}", copierAllocator.getAllocatedMemory()); VectorContainer hyperBatch = new VectorContainer(); VectorContainer outputContainer = new VectorContainer(); List<BatchGroup> batchGroupList = Lists.newArrayList(); for (int i = 0; i < SPILL_BATCH_GROUP_SIZE; i++) { + if (batchGroups.size() == 0) { + break; + } + if (batchGroups.peekLast().getSecondContainer() != null) { + break; + } batchGroupList.add(batchGroups.pollLast()); } + if (batchGroupList.size() == 0) { + return; + } constructHyperBatch(batchGroupList, hyperBatch); createCopier(hyperBatch, batchGroupList, outputContainer); @@ -309,7 +329,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { c2.setRecordCount(count); String outputFile = String.format(Utilities.getFileNameForQueryFragment(context, dirs.next(), "spill" + uid + "_" + spillCount++)); - BatchGroup newGroup = new BatchGroup(c1, c2, fs, outputFile, context.getAllocator()); + BatchGroup newGroup = new BatchGroup(c1, c2, fs, outputFile, oContext.getAllocator()); try { while ((count = copier.next()) > 0) { @@ -328,9 +348,19 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } } - private SelectionVector2 newSV2() { - SelectionVector2 sv2 = new SelectionVector2(context.getAllocator()); - sv2.allocateNew(incoming.getRecordCount()); + private SelectionVector2 newSV2() throws OutOfMemoryException { + SelectionVector2 sv2 = new SelectionVector2(oContext.getAllocator()); + if (!sv2.allocateNew(incoming.getRecordCount())) { + try { + mergeAndSpill(); + } catch (SchemaChangeException e) { + throw new RuntimeException(); + } + batchesSinceLastSpill = 0; + if (!sv2.allocateNew(incoming.getRecordCount())) { + throw new OutOfMemoryException(); + } + } for (int i = 0; i < incoming.getRecordCount(); i++) { sv2.setIndex(i, (char) i); } @@ -360,7 +390,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } private void constructSV4() throws SchemaChangeException { - BufferAllocator.PreAllocator preAlloc = context.getAllocator().getNewPreAllocator(); + BufferAllocator.PreAllocator preAlloc = oContext.getAllocator().getNewPreAllocator(); preAlloc.preAllocate(4 * TARGET_RECORD_COUNT); sv4 = new SelectionVector4(preAlloc.getAllocation(), TARGET_RECORD_COUNT, TARGET_RECORD_COUNT); } @@ -476,11 +506,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { List<VectorAllocator> allocators = Lists.newArrayList(); for(VectorWrapper<?> i : batch){ - ValueVector v = TypeHelper.getNewVector(i.getField(), context.getAllocator()); + ValueVector v = TypeHelper.getNewVector(i.getField(), copierAllocator); outputContainer.add(v); allocators.add(VectorAllocator.getAllocator(v, 110)); } - copier.setup(context, batch, batchGroupList, outputContainer, allocators); + copier.setup(context, copierAllocator, batch, batchGroupList, outputContainer, allocators); } catch (ClassTransformationException e) { throw new RuntimeException(e); } catch (IOException e) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java index e87774b39..c54b2b7df 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java @@ -33,15 +33,18 @@ import java.util.Queue; public abstract class MSortTemplate implements MSorter, IndexedSortable{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MSortTemplate.class); - + + private BufferAllocator allocator; private SelectionVector4 vector4; private SelectionVector4 aux; private long compares; private Queue<Integer> runStarts = Queues.newLinkedBlockingQueue(); private Queue<Integer> newRunStarts; - - public void setup(FragmentContext context, SelectionVector4 vector4, VectorContainer hyperBatch) throws SchemaChangeException{ + + @Override + public void setup(FragmentContext context, BufferAllocator allocator, SelectionVector4 vector4, VectorContainer hyperBatch) throws SchemaChangeException{ + this.allocator = allocator; // we pass in the local hyperBatch since that is where we'll be reading data. Preconditions.checkNotNull(vector4); this.vector4 = vector4.createNewWrapperCurrent(); @@ -60,7 +63,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{ throw new UnsupportedOperationException("Missing batch"); } } - BufferAllocator.PreAllocator preAlloc = context.getAllocator().getNewPreAllocator(); + BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator(); preAlloc.preAllocate(4 * this.vector4.getTotalCount()); aux = new SelectionVector4(preAlloc.getAllocation(), this.vector4.getTotalCount(), Character.MAX_VALUE); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java index 6ad4e3ddb..1300830f6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java @@ -19,12 +19,13 @@ package org.apache.drill.exec.physical.impl.xsort; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.selection.SelectionVector4; public interface MSorter { - public void setup(FragmentContext context, SelectionVector4 vector4, VectorContainer hyperBatch) throws SchemaChangeException; + public void setup(FragmentContext context, BufferAllocator allocator, SelectionVector4 vector4, VectorContainer hyperBatch) throws SchemaChangeException; public void sort(VectorContainer container); public SelectionVector4 getSV4(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java index b31e28721..712296352 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopier.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.xsort; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.sort.RecordBatchData; import org.apache.drill.exec.physical.impl.svremover.Copier; @@ -31,7 +32,7 @@ import org.apache.drill.exec.vector.allocator.VectorAllocator; import java.util.List; public interface PriorityQueueCopier { - public void setup(FragmentContext context, VectorAccessible hyperBatch, List<BatchGroup> batchGroups, VectorAccessible outgoing, List<VectorAllocator> allocators) throws SchemaChangeException; + public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups, VectorAccessible outgoing, List<VectorAllocator> allocators) throws SchemaChangeException; public int next(); public List<VectorAllocator> getAllocators(); public void cleanup(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java index bc2c19c60..4221ae284 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueCopierTemplate.java @@ -42,6 +42,7 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier private List<BatchGroup> batchGroups; private VectorAccessible hyperBatch; private FragmentContext context; + private BufferAllocator allocator; private VectorAccessible outgoing; private List<VectorAllocator> allocators; private int size; @@ -49,19 +50,21 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier private int targetRecordCount = ExternalSortBatch.SPILL_TARGET_RECORD_COUNT; @Override - public void setup(FragmentContext context, VectorAccessible hyperBatch, List<BatchGroup> batchGroups, VectorAccessible outgoing, List<VectorAllocator> allocators) throws SchemaChangeException { + public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, List<BatchGroup> batchGroups, VectorAccessible outgoing, List<VectorAllocator> allocators) throws SchemaChangeException { this.context = context; + this.allocator = allocator; this.hyperBatch = hyperBatch; this.batchGroups = batchGroups; this.outgoing = outgoing; this.allocators = allocators; this.size = batchGroups.size(); - BufferAllocator.PreAllocator preAlloc = context.getAllocator().getNewPreAllocator(); + BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator(); preAlloc.preAllocate(4 * size); vector4 = new SelectionVector4(preAlloc.getAllocation(), size, Character.MAX_VALUE); doSetup(context, hyperBatch, outgoing); + queueSize = 0; for (int i = 0; i < size; i++) { vector4.set(i, i * 2, batchGroups.get(i).getNextIndex()); siftUp(); @@ -79,10 +82,14 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier } int compoundIndex = vector4.get(0); int batch = compoundIndex >>> 16; - assert batch < batchGroups.size() * 2; + assert batch < batchGroups.size() * 2 : String.format("batch: %d batchGroups: %d", batch, batchGroups.size()); int batchGroup = batch / 2; - doCopy(compoundIndex, outgoingIndex); + if (!doCopy(compoundIndex, outgoingIndex)) { + setValueCount(outgoingIndex); + return outgoingIndex; + } int nextIndex = batchGroups.get(batchGroup).getNextIndex(); + batch = batch & 0xFFFE; batch += batchGroups.get(batchGroup).getBatchPointer(); if (nextIndex < 0) { vector4.set(0, vector4.get(--queueSize)); @@ -172,6 +179,6 @@ public abstract class PriorityQueueCopierTemplate implements PriorityQueueCopier public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorAccessible incoming, @Named("outgoing") VectorAccessible outgoing); public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex); - public abstract void doCopy(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); + public abstract boolean doCopy(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelector.java index 963b8b03c..786667a8e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelector.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.xsort; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.selection.SelectionVector4; @@ -27,7 +28,7 @@ import org.apache.drill.exec.vector.allocator.VectorAllocator; import java.util.List; public interface PriorityQueueSelector { - public void setup(FragmentContext context, VectorAccessible hyperBatch, SelectionVector4 sv4, List<BatchGroup> batchGroups) throws SchemaChangeException; + public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, SelectionVector4 sv4, List<BatchGroup> batchGroups) throws SchemaChangeException; public int next(); public void cleanup(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelectorTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelectorTemplate.java index ccb26cfee..65a072ba7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelectorTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/PriorityQueueSelectorTemplate.java @@ -35,20 +35,22 @@ public abstract class PriorityQueueSelectorTemplate implements PriorityQueueSele private SelectionVector4 vector4; private List<BatchGroup> batchGroups; private FragmentContext context; + private BufferAllocator allocator; private int size; private int queueSize = 0; private int targetRecordCount = ExternalSortBatch.TARGET_RECORD_COUNT; private VectorAccessible hyperBatch; @Override - public void setup(FragmentContext context, VectorAccessible hyperBatch, SelectionVector4 sv4, List<BatchGroup> batchGroups) throws SchemaChangeException { + public void setup(FragmentContext context, BufferAllocator allocator, VectorAccessible hyperBatch, SelectionVector4 sv4, List<BatchGroup> batchGroups) throws SchemaChangeException { this.context = context; + this.allocator = allocator; this.sv4 = sv4; this.batchGroups = batchGroups; this.size = batchGroups.size(); this.hyperBatch = hyperBatch; - BufferAllocator.PreAllocator preAlloc = context.getAllocator().getNewPreAllocator(); + BufferAllocator.PreAllocator preAlloc = allocator.getNewPreAllocator(); preAlloc.preAllocate(4 * size); vector4 = new SelectionVector4(preAlloc.getAllocation(), size, Character.MAX_VALUE); doSetup(context, hyperBatch, null); @@ -78,6 +80,7 @@ public abstract class PriorityQueueSelectorTemplate implements PriorityQueueSele } else if (nextIndex == -2) { vector4.set(0, batch - 1, 0); sv4.setCount(outgoingIndex); + assert outgoingIndex != 0; return outgoingIndex; } else { vector4.set(0, batch, nextIndex); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java index d518f04bf..0ba84f9fe 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.physical.impl.xsort; import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.sort.Sorter; @@ -29,6 +30,7 @@ import org.apache.hadoop.util.IndexedSortable; import org.apache.hadoop.util.QuickSort; import javax.inject.Named; +import java.util.concurrent.TimeUnit; public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, IndexedSortable{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SingleBatchSorterTemplate.class); @@ -44,7 +46,10 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In @Override public void sort(SelectionVector2 vector2){ QuickSort qs = new QuickSort(); + Stopwatch watch = new Stopwatch(); + watch.start(); qs.sort(this, 0, vector2.getCount()); + logger.debug("Took {} us to sort {} records", watch.elapsed(TimeUnit.MICROSECONDS), vector2.getCount()); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java index 70200a9a1..87078a2d4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java @@ -33,6 +33,7 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate @Override public PhysicalOperator visitExchange(Exchange exchange, IndexedFragmentNode iNode) throws ExecutionSetupException { + iNode.addAllocation(exchange); if(exchange == iNode.getNode().getSendingExchange()){ // this is a sending exchange. @@ -56,6 +57,7 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate @Override public PhysicalOperator visitSubScan(SubScan subScan, IndexedFragmentNode value) throws ExecutionSetupException { + value.addAllocation(subScan); // TODO - implement this return super.visitOp(subScan, value); } @@ -63,6 +65,8 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate @Override public PhysicalOperator visitStore(Store store, IndexedFragmentNode iNode) throws ExecutionSetupException { PhysicalOperator child = store.getChild().accept(this, iNode); + + iNode.addAllocation(store); try { PhysicalOperator o = store.getSpecificStore(child, iNode.getMinorFragmentId()); @@ -75,6 +79,7 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate @Override public PhysicalOperator visitOp(PhysicalOperator op, IndexedFragmentNode iNode) throws ExecutionSetupException { + iNode.addAllocation(op); // logger.debug("Visiting catch all: {}", op); List<PhysicalOperator> children = Lists.newArrayList(); for(PhysicalOperator child : op){ @@ -104,6 +109,10 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate public Wrapper getInfo() { return info; } + + public void addAllocation(PhysicalOperator pop) { + info.addAllocation(pop); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java index 581499d1e..6e951df0a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java @@ -132,7 +132,9 @@ public class SimpleParallelizer { .setAssignment(wrapper.getAssignedEndpoint(minorFragmentId)) // .setLeafFragment(isLeafFragment) // .setQueryStartTime(queryStartTime) - .setTimeZone(timeZone) + .setTimeZone(timeZone)// + .setMemInitial(wrapper.getInitialAllocation())// + .setMemMax(wrapper.getMaxAllocation()) .build(); if (isRootNode) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java index ca933c6ab..6d720a7d1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java @@ -80,9 +80,9 @@ public class StatsCollector { } @Override - public Void visitSubScan(SubScan subScan, Wrapper value) throws RuntimeException { + public Void visitSubScan(SubScan subScan, Wrapper wrapper) throws RuntimeException { // TODO - implement this - return super.visitOp(subScan, value); + return super.visitOp(subScan, wrapper); } @Override @@ -93,9 +93,9 @@ public class StatsCollector { } @Override - public Void visitLimit(Limit limit, Wrapper value) throws RuntimeException { + public Void visitLimit(Limit limit, Wrapper wrapper) throws RuntimeException { // TODO: Implement this - return visitOp(limit, value); + return visitOp(limit, wrapper); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java index 94fcac51c..8602bf002 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java @@ -48,6 +48,8 @@ public class Wrapper { private final Stats stats; private boolean endpointsAssigned; private Map<DrillbitEndpoint, EndpointAffinity> endpointAffinity = Maps.newHashMap(); + private long initialAllocation = 0; + private long maxAllocation = 0; // a list of assigned endpoints. Technically, there could repeated endpoints in this list if we'd like to assign the // same fragment multiple times to the same endpoint. @@ -99,6 +101,19 @@ public class Wrapper { return node; } + public long getInitialAllocation() { + return initialAllocation; + } + + public long getMaxAllocation() { + return maxAllocation; + } + + public void addAllocation(PhysicalOperator pop) { + initialAllocation += pop.getInitialAllocation(); + maxAllocation += pop.getMaxAllocation(); + } + private class AssignEndpointsToScanAndStore extends AbstractPhysicalVisitor<Void, List<DrillbitEndpoint>, PhysicalOperatorSetupException>{ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java index 7898937dc..5521c4e08 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java @@ -21,15 +21,7 @@ import java.util.Iterator; import net.hydromatic.optiq.tools.RuleSet; -import org.apache.drill.exec.planner.physical.FilterPrule; -import org.apache.drill.exec.planner.physical.LimitPrule; -import org.apache.drill.exec.planner.physical.MergeJoinPrule; -import org.apache.drill.exec.planner.physical.ProjectPrule; -import org.apache.drill.exec.planner.physical.ScanPrule; -import org.apache.drill.exec.planner.physical.ScreenPrule; -import org.apache.drill.exec.planner.physical.SortConvertPrule; -import org.apache.drill.exec.planner.physical.SortPrule; -import org.apache.drill.exec.planner.physical.StreamAggPrule; +import org.apache.drill.exec.planner.physical.*; import org.eigenbase.rel.RelFactories; import org.eigenbase.rel.rules.MergeProjectRule; import org.eigenbase.rel.rules.PushFilterPastJoinRule; @@ -116,9 +108,9 @@ public class DrillRuleSets { StreamAggPrule.INSTANCE, MergeJoinPrule.INSTANCE, FilterPrule.INSTANCE, - LimitPrule.INSTANCE + LimitPrule.INSTANCE, -// PushLimitToTopN.INSTANCE + PushLimitToTopN.INSTANCE // ExpandConversionRule.INSTANCE, // SwapJoinRule.INSTANCE, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java index 344be4ebc..998ed0a8b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SortPrel.java @@ -51,11 +51,11 @@ public class SortPrel extends SortRel implements Prel { PhysicalOperator childPOP = child.getPhysicalOperator(creator); - childPOP = PrelUtil.removeSvIfRequired(childPOP, SelectionVectorMode.NONE); - Sort g = new Sort(childPOP, PrelUtil.getOrdering(this.collation, getChild().getRowType()), false); +// childPOP = PrelUtil.removeSvIfRequired(childPOP, SelectionVectorMode.NONE); +// Sort g = new Sort(childPOP, PrelUtil.getOrdering(this.collation, getChild().getRowType()), false); -// childPOP = PrelUtil.removeSvIfRequired(childPOP, SelectionVectorMode.NONE, SelectionVectorMode.TWO_BYTE); -// Sort g = new ExternalSort(childPOP, PrelUtil.getOrdering(this.collation, getChild().getRowType()), false); + childPOP = PrelUtil.removeSvIfRequired(childPOP, SelectionVectorMode.NONE, SelectionVectorMode.TWO_BYTE); + Sort g = new ExternalSort(childPOP, PrelUtil.getOrdering(this.collation, getChild().getRowType()), false); return g; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index d4f458f33..214f81c95 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -20,7 +20,11 @@ package org.apache.drill.exec.record; import java.util.Iterator; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; @@ -31,11 +35,13 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements protected final VectorContainer container = new VectorContainer(); protected final T popConfig; protected final FragmentContext context; - - protected AbstractRecordBatch(T popConfig, FragmentContext context) { + protected final OperatorContext oContext; + + protected AbstractRecordBatch(T popConfig, FragmentContext context) throws OutOfMemoryException { super(); this.context = context; this.popConfig = popConfig; + this.oContext = new OperatorContext(popConfig, context); } @Override @@ -60,13 +66,13 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements @Override public void kill() { killIncoming(); - cleanup(); } protected abstract void killIncoming(); public void cleanup(){ container.clear(); + oContext.close(); } @Override @@ -97,6 +103,4 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements return batch; } - - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java index 7b832e43a..dd2cfe059 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java @@ -18,16 +18,19 @@ package org.apache.drill.exec.record; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.PhysicalOperator; public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> extends AbstractRecordBatch<T> { final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass()); - + protected final RecordBatch incoming; private boolean first = true; + protected boolean outOfMemory = false; - public AbstractSingleRecordBatch(T popConfig, FragmentContext context, RecordBatch incoming) { + public AbstractSingleRecordBatch(T popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { super(popConfig, context); this.incoming = incoming; } @@ -46,7 +49,8 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte case NONE: case NOT_YET: case STOP: - cleanup(); + return upstream; + case OUT_OF_MEMORY: return upstream; case OK_NEW_SCHEMA: try{ @@ -60,6 +64,10 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte // fall through. case OK: doWork(); + if (outOfMemory) { + outOfMemory = false; + return IterOutcome.OUT_OF_MEMORY; + } return upstream; // change if upstream changed, otherwise normal. default: throw new UnsupportedOperationException(); @@ -69,8 +77,8 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte @Override public void cleanup() { // logger.debug("Cleaning up."); - incoming.cleanup(); super.cleanup(); + incoming.cleanup(); } protected abstract void setupNewSchema() throws SchemaChangeException; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java index 6f5f7a720..acbd8bd81 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatchProvider.java @@ -25,5 +25,5 @@ public interface RawFragmentBatchProvider { public RawFragmentBatch getNext() throws IOException; public void kill(FragmentContext context); - + public void cleanup(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java index b77a6a8fc..31283c61b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java @@ -45,7 +45,8 @@ public interface RecordBatch extends VectorAccessible { OK_NEW_SCHEMA, // A full collection of records STOP, // Informs parent nodes that the query has terminated. In this case, a consumer can consume their QueryContext // to understand the current state of things. - NOT_YET // used by batches that haven't received incoming data yet. + NOT_YET, // used by batches that haven't received incoming data yet. + OUT_OF_MEMORY // an upstream operator was unable to allocate memory. A batch receiving this should release memory if it can } public static enum SetupOutcome { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java index 96a1c227b..ba2c7b2b2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TransferPair.java @@ -21,6 +21,7 @@ import org.apache.drill.exec.vector.ValueVector; public interface TransferPair { public void transfer(); + public void splitAndTransfer(int startIndex, int length); public ValueVector getTo(); public void copyValue(int from, int to); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java index 288aa7f77..396834c49 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java @@ -71,6 +71,7 @@ public class WritableBatch { cbb.addComponent(buf); } + List<FieldMetadata> fields = def.getFieldList(); int bufferOffset = 0; @@ -83,7 +84,10 @@ public class WritableBatch { for (VectorWrapper<?> vv : container) { FieldMetadata fmd = fields.get(vectorIndex); ValueVector v = vv.getValueVector(); - v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength())); + ByteBuf bb = cbb.slice(bufferOffset, fmd.getBufferLength()); +// v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength())); + v.load(fmd, bb); + bb.release(); vectorIndex++; bufferOffset += fmd.getBufferLength(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java index f7b5155b6..af1e2b6fd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java @@ -77,10 +77,14 @@ public class SelectionVector2 implements Closeable{ public void setIndex(int index, char value){ buffer.setChar(index * RECORD_SIZE, value); } - - public void allocateNew(int size){ + + public boolean allocateNew(int size){ clear(); buffer = allocator.buffer(size * RECORD_SIZE); + if (buffer == null) { + return false; + } + return true; } public SelectionVector2 clone(){ @@ -98,7 +102,7 @@ public class SelectionVector2 implements Closeable{ } public void clear() { - if (buffer != DeadBuf.DEAD_BUFFER) { + if (buffer != null && buffer != DeadBuf.DEAD_BUFFER) { buffer.release(); buffer = DeadBuf.DEAD_BUFFER; recordCount = 0; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java index c665949a5..d50a64e64 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClientWithConnection.java @@ -46,7 +46,7 @@ public abstract class BasicClientWithConnection<T extends EnumLite, HANDSHAKE_SE protected GenericFutureListener<ChannelFuture> getCloseHandler(ServerConnection clientConnection) { return getCloseHandler(clientConnection.getChannel()); } - + @Override protected Response handle(ServerConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException { return handleReponse( (ConnectionThrottle) connection, rpcType, pBody, dBody); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java index f5e77f10c..8f533e3cf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java @@ -73,7 +73,7 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection ch.closeFuture().addListener(getCloseHandler(connection)); ch.pipeline().addLast( // - getDecoder(connection.getAllocator()), // + getDecoder(connection.getAllocator(), getOutOfMemoryHandler()), // new RpcDecoder("s-" + rpcConfig.getName()), // new RpcEncoder("s-" + rpcConfig.getName()), // getHandshakeHandler(connection), new InboundHandler(connection), // @@ -85,7 +85,11 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection }); } - public abstract ProtobufLengthDecoder getDecoder(BufferAllocator allocator); + public OutOfMemoryHandler getOutOfMemoryHandler() { + return OutOfMemoryHandler.DEFAULT_INSTANCE; + } + + public abstract ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler); @Override public boolean isClient() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java index 1527e7942..8fc446fd4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/InboundRpcMessage.java @@ -41,7 +41,7 @@ public class InboundRpcMessage extends RpcMessage{ } void release(){ - pBody.release(); + if (pBody != null) pBody.release(); if(dBody != null) dBody.release(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutOfMemoryHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutOfMemoryHandler.java new file mode 100644 index 000000000..5d7db478e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/OutOfMemoryHandler.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.rpc; + +public interface OutOfMemoryHandler { + + public static OutOfMemoryHandler DEFAULT_INSTANCE = new OutOfMemoryHandler() { + @Override + public void handle() { + throw new UnsupportedOperationException(); + } + }; + + public void handle(); + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java index 23fa46d06..473e3e6c9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java @@ -18,6 +18,8 @@ package org.apache.drill.exec.rpc; import io.netty.buffer.ByteBuf; +import io.netty.buffer.SwappedByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.CorruptedFrameException; @@ -27,6 +29,7 @@ import java.util.List; import org.apache.drill.exec.memory.BufferAllocator; import com.google.protobuf.CodedInputStream; +import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode; /** * Modified version of ProtobufVarint32FrameDecoder that avoids bytebuf copy. @@ -34,12 +37,13 @@ import com.google.protobuf.CodedInputStream; public class ProtobufLengthDecoder extends ByteToMessageDecoder { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtobufLengthDecoder.class); - private BufferAllocator allocator; + private OutOfMemoryHandler outOfMemoryHandler; - public ProtobufLengthDecoder(BufferAllocator allocator) { + public ProtobufLengthDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) { super(); this.allocator = allocator; + this.outOfMemoryHandler = outOfMemoryHandler; } @@ -82,6 +86,7 @@ public class ProtobufLengthDecoder extends ByteToMessageDecoder { if(outBuf == null){ logger.warn("Failure allocating buffer on incoming stream due to memory limits. Current Allocation: {}.", allocator.getAllocatedMemory()); in.resetReaderIndex(); + outOfMemoryHandler.handle(); return; } outBuf.writeBytes(in, in.readerIndex(), length); @@ -104,4 +109,9 @@ public class ProtobufLengthDecoder extends ByteToMessageDecoder { } + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + ctx.fireChannelReadComplete(); + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java index 1a422ee63..30101b2f1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java @@ -149,7 +149,7 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp if (!ctx.channel().isOpen()) return; if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Received message {}", msg); switch (msg.mode) { - case REQUEST: + case REQUEST: { // handle message and ack. Response r = handle(connection, msg.rpcType, msg.pBody, msg.dBody); msg.release(); // we release our ownership. Handle could have taken over ownership. @@ -159,6 +159,7 @@ public abstract class RpcBus<T extends EnumLite, C extends RemoteConnection> imp if (RpcConstants.EXTRA_DEBUGGING) logger.debug("Adding message to outbound buffer. {}", outMessage); ctx.writeAndFlush(outMessage); break; + } case RESPONSE: try{ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java index af2b58bf5..178ac43af 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlClient.java @@ -26,12 +26,7 @@ import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.BitControl.BitControlHandshake; import org.apache.drill.exec.proto.BitControl.RpcType; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; -import org.apache.drill.exec.rpc.BasicClient; -import org.apache.drill.exec.rpc.ProtobufLengthDecoder; -import org.apache.drill.exec.rpc.Response; -import org.apache.drill.exec.rpc.RpcBus; -import org.apache.drill.exec.rpc.RpcConnectionHandler; -import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.*; import org.apache.drill.exec.server.BootStrapContext; import org.apache.drill.exec.work.batch.ControlMessageHandler; @@ -99,7 +94,7 @@ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitCo @Override public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) { - return new ControlProtobufLengthDecoder(allocator); + return new ControlProtobufLengthDecoder(allocator, OutOfMemoryHandler.DEFAULT_INSTANCE); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlProtobufLengthDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlProtobufLengthDecoder.java index c00dc5483..7edfe204e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlProtobufLengthDecoder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlProtobufLengthDecoder.java @@ -23,14 +23,15 @@ import io.netty.channel.ChannelHandlerContext; import java.util.List; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.rpc.OutOfMemoryHandler; import org.apache.drill.exec.rpc.ProtobufLengthDecoder; /** * Purely to simplify memory debugging. */ public class ControlProtobufLengthDecoder extends ProtobufLengthDecoder{ - public ControlProtobufLengthDecoder(BufferAllocator allocator) { - super(allocator); + public ControlProtobufLengthDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) { + super(allocator, outOfMemoryHandler); } protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java index 4b6a85d4e..3e1a2a4c3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlServer.java @@ -25,10 +25,7 @@ import io.netty.util.concurrent.GenericFutureListener; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.BitControl.BitControlHandshake; import org.apache.drill.exec.proto.BitControl.RpcType; -import org.apache.drill.exec.rpc.BasicServer; -import org.apache.drill.exec.rpc.ProtobufLengthDecoder; -import org.apache.drill.exec.rpc.Response; -import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.*; import org.apache.drill.exec.server.BootStrapContext; import org.apache.drill.exec.work.batch.ControlMessageHandler; @@ -98,8 +95,8 @@ public class ControlServer extends BasicServer<RpcType, ControlConnection>{ } @Override - public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) { - return new ControlProtobufLengthDecoder(allocator); + public ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) { + return new ControlProtobufLengthDecoder(allocator, outOfMemoryHandler); } private class ProxyCloseHandler implements GenericFutureListener<ChannelFuture> { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java index af4da41fe..58fa403fc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java @@ -17,25 +17,33 @@ */ package org.apache.drill.exec.rpc.data; +import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; +import org.apache.drill.exec.memory.AccountingByteBuf; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryException; +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.rpc.RemoteConnection; import org.apache.drill.exec.work.fragment.FragmentManager; -class BitServerConnection extends RemoteConnection{ +public class BitServerConnection extends RemoteConnection{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitServerConnection.class); - private final BufferAllocator initialAllocator; + private AllocatorProxy proxy = new AllocatorProxy(); private volatile FragmentManager manager; public BitServerConnection(Channel channel, BufferAllocator initialAllocator) { super(channel); - this.initialAllocator = initialAllocator; + proxy.setAllocator(initialAllocator); } void setManager(FragmentManager manager){ this.manager = manager; + if (manager != null) { // Do this check for TestBitRpc test + this.proxy.setAllocator(manager.getFragmentContext().getAllocator()); + manager.addConnection(this); + } } @Override @@ -43,13 +51,76 @@ class BitServerConnection extends RemoteConnection{ if(manager != null){ return manager.getFragmentContext().getAllocator(); } - - return initialAllocator; - + return proxy; } public FragmentManager getFragmentManager(){ return manager; } + + final static String ERROR_MESSAGE = "Attempted to access AllocatorProxy"; + + private static class AllocatorProxy implements BufferAllocator { + private BufferAllocator allocator; + + public void setAllocator(BufferAllocator allocator) { + this.allocator = allocator; + } + + @Override + public AccountingByteBuf buffer(int size) { + if (allocator == null) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + return allocator.buffer(size); + } + + @Override + public AccountingByteBuf buffer(int minSize, int maxSize) { + if (allocator == null) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + return allocator.buffer(minSize, maxSize); + } + + @Override + public ByteBufAllocator getUnderlyingAllocator() { + if (allocator == null) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + return allocator.getUnderlyingAllocator(); + } + + @Override + public BufferAllocator getChildAllocator(FragmentHandle handle, long initialReservation, long maximumReservation) throws OutOfMemoryException { + if (allocator == null) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + return allocator.getChildAllocator(handle, initialReservation, maximumReservation); + } + + @Override + public PreAllocator getNewPreAllocator() { + if (allocator == null) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + return allocator.getNewPreAllocator(); + } + + @Override + public void close() { + if (allocator != null) { + allocator.close(); + } + } + + @Override + public long getAllocatedMemory() { + if (allocator == null) { + throw new UnsupportedOperationException(ERROR_MESSAGE); + } + return allocator.getAllocatedMemory(); + } + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java index 6c57f22f7..e22df7c71 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java @@ -28,11 +28,7 @@ import org.apache.drill.exec.proto.BitData.BitServerHandshake; import org.apache.drill.exec.proto.BitData.RpcType; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.UserBitShared.RpcChannel; -import org.apache.drill.exec.rpc.BasicClient; -import org.apache.drill.exec.rpc.ProtobufLengthDecoder; -import org.apache.drill.exec.rpc.Response; -import org.apache.drill.exec.rpc.RpcConnectionHandler; -import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.*; import org.apache.drill.exec.rpc.control.ControlProtobufLengthDecoder; import org.apache.drill.exec.server.BootStrapContext; @@ -95,6 +91,6 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl @Override public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) { - return new DataProtobufLengthDecoder(allocator); + return new DataProtobufLengthDecoder(allocator, OutOfMemoryHandler.DEFAULT_INSTANCE); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataProtobufLengthDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataProtobufLengthDecoder.java index d4391480b..b648c72de 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataProtobufLengthDecoder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataProtobufLengthDecoder.java @@ -23,12 +23,13 @@ import io.netty.channel.ChannelHandlerContext; import java.util.List; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.rpc.OutOfMemoryHandler; import org.apache.drill.exec.rpc.ProtobufLengthDecoder; public class DataProtobufLengthDecoder extends ProtobufLengthDecoder{ - public DataProtobufLengthDecoder(BufferAllocator allocator) { - super(allocator); + public DataProtobufLengthDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) { + super(allocator, outOfMemoryHandler); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java index 3dd7912cd..7354d72f9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java @@ -22,16 +22,15 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.util.concurrent.GenericFutureListener; +import org.apache.drill.exec.exception.FragmentSetupException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.BitData.BitClientHandshake; import org.apache.drill.exec.proto.BitData.BitServerHandshake; import org.apache.drill.exec.proto.BitData.FragmentRecordBatch; import org.apache.drill.exec.proto.BitData.RpcType; import org.apache.drill.exec.proto.UserBitShared.RpcChannel; -import org.apache.drill.exec.rpc.BasicServer; -import org.apache.drill.exec.rpc.ProtobufLengthDecoder; -import org.apache.drill.exec.rpc.Response; -import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.record.RawFragmentBatch; +import org.apache.drill.exec.rpc.*; import org.apache.drill.exec.rpc.control.WorkEventBus; import org.apache.drill.exec.server.BootStrapContext; @@ -44,6 +43,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> { private final BootStrapContext context; private final WorkEventBus workBus; private final DataResponseHandler dataHandler; + private BitServerConnection connection; public DataServer(BootStrapContext context, WorkEventBus workBus, DataResponseHandler dataHandler) { super(DataRpcConfig.MAPPING, context.getAllocator().getUnderlyingAllocator(), context.getBitLoopGroup()); @@ -65,7 +65,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> { @Override public BitServerConnection initRemoteConnection(Channel channel) { - return new BitServerConnection(channel, context.getAllocator()); + return connection = new BitServerConnection(channel, context.getAllocator()); } @Override @@ -89,6 +89,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> { }; } + @Override protected Response handle(BitServerConnection connection, int rpcType, ByteBuf pBody, ByteBuf body) throws RpcException { assert rpcType == RpcType.REQ_RECORD_BATCH_VALUE; @@ -121,8 +122,26 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> { } + private final static FragmentRecordBatch OOM_FRAGMENT = FragmentRecordBatch.newBuilder().setIsOutOfMemory(true).build(); + + @Override + public OutOfMemoryHandler getOutOfMemoryHandler() { + return new OutOfMemoryHandler() { + @Override + public void handle() { + try { + logger.debug("Setting autoRead false"); + connection.getFragmentManager().setAutoRead(false); + connection.getFragmentManager().handle(new RawFragmentBatch(connection, OOM_FRAGMENT, null)); + } catch (FragmentSetupException e) { + throw new RuntimeException(); + } + } + }; + } + @Override - public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) { - return new DataProtobufLengthDecoder(allocator); + public ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) { + return new DataProtobufLengthDecoder(allocator, outOfMemoryHandler); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java index 5345b31a8..37d8d676a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java @@ -67,23 +67,26 @@ public class QueryResultHandler { failAll(); } } - + if(failed){ l.submissionFailed(new RpcException("Remote failure while running query." + batch.getHeader().getErrorList())); resultsListener.remove(result.getQueryId(), l); }else{ + try { l.resultArrived(batch, throttle); + } catch (Exception e) { + batch.release(); + l.submissionFailed(new RpcException(e)); + } } if ( (failed || result.getIsLastChunk()) - && + && (!(l instanceof BufferingListener) || ((BufferingListener)l).output != null) ) { resultsListener.remove(result.getQueryId(), l); } - - } private void failAll() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java index 50d456df0..f497d39b2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java @@ -30,11 +30,7 @@ import org.apache.drill.exec.proto.UserProtos.RpcType; import org.apache.drill.exec.proto.UserProtos.RunQuery; import org.apache.drill.exec.proto.UserProtos.UserProperties; import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake; -import org.apache.drill.exec.rpc.BasicClientWithConnection; -import org.apache.drill.exec.rpc.ProtobufLengthDecoder; -import org.apache.drill.exec.rpc.Response; -import org.apache.drill.exec.rpc.RpcConnectionHandler; -import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.*; import com.google.protobuf.MessageLite; @@ -104,7 +100,7 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand @Override public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) { - return new UserProtobufLengthDecoder(allocator); + return new UserProtobufLengthDecoder(allocator, OutOfMemoryHandler.DEFAULT_INSTANCE); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserProtobufLengthDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserProtobufLengthDecoder.java index 680a07d49..99e77774c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserProtobufLengthDecoder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserProtobufLengthDecoder.java @@ -23,12 +23,13 @@ import io.netty.channel.ChannelHandlerContext; import java.util.List; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.rpc.OutOfMemoryHandler; import org.apache.drill.exec.rpc.ProtobufLengthDecoder; public class UserProtobufLengthDecoder extends ProtobufLengthDecoder{ - public UserProtobufLengthDecoder(BufferAllocator allocator) { - super(allocator); + public UserProtobufLengthDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) { + super(allocator, outOfMemoryHandler); } protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java index ae4b01a53..acd841202 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java @@ -34,13 +34,7 @@ import org.apache.drill.exec.proto.UserProtos.RpcType; import org.apache.drill.exec.proto.UserProtos.RunQuery; import org.apache.drill.exec.proto.UserProtos.UserProperties; import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake; -import org.apache.drill.exec.rpc.Acks; -import org.apache.drill.exec.rpc.BasicServer; -import org.apache.drill.exec.rpc.ProtobufLengthDecoder; -import org.apache.drill.exec.rpc.RemoteConnection; -import org.apache.drill.exec.rpc.Response; -import org.apache.drill.exec.rpc.RpcException; -import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.*; import org.apache.drill.exec.work.user.UserWorker; import com.google.protobuf.InvalidProtocolBufferException; @@ -150,7 +144,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec } @Override - public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) { - return new UserProtobufLengthDecoder(allocator); + public ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) { + return new UserProtobufLengthDecoder(allocator, outOfMemoryHandler); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java index 102201678..fb8a014b2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java @@ -22,6 +22,9 @@ import org.apache.drill.exec.physical.impl.OutputMutator; public interface RecordReader { + public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024; + public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000; + /** * Configure the RecordReader with the provided schema and the record batch that should be written to. * diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java index 2c5ef42c7..1d6aa4d15 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/VectorHolder.java @@ -25,16 +25,27 @@ public class VectorHolder { private int length; private ValueVector vector; private int currentLength; + private boolean repeated; public VectorHolder(int length, ValueVector vector) { this.length = length; this.vector = vector; + if (vector instanceof RepeatedFixedWidthVector || vector instanceof RepeatedVariableWidthVector) { + repeated = true; + } } public VectorHolder(ValueVector vector) { this.length = vector.getValueCapacity(); this.vector = vector; + if (vector instanceof RepeatedFixedWidthVector || vector instanceof RepeatedVariableWidthVector) { + repeated = true; + } + } + + public boolean isRepeated() { + return repeated; } public ValueVector getValueVector() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java index 7ae10f814..67502ef9c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java @@ -153,7 +153,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements } } - return new ScanBatch(context, readers.iterator(), partitionColumns, selectedPartitionColumns); + return new ScanBatch(scan, context, readers.iterator(), partitionColumns, selectedPartitionColumns); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java index fb16edfa6..84587a946 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java @@ -32,6 +32,6 @@ public class DirectBatchCreator implements BatchCreator<DirectSubScan>{ @Override public RecordBatch getBatch(FragmentContext context, DirectSubScan config, List<RecordBatch> children) throws ExecutionSetupException { - return new ScanBatch(context, Collections.singleton(config.getReader()).iterator()); + return new ScanBatch(config, context, Collections.singleton(config.getReader()).iterator()); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java index 2e8cd2ede..1c8539cc0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java @@ -40,6 +40,7 @@ import org.apache.drill.exec.expr.holders.NullableBitHolder; import org.apache.drill.exec.expr.holders.NullableFloat4Holder; import org.apache.drill.exec.expr.holders.NullableIntHolder; import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; @@ -65,7 +66,7 @@ import com.google.common.collect.Maps; public class JSONRecordReader implements RecordReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JSONRecordReader.class); - private static final int DEFAULT_LENGTH = 256 * 1024; // 256kb + private static final int DEFAULT_LENGTH = 4000; public static final Charset UTF_8 = Charset.forName("UTF-8"); private final Map<String, VectorHolder> valueVectorMap; @@ -78,22 +79,20 @@ public class JSONRecordReader implements RecordReader { private RecordSchema currentSchema; private List<Field> removedFields; private OutputMutator outputMutator; - private BufferAllocator allocator; private int batchSize; private final List<SchemaPath> columns; public JSONRecordReader(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem, int batchSize, - List<SchemaPath> columns) { + List<SchemaPath> columns) throws OutOfMemoryException { this.hadoopPath = new Path(inputPath); this.fileSystem = fileSystem; - this.allocator = fragmentContext.getAllocator(); this.batchSize = batchSize; valueVectorMap = Maps.newHashMap(); this.columns = columns; } public JSONRecordReader(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem, - List<SchemaPath> columns) { + List<SchemaPath> columns) throws OutOfMemoryException { this(fragmentContext, inputPath, fileSystem, DEFAULT_LENGTH, columns); } @@ -162,7 +161,10 @@ public class JSONRecordReader implements RecordReader { } for (VectorHolder holder : valueVectorMap.values()) { - holder.populateVectorLength(); + if (holder.isRepeated()) { + holder.setGroupCount(nextRowIndex); + } + holder.getValueVector().getMutator().setValueCount(nextRowIndex); } return nextRowIndex; @@ -200,10 +202,6 @@ public class JSONRecordReader implements RecordReader { return removedFields; } - public BufferAllocator getAllocator() { - return allocator; - } - private boolean fieldSelected(String field){ SchemaPath sp = SchemaPath.getCompoundPath(field.split("\\.")); @@ -523,11 +521,10 @@ public class JSONRecordReader implements RecordReader { MaterializedField f = MaterializedField.create(SchemaPath.getCompoundPath(fullFieldName.split("\\.")), type); - ValueVector v = TypeHelper.getNewVector(f, allocator); + ValueVector v = outputMutator.addField(f, TypeHelper.getValueVectorClass(minorType, type.getMode())); AllocationHelper.allocate(v, batchSize, 50); holder = new VectorHolder(v); valueVectorMap.put(fullFieldName, holder); - outputMutator.addField(v); return holder; } return holder; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java index 07e0cbe91..2544d2b6f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java @@ -23,11 +23,14 @@ import java.util.Properties; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.Types; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; @@ -209,10 +212,10 @@ public class HiveRecordReader implements RecordReader { try { for (int i = 0; i < columnNames.size(); i++) { PrimitiveCategory pCat = primitiveCategories.get(i); - MaterializedField field = MaterializedField.create(SchemaPath.getSimplePath(columnNames.get(i)), getMajorType(pCat)); - ValueVector vv = TypeHelper.getNewVector(field, context.getAllocator()); + MajorType type = getMajorType(pCat); + MaterializedField field = MaterializedField.create(SchemaPath.getSimplePath(columnNames.get(i)), type); + ValueVector vv = output.addField(field, TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode())); vectors.add(vv); - output.addField(vv); } for (int i = 0; i < selectedPartitionNames.size(); i++) { String type = selectedPartitionTypes.get(i); @@ -249,7 +252,7 @@ public class HiveRecordReader implements RecordReader { TinyIntVector v = (TinyIntVector) vector; byte value = (byte) val; for (int j = 0; j < recordCount; j++) { - v.getMutator().set(j, value); + v.getMutator().setSafe(j, value); } break; } @@ -257,7 +260,7 @@ public class HiveRecordReader implements RecordReader { Float8Vector v = (Float8Vector) vector; double value = (double) val; for (int j = 0; j < recordCount; j++) { - v.getMutator().set(j, value); + v.getMutator().setSafe(j, value); } break; } @@ -265,7 +268,7 @@ public class HiveRecordReader implements RecordReader { Float4Vector v = (Float4Vector) vector; float value = (float) val; for (int j = 0; j < recordCount; j++) { - v.getMutator().set(j, value); + v.getMutator().setSafe(j, value); } break; } @@ -273,7 +276,7 @@ public class HiveRecordReader implements RecordReader { IntVector v = (IntVector) vector; int value = (int) val; for (int j = 0; j < recordCount; j++) { - v.getMutator().set(j, value); + v.getMutator().setSafe(j, value); } break; } @@ -281,7 +284,7 @@ public class HiveRecordReader implements RecordReader { BigIntVector v = (BigIntVector) vector; long value = (long) val; for (int j = 0; j < recordCount; j++) { - v.getMutator().set(j, value); + v.getMutator().setSafe(j, value); } break; } @@ -289,7 +292,7 @@ public class HiveRecordReader implements RecordReader { SmallIntVector v = (SmallIntVector) vector; short value = (short) val; for (int j = 0; j < recordCount; j++) { - v.getMutator().set(j, value); + v.getMutator().setSafe(j, value); } break; } @@ -297,7 +300,7 @@ public class HiveRecordReader implements RecordReader { VarCharVector v = (VarCharVector) vector; byte[] value = (byte[]) val; for (int j = 0; j < recordCount; j++) { - v.getMutator().set(j, value); + v.getMutator().setSafe(j, value); } break; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java index b155661ed..62f2ec736 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScanBatchCreator.java @@ -66,6 +66,6 @@ public class HiveScanBatchCreator implements BatchCreator<HiveSubScan> { } } } - return new ScanBatch(context, readers.iterator()); + return new ScanBatch(config, context, readers.iterator()); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java index f1821965e..125ee13a6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java @@ -80,8 +80,7 @@ public class HiveTextRecordReader extends HiveRecordReader { for (int i = start; (b = bytes[i]) != delimiter; i++) { value = (value * 10) + b - 48; } - ((NullableIntVector) vv).getMutator().set(index, value); // No need to use setSafe for fixed length vectors - return true; + return ((NullableIntVector) vv).getMutator().setSafe(index, value); } case LONG: { long value = 0; @@ -89,8 +88,7 @@ public class HiveTextRecordReader extends HiveRecordReader { for (int i = start; (b = bytes[i]) != delimiter; i++) { value = (value * 10) + b - 48; } - ((NullableBigIntVector) vv).getMutator().set(index, value); // No need to use setSafe for fixed length vectors - return true; + return ((NullableBigIntVector) vv).getMutator().setSafe(index, value); } case SHORT: throw new UnsupportedOperationException(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java index a40024529..a7e814638 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java @@ -33,6 +33,6 @@ public class InfoSchemaBatchCreator implements BatchCreator<InfoSchemaSubScan>{ @Override public RecordBatch getBatch(FragmentContext context, InfoSchemaSubScan config, List<RecordBatch> children) throws ExecutionSetupException { RecordReader rr = new RowRecordReader(context, config.getTable(), context.getRootSchema()); - return new ScanBatch(context, Collections.singleton(rr).iterator()); + return new ScanBatch(config, context, Collections.singleton(rr).iterator()); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java index 5d723dc99..ac601d43f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/RowRecordReader.java @@ -23,6 +23,8 @@ import net.hydromatic.optiq.SchemaPlus; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.store.RecordReader; @@ -38,9 +40,13 @@ import org.apache.drill.exec.vector.ValueVector; public class RowRecordReader implements RecordReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RowRecordReader.class); + public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024; + public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000; + protected final VectorSet batch; protected final RowProvider provider; protected final FragmentContext context; + protected final BufferAllocator allocator; protected OutputMutator output; private int bufSize = 256*1024; @@ -50,14 +56,16 @@ public class RowRecordReader implements RecordReader { * @param context * @param vectors */ - public RowRecordReader(FragmentContext context, VectorSet batch, RowProvider provider) { + public RowRecordReader(FragmentContext context, VectorSet batch, RowProvider provider) throws OutOfMemoryException { this.context = context; + this.allocator = context.getNewChildAllocator(ALLOCATOR_INITIAL_RESERVATION, ALLOCATOR_MAX_RESERVATION); this.provider = provider; this.batch = batch; } - public RowRecordReader(FragmentContext context, SelectedTable table, SchemaPlus rootSchema){ + public RowRecordReader(FragmentContext context, SelectedTable table, SchemaPlus rootSchema) throws OutOfMemoryException { this.context = context; + this.allocator = context.getNewChildAllocator(ALLOCATOR_INITIAL_RESERVATION, ALLOCATOR_MAX_RESERVATION); this.provider = table.getProvider(rootSchema); this.batch = table.getFixedTable(); } @@ -68,7 +76,7 @@ public class RowRecordReader implements RecordReader { @Override public void setup(OutputMutator output) throws ExecutionSetupException { this.output = output; - batch.createVectors(context.getAllocator()); + batch.createVectors(allocator); // Inform drill of the output columns. They were set up when the vector handler was created. // Note we are currently working with fixed tables. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java index 7a2ed1b9d..eb9e7a693 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java @@ -23,6 +23,8 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; @@ -38,11 +40,12 @@ public class MockRecordReader implements RecordReader { private OutputMutator output; private MockScanEntry config; private FragmentContext context; + private BufferAllocator alcator; private ValueVector[] valueVectors; private int recordsRead; private int batchRecordCount; - public MockRecordReader(FragmentContext context, MockScanEntry config) { + public MockRecordReader(FragmentContext context, MockScanEntry config) throws OutOfMemoryException { this.context = context; this.config = config; } @@ -55,14 +58,11 @@ public class MockRecordReader implements RecordReader { return x; } - private ValueVector getVector(String name, MajorType type, int length) { + private MaterializedField getVector(String name, MajorType type, int length) { assert context != null : "Context shouldn't be null."; MaterializedField f = MaterializedField.create(SchemaPath.getSimplePath(name), type); - ValueVector v; - v = TypeHelper.getNewVector(f, context.getAllocator()); - AllocationHelper.allocate(v, length, 50, 4); - return v; + return f; } @@ -75,8 +75,8 @@ public class MockRecordReader implements RecordReader { batchRecordCount = 250000 / estimateRowSize; for (int i = 0; i < config.getTypes().length; i++) { - valueVectors[i] = getVector(config.getTypes()[i].getName(), config.getTypes()[i].getMajorType(), batchRecordCount); - output.addField(valueVectors[i]); + MajorType type = config.getTypes()[i].getMajorType(); + valueVectors[i] = output.addField(getVector(config.getTypes()[i].getName(), type, batchRecordCount), TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode())); } output.setNewSchema(); } catch (SchemaChangeException e) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java index 5c51a5af6..0bfd03886 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java @@ -41,6 +41,6 @@ public class MockScanBatchCreator implements BatchCreator<MockSubScanPOP>{ for(MockScanEntry e : entries){ readers.add(new MockRecordReader(context, e)); } - return new ScanBatch(context, readers.iterator()); + return new ScanBatch(config, context, readers.iterator()); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java index 4c060f2b7..16c27159c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/NullableBitReader.java @@ -52,8 +52,10 @@ final class NullableBitReader extends ColumnReader { defLevel = pageReadStatus.definitionLevels.readInteger(); // if the value is defined if (defLevel == columnDescriptor.getMaxDefinitionLevel()){ - ((NullableBitVector)valueVecHolder.getValueVector()).getMutator().set(i + valuesReadInCurrentPass, - pageReadStatus.valueReader.readBoolean() ? 1 : 0 ); + if (!((NullableBitVector)valueVecHolder.getValueVector()).getMutator().setSafe(i + valuesReadInCurrentPass, + pageReadStatus.valueReader.readBoolean() ? 1 : 0 )) { + throw new RuntimeException(); + } } // otherwise the value is skipped, because the bit vector indicating nullability is zero filled } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java index 6e17fbae9..9acb55745 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordReader.java @@ -29,6 +29,7 @@ import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.Types; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; @@ -77,7 +78,6 @@ class ParquetRecordReader implements RecordReader { private List<ColumnReader> columnStatuses; FileSystem fileSystem; - BufferAllocator allocator; private long batchSize; Path hadoopPath; private VarLenBinaryReader varLengthReader; @@ -107,7 +107,6 @@ class ParquetRecordReader implements RecordReader { String path, int rowGroupIndex, FileSystem fs, CodecFactoryExposer codecFactoryExposer, ParquetMetadata footer, List<SchemaPath> columns) throws ExecutionSetupException { - this.allocator = fragmentContext.getAllocator(); hadoopPath = new Path(path); fileSystem = fs; this.codecFactoryExposer = codecFactoryExposer; @@ -214,13 +213,13 @@ class ParquetRecordReader implements RecordReader { for (int i = 0; i < columns.size(); ++i) { column = columns.get(i); columnChunkMetaData = footer.getBlocks().get(0).getColumns().get(i); - field = MaterializedField.create(toFieldName(column.getPath()), - toMajorType(column.getType(), getDataMode(column))); + MajorType type = toMajorType(column.getType(), getDataMode(column)); + field = MaterializedField.create(toFieldName(column.getPath()), type); // the field was not requested to be read if ( ! fieldSelected(field)) continue; fieldFixedLength = column.getType() != PrimitiveType.PrimitiveTypeName.BINARY; - ValueVector v = TypeHelper.getNewVector(field, allocator); + ValueVector v = output.addField(field, TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode())); if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) { createFixedColumnReader(fieldFixedLength, column, columnChunkMetaData, recordsPerBatch, v); } else { @@ -237,17 +236,8 @@ class ParquetRecordReader implements RecordReader { throw new ExecutionSetupException(e); } - output.removeAllFields(); +// output.removeAllFields(); try { - for (ColumnReader crs : columnStatuses) { - output.addField(crs.valueVecHolder.getValueVector()); - } - for (VarLenBinaryReader.VarLengthColumn r : varLengthReader.columns) { - output.addField(r.valueVecHolder.getValueVector()); - } - for (VarLenBinaryReader.NullableVarLengthColumn r : varLengthReader.nullableColumns) { - output.addField(r.valueVecHolder.getValueVector()); - } output.setNewSchema(); }catch(SchemaChangeException e) { throw new ExecutionSetupException("Error setting up output mutator.", e); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java index 6278a7974..df6581fb8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java @@ -123,6 +123,6 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan } } - return new ScanBatch(context, readers.iterator(), partitionColumns, selectedPartitionColumns); + return new ScanBatch(rowGroupScan, context, readers.iterator(), partitionColumns, selectedPartitionColumns); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java index e9e54f00f..86aec44c8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/AllocationHelper.java @@ -32,7 +32,7 @@ public class AllocationHelper { }else if(v instanceof RepeatedFixedWidthVector){ ((RepeatedFixedWidthVector) v).allocateNew(valueCount, valueCount * repeatedPerTop); }else if(v instanceof RepeatedVariableWidthVector){ - ((RepeatedVariableWidthVector) v).allocateNew(valueCount * bytesPerValue, valueCount, valueCount * repeatedPerTop); + ((RepeatedVariableWidthVector) v).allocateNew(valueCount * bytesPerValue * repeatedPerTop, valueCount, valueCount * repeatedPerTop); }else{ throw new UnsupportedOperationException(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java index 619fdad38..155d7d6cb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java @@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.exec.expr.holders.BitHolder; import org.apache.drill.exec.expr.holders.NullableBitHolder; +import org.apache.drill.exec.memory.AccountingByteBuf; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.UserBitShared.FieldMetadata; import org.apache.drill.exec.record.MaterializedField; @@ -38,6 +39,9 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe private final Accessor accessor = new Accessor(); private final Mutator mutator = new Mutator(); + private int allocationValueCount = 4000; + private int allocationMonitor = 0; + private int valueCapacity; public BitVector(MaterializedField field, BufferAllocator allocator) { @@ -57,6 +61,19 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe return (int) Math.ceil((float)valueCount / 8.0); } + private int getByteIndex(int index) { + return (int) Math.floor((float) index / 8.0); + } + + public void allocateNew() { + clear(); + if (allocationMonitor > 5) { + allocationValueCount = Math.min(1, (int)(allocationValueCount * 0.9)); + } else if (allocationMonitor < -5) { + allocationValueCount = (int) (allocationValueCount * 1.1); + } + } + /** * Allocate a new memory space for this vector. Must be called prior to using the ValueVector. * @@ -132,6 +149,37 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe clear(); } + public void splitAndTransferTo(int startIndex, int length, BitVector target) { + assert startIndex + length <= valueCount; + int firstByte = getByteIndex(startIndex); + int lastByte = getSizeFromCount(startIndex + length) - 1; + int offset = startIndex % 8; + if (offset == 0) { + // slice + target.data = this.data.slice(firstByte, lastByte - firstByte + 1); + target.data.retain(); + } else { + // Copy data + target.clear(); + target.allocateNew(length); + if ((startIndex + length) % 8 == 0) { + lastByte++; + } + int i = firstByte; + // TODO maybe do this one word at a time, rather than byte? + for (; i <= lastByte - 1; i++) { + target.data.setByte(i - firstByte, (((this.data.getByte(i) & 0xFF) >>> offset) + (this.data.getByte(i + 1) << (8 - offset)))); + } + if (startIndex + length == this.valueCount) { + target.data.setByte(i - firstByte, ((this.data.getByte(lastByte) & 0xFF) >>> offset)); + } + } + } + + private void copyTo(int startIndex, int length, BitVector target) { + + } + private class TransferImpl implements TransferPair { BitVector to; @@ -151,6 +199,10 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe transferTo(to); } + public void splitAndTransfer(int startIndex, int length) { + splitAndTransferTo(startIndex, length, to); + } + @Override public void copyValue(int fromIndex, int toIndex) { to.copyFrom(fromIndex, toIndex, BitVector.this); @@ -237,7 +289,10 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe } public boolean setSafe(int index, int value) { - if(index >= getValueCapacity()) return false; + if(index >= getValueCapacity()) { + allocationMonitor--; + return false; + } set(index, value); return true; } @@ -256,7 +311,15 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe public final void setValueCount(int valueCount) { BitVector.this.valueCount = valueCount; - data.writerIndex(getSizeFromCount(valueCount)); + int idx = getSizeFromCount(valueCount); + if (((float) data.capacity()) / idx > 1.1) { + allocationMonitor++; + } + data.writerIndex(idx); + if (data instanceof AccountingByteBuf) { + data.capacity(idx); + data.writerIndex(idx); + } } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java index 24e3473ef..258b354b3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ValueVector.java @@ -33,6 +33,11 @@ import org.apache.drill.exec.record.TransferPair; */ public interface ValueVector extends Closeable { + /** + * Allocate new buffers. ValueVector implements logic to determine how much to allocate. + */ + public void allocateNew(); + public int getBufferSize(); /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java index b6d441cff..27f82216c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java @@ -22,11 +22,7 @@ import java.io.IOException; import java.util.Collections; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.cache.DistributedCache; @@ -71,7 +67,7 @@ public class WorkManager implements Closeable{ private final UserWorker userWorker; private final WorkerBee bee; private final WorkEventBus workBus; - private Executor executor; + private ExecutorService executor; private final EventThread eventThread; public WorkManager(BootStrapContext context){ @@ -108,6 +104,11 @@ public class WorkManager implements Closeable{ @Override public void close() throws IOException { + try { + executor.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + logger.warn("Executor interrupted while awaiting termination"); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java index 30e6df285..76db1ed4e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java @@ -81,6 +81,11 @@ public abstract class AbstractDataCollector implements DataCollector{ public abstract void streamFinished(int minorFragmentId); public boolean batchArrived(int minorFragmentId, RawFragmentBatch batch) throws IOException { + if (batch.getHeader().getIsOutOfMemory()) { + for (RawBatchBuffer buffer : buffers) { + buffer.enqueue(batch); + } + } boolean decremented = false; if (remainders.compareAndSet(minorFragmentId, 0, 1)) { int rem = remainingRequired.decrementAndGet(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java index 3cb18b631..9b3b870d3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/IncomingBuffers.java @@ -64,6 +64,16 @@ public class IncomingBuffers { public boolean batchArrived(RawFragmentBatch batch) throws FragmentSetupException { // no need to do anything if we've already enabled running. // logger.debug("New Batch Arrived {}", batch); + if (batch.getHeader().getIsOutOfMemory()) { + for (DataCollector fSet : fragCounts.values()) { + try { + fSet.batchArrived(0, batch); + } catch (IOException e) { + throw new RuntimeException(); + } + } + return false; + } if(batch.getHeader().getIsLastBatch()){ streamsRemaining.decrementAndGet(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java index afac86fbf..c8a15258d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java @@ -29,13 +29,17 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.proto.BitData; import org.apache.drill.exec.proto.ExecProtos; import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.record.RawFragmentBatch; import org.apache.drill.exec.rpc.RemoteConnection; +import org.apache.drill.exec.rpc.data.BitServerConnection; import org.apache.drill.exec.store.LocalSyncableFileSystem; +import org.apache.drill.exec.work.fragment.FragmentManager; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -55,20 +59,27 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer { private static String DRILL_LOCAL_IMPL_STRING = "fs.drill-local.impl"; private static final float STOP_SPOOLING_FRACTION = (float) 0.5; + public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024; + public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000; private final LinkedBlockingDeque<RawFragmentBatchWrapper> buffer = Queues.newLinkedBlockingDeque(); private volatile boolean finished = false; private volatile long queueSize = 0; private long threshold; private FragmentContext context; + private BufferAllocator allocator; private volatile AtomicBoolean spooling = new AtomicBoolean(false); private FileSystem fs; private Path path; private FSDataOutputStream outputStream; private FSDataInputStream inputStream; + private boolean outOfMemory = false; + private boolean closed = false; + private FragmentManager fragmentManager; - public SpoolingRawBatchBuffer(FragmentContext context) throws IOException { + public SpoolingRawBatchBuffer(FragmentContext context) throws IOException, OutOfMemoryException { this.context = context; + this.allocator = context.getNewChildAllocator(ALLOCATOR_INITIAL_RESERVATION, ALLOCATOR_MAX_RESERVATION); this.threshold = context.getConfig().getLong(ExecConstants.SPOOLING_BUFFER_MEMORY); Configuration conf = new Configuration(); conf.set(FileSystem.FS_DEFAULT_NAME_KEY, context.getConfig().getString(ExecConstants.TEMP_FILESYSTEM)); @@ -86,6 +97,20 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer { @Override public synchronized void enqueue(RawFragmentBatch batch) throws IOException { + if (batch.getHeader().getIsOutOfMemory()) { + if (fragmentManager == null) { + fragmentManager = ((BitServerConnection) batch.getConnection()).getFragmentManager(); + } +// fragmentManager.setAutoRead(false); +// logger.debug("Setting autoRead false"); + if (!outOfMemory && !buffer.peekFirst().isOutOfMemory()) { + logger.debug("Adding OOM message to front of queue. Current queue size: {}", buffer.size()); + buffer.addFirst(new RawFragmentBatchWrapper(batch, true)); + } else { + logger.debug("ignoring duplicate OOM message"); + } + return; + } RawFragmentBatchWrapper wrapper; boolean spool = spooling.get(); wrapper = new RawFragmentBatchWrapper(batch, !spool); @@ -105,7 +130,7 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer { @Override public void kill(FragmentContext context) { - cleanup(); + allocator.close(); } @@ -116,6 +141,11 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer { @Override public RawFragmentBatch getNext() throws IOException { + if (outOfMemory && buffer.size() < 10) { + outOfMemory = false; + fragmentManager.setAutoRead(true); + logger.debug("Setting autoRead true"); + } boolean spool = spooling.get(); RawFragmentBatchWrapper w = buffer.poll(); RawFragmentBatch batch; @@ -123,21 +153,27 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer { try { w = buffer.take(); batch = w.get(); + if (batch.getHeader().getIsOutOfMemory()) { + outOfMemory = true; + return batch; + } queueSize -= w.getBodySize(); return batch; } catch (InterruptedException e) { - cleanup(); return null; } } if (w == null) { - cleanup(); return null; } batch = w.get(); + if (batch.getHeader().getIsOutOfMemory()) { + outOfMemory = true; + return batch; + } queueSize -= w.getBodySize(); - assert queueSize >= 0; +// assert queueSize >= 0; if (spool && queueSize < threshold * STOP_SPOOLING_FRACTION) { logger.debug("buffer size {} less than {}x threshold. Stop spooling.", queueSize, STOP_SPOOLING_FRACTION); spooling.set(false); @@ -145,7 +181,13 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer { return batch; } - private void cleanup() { + public void cleanup() { + if (closed) { + logger.warn("Tried cleanup twice"); + return; + } + closed = true; + allocator.close(); try { if (outputStream != null) { outputStream.close(); @@ -171,6 +213,7 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer { private boolean available; private CountDownLatch latch = new CountDownLatch(1); private int bodyLength; + private boolean outOfMemory = false; public RawFragmentBatchWrapper(RawFragmentBatch batch, boolean available) { Preconditions.checkNotNull(batch); @@ -225,7 +268,7 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer { Stopwatch watch = new Stopwatch(); watch.start(); BitData.FragmentRecordBatch header = BitData.FragmentRecordBatch.parseDelimitedFrom(stream); - ByteBuf buf = context.getAllocator().buffer(bodyLength); + ByteBuf buf = allocator.buffer(bodyLength); buf.writeBytes(stream, bodyLength); batch = new RawFragmentBatch(null, header, buf); available = true; @@ -233,6 +276,14 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer { long t = watch.elapsed(TimeUnit.MICROSECONDS); logger.debug("Took {} us to read {} from disk. Rate {} mb/s", t, bodyLength, bodyLength / t); } + + private boolean isOutOfMemory() { + return outOfMemory; + } + + private void setOutOfMemory(boolean outOfMemory) { + this.outOfMemory = outOfMemory; + } } private String getFileName() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java index 97d8d34ce..4853d329a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java @@ -52,6 +52,11 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ } @Override + public void cleanup() { + + } + + @Override public void kill(FragmentContext context) { while(!buffer.isEmpty()){ RawFragmentBatch batch = buffer.poll(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java index 7d92c9a64..0a4b2350f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentManager.java @@ -21,6 +21,7 @@ import org.apache.drill.exec.exception.FragmentSetupException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.RawFragmentBatch; +import org.apache.drill.exec.rpc.RemoteConnection; /** * The Fragment Manager is responsible managing incoming data and executing a fragment. Once enough data and resources @@ -51,4 +52,12 @@ public interface FragmentManager { public abstract FragmentHandle getHandle(); public abstract FragmentContext getFragmentContext(); + + public abstract void addConnection(RemoteConnection connection); + + /** + * Sets autoRead property on all connections + * @param autoRead + */ + public abstract void setAutoRead(boolean autoRead); }
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java index d82c1c001..c8f20212e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java @@ -18,6 +18,8 @@ package org.apache.drill.exec.work.fragment; import java.io.IOException; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.exception.FragmentSetupException; @@ -31,6 +33,7 @@ import org.apache.drill.exec.planner.PhysicalPlanReader; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.RawFragmentBatch; +import org.apache.drill.exec.rpc.RemoteConnection; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.work.batch.IncomingBuffers; @@ -46,6 +49,7 @@ public class NonRootFragmentManager implements FragmentManager { private volatile boolean cancel = false; private final FragmentContext context; private final PhysicalPlanReader reader; + private List<RemoteConnection> connections = new CopyOnWriteArrayList<>(); public NonRootFragmentManager(PlanFragment fragment, DrillbitContext context) throws FragmentSetupException{ try{ @@ -118,8 +122,17 @@ public class NonRootFragmentManager implements FragmentManager { public FragmentContext getFragmentContext() { return context; } - - - + @Override + public void addConnection(RemoteConnection connection) { + connections.add(connection); + } + + @Override + public void setAutoRead(boolean autoRead) { + for (RemoteConnection c : connections) { + c.setAutoRead(autoRead); + } + } + }
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java index 40f4d2b30..c763d5568 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java @@ -21,8 +21,12 @@ import org.apache.drill.exec.exception.FragmentSetupException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.RawFragmentBatch; +import org.apache.drill.exec.rpc.RemoteConnection; import org.apache.drill.exec.work.batch.IncomingBuffers; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + public class RootFragmentManager implements FragmentManager{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootFragmentManager.class); @@ -30,6 +34,7 @@ public class RootFragmentManager implements FragmentManager{ private final FragmentExecutor runner; private final FragmentHandle handle; private volatile boolean cancel = false; + private List<RemoteConnection> connections = new CopyOnWriteArrayList<>(); public RootFragmentManager(FragmentHandle handle, IncomingBuffers buffers, FragmentExecutor runner) { super(); @@ -66,7 +71,17 @@ public class RootFragmentManager implements FragmentManager{ public FragmentContext getFragmentContext() { return runner.getContext(); } - - - + + @Override + public void addConnection(RemoteConnection connection) { + connections.add(connection); + } + + @Override + public void setAutoRead(boolean autoRead) { + for (RemoteConnection c : connections) { + c.setAutoRead(autoRead); + } + } + } |