diff options
author | Jacques Nadeau <jacques@apache.org> | 2015-04-18 16:40:02 -0700 |
---|---|---|
committer | Jacques Nadeau <jacques@apache.org> | 2015-05-02 19:33:54 -0700 |
commit | 88bb05194b023467d590ac747ec5fa14d04249f5 (patch) | |
tree | 5be7744f49f6b22f501bc681d965c780d88b69e3 /exec/java-exec/src | |
parent | 636177df12c593368676d42bc17b65d684c8d000 (diff) |
DRILL-2826: Simplify and centralize Operator Cleanup
- Remove cleanup method from RecordBatch interface
- Make OperatorContext creation and closing the management of FragmentContext
- Make OperatorContext an abstract class and the impl only available to FragmentContext
- Make RecordBatch closing the responsibility of the RootExec
- Make all closes be suppresing closes to maximize memory release in failure
- Add new CloseableRecordBatch interface used by RootExec
- Make RootExec AutoCloseable
- Update RecordBatchCreator to return CloseableRecordBatches so that RootExec can maintain list
- Generate list of operators through change in ImplCreator
Diffstat (limited to 'exec/java-exec/src')
91 files changed, 489 insertions, 415 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java index d22651e23..22fcb8e08 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java @@ -118,10 +118,15 @@ public class TopLevelAllocator implements BufferAllocator { } @Override - public BufferAllocator getChildAllocator(FragmentContext context, long initialReservation, long maximumReservation, boolean applyFragmentLimit) throws OutOfMemoryException { + public BufferAllocator getChildAllocator(FragmentContext context, long initialReservation, long maximumReservation, + boolean applyFragmentLimit) { if(!acct.reserve(initialReservation)){ logger.debug(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, acct.getCapacity() - acct.getAllocation())); - throw new OutOfMemoryException(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, acct.getCapacity() - acct.getAllocation())); + throw new OutOfMemoryRuntimeException( + String + .format( + "You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", + initialReservation, acct.getCapacity() - acct.getAllocation())); }; logger.debug("New child allocator with initial reservation {}", initialReservation); ChildAllocator allocator = new ChildAllocator(context, acct, maximumReservation, initialReservation, childrenMap, applyFragmentLimit); @@ -191,7 +196,7 @@ public class TopLevelAllocator implements BufferAllocator { long pre, Map<ChildAllocator, StackTraceElement[]> map, - boolean applyFragmentLimit) throws OutOfMemoryException{ + boolean applyFragmentLimit) { assert max >= pre; this.applyFragmentLimit=applyFragmentLimit; DrillConfig drillConf = context != null ? context.getConfig() : null; @@ -240,10 +245,14 @@ public class TopLevelAllocator implements BufferAllocator { } @Override - public BufferAllocator getChildAllocator(FragmentContext context, long initialReservation, long maximumReservation, boolean applyFragmentLimit) - throws OutOfMemoryException { + public BufferAllocator getChildAllocator(FragmentContext context, long initialReservation, long maximumReservation, + boolean applyFragmentLimit) { if (!childAcct.reserve(initialReservation)) { - throw new OutOfMemoryException(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, childAcct.getAvailable())); + throw new OutOfMemoryRuntimeException( + String + .format( + "You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", + initialReservation, childAcct.getAvailable())); }; logger.debug("New child allocator with initial reservation {}", initialReservation); ChildAllocator newChildAllocator = new ChildAllocator(context, childAcct, maximumReservation, initialReservation, null, applyFragmentLimit); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java index e6d5acd97..09a75689e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java @@ -27,7 +27,6 @@ import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.jdbc.SimpleCalciteSchema; import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.ClassTransformationException; @@ -36,6 +35,7 @@ import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.OutOfMemoryException; +import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; @@ -57,6 +57,7 @@ import org.apache.drill.exec.work.batch.IncomingBuffers; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; /** @@ -66,6 +67,8 @@ public class FragmentContext implements AutoCloseable, UdfUtilities { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class); private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap(); + private final List<OperatorContextImpl> contexts = Lists.newLinkedList(); + private final DrillbitContext context; private final UserClientConnection connection; // is null if this context is for non-root fragment private final QueryContext queryContext; // is null if this context is for non-root fragment @@ -145,8 +148,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities { // Add the fragment context to the root allocator. // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments try { - allocator = context.getAllocator().getChildAllocator( - this, fragment.getMemInitial(), fragment.getMemMax(), true); + allocator = context.getAllocator().getChildAllocator(this, fragment.getMemInitial(), fragment.getMemMax(), true); Preconditions.checkNotNull(allocator, "Unable to acuqire allocator"); } catch(final Throwable e) { throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e); @@ -314,6 +316,20 @@ public class FragmentContext implements AutoCloseable, UdfUtilities { return buffers; } + public OperatorContext newOperatorContext(PhysicalOperator popConfig, OperatorStats stats, boolean applyFragmentLimit) + throws OutOfMemoryException { + OperatorContextImpl context = new OperatorContextImpl(popConfig, this, stats, applyFragmentLimit); + contexts.add(context); + return context; + } + + public OperatorContext newOperatorContext(PhysicalOperator popConfig, boolean applyFragmentLimit) + throws OutOfMemoryException { + OperatorContextImpl context = new OperatorContextImpl(popConfig, this, applyFragmentLimit); + contexts.add(context); + return context; + } + @VisibleForTesting @Deprecated public Throwable getFailureCause() { @@ -359,6 +375,12 @@ public class FragmentContext implements AutoCloseable, UdfUtilities { @Override public void close() { waitForSendComplete(); + + // close operator context + for (OperatorContextImpl opContext : contexts) { + suppressingClose(opContext); + } + suppressingClose(bufferManager); suppressingClose(buffers); suppressingClose(allocator); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java index ccafa6783..7cc52bafe 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java @@ -21,57 +21,20 @@ import io.netty.buffer.DrillBuf; import java.util.Iterator; -import org.apache.drill.common.util.Hook.Closeable; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.physical.base.PhysicalOperator; -import com.carrotsearch.hppc.LongObjectOpenHashMap; +public abstract class OperatorContext { -public class OperatorContext implements Closeable { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContext.class); + public abstract DrillBuf replace(DrillBuf old, int newSize); - private final BufferAllocator allocator; - private boolean closed = false; - private PhysicalOperator popConfig; - private OperatorStats stats; - private LongObjectOpenHashMap<DrillBuf> managedBuffers = new LongObjectOpenHashMap<>(); - private final boolean applyFragmentLimit; + public abstract DrillBuf getManagedBuffer(); - public OperatorContext(PhysicalOperator popConfig, FragmentContext context, boolean applyFragmentLimit) throws OutOfMemoryException { - this.applyFragmentLimit=applyFragmentLimit; - this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation(), applyFragmentLimit); - this.popConfig = popConfig; + public abstract DrillBuf getManagedBuffer(int size); - OpProfileDef def = new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), getChildCount(popConfig)); - this.stats = context.getStats().getOperatorStats(def, allocator); - } - - public OperatorContext(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats, boolean applyFragmentLimit) throws OutOfMemoryException { - this.applyFragmentLimit=applyFragmentLimit; - this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation(), applyFragmentLimit); - this.popConfig = popConfig; - this.stats = stats; - } - - public DrillBuf replace(DrillBuf old, int newSize) { - if (managedBuffers.remove(old.memoryAddress()) == null) { - throw new IllegalStateException("Tried to remove unmanaged buffer."); - } - old.release(); - return getManagedBuffer(newSize); - } + public abstract BufferAllocator getAllocator(); - public DrillBuf getManagedBuffer() { - return getManagedBuffer(256); - } - - public DrillBuf getManagedBuffer(int size) { - DrillBuf newBuf = allocator.buffer(size); - managedBuffers.put(newBuf.memoryAddress(), newBuf); - newBuf.setOperatorContext(this); - return newBuf; - } + public abstract OperatorStats getStats(); public static int getChildCount(PhysicalOperator popConfig) { Iterator<PhysicalOperator> iter = popConfig.iterator(); @@ -87,41 +50,4 @@ public class OperatorContext implements Closeable { return i; } - public BufferAllocator getAllocator() { - if (allocator == null) { - throw new UnsupportedOperationException("Operator context does not have an allocator"); - } - return allocator; - } - - public boolean isClosed() { - return closed; - } - - @Override - public void close() { - if (closed) { - logger.debug("Attempted to close Operator context for {}, but context is already closed", popConfig != null ? popConfig.getClass().getName() : null); - return; - } - logger.debug("Closing context for {}", popConfig != null ? popConfig.getClass().getName() : null); - - // release managed buffers. - Object[] buffers = ((LongObjectOpenHashMap<Object>)(Object)managedBuffers).values; - for (int i =0; i < buffers.length; i++) { - if (managedBuffers.allocated[i]) { - ((DrillBuf)buffers[i]).release(); - } - } - - if (allocator != null) { - allocator.close(); - } - closed = true; - } - - public OperatorStats getStats() { - return stats; - } - -} +}
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java new file mode 100644 index 000000000..6dbd880b8 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.ops; + +import io.netty.buffer.DrillBuf; + +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.OutOfMemoryException; +import org.apache.drill.exec.physical.base.PhysicalOperator; + +import com.carrotsearch.hppc.LongObjectOpenHashMap; + +class OperatorContextImpl extends OperatorContext implements AutoCloseable { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContextImpl.class); + + private final BufferAllocator allocator; + private boolean closed = false; + private PhysicalOperator popConfig; + private OperatorStats stats; + private LongObjectOpenHashMap<DrillBuf> managedBuffers = new LongObjectOpenHashMap<>(); + private final boolean applyFragmentLimit; + + public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, boolean applyFragmentLimit) throws OutOfMemoryException { + this.applyFragmentLimit=applyFragmentLimit; + this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation(), applyFragmentLimit); + this.popConfig = popConfig; + + OpProfileDef def = new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), getChildCount(popConfig)); + this.stats = context.getStats().getOperatorStats(def, allocator); + } + + public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats, boolean applyFragmentLimit) throws OutOfMemoryException { + this.applyFragmentLimit=applyFragmentLimit; + this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation(), applyFragmentLimit); + this.popConfig = popConfig; + this.stats = stats; + } + + public DrillBuf replace(DrillBuf old, int newSize) { + if (managedBuffers.remove(old.memoryAddress()) == null) { + throw new IllegalStateException("Tried to remove unmanaged buffer."); + } + old.release(); + return getManagedBuffer(newSize); + } + + public DrillBuf getManagedBuffer() { + return getManagedBuffer(256); + } + + public DrillBuf getManagedBuffer(int size) { + DrillBuf newBuf = allocator.buffer(size); + managedBuffers.put(newBuf.memoryAddress(), newBuf); + newBuf.setOperatorContext(this); + return newBuf; + } + + public BufferAllocator getAllocator() { + if (allocator == null) { + throw new UnsupportedOperationException("Operator context does not have an allocator"); + } + return allocator; + } + + public boolean isClosed() { + return closed; + } + + @Override + public void close() { + if (closed) { + logger.debug("Attempted to close Operator context for {}, but context is already closed", popConfig != null ? popConfig.getClass().getName() : null); + return; + } + logger.debug("Closing context for {}", popConfig != null ? popConfig.getClass().getName() : null); + + // release managed buffers. + Object[] buffers = ((LongObjectOpenHashMap<Object>)(Object)managedBuffers).values; + for (int i =0; i < buffers.length; i++) { + if (managedBuffers.allocated[i]) { + ((DrillBuf)buffers[i]).release(); + } + } + + if (allocator != null) { + allocator.close(); + } + closed = true; + } + + public OperatorStats getStats() { + return stats; + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java index 628dcd3ba..accce43ed 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.physical.impl; +import java.util.List; + import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OpProfileDef; @@ -24,6 +26,7 @@ import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; @@ -33,9 +36,10 @@ public abstract class BaseRootExec implements RootExec { protected OperatorStats stats = null; protected OperatorContext oContext = null; protected FragmentContext fragmentContext = null; + private List<CloseableRecordBatch> operators; public BaseRootExec(final FragmentContext fragmentContext, final PhysicalOperator config) throws OutOfMemoryException { - this.oContext = new OperatorContext(config, fragmentContext, stats, true); + this.oContext = fragmentContext.newOperatorContext(config, stats, true); stats = new OperatorStats(new OpProfileDef(config.getOperatorId(), config.getOperatorType(), OperatorContext.getChildCount(config)), oContext.getAllocator()); @@ -43,15 +47,20 @@ public abstract class BaseRootExec implements RootExec { this.fragmentContext = fragmentContext; } - public BaseRootExec(final FragmentContext fragmentContext, final OperatorContext oContext, final PhysicalOperator config) throws OutOfMemoryException { + public BaseRootExec(final FragmentContext fragmentContext, final OperatorContext oContext, + final PhysicalOperator config) throws OutOfMemoryException { this.oContext = oContext; stats = new OperatorStats(new OpProfileDef(config.getOperatorId(), - config.getOperatorType(), OperatorContext.getChildCount(config)), + config.getOperatorType(), OperatorContext.getChildCount(config)), oContext.getAllocator()); fragmentContext.getStats().addOperatorStats(this.stats); this.fragmentContext = fragmentContext; } + void setOperators(List<CloseableRecordBatch> operators) { + this.operators = operators; + } + @Override public final boolean next() { // Stats should have been initialized @@ -95,7 +104,7 @@ public abstract class BaseRootExec implements RootExec { } @Override - public void stop() { + public void close() throws Exception { // We want to account for the time spent waiting here as Wait time in the operator profile try { stats.startProcessing(); @@ -105,5 +114,16 @@ public abstract class BaseRootExec implements RootExec { stats.stopWait(); stats.stopProcessing(); } + + // close all operators. + if (operators != null) { + for (CloseableRecordBatch b : operators) { + try { + b.close(); + } catch (Exception e) { + fragmentContext.fail(e); + } + } + } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java index 1cf7da7b6..af99b5ea1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java @@ -20,13 +20,14 @@ package org.apache.drill.exec.physical.impl; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RecordBatch; public interface BatchCreator<T extends PhysicalOperator> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchCreator.class); - public RecordBatch getBatch(FragmentContext context, T config, List<RecordBatch> children) throws ExecutionSetupException; + public CloseableRecordBatch getBatch(FragmentContext context, T config, List<RecordBatch> children) + throws ExecutionSetupException; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java index 912dfd73b..5cea748b2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl; import java.io.IOException; import java.security.PrivilegedExceptionAction; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.TimeUnit; @@ -27,14 +28,15 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.impl.validate.IteratorValidatorInjector; +import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.util.AssertionUtil; +import org.apache.drill.exec.util.ImpersonationUtil; +import org.apache.hadoop.security.UserGroupInformation; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; -import org.apache.drill.exec.util.ImpersonationUtil; -import org.apache.hadoop.security.UserGroupInformation; /** * Create RecordBatch tree (PhysicalOperator implementations) for a given PhysicalOperator tree. @@ -42,15 +44,23 @@ import org.apache.hadoop.security.UserGroupInformation; public class ImplCreator { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ImplCreator.class); - private static final ImplCreator INSTANCE = new ImplCreator(); + private RootExec root = null; + private LinkedList<CloseableRecordBatch> operators = Lists.newLinkedList(); private ImplCreator() {} + private List<CloseableRecordBatch> getOperators() { + return operators; + } + /** * Create and return fragment RootExec for given FragmentRoot. RootExec has one or more RecordBatches as children * (which may contain child RecordBatches and so on). - * @param context FragmentContext. - * @param root FragmentRoot. + * + * @param context + * FragmentContext. + * @param root + * FragmentRoot. * @return RootExec of fragment. * @throws ExecutionSetupException */ @@ -61,10 +71,16 @@ public class ImplCreator { if (AssertionUtil.isAssertionsEnabled()) { root = IteratorValidatorInjector.rewritePlanWithIteratorValidator(context, root); } - + final ImplCreator creator = new ImplCreator(); Stopwatch watch = new Stopwatch(); watch.start(); - final RootExec rootExec = INSTANCE.getRootExec(root, context); + final RootExec rootExec = creator.getRootExec(root, context); + + // skip over this for SimpleRootExec (testing) + if (rootExec instanceof BaseRootExec) { + ((BaseRootExec) rootExec).setOperators(creator.getOperators()); + } + logger.debug("Took {} ms to create RecordBatch tree", watch.elapsed(TimeUnit.MILLISECONDS)); if (rootExec == null) { throw new ExecutionSetupException( @@ -72,6 +88,7 @@ public class ImplCreator { } return rootExec; + } /** Create RootExec and its children (RecordBatches) for given FragmentRoot */ @@ -96,6 +113,7 @@ public class ImplCreator { } } + /** Create a RecordBatch and its children for given PhysicalOperator */ private RecordBatch getRecordBatch(final PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException { Preconditions.checkNotNull(op); @@ -107,7 +125,10 @@ public class ImplCreator { try { return proxyUgi.doAs(new PrivilegedExceptionAction<RecordBatch>() { public RecordBatch run() throws Exception { - return ((BatchCreator<PhysicalOperator>) getOpCreator(op, context)).getBatch(context, op, childRecordBatches); + final CloseableRecordBatch batch = ((BatchCreator<PhysicalOperator>) getOpCreator(op, context)).getBatch( + context, op, childRecordBatches); + operators.addFirst(batch); + return batch; } }); } catch (InterruptedException | IOException e) { @@ -116,7 +137,10 @@ public class ImplCreator { throw new ExecutionSetupException(errMsg, e); } } else { - return ((BatchCreator<PhysicalOperator>) getOpCreator(op, context)).getBatch(context, op, childRecordBatches); + final CloseableRecordBatch batch = ((BatchCreator<PhysicalOperator>) getOpCreator(op, context)).getBatch(context, + op, childRecordBatches); + operators.addFirst(batch); + return batch; } } @@ -141,4 +165,5 @@ public class ImplCreator { return children; } -}
\ No newline at end of file + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java index daef44cce..1bf6c0105 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java @@ -32,7 +32,7 @@ public class MergingReceiverCreator implements BatchCreator<MergingReceiverPOP> static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergingReceiverCreator.class); @Override - public RecordBatch getBatch(FragmentContext context, + public MergingRecordBatch getBatch(FragmentContext context, MergingReceiverPOP receiver, List<RecordBatch> children) throws ExecutionSetupException, OutOfMemoryException { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java index 8fd68b224..5e366fb43 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java @@ -23,7 +23,7 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; * A FragmentRoot is a node which is the last processing node in a query plan. FragmentTerminals include Exchange * output nodes and storage nodes. They are there driving force behind the completion of a query. */ -public interface RootExec { +public interface RootExec extends AutoCloseable { /** * Do the next batch of work. * @return Whether or not additional batches of work are necessary. False means that this fragment is done. @@ -31,11 +31,6 @@ public interface RootExec { public boolean next(); /** - * Inform all children to clean up and go away. - */ - public void stop(); - - /** * Inform sender that receiving fragment is finished and doesn't need any more data * @param handle */ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index ca2a0486d..6ea43cda4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -24,7 +24,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos.MinorType; @@ -38,8 +37,8 @@ import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.MaterializedField; -import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; @@ -59,7 +58,7 @@ import com.google.common.collect.Maps; /** * Record batch used for a particular scan. Operators against one or more */ -public class ScanBatch implements RecordBatch { +public class ScanBatch implements CloseableRecordBatch { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class); private static final int MAX_RECORD_CNT = Character.MAX_VALUE; @@ -115,7 +114,7 @@ public class ScanBatch implements RecordBatch { public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers) throws ExecutionSetupException { this(subScanConfig, context, - new OperatorContext(subScanConfig, context, false /* ScanBatch is not subject to fragment memory limit */), + context.newOperatorContext(subScanConfig, false /* ScanBatch is not subject to fragment memory limit */), readers, Collections.<String[]> emptyList(), Collections.<Integer> emptyList()); } @@ -343,7 +342,7 @@ public class ScanBatch implements RecordBatch { return WritableBatch.get(this); } - public void cleanup() { + public void close() { container.clear(); if (tempContainer != null) { tempContainer.clear(); @@ -353,7 +352,6 @@ public class ScanBatch implements RecordBatch { } fieldVectorMap.clear(); currentReader.cleanup(); - oContext.close(); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java index 2069d35bf..5b4d7bddc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java @@ -81,12 +81,10 @@ public class ScreenCreator implements RootCreator<Screen>{ logger.trace("Screen Outcome {}", outcome); switch (outcome) { case STOP: - this.internalStop(); return false; case NONE: if (firstBatch) { // this is the only data message sent to the client and may contain the schema - this.internalStop(); QueryWritableBatch batch; QueryData header = QueryData.newBuilder() // .setQueryId(context.getHandle().getQueryId()) // @@ -130,21 +128,6 @@ public class ScreenCreator implements RootCreator<Screen>{ stats.addLongStat(Metric.BYTES_SENT, queryBatch.getByteCount()); } - - private void internalStop(){ - oContext.close(); - incoming.cleanup(); - } - - @Override - public void stop() { - super.stop(); - if (!oContext.isClosed()) { - internalStop(); - } - injector.injectPause(context.getExecutionControls(), "send-complete", logger); - } - RecordBatch getIncoming() { return incoming; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java index 18ea71d71..67062f3e7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java @@ -24,7 +24,6 @@ import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.AccountingDataTunnel; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; -import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.config.SingleSender; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.BatchSchema; @@ -64,7 +63,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ } public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config) throws OutOfMemoryException { - super(context, new OperatorContext(config, context, null, false), config); + super(context, context.newOperatorContext(config, null, false), config); this.incoming = batch; assert(incoming != null); this.handle = context.getHandle(); @@ -136,13 +135,6 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ } @Override - public void stop() { - super.stop(); - oContext.close(); - incoming.cleanup(); - } - - @Override public void receivingFragmentFinished(FragmentHandle handle) { done = true; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index 681c3e339..9f6bea99b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -117,15 +117,14 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { } @Override - public void cleanup() { + public void close() { if (sv4 != null) { sv4.clear(); } if (priorityQueue != null) { priorityQueue.cleanup(); } - super.cleanup(); - incoming.cleanup(); + super.close(); } public void buildSchema() throws SchemaChangeException { @@ -424,10 +423,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { } @Override - public void cleanup() { - } - - @Override public Iterator<VectorWrapper<?>> iterator() { return container.iterator(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java index aa8b6115d..e815bff31 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java @@ -31,7 +31,8 @@ public class TopNSortBatchCreator implements BatchCreator<TopN>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopNSortBatchCreator.class); @Override - public RecordBatch getBatch(FragmentContext context, TopN config, List<RecordBatch> children) throws ExecutionSetupException { + public TopNBatch getBatch(FragmentContext context, TopN config, List<RecordBatch> children) + throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new TopNBatch(config, context, children.iterator().next()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java index b419f7102..15fb7b576 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java @@ -24,7 +24,6 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.Writer; @@ -179,7 +178,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { } @Override - public void cleanup() { + public void close() { try { if (recordWriter != null) { recordWriter.cleanup(); @@ -188,8 +187,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { logger.error("Failure while closing record writer", ex); throw new RuntimeException("Failed to close RecordWriter", ex); } - super.cleanup(); - incoming.cleanup(); + super.close(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index c29fbf246..b75357498 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -30,7 +30,6 @@ import org.apache.drill.exec.compile.sig.MappingSet; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; -import org.apache.drill.exec.expr.ClassGenerator.BlockType; import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; @@ -135,7 +134,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { return aggregator.getOutcome(); case UPDATE_AGGREGATOR: context.fail(new SchemaChangeException("Hash aggregate does not support schema changes")); - cleanup(); + close(); killIncoming(false); return IterOutcome.STOP; default: @@ -273,12 +272,11 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { } @Override - public void cleanup() { + public void close() { if (aggregator != null) { aggregator.cleanup(); } - super.cleanup(); - incoming.cleanup(); + super.close(); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java index 8c605415d..1397342f6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java @@ -31,7 +31,8 @@ public class HashAggBatchCreator implements BatchCreator<HashAggregate>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggBatchCreator.class); @Override - public RecordBatch getBatch(FragmentContext context, HashAggregate config, List<RecordBatch> children) throws ExecutionSetupException { + public HashAggBatch getBatch(FragmentContext context, HashAggregate config, List<RecordBatch> children) + throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new HashAggBatch(config, children.iterator().next(), context); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index 87cd4d604..1b90dd8ae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -30,7 +30,6 @@ import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.types.TypeProtos.MajorType; -import org.apache.drill.common.types.Types; import org.apache.drill.exec.compile.sig.RuntimeOverridden; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; @@ -52,14 +51,12 @@ import org.apache.drill.exec.record.RecordBatch.IterOutcome; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.ObjectVector; -import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.VariableWidthVector; -import com.google.common.collect.Lists; - public abstract class HashAggTemplate implements HashAggregator { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class); @@ -351,10 +348,6 @@ public abstract class HashAggTemplate implements HashAggregator { outputCurrentBatch(); - // cleanup incoming batch since output of aggregation does not need - // any references to the incoming - - incoming.cleanup(); // return setOkAndReturn(); return AggOutcome.RETURN_OUTCOME; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index ed5b415e5..c1c5cb947 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -30,7 +30,6 @@ import org.apache.drill.exec.compile.sig.MappingSet; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; -import org.apache.drill.exec.expr.ClassGenerator.BlockType; import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; @@ -177,7 +176,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { return outcome; case UPDATE_AGGREGATOR: context.fail(new SchemaChangeException("Streaming aggregate does not support schema changes")); - cleanup(); + close(); killIncoming(false); return IterOutcome.STOP; default: @@ -409,9 +408,8 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { } @Override - public void cleanup() { - super.cleanup(); - incoming.cleanup(); + public void close() { + super.close(); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java index 0203b8144..cac5b06d8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java @@ -31,7 +31,8 @@ public class StreamingAggBatchCreator implements BatchCreator<StreamingAggregate static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggBatchCreator.class); @Override - public RecordBatch getBatch(FragmentContext context, StreamingAggregate config, List<RecordBatch> children) throws ExecutionSetupException { + public StreamingAggBatch getBatch(FragmentContext context, StreamingAggregate config, List<RecordBatch> children) + throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new StreamingAggBatch(config, children.iterator().next(), context); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java index b3a6a8f72..d2282c8b9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java @@ -23,7 +23,6 @@ import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.AccountingDataTunnel; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; -import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.config.BroadcastSender; import org.apache.drill.exec.physical.impl.BaseRootExec; @@ -62,7 +61,7 @@ public class BroadcastSenderRootExec extends BaseRootExec { public BroadcastSenderRootExec(FragmentContext context, RecordBatch incoming, BroadcastSender config) throws OutOfMemoryException { - super(context, new OperatorContext(config, context, null, false), config); + super(context, context.newOperatorContext(config, null, false), config); this.ok = true; this.context = context; this.incoming = incoming; @@ -153,10 +152,4 @@ public class BroadcastSenderRootExec extends BaseRootExec { stats.addLongStat(Metric.BYTES_SENT, writableBatch.getByteCount()); } - @Override - public void stop() { - super.stop(); - oContext.close(); - incoming.cleanup(); - } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java index 7f2fe8e24..e9b305169 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java @@ -31,7 +31,8 @@ public class FilterBatchCreator implements BatchCreator<Filter>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterBatchCreator.class); @Override - public RecordBatch getBatch(FragmentContext context, Filter config, List<RecordBatch> children) throws ExecutionSetupException { + public FilterRecordBatch getBatch(FragmentContext context, Filter config, List<RecordBatch> children) + throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new FilterRecordBatch(config, children.iterator().next(), context); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java index 064d5c894..5eee9dfe5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java @@ -86,14 +86,14 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ @Override - public void cleanup() { + public void close() { if (sv2 != null) { sv2.clear(); } if (sv4 != null) { sv4.clear(); } - super.cleanup(); + super.close(); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java index 6f0282434..94203d81b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java @@ -22,7 +22,6 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.FlattenPOP; -import org.apache.drill.exec.physical.config.Project; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; @@ -32,7 +31,8 @@ public class FlattenBatchCreator implements BatchCreator<FlattenPOP>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenBatchCreator.class); @Override - public RecordBatch getBatch(FragmentContext context, FlattenPOP config, List<RecordBatch> children) throws ExecutionSetupException { + public FlattenRecordBatch getBatch(FragmentContext context, FlattenPOP config, List<RecordBatch> children) + throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new FlattenRecordBatch(config, children.iterator().next(), context); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 79fe17740..dd53477aa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -516,7 +516,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { } @Override - public void cleanup() { + public void close() { if (hjHelper != null) { hjHelper.clear(); } @@ -529,9 +529,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { if (hashTable != null) { hashTable.clear(); } - super.cleanup(); - right.cleanup(); - left.cleanup(); + super.close(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java index bfe89c020..140276943 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java @@ -30,7 +30,8 @@ import com.google.common.base.Preconditions; public class HashJoinBatchCreator implements BatchCreator<HashJoinPOP> { @Override - public RecordBatch getBatch(FragmentContext context, HashJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException { + public HashJoinBatch getBatch(FragmentContext context, HashJoinPOP config, List<RecordBatch> children) + throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 2); return new HashJoinBatch(config, context, children.get(0), children.get(1)); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 1a7e60e7c..6466f70b7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -38,7 +38,6 @@ import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; -import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; @@ -256,14 +255,6 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { right.kill(sendUpstream); } - @Override - public void cleanup() { - super.cleanup(); - - left.cleanup(); - right.cleanup(); - } - private void generateDoCompareNextLeft(ClassGenerator<JoinWorker> cg, JVar incomingRecordBatch, LogicalExpression[] leftExpression, JVar incomingLeftRecordBatch, JVar joinStatus, ErrorCollector collector) throws ClassTransformationException { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java index 7d100afa9..24f5533db 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java @@ -32,7 +32,8 @@ public class MergeJoinCreator implements BatchCreator<MergeJoinPOP> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinCreator.class); @Override - public RecordBatch getBatch(FragmentContext context, MergeJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException { + public MergeJoinBatch getBatch(FragmentContext context, MergeJoinPOP config, List<RecordBatch> children) + throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 2); if(config.getJoinType() == JoinRelType.RIGHT){ return new MergeJoinBatch(config.flipIfRight(), context, children.get(1), children.get(0)); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java index 4fb14095e..d20bfa18f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java @@ -17,9 +17,9 @@ */ package org.apache.drill.exec.physical.impl.join; -import com.sun.codemodel.JExpr; -import com.sun.codemodel.JExpression; -import com.sun.codemodel.JVar; +import java.io.IOException; +import java.util.LinkedList; + import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.compile.sig.GeneratorMapping; @@ -40,12 +40,11 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.vector.AllocationHelper; -import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.complex.AbstractContainerVector; -import com.google.common.base.Preconditions; -import java.io.IOException; -import java.util.LinkedList; +import com.google.common.base.Preconditions; +import com.sun.codemodel.JExpr; +import com.sun.codemodel.JExpression; +import com.sun.codemodel.JVar; /* * RecordBatch implementation for the nested loop join operator @@ -309,12 +308,10 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP> } @Override - public void cleanup() { + public void close() { rightContainer.clear(); rightCounts.clear(); - super.cleanup(); - right.cleanup(); - left.cleanup(); + super.close(); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java index 12588acba..2e708a6ea 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java @@ -17,17 +17,18 @@ */ package org.apache.drill.exec.physical.impl.join; +import java.util.List; + import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.NestedLoopJoinPOP; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; -import java.util.List; - public class NestedLoopJoinBatchCreator implements BatchCreator<NestedLoopJoinPOP> { @Override - public RecordBatch getBatch(FragmentContext context, NestedLoopJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException { + public NestedLoopJoinBatch getBatch(FragmentContext context, NestedLoopJoinPOP config, List<RecordBatch> children) + throws ExecutionSetupException { return new NestedLoopJoinBatch(config, context, children.get(0), children.get(1)); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java index e71dababa..f954e7202 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java @@ -29,7 +29,8 @@ import com.google.common.collect.Iterables; public class LimitBatchCreator implements BatchCreator<Limit> { @Override - public RecordBatch getBatch(FragmentContext context, Limit config, List<RecordBatch> children) throws ExecutionSetupException { + public LimitRecordBatch getBatch(FragmentContext context, Limit config, List<RecordBatch> children) + throws ExecutionSetupException { return new LimitRecordBatch(config, context, Iterables.getOnlyElement(children)); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java index 7e668935b..eff9e6155 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java @@ -194,9 +194,9 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { } @Override - public void cleanup(){ + public void close(){ outgoingSv.clear(); - super.cleanup(); + super.close(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index 40cbc8931..c36b0d3d9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -44,7 +44,6 @@ import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; -import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.config.MergingReceiverPOP; import org.apache.drill.exec.proto.BitControl.FinishedReceiver; @@ -130,7 +129,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> public MergingRecordBatch(final FragmentContext context, final MergingReceiverPOP config, final RawFragmentBatchProvider[] fragProviders) throws OutOfMemoryException { - super(config, context, true, new OperatorContext(config, context, false)); + super(config, context, true, context.newOperatorContext(config, false)); //super(config, context); this.fragProviders = fragProviders; this.context = context; @@ -487,7 +486,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> if (sendUpstream) { informSenders(); } else { - cleanup(); + close(); for (final RawFragmentBatchProvider provider : fragProviders) { provider.kill(context); } @@ -697,7 +696,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } @Override - public void cleanup() { + public void close() { outgoingContainer.clear(); if (batchLoaders != null) { for (final RecordBatchLoader rbl : batchLoaders) { @@ -706,7 +705,6 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } } } - oContext.close(); if (fragProviders != null) { for (final RawFragmentBatchProvider f : fragProviders) { f.cleanup(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java index b26c78a03..63b7eba21 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java @@ -158,9 +158,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart @Override - public void cleanup() { - incoming.cleanup(); - super.cleanup(); + public void close() { + super.close(); this.partitionVectors.clear(); this.partitionKeyVector.clear(); } @@ -302,9 +301,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart for (VectorWrapper<?> w : finalTable.get()) { partitionVectors.add(w.getValueVector()); } + } catch (ClassTransformationException | IOException | SchemaChangeException | InterruptedException ex) { kill(false); - logger.error("Failure while building final partition table.", ex); context.fail(ex); return false; // TODO InterruptedException @@ -467,7 +466,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart // If this is the first iteration, we need to generate the partition vectors before we can proceed if (this.first && upstream == IterOutcome.OK_NEW_SCHEMA) { if (!getPartitionVectors()) { - cleanup(); + close(); return IterOutcome.STOP; } @@ -503,7 +502,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart case NONE: case NOT_YET: case STOP: - cleanup(); + close(); recordCount = 0; return upstream; case OK_NEW_SCHEMA: diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 7e3f4b20e..cf7ba1610 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -35,7 +35,6 @@ import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.AccountingDataTunnel; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; -import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.config.HashPartitionSender; @@ -101,7 +100,7 @@ public class PartitionSenderRootExec extends BaseRootExec { public PartitionSenderRootExec(FragmentContext context, RecordBatch incoming, HashPartitionSender operator) throws OutOfMemoryException { - super(context, new OperatorContext(operator, context, null, false), operator); + super(context, context.newOperatorContext(operator, null, false), operator); this.incoming = incoming; this.operator = operator; this.context = context; @@ -141,8 +140,6 @@ public class PartitionSenderRootExec extends BaseRootExec { public boolean innerNext() { if (!ok) { - stop(); - return false; } @@ -322,17 +319,15 @@ public class PartitionSenderRootExec extends BaseRootExec { } } - public void stop() { + public void close() throws Exception { logger.debug("Partition sender stopping."); - super.stop(); + super.close(); ok = false; if (partitioner != null) { updateAggregateStats(); partitioner.clear(); } - oContext.close(); - incoming.cleanup(); } public void sendEmptyBatch(boolean isLast) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java index c2d616667..35bf3cdf9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java @@ -187,7 +187,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { } @Override - public void cleanup() { + public void close() { stop = true; try { cleanUpLatch.await(); @@ -195,9 +195,8 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { logger.warn("Interrupted while waiting for producer to clean up first. I will try to clean up now...", e); // TODO InterruptedException } finally { - super.cleanup(); + super.close(); clearQueue(); - incoming.cleanup(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java index c568ed4d4..6542576b5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java @@ -29,7 +29,8 @@ import com.google.common.collect.Iterables; public class ProducerConsumerBatchCreator implements BatchCreator<ProducerConsumer> { @Override - public RecordBatch getBatch(FragmentContext context, ProducerConsumer config, List<RecordBatch> children) throws ExecutionSetupException { + public ProducerConsumerBatch getBatch(FragmentContext context, ProducerConsumer config, List<RecordBatch> children) + throws ExecutionSetupException { return new ProducerConsumerBatch(config, context, Iterables.getOnlyElement(children)); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java index 0df949105..f2495402f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java @@ -32,7 +32,8 @@ public class ComplexToJsonBatchCreator implements BatchCreator<ComplexToJson> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ComplexToJsonBatchCreator.class); @Override - public RecordBatch getBatch(FragmentContext context, ComplexToJson flatten, List<RecordBatch> children) throws ExecutionSetupException { + public ProjectRecordBatch getBatch(FragmentContext context, ComplexToJson flatten, List<RecordBatch> children) + throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new ProjectRecordBatch(new Project(null, flatten.getChild()), children.iterator().next(), diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java index cb1d4f1b5..e7a6b0542 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java @@ -31,7 +31,8 @@ public class ProjectBatchCreator implements BatchCreator<Project>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectBatchCreator.class); @Override - public RecordBatch getBatch(FragmentContext context, Project config, List<RecordBatch> children) throws ExecutionSetupException { + public ProjectRecordBatch getBatch(FragmentContext context, Project config, List<RecordBatch> children) + throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new ProjectRecordBatch(config, children.iterator().next(), context); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java index 0a097c1f4..74b7d859a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java @@ -84,10 +84,9 @@ public class SortBatch extends AbstractRecordBatch<Sort> { } @Override - public void cleanup() { + public void close() { builder.clear(); - super.cleanup(); - incoming.cleanup(); + super.close(); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java index 217acf216..559558f49 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java @@ -31,7 +31,8 @@ public class SortBatchCreator implements BatchCreator<Sort>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortBatchCreator.class); @Override - public RecordBatch getBatch(FragmentContext context, Sort config, List<RecordBatch> children) throws ExecutionSetupException { + public SortBatch getBatch(FragmentContext context, Sort config, List<RecordBatch> children) + throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new SortBatch(config, context, children.iterator().next()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java index 1fa759ca3..aa9297e10 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java @@ -187,8 +187,8 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect } @Override - public void cleanup(){ - super.cleanup(); + public void close(){ + super.close(); } private class StraightCopier implements Copier{ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java index 455a5f920..9ab39a399 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java @@ -31,7 +31,8 @@ public class SVRemoverCreator implements BatchCreator<SelectionVectorRemover>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SVRemoverCreator.class); @Override - public RecordBatch getBatch(FragmentContext context, SelectionVectorRemover config, List<RecordBatch> children) throws ExecutionSetupException { + public RemovingRecordBatch getBatch(FragmentContext context, SelectionVectorRemover config, List<RecordBatch> children) + throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new RemovingRecordBatch(config, context, children.iterator().next()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java index 12afa3303..40ef2bb12 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java @@ -30,7 +30,7 @@ public class TraceBatchCreator implements BatchCreator<Trace> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceBatchCreator.class); @Override - public RecordBatch getBatch(FragmentContext context, Trace config, List<RecordBatch> children) + public TraceRecordBatch getBatch(FragmentContext context, Trace config, List<RecordBatch> children) throws ExecutionSetupException { // Preconditions.checkArgument(children.size() == 1); return new TraceRecordBatch(config, children.iterator().next(), context); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java index 8a7d65925..af45815cb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java @@ -155,7 +155,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> { } @Override - public void cleanup() { + public void close() { /* Release the selection vector */ if (sv != null) { sv.clear(); @@ -167,8 +167,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> { } catch (IOException e) { logger.error("Unable to close file descriptors for file: " + getFileName()); } - super.cleanup(); - incoming.cleanup(); + super.close(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java index 7f7e11038..1ef3142b1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java @@ -31,7 +31,8 @@ public class UnionAllBatchCreator implements BatchCreator<UnionAll>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionAllBatchCreator.class); @Override - public RecordBatch getBatch(FragmentContext context, UnionAll config, List<RecordBatch> children) throws ExecutionSetupException { + public UnionAllRecordBatch getBatch(FragmentContext context, UnionAll config, List<RecordBatch> children) + throws ExecutionSetupException { Preconditions.checkArgument(children.size() >= 1); return new UnionAllRecordBatch(config, children, context); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java index 52b179460..d7ea3bb7d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java @@ -20,15 +20,15 @@ package org.apache.drill.exec.physical.impl.union; import java.io.IOException; import java.util.Iterator; import java.util.List; -import com.google.common.collect.Lists; + import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; @@ -38,6 +38,7 @@ import org.apache.drill.exec.expr.ValueVectorReadExpression; import org.apache.drill.exec.expr.ValueVectorWriteExpression; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.UnionAll; import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; @@ -53,7 +54,8 @@ import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.FixedWidthVector; import org.apache.drill.exec.vector.SchemaChangeCallBack; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.physical.config.UnionAll; + +import com.google.common.collect.Lists; public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionAllRecordBatch.class); @@ -132,12 +134,6 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { return WritableBatch.get(this); } - @Override - public void cleanup() { - super.cleanup(); - unionAllInput.cleanup(); - } - private void setValueCount(int count) { for (ValueVector v : allocationVectors) { ValueVector.Mutator m = v.getMutator(); @@ -505,11 +501,6 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> { return rightSide.getRecordBatch(); } - public void cleanup() { - leftSide.getRecordBatch().cleanup(); - rightSide.getRecordBatch().cleanup(); - } - private class OneSideInput { private IterOutcome upstream = IterOutcome.NOT_YET; private RecordBatch recordBatch; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java index 094865ee4..09cb7add2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java @@ -37,9 +37,9 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef; import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RawFragmentBatch; import org.apache.drill.exec.record.RawFragmentBatchProvider; -import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorContainer; @@ -50,7 +50,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.RpcOutcomeListener; -public class UnorderedReceiverBatch implements RecordBatch { +public class UnorderedReceiverBatch implements CloseableRecordBatch { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverBatch.class); private final RecordBatchLoader batchLoader; @@ -60,7 +60,7 @@ public class UnorderedReceiverBatch implements RecordBatch { private final OperatorStats stats; private boolean first = true; private final UnorderedReceiver config; - OperatorContext oContext; + private final OperatorContext oContext; public enum Metric implements MetricDef { BYTES_RECEIVED, @@ -77,7 +77,7 @@ public class UnorderedReceiverBatch implements RecordBatch { this.context = context; // In normal case, batchLoader does not require an allocator. However, in case of splitAndTransfer of a value vector, // we may need an allocator for the new offset vector. Therefore, here we pass the context's allocator to batchLoader. - oContext = new OperatorContext(config, context, false); + oContext = context.newOperatorContext(config, false); this.batchLoader = new RecordBatchLoader(oContext.getAllocator()); this.stats = context.getStats().getOperatorStats(new OpProfileDef(config.getOperatorId(), config.getOperatorType(), 1), null); @@ -194,10 +194,9 @@ public class UnorderedReceiverBatch implements RecordBatch { } @Override - public void cleanup() { + public void close() { batchLoader.clear(); fragProvider.cleanup(); - oContext.close(); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java index d9864f976..649ecd967 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java @@ -31,7 +31,7 @@ public class UnorderedReceiverCreator implements BatchCreator<UnorderedReceiver> static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverCreator.class); @Override - public RecordBatch getBatch(FragmentContext context, UnorderedReceiver receiver, List<RecordBatch> children) + public UnorderedReceiverBatch getBatch(FragmentContext context, UnorderedReceiver receiver, List<RecordBatch> children) throws ExecutionSetupException { assert children == null || children.isEmpty(); IncomingBuffers bufHolder = context.getBuffers(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java index 66ec22f79..2ae53aa67 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java @@ -20,9 +20,9 @@ package org.apache.drill.exec.physical.impl.validate; import java.util.Iterator; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorContainer; @@ -32,7 +32,7 @@ import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.vector.VectorValidator; -public class IteratorValidatorBatchIterator implements RecordBatch { +public class IteratorValidatorBatchIterator implements CloseableRecordBatch { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidatorBatchIterator.class); static final boolean VALIDATE_VECTORS = false; @@ -144,8 +144,7 @@ public class IteratorValidatorBatchIterator implements RecordBatch { } @Override - public void cleanup() { - incoming.cleanup(); + public void close() { } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java index 5d08afb0b..cc30326fd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java @@ -31,7 +31,8 @@ public class IteratorValidatorCreator implements BatchCreator<IteratorValidator> static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidatorCreator.class); @Override - public RecordBatch getBatch(FragmentContext context, IteratorValidator config, List<RecordBatch> children) + public IteratorValidatorBatchIterator getBatch(FragmentContext context, IteratorValidator config, + List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new IteratorValidatorBatchIterator(children.iterator().next()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java index d526a8432..2298df596 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java @@ -34,7 +34,7 @@ import com.google.common.collect.Iterators; public class ValuesBatchCreator implements BatchCreator<Values> { @Override - public RecordBatch getBatch(FragmentContext context, Values config, List<RecordBatch> children) + public ScanBatch getBatch(FragmentContext context, Values config, List<RecordBatch> children) throws ExecutionSetupException { assert children.isEmpty(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java index 285e2cd30..59bc1159c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java @@ -18,19 +18,21 @@ package org.apache.drill.exec.physical.impl.window; -import com.google.common.base.Preconditions; +import java.util.List; + import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.WindowPOP; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; -import java.util.List; +import com.google.common.base.Preconditions; public class WindowFrameBatchCreator implements BatchCreator<WindowPOP> { @Override - public RecordBatch getBatch(FragmentContext context, WindowPOP config, List<RecordBatch> children) throws ExecutionSetupException { + public WindowFrameRecordBatch getBatch(FragmentContext context, WindowPOP config, List<RecordBatch> children) + throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new WindowFrameRecordBatch(config, context, children.iterator().next()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java index bc86390fd..86d11d58a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java @@ -17,8 +17,9 @@ */ package org.apache.drill.exec.physical.impl.window; -import com.google.common.collect.Lists; -import com.sun.codemodel.JExpr; +import java.io.IOException; +import java.util.List; + import org.apache.drill.common.exceptions.DrillException; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; @@ -49,8 +50,8 @@ import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.vector.ValueVector; -import java.io.IOException; -import java.util.List; +import com.google.common.collect.Lists; +import com.sun.codemodel.JExpr; /** * support for OVER(PARTITION BY expression1,expression2,... [ORDER BY expressionA, expressionB,...]) @@ -333,13 +334,12 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> { } @Override - public void cleanup() { + public void close() { if (framer != null) { framer.cleanup(); framer = null; } - super.cleanup(); - incoming.cleanup(); + super.close(); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index ca93a7214..e88bc677f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.physical.impl.xsort; -import com.google.common.base.Joiner; import io.netty.buffer.DrillBuf; import java.io.IOException; @@ -68,6 +67,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.calcite.rel.RelFieldCollation.Direction; +import com.google.common.base.Joiner; import com.google.common.base.Stopwatch; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; @@ -145,7 +145,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } @Override - public void cleanup() { + public void close() { if (batchGroups != null) { for (BatchGroup group: batchGroups) { try { @@ -165,8 +165,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { copier.cleanup(); } copierAllocator.close(); - super.cleanup(); - incoming.cleanup(); + super.close(); } public void buildSchema() throws SchemaChangeException { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java index eb5d83bf4..b9f639649 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java @@ -31,7 +31,8 @@ public class ExternalSortBatchCreator implements BatchCreator<ExternalSort>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatchCreator.class); @Override - public RecordBatch getBatch(FragmentContext context, ExternalSort config, List<RecordBatch> children) throws ExecutionSetupException { + public ExternalSortBatch getBatch(FragmentContext context, ExternalSort config, List<RecordBatch> children) + throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new ExternalSortBatch(config, context, children.iterator().next()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index c96cb7c4d..4e348bb6f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -30,7 +30,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; -public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements RecordBatch{ +public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements CloseableRecordBatch { final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass()); protected final VectorContainer container; //= new VectorContainer(); @@ -42,14 +42,15 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements protected BatchState state; protected AbstractRecordBatch(final T popConfig, final FragmentContext context) throws OutOfMemoryException { - this(popConfig, context, true, new OperatorContext(popConfig, context, true)); + this(popConfig, context, true, context.newOperatorContext(popConfig, true)); } protected AbstractRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema) throws OutOfMemoryException { - this(popConfig, context, buildSchema, new OperatorContext(popConfig, context, true)); + this(popConfig, context, buildSchema, context.newOperatorContext(popConfig, true)); } - protected AbstractRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema, final OperatorContext oContext) throws OutOfMemoryException { + protected AbstractRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema, + final OperatorContext oContext) throws OutOfMemoryException { super(); this.context = context; this.popConfig = popConfig; @@ -171,9 +172,8 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements protected abstract void killIncoming(boolean sendUpstream); - public void cleanup(){ + public void close(){ container.clear(); - oContext.close(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java index 3cfe177b6..dd90cab45 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java @@ -115,13 +115,6 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte } @Override - public void cleanup() { -// logger.debug("Cleaning up."); - super.cleanup(); - incoming.cleanup(); - } - - @Override public BatchSchema getSchema() { if (container.hasSchema()) { return container.getSchema(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/CloseableRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/CloseableRecordBatch.java new file mode 100644 index 000000000..c52c3ee6d --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/CloseableRecordBatch.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.record; + +public interface CloseableRecordBatch extends RecordBatch, AutoCloseable { + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java index 0a8ece5d0..6f10a1cbb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java @@ -18,7 +18,6 @@ package org.apache.drill.exec.record; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; @@ -118,6 +117,4 @@ public interface RecordBatch extends VectorAccessible { */ public WritableBatch getWritableBatch(); - public void cleanup(); - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java index 489a989c6..59999baf3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java @@ -29,7 +29,6 @@ import org.apache.avro.generic.GenericArray; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.mapred.FsInput; import org.apache.avro.util.Utf8; - import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.PathSegment; @@ -47,7 +46,6 @@ import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java index f1271b117..b4efe70a4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java @@ -36,6 +36,7 @@ import org.apache.drill.exec.physical.base.AbstractWriter; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.impl.ScanBatch; import org.apache.drill.exec.physical.impl.WriterRecordBatch; +import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.AbstractRecordReader; @@ -43,11 +44,10 @@ import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.RecordWriter; import org.apache.drill.exec.store.StoragePluginOptimizerRule; import org.apache.drill.exec.store.dfs.BasicFormatMatcher; +import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.dfs.FileSelection; import org.apache.drill.exec.store.dfs.FormatMatcher; import org.apache.drill.exec.store.dfs.FormatPlugin; -import org.apache.drill.exec.store.dfs.DrillFileSystem; -import org.apache.drill.exec.util.ImpersonationUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.CompressionCodecFactory; @@ -119,7 +119,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements public abstract RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork, List<SchemaPath> columns) throws ExecutionSetupException; - RecordBatch getReaderBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException { + CloseableRecordBatch getReaderBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException { String partitionDesignator = context.getOptions() .getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val; List<SchemaPath> columns = scan.getColumns(); @@ -153,9 +153,11 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements } int numParts = 0; - OperatorContext oContext = new OperatorContext(scan, context, - false /* ScanBatch is not subject to fragment memory limit */); - DrillFileSystem dfs; + OperatorContext oContext = context.newOperatorContext(scan, false /* + * ScanBatch is not subject to fragment memory + * limit + */); + final DrillFileSystem dfs; try { dfs = new DrillFileSystem(fsConf, oContext.getStats()); } catch (IOException e) { @@ -190,7 +192,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements public abstract RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException; - public RecordBatch getWriterBatch(FragmentContext context, RecordBatch incoming, EasyWriter writer) + public CloseableRecordBatch getWriterBatch(FragmentContext context, RecordBatch incoming, EasyWriter writer) throws ExecutionSetupException { try { return new WriterRecordBatch(writer, incoming, context, getRecordWriter(context, writer)); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java index ac0d2e73c..f9dfd8bdb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java @@ -22,13 +22,14 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RecordBatch; public class EasyReaderBatchCreator implements BatchCreator<EasySubScan>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyReaderBatchCreator.class); @Override - public RecordBatch getBatch(FragmentContext context, EasySubScan config, List<RecordBatch> children) + public CloseableRecordBatch getBatch(FragmentContext context, EasySubScan config, List<RecordBatch> children) throws ExecutionSetupException { assert children == null || children.isEmpty(); return config.getFormatPlugin().getReaderBatch(context, config); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java index c91ceba2a..bfb4188cc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java @@ -22,13 +22,14 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RecordBatch; public class EasyWriterBatchCreator implements BatchCreator<EasyWriter>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyWriterBatchCreator.class); @Override - public RecordBatch getBatch(FragmentContext context, EasyWriter config, List<RecordBatch> children) + public CloseableRecordBatch getBatch(FragmentContext context, EasyWriter config, List<RecordBatch> children) throws ExecutionSetupException { assert children != null && children.size() == 1; return config.getFormatPlugin().getWriterBatch(context, children.iterator().next(), config); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java index 84587a946..d59cda21b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java @@ -30,7 +30,7 @@ public class DirectBatchCreator implements BatchCreator<DirectSubScan>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DirectBatchCreator.class); @Override - public RecordBatch getBatch(FragmentContext context, DirectSubScan config, List<RecordBatch> children) + public ScanBatch getBatch(FragmentContext context, DirectSubScan config, List<RecordBatch> children) throws ExecutionSetupException { return new ScanBatch(config, context, Collections.singleton(config.getReader()).iterator()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java index 2cccc6442..2666b2ebf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.util.List; import com.google.common.collect.ImmutableList; + import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java index b38a33fb1..2ef2333bd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java @@ -31,7 +31,8 @@ public class InfoSchemaBatchCreator implements BatchCreator<InfoSchemaSubScan>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InfoSchemaBatchCreator.class); @Override - public RecordBatch getBatch(FragmentContext context, InfoSchemaSubScan config, List<RecordBatch> children) throws ExecutionSetupException { + public ScanBatch getBatch(FragmentContext context, InfoSchemaSubScan config, List<RecordBatch> children) + throws ExecutionSetupException { RecordReader rr = config.getTable().getRecordReader(context.getRootSchema(), config.getFilter()); return new ScanBatch(config, context, Collections.singleton(rr).iterator()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java index 0bfd03886..74423bfda 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java @@ -34,7 +34,8 @@ public class MockScanBatchCreator implements BatchCreator<MockSubScanPOP>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class); @Override - public RecordBatch getBatch(FragmentContext context, MockSubScanPOP config, List<RecordBatch> children) throws ExecutionSetupException { + public ScanBatch getBatch(FragmentContext context, MockSubScanPOP config, List<RecordBatch> children) + throws ExecutionSetupException { Preconditions.checkArgument(children.isEmpty()); List<MockScanEntry> entries = config.getReadEntries(); List<RecordReader> readers = Lists.newArrayList(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java index 7298f53a0..cfa4c9318 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java @@ -38,6 +38,7 @@ import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.RecordWriter; import org.apache.drill.exec.store.StoragePluginOptimizerRule; import org.apache.drill.exec.store.dfs.BasicFormatMatcher; +import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.dfs.DrillPathFilter; import org.apache.drill.exec.store.dfs.FileSelection; import org.apache.drill.exec.store.dfs.FileSystemConfig; @@ -45,7 +46,6 @@ import org.apache.drill.exec.store.dfs.FormatMatcher; import org.apache.drill.exec.store.dfs.FormatPlugin; import org.apache.drill.exec.store.dfs.FormatSelection; import org.apache.drill.exec.store.dfs.MagicString; -import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.mock.MockStorageEngine; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -151,7 +151,7 @@ public class ParquetFormatPlugin implements FormatPlugin{ return recordWriter; } - public RecordBatch getWriterBatch(FragmentContext context, RecordBatch incoming, ParquetWriter writer) + public WriterRecordBatch getWriterBatch(FragmentContext context, RecordBatch incoming, ParquetWriter writer) throws ExecutionSetupException { try { return new WriterRecordBatch(writer, incoming, context, getRecordWriter(context, writer)); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java index 3e3572169..3506ffafe 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java @@ -99,7 +99,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) throws OutOfMemoryException{ super(); - this.oContext=new OperatorContext(writer, context, true); + this.oContext = context.newOperatorContext(writer, true); } @Override @@ -331,9 +331,6 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter { if (pageStore != null) { ColumnChunkPageWriteStoreExposer.close(pageStore); } - if(oContext!=null){ - oContext.close(); - } if (!hasRecords) { // the very last file is empty, delete it (DRILL-2408) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java index 52dccd90c..d5586ce61 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java @@ -61,14 +61,17 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan private static final String ENABLE_TIME_READ_COUNTER = "parquet.benchmark.time.read"; @Override - public RecordBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws ExecutionSetupException { + public ScanBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) + throws ExecutionSetupException { Preconditions.checkArgument(children.isEmpty()); String partitionDesignator = context.getOptions() .getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val; List<SchemaPath> columns = rowGroupScan.getColumns(); List<RecordReader> readers = Lists.newArrayList(); - OperatorContext oContext = new OperatorContext(rowGroupScan, context, - false /* ScanBatch is not subject to fragment memory limit */); + OperatorContext oContext = context.newOperatorContext(rowGroupScan, false /* + * ScanBatch is not subject to fragment + * memory limit + */); List<String[]> partitionColumns = Lists.newArrayList(); List<Integer> selectedPartitionColumns = Lists.newArrayList(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java index 10dd26da8..79c570953 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java @@ -22,13 +22,14 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.physical.impl.WriterRecordBatch; import org.apache.drill.exec.record.RecordBatch; public class ParquetWriterBatchCreator implements BatchCreator<ParquetWriter>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetWriterBatchCreator.class); @Override - public RecordBatch getBatch(FragmentContext context, ParquetWriter config, List<RecordBatch> children) + public WriterRecordBatch getBatch(FragmentContext context, ParquetWriter config, List<RecordBatch> children) throws ExecutionSetupException { assert children != null && children.size() == 1; return config.getFormatPlugin().getWriterBatch(context, children.iterator().next(), config); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java index 4d837c1ef..921d13400 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.Set; import com.google.common.collect.Sets; + import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.PathSegment; @@ -69,9 +70,9 @@ import parquet.schema.MessageType; import parquet.schema.Type; import parquet.schema.PrimitiveType; - import com.google.common.base.Preconditions; import com.google.common.collect.Lists; + import parquet.schema.Types; public class DrillParquetReader extends AbstractRecordReader { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java index 92f676af1..58bf433fc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java @@ -39,7 +39,7 @@ public class SystemTableBatchCreator implements BatchCreator<SystemTableScan> { @SuppressWarnings({ "rawtypes", "unchecked" }) @Override - public RecordBatch getBatch(final FragmentContext context, final SystemTableScan scan, + public ScanBatch getBatch(final FragmentContext context, final SystemTableScan scan, final List<RecordBatch> children) throws ExecutionSetupException { final SystemTable table = scan.getTable(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java index 336841266..87c78b278 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java @@ -25,6 +25,7 @@ import javax.annotation.Nullable; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; + import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.FieldReference; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java index 85262de10..d23655c6a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java @@ -105,9 +105,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ public void cleanup() { if (!isFinished() && context.shouldContinue()) { final String msg = String.format("Cleanup before finished. " + (fragmentCount - streamCounter) + " out of " + fragmentCount + " streams have finished."); - logger.error(msg); final IllegalStateException e = new IllegalStateException(msg); - context.fail(e); throw e; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index edbcfde2e..4249cbe4d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -212,19 +212,23 @@ public class Foreman implements Runnable { moveToState(QueryState.FAILED, new ForemanException("Unexpected exception during fragment initialization: " + ex.getMessage(), ex)); } catch (final OutOfMemoryError e) { - /* - * FragmentExecutors use a DrillbitStatusListener to watch out for the death of their query's Foreman. - * So, if we die here, they should get notified about that, and cancel themselves; we don't have to - * attempt to notify them, which might not work under these conditions. - */ - /* - * TODO this will kill everything in this JVM; why can't we just free all allocation - * associated with this Foreman and allow others to continue? - */ - System.out.println("Out of memory, exiting."); - e.printStackTrace(); - System.out.flush(); - System.exit(-1); + if ("Direct buffer memory".equals(e.getMessage())) { + moveToState(QueryState.FAILED, + UserException.resourceError(e) + .message("One or more nodes ran out of memory while executing the query.") + .build()); + } else { + /* + * FragmentExecutors use a DrillbitStatusListener to watch out for the death of their query's Foreman. So, if we + * die here, they should get notified about that, and cancel themselves; we don't have to attempt to notify + * them, which might not work under these conditions. + */ + System.out.println("Node ran out of Heap memory, exiting."); + e.printStackTrace(); + System.out.flush(); + System.exit(-1); + } + } finally { /* * Begin accepting external events. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java index 0783fee16..fb2045f62 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java @@ -272,12 +272,14 @@ public class FragmentExecutor implements Runnable { private void closeOutResources() { + // first close the operators and release all memory. try { - root.stop(); // TODO make this an AutoCloseable so we can detect lack of closure + root.close(); } catch (final Exception e) { fail(e); } + // then close the fragment context. fragmentContext.close(); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java index b02051b80..d6e6d08a2 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java +++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java @@ -25,7 +25,6 @@ import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; -import com.google.common.base.Preconditions; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.ExecConstants; @@ -45,9 +44,9 @@ import org.apache.drill.exec.server.Drillbit; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.RemoteServiceSet; import org.apache.drill.exec.store.StoragePluginRegistry; -import org.apache.drill.exec.util.TestUtilities; import org.apache.drill.exec.util.JsonStringArrayList; import org.apache.drill.exec.util.JsonStringHashMap; +import org.apache.drill.exec.util.TestUtilities; import org.apache.drill.exec.util.VectorUtil; import org.apache.hadoop.io.Text; import org.junit.AfterClass; @@ -57,6 +56,7 @@ import org.junit.rules.TestWatcher; import org.junit.runner.Description; import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; import com.google.common.io.Resources; public class BaseTestQuery extends ExecTest { @@ -219,7 +219,7 @@ public class BaseTestQuery extends ExecTest { } @AfterClass - public static void closeClient() throws IOException{ + public static void closeClient() throws IOException, InterruptedException { if (client != null) { client.close(); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/RunRootExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/RunRootExec.java index 4258e60fb..13f9563ad 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/RunRootExec.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/RunRootExec.java @@ -65,7 +65,7 @@ public class RunRootExec { } System.out.println("ENDITER: " + i); System.out.println("TIME: " + w.elapsed(TimeUnit.MILLISECONDS) + "ms"); - exec.stop(); + exec.close(); } context.close(); bit.close(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java index f4f4966f5..7c58b19f5 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java @@ -88,7 +88,7 @@ public class DumpCatTest extends ExecTest{ } assertTrue(!context.isFailed()); - exec.stop(); + exec.close(); FragmentHandle handle = context.getHandle(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java index 49421851d..04e198028 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java @@ -17,7 +17,11 @@ ******************************************************************************/ package org.apache.drill.exec.fn.interp; -import com.google.common.collect.Lists; +import static org.junit.Assert.assertEquals; + +import java.nio.ByteBuffer; +import java.util.List; + import org.antlr.runtime.ANTLRStringStream; import org.antlr.runtime.CommonTokenStream; import org.antlr.runtime.RecognitionException; @@ -36,6 +40,7 @@ import org.apache.drill.exec.expr.fn.interpreter.InterpreterEvaluator; import org.apache.drill.exec.expr.holders.TimeStampHolder; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.QueryDateTimeInfo; +import org.apache.drill.exec.physical.impl.ScanBatch; import org.apache.drill.exec.pop.PopUnitTestBase; import org.apache.drill.exec.proto.BitControl; import org.apache.drill.exec.record.MaterializedField; @@ -49,10 +54,7 @@ import org.apache.drill.exec.vector.ValueVector; import org.joda.time.DateTime; import org.junit.Test; -import java.nio.ByteBuffer; -import java.util.List; - -import static org.junit.Assert.assertEquals; +import com.google.common.collect.Lists; public class ExpressionInterpreterTest extends PopUnitTestBase { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExpressionInterpreterTest.class); @@ -170,7 +172,7 @@ public class ExpressionInterpreterTest extends PopUnitTestBase { MockGroupScanPOP.MockScanEntry entry = new MockGroupScanPOP.MockScanEntry(10, columns); MockSubScanPOP scanPOP = new MockSubScanPOP("testTable", java.util.Collections.singletonList(entry)); - RecordBatch batch = createMockScanBatch(bit1, scanPOP, planFragment); + ScanBatch batch = createMockScanBatch(bit1, scanPOP, planFragment); batch.next(); @@ -184,13 +186,13 @@ public class ExpressionInterpreterTest extends PopUnitTestBase { showValueVectorContent(vv); vv.clear(); - batch.cleanup(); + batch.close(); batch.getContext().close(); bit1.close(); } - private RecordBatch createMockScanBatch(Drillbit bit, MockSubScanPOP scanPOP, BitControl.PlanFragment planFragment) { + private ScanBatch createMockScanBatch(Drillbit bit, MockSubScanPOP scanPOP, BitControl.PlanFragment planFragment) { List<RecordBatch> children = Lists.newArrayList(); MockScanBatchCreator creator = new MockScanBatchCreator(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java index 1f0951bbd..74ce22596 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java @@ -19,9 +19,13 @@ package org.apache.drill.exec.memory; -import com.google.common.base.Charsets; -import com.google.common.io.Files; +import static org.junit.Assert.assertEquals; import io.netty.buffer.DrillBuf; + +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.util.FileUtils; import org.apache.drill.exec.ExecConstants; @@ -41,11 +45,9 @@ import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.RemoteServiceSet; import org.apache.drill.exec.store.StoragePluginRegistry; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; +import com.google.common.base.Charsets; +import com.google.common.io.Files; public class TestAllocators { @@ -100,23 +102,25 @@ public class TestAllocators { OperatorStats stats; //Use some bogus operator type to create a new operator context. - def = new OpProfileDef(physicalOperator1.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE, OperatorContext.getChildCount(physicalOperator1)); + def = new OpProfileDef(physicalOperator1.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE, + OperatorContext.getChildCount(physicalOperator1)); stats = fragmentContext1.getStats().getOperatorStats(def, fragmentContext1.getAllocator()); // Add a couple of Operator Contexts // Initial allocation = 1000000 bytes for all operators - OperatorContext oContext11 = new OperatorContext(physicalOperator1, fragmentContext1, true); + OperatorContext oContext11 = fragmentContext1.newOperatorContext(physicalOperator1, true); DrillBuf b11=oContext11.getAllocator().buffer(1000000); - OperatorContext oContext12 = new OperatorContext(physicalOperator2, fragmentContext1, stats, true); + OperatorContext oContext12 = fragmentContext1.newOperatorContext(physicalOperator2, stats, true); DrillBuf b12=oContext12.getAllocator().buffer(500000); - OperatorContext oContext21 = new OperatorContext(physicalOperator3, fragmentContext2, true); + OperatorContext oContext21 = fragmentContext1.newOperatorContext(physicalOperator3, true); - def = new OpProfileDef(physicalOperator4.getOperatorId(), UserBitShared.CoreOperatorType.TEXT_WRITER_VALUE, OperatorContext.getChildCount(physicalOperator4)); + def = new OpProfileDef(physicalOperator4.getOperatorId(), UserBitShared.CoreOperatorType.TEXT_WRITER_VALUE, + OperatorContext.getChildCount(physicalOperator4)); stats = fragmentContext2.getStats().getOperatorStats(def, fragmentContext2.getAllocator()); - OperatorContext oContext22 = new OperatorContext(physicalOperator4, fragmentContext2, stats, true); + OperatorContext oContext22 = fragmentContext2.newOperatorContext(physicalOperator4, stats, true); DrillBuf b22=oContext22.getAllocator().buffer(2000000); // New Fragment begins @@ -127,15 +131,16 @@ public class TestAllocators { FragmentContext fragmentContext3 = new FragmentContext(bitContext, pf3, null, functionRegistry); // New fragment starts an operator that allocates an amount within the limit - def = new OpProfileDef(physicalOperator5.getOperatorId(), UserBitShared.CoreOperatorType.UNION_VALUE, OperatorContext.getChildCount(physicalOperator5)); + def = new OpProfileDef(physicalOperator5.getOperatorId(), UserBitShared.CoreOperatorType.UNION_VALUE, + OperatorContext.getChildCount(physicalOperator5)); stats = fragmentContext3.getStats().getOperatorStats(def, fragmentContext3.getAllocator()); - OperatorContext oContext31 = new OperatorContext(physicalOperator5, fragmentContext3, stats, true); + OperatorContext oContext31 = fragmentContext3.newOperatorContext(physicalOperator5, stats, true); DrillBuf b31a = oContext31.getAllocator().buffer(200000); //Previously running operator completes b22.release(); - oContext22.close(); + ((AutoCloseable) oContext22).close(); // Fragment 3 asks for more and fails boolean outOfMem=false; @@ -153,7 +158,7 @@ public class TestAllocators { // Operator is Exempt from Fragment limits. Fragment 3 asks for more and succeeds outOfMem=false; - OperatorContext oContext32 = new OperatorContext(physicalOperator6, fragmentContext3, false); + OperatorContext oContext32 = fragmentContext3.newOperatorContext(physicalOperator6, false); DrillBuf b32=null; try { b32=oContext32.getAllocator().buffer(4400000); @@ -165,17 +170,17 @@ public class TestAllocators { }else{ outOfMem=true; } - oContext32.close(); + closeOp(oContext32); } assertEquals(false, (boolean)outOfMem); b11.release(); - oContext11.close(); + closeOp(oContext11); b12.release(); - oContext12.close(); - oContext21.close(); + closeOp(oContext12); + closeOp(oContext21); b31a.release(); - oContext31.close(); + closeOp(oContext31); fragmentContext1.close(); fragmentContext2.close(); @@ -184,4 +189,8 @@ public class TestAllocators { bit.close(); serviceSet.close(); } + + private void closeOp(OperatorContext c) throws Exception { + ((AutoCloseable) c).close(); + } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java index 2536bbbb9..42d2193f5 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java @@ -101,7 +101,6 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector> { switch (incoming.next()) { case NONE: case STOP: - incoming.cleanup(); return false; default: return true; @@ -109,8 +108,8 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector> { } @Override - public void stop() { - screenRoot.stop(); + public void close() throws Exception { + screenRoot.close(); } @Override diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestCastFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestCastFunctions.java index e5448ac93..ffa876505 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestCastFunctions.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestCastFunctions.java @@ -111,7 +111,7 @@ public class TestCastFunctions extends PopUnitTestBase{ assertEquals(5, count); } - exec.stop(); + exec.close(); context.close(); allocator.close(); @@ -159,7 +159,7 @@ public class TestCastFunctions extends PopUnitTestBase{ assertEquals(5, count); } - exec.stop(); + exec.close(); context.close(); allocator.close(); @@ -205,7 +205,7 @@ public class TestCastFunctions extends PopUnitTestBase{ assertEquals(5, count); } - exec.stop(); + exec.close(); context.close(); allocator.close(); @@ -252,7 +252,7 @@ public class TestCastFunctions extends PopUnitTestBase{ assertEquals(5, count); } - exec.stop(); + exec.close(); context.close(); allocator.close(); @@ -299,7 +299,7 @@ public class TestCastFunctions extends PopUnitTestBase{ assertEquals(5, count); } - exec.stop(); + exec.close(); context.close(); allocator.close(); @@ -346,7 +346,7 @@ public class TestCastFunctions extends PopUnitTestBase{ } assertEquals(5, count); } - exec.stop(); + exec.close(); context.close(); allocator.close(); @@ -392,7 +392,7 @@ public class TestCastFunctions extends PopUnitTestBase{ } assertEquals(5, count); } - exec.stop(); + exec.close(); context.close(); allocator.close(); @@ -428,7 +428,7 @@ public class TestCastFunctions extends PopUnitTestBase{ while(exec.next()){ } - exec.stop(); + exec.close(); context.close(); allocator.close(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java index 0f6fd43b9..c69c6f59c 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java @@ -80,7 +80,7 @@ public class TestComparisonFunctions extends ExecTest { // } } - exec.stop(); + exec.close(); context.close(); if (context.getFailureCause() != null) { diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java index a112d92a8..a069078ef 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java @@ -74,7 +74,7 @@ public class TestSimpleFilter extends ExecTest { assertEquals(50, exec.getRecordCount()); } - exec.stop(); + exec.close(); if(context.getFailureCause() != null){ throw context.getFailureCause(); @@ -106,7 +106,7 @@ public class TestSimpleFilter extends ExecTest { } recordCount += exec.getSelectionVector4().getCount(); } - exec.stop(); + exec.close(); assertEquals(50, recordCount); if(context.getFailureCause() != null){ diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java index ef3a330b6..6c067febe 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java @@ -93,7 +93,7 @@ public class TestHashJoin extends PopUnitTestBase { while (exec.next()) { totalRecordCount += exec.getRecordCount(); } - exec.stop(); + exec.close(); assertEquals(expectedRows, totalRecordCount); System.out.println("Total Record Count: " + totalRecordCount); if (context.getFailureCause() != null) { diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java index 46bcc6014..6a6a7e080 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java @@ -17,7 +17,10 @@ */ package org.apache.drill.exec.physical.impl.partitionsender; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.File; import java.io.IOException; @@ -46,8 +49,8 @@ import org.apache.drill.exec.planner.fragment.Fragment; import org.apache.drill.exec.planner.fragment.PlanningSet; import org.apache.drill.exec.planner.fragment.SimpleParallelizer; import org.apache.drill.exec.pop.PopUnitTestBase; -import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.BitControl.PlanFragment; +import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.proto.UserBitShared.MetricValue; import org.apache.drill.exec.proto.UserBitShared.OperatorProfile; import org.apache.drill.exec.proto.UserBitShared.QueryId; @@ -363,8 +366,8 @@ public class TestPartitionSender extends PlanTestBase { super(context, incoming, operator); } - public void close() { - oContext.close(); + public void close() throws Exception { + ((AutoCloseable) oContext).close(); } public int getNumberPartitions() { diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java index d0d4005ba..b82846e6d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java @@ -82,7 +82,7 @@ public class TestTraceMultiRecordBatch extends ExecTest { } } - exec.stop(); + exec.close(); if(context.getFailureCause() != null){ throw context.getFailureCause(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java index f6766b1d5..1cb72ffa6 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java @@ -93,7 +93,7 @@ public class TestTraceOutputDump extends ExecTest { while(exec.next()){ } - exec.stop(); + exec.close(); if(context.getFailureCause() != null){ throw context.getFailureCause(); |