aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src
diff options
context:
space:
mode:
authorJacques Nadeau <jacques@apache.org>2015-04-18 16:40:02 -0700
committerJacques Nadeau <jacques@apache.org>2015-05-02 19:33:54 -0700
commit88bb05194b023467d590ac747ec5fa14d04249f5 (patch)
tree5be7744f49f6b22f501bc681d965c780d88b69e3 /exec/java-exec/src
parent636177df12c593368676d42bc17b65d684c8d000 (diff)
DRILL-2826: Simplify and centralize Operator Cleanup
- Remove cleanup method from RecordBatch interface - Make OperatorContext creation and closing the management of FragmentContext - Make OperatorContext an abstract class and the impl only available to FragmentContext - Make RecordBatch closing the responsibility of the RootExec - Make all closes be suppresing closes to maximize memory release in failure - Add new CloseableRecordBatch interface used by RootExec - Make RootExec AutoCloseable - Update RecordBatchCreator to return CloseableRecordBatches so that RootExec can maintain list - Generate list of operators through change in ImplCreator
Diffstat (limited to 'exec/java-exec/src')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java21
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java28
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java88
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java110
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java28
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java45
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java21
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java19
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/CloseableRecordBatch.java22
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java30
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java4
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java6
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/RunRootExec.java2
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java2
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java18
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java51
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java5
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestCastFunctions.java16
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java2
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java4
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java2
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java11
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java2
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java2
91 files changed, 489 insertions, 415 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
index d22651e23..22fcb8e08 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
@@ -118,10 +118,15 @@ public class TopLevelAllocator implements BufferAllocator {
}
@Override
- public BufferAllocator getChildAllocator(FragmentContext context, long initialReservation, long maximumReservation, boolean applyFragmentLimit) throws OutOfMemoryException {
+ public BufferAllocator getChildAllocator(FragmentContext context, long initialReservation, long maximumReservation,
+ boolean applyFragmentLimit) {
if(!acct.reserve(initialReservation)){
logger.debug(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, acct.getCapacity() - acct.getAllocation()));
- throw new OutOfMemoryException(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, acct.getCapacity() - acct.getAllocation()));
+ throw new OutOfMemoryRuntimeException(
+ String
+ .format(
+ "You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.",
+ initialReservation, acct.getCapacity() - acct.getAllocation()));
};
logger.debug("New child allocator with initial reservation {}", initialReservation);
ChildAllocator allocator = new ChildAllocator(context, acct, maximumReservation, initialReservation, childrenMap, applyFragmentLimit);
@@ -191,7 +196,7 @@ public class TopLevelAllocator implements BufferAllocator {
long pre,
Map<ChildAllocator,
StackTraceElement[]> map,
- boolean applyFragmentLimit) throws OutOfMemoryException{
+ boolean applyFragmentLimit) {
assert max >= pre;
this.applyFragmentLimit=applyFragmentLimit;
DrillConfig drillConf = context != null ? context.getConfig() : null;
@@ -240,10 +245,14 @@ public class TopLevelAllocator implements BufferAllocator {
}
@Override
- public BufferAllocator getChildAllocator(FragmentContext context, long initialReservation, long maximumReservation, boolean applyFragmentLimit)
- throws OutOfMemoryException {
+ public BufferAllocator getChildAllocator(FragmentContext context, long initialReservation, long maximumReservation,
+ boolean applyFragmentLimit) {
if (!childAcct.reserve(initialReservation)) {
- throw new OutOfMemoryException(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, childAcct.getAvailable()));
+ throw new OutOfMemoryRuntimeException(
+ String
+ .format(
+ "You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.",
+ initialReservation, childAcct.getAvailable()));
};
logger.debug("New child allocator with initial reservation {}", initialReservation);
ChildAllocator newChildAllocator = new ChildAllocator(context, childAcct, maximumReservation, initialReservation, null, applyFragmentLimit);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index e6d5acd97..09a75689e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -27,7 +27,6 @@ import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.jdbc.SimpleCalciteSchema;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.ClassTransformationException;
@@ -36,6 +35,7 @@ import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -57,6 +57,7 @@ import org.apache.drill.exec.work.batch.IncomingBuffers;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
@@ -66,6 +67,8 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
private final Map<DrillbitEndpoint, AccountingDataTunnel> tunnels = Maps.newHashMap();
+ private final List<OperatorContextImpl> contexts = Lists.newLinkedList();
+
private final DrillbitContext context;
private final UserClientConnection connection; // is null if this context is for non-root fragment
private final QueryContext queryContext; // is null if this context is for non-root fragment
@@ -145,8 +148,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
// Add the fragment context to the root allocator.
// The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
try {
- allocator = context.getAllocator().getChildAllocator(
- this, fragment.getMemInitial(), fragment.getMemMax(), true);
+ allocator = context.getAllocator().getChildAllocator(this, fragment.getMemInitial(), fragment.getMemMax(), true);
Preconditions.checkNotNull(allocator, "Unable to acuqire allocator");
} catch(final Throwable e) {
throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
@@ -314,6 +316,20 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
return buffers;
}
+ public OperatorContext newOperatorContext(PhysicalOperator popConfig, OperatorStats stats, boolean applyFragmentLimit)
+ throws OutOfMemoryException {
+ OperatorContextImpl context = new OperatorContextImpl(popConfig, this, stats, applyFragmentLimit);
+ contexts.add(context);
+ return context;
+ }
+
+ public OperatorContext newOperatorContext(PhysicalOperator popConfig, boolean applyFragmentLimit)
+ throws OutOfMemoryException {
+ OperatorContextImpl context = new OperatorContextImpl(popConfig, this, applyFragmentLimit);
+ contexts.add(context);
+ return context;
+ }
+
@VisibleForTesting
@Deprecated
public Throwable getFailureCause() {
@@ -359,6 +375,12 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
@Override
public void close() {
waitForSendComplete();
+
+ // close operator context
+ for (OperatorContextImpl opContext : contexts) {
+ suppressingClose(opContext);
+ }
+
suppressingClose(bufferManager);
suppressingClose(buffers);
suppressingClose(allocator);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
index ccafa6783..7cc52bafe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -21,57 +21,20 @@ import io.netty.buffer.DrillBuf;
import java.util.Iterator;
-import org.apache.drill.common.util.Hook.Closeable;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.physical.base.PhysicalOperator;
-import com.carrotsearch.hppc.LongObjectOpenHashMap;
+public abstract class OperatorContext {
-public class OperatorContext implements Closeable {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContext.class);
+ public abstract DrillBuf replace(DrillBuf old, int newSize);
- private final BufferAllocator allocator;
- private boolean closed = false;
- private PhysicalOperator popConfig;
- private OperatorStats stats;
- private LongObjectOpenHashMap<DrillBuf> managedBuffers = new LongObjectOpenHashMap<>();
- private final boolean applyFragmentLimit;
+ public abstract DrillBuf getManagedBuffer();
- public OperatorContext(PhysicalOperator popConfig, FragmentContext context, boolean applyFragmentLimit) throws OutOfMemoryException {
- this.applyFragmentLimit=applyFragmentLimit;
- this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation(), applyFragmentLimit);
- this.popConfig = popConfig;
+ public abstract DrillBuf getManagedBuffer(int size);
- OpProfileDef def = new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), getChildCount(popConfig));
- this.stats = context.getStats().getOperatorStats(def, allocator);
- }
-
- public OperatorContext(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats, boolean applyFragmentLimit) throws OutOfMemoryException {
- this.applyFragmentLimit=applyFragmentLimit;
- this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation(), applyFragmentLimit);
- this.popConfig = popConfig;
- this.stats = stats;
- }
-
- public DrillBuf replace(DrillBuf old, int newSize) {
- if (managedBuffers.remove(old.memoryAddress()) == null) {
- throw new IllegalStateException("Tried to remove unmanaged buffer.");
- }
- old.release();
- return getManagedBuffer(newSize);
- }
+ public abstract BufferAllocator getAllocator();
- public DrillBuf getManagedBuffer() {
- return getManagedBuffer(256);
- }
-
- public DrillBuf getManagedBuffer(int size) {
- DrillBuf newBuf = allocator.buffer(size);
- managedBuffers.put(newBuf.memoryAddress(), newBuf);
- newBuf.setOperatorContext(this);
- return newBuf;
- }
+ public abstract OperatorStats getStats();
public static int getChildCount(PhysicalOperator popConfig) {
Iterator<PhysicalOperator> iter = popConfig.iterator();
@@ -87,41 +50,4 @@ public class OperatorContext implements Closeable {
return i;
}
- public BufferAllocator getAllocator() {
- if (allocator == null) {
- throw new UnsupportedOperationException("Operator context does not have an allocator");
- }
- return allocator;
- }
-
- public boolean isClosed() {
- return closed;
- }
-
- @Override
- public void close() {
- if (closed) {
- logger.debug("Attempted to close Operator context for {}, but context is already closed", popConfig != null ? popConfig.getClass().getName() : null);
- return;
- }
- logger.debug("Closing context for {}", popConfig != null ? popConfig.getClass().getName() : null);
-
- // release managed buffers.
- Object[] buffers = ((LongObjectOpenHashMap<Object>)(Object)managedBuffers).values;
- for (int i =0; i < buffers.length; i++) {
- if (managedBuffers.allocated[i]) {
- ((DrillBuf)buffers[i]).release();
- }
- }
-
- if (allocator != null) {
- allocator.close();
- }
- closed = true;
- }
-
- public OperatorStats getStats() {
- return stats;
- }
-
-}
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
new file mode 100644
index 000000000..6dbd880b8
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ops;
+
+import io.netty.buffer.DrillBuf;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+
+import com.carrotsearch.hppc.LongObjectOpenHashMap;
+
+class OperatorContextImpl extends OperatorContext implements AutoCloseable {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContextImpl.class);
+
+ private final BufferAllocator allocator;
+ private boolean closed = false;
+ private PhysicalOperator popConfig;
+ private OperatorStats stats;
+ private LongObjectOpenHashMap<DrillBuf> managedBuffers = new LongObjectOpenHashMap<>();
+ private final boolean applyFragmentLimit;
+
+ public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, boolean applyFragmentLimit) throws OutOfMemoryException {
+ this.applyFragmentLimit=applyFragmentLimit;
+ this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation(), applyFragmentLimit);
+ this.popConfig = popConfig;
+
+ OpProfileDef def = new OpProfileDef(popConfig.getOperatorId(), popConfig.getOperatorType(), getChildCount(popConfig));
+ this.stats = context.getStats().getOperatorStats(def, allocator);
+ }
+
+ public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats, boolean applyFragmentLimit) throws OutOfMemoryException {
+ this.applyFragmentLimit=applyFragmentLimit;
+ this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation(), applyFragmentLimit);
+ this.popConfig = popConfig;
+ this.stats = stats;
+ }
+
+ public DrillBuf replace(DrillBuf old, int newSize) {
+ if (managedBuffers.remove(old.memoryAddress()) == null) {
+ throw new IllegalStateException("Tried to remove unmanaged buffer.");
+ }
+ old.release();
+ return getManagedBuffer(newSize);
+ }
+
+ public DrillBuf getManagedBuffer() {
+ return getManagedBuffer(256);
+ }
+
+ public DrillBuf getManagedBuffer(int size) {
+ DrillBuf newBuf = allocator.buffer(size);
+ managedBuffers.put(newBuf.memoryAddress(), newBuf);
+ newBuf.setOperatorContext(this);
+ return newBuf;
+ }
+
+ public BufferAllocator getAllocator() {
+ if (allocator == null) {
+ throw new UnsupportedOperationException("Operator context does not have an allocator");
+ }
+ return allocator;
+ }
+
+ public boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public void close() {
+ if (closed) {
+ logger.debug("Attempted to close Operator context for {}, but context is already closed", popConfig != null ? popConfig.getClass().getName() : null);
+ return;
+ }
+ logger.debug("Closing context for {}", popConfig != null ? popConfig.getClass().getName() : null);
+
+ // release managed buffers.
+ Object[] buffers = ((LongObjectOpenHashMap<Object>)(Object)managedBuffers).values;
+ for (int i =0; i < buffers.length; i++) {
+ if (managedBuffers.allocated[i]) {
+ ((DrillBuf)buffers[i]).release();
+ }
+ }
+
+ if (allocator != null) {
+ allocator.close();
+ }
+ closed = true;
+ }
+
+ public OperatorStats getStats() {
+ return stats;
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index 628dcd3ba..accce43ed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.physical.impl;
+import java.util.List;
+
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OpProfileDef;
@@ -24,6 +26,7 @@ import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
@@ -33,9 +36,10 @@ public abstract class BaseRootExec implements RootExec {
protected OperatorStats stats = null;
protected OperatorContext oContext = null;
protected FragmentContext fragmentContext = null;
+ private List<CloseableRecordBatch> operators;
public BaseRootExec(final FragmentContext fragmentContext, final PhysicalOperator config) throws OutOfMemoryException {
- this.oContext = new OperatorContext(config, fragmentContext, stats, true);
+ this.oContext = fragmentContext.newOperatorContext(config, stats, true);
stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
config.getOperatorType(), OperatorContext.getChildCount(config)),
oContext.getAllocator());
@@ -43,15 +47,20 @@ public abstract class BaseRootExec implements RootExec {
this.fragmentContext = fragmentContext;
}
- public BaseRootExec(final FragmentContext fragmentContext, final OperatorContext oContext, final PhysicalOperator config) throws OutOfMemoryException {
+ public BaseRootExec(final FragmentContext fragmentContext, final OperatorContext oContext,
+ final PhysicalOperator config) throws OutOfMemoryException {
this.oContext = oContext;
stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
- config.getOperatorType(), OperatorContext.getChildCount(config)),
+ config.getOperatorType(), OperatorContext.getChildCount(config)),
oContext.getAllocator());
fragmentContext.getStats().addOperatorStats(this.stats);
this.fragmentContext = fragmentContext;
}
+ void setOperators(List<CloseableRecordBatch> operators) {
+ this.operators = operators;
+ }
+
@Override
public final boolean next() {
// Stats should have been initialized
@@ -95,7 +104,7 @@ public abstract class BaseRootExec implements RootExec {
}
@Override
- public void stop() {
+ public void close() throws Exception {
// We want to account for the time spent waiting here as Wait time in the operator profile
try {
stats.startProcessing();
@@ -105,5 +114,16 @@ public abstract class BaseRootExec implements RootExec {
stats.stopWait();
stats.stopProcessing();
}
+
+ // close all operators.
+ if (operators != null) {
+ for (CloseableRecordBatch b : operators) {
+ try {
+ b.close();
+ } catch (Exception e) {
+ fragmentContext.fail(e);
+ }
+ }
+ }
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
index 1cf7da7b6..af99b5ea1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
@@ -20,13 +20,14 @@ package org.apache.drill.exec.physical.impl;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
public interface BatchCreator<T extends PhysicalOperator> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchCreator.class);
- public RecordBatch getBatch(FragmentContext context, T config, List<RecordBatch> children) throws ExecutionSetupException;
+ public CloseableRecordBatch getBatch(FragmentContext context, T config, List<RecordBatch> children)
+ throws ExecutionSetupException;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index 912dfd73b..5cea748b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -27,14 +28,15 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.impl.validate.IteratorValidatorInjector;
+import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.util.AssertionUtil;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
-import org.apache.drill.exec.util.ImpersonationUtil;
-import org.apache.hadoop.security.UserGroupInformation;
/**
* Create RecordBatch tree (PhysicalOperator implementations) for a given PhysicalOperator tree.
@@ -42,15 +44,23 @@ import org.apache.hadoop.security.UserGroupInformation;
public class ImplCreator {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ImplCreator.class);
- private static final ImplCreator INSTANCE = new ImplCreator();
+ private RootExec root = null;
+ private LinkedList<CloseableRecordBatch> operators = Lists.newLinkedList();
private ImplCreator() {}
+ private List<CloseableRecordBatch> getOperators() {
+ return operators;
+ }
+
/**
* Create and return fragment RootExec for given FragmentRoot. RootExec has one or more RecordBatches as children
* (which may contain child RecordBatches and so on).
- * @param context FragmentContext.
- * @param root FragmentRoot.
+ *
+ * @param context
+ * FragmentContext.
+ * @param root
+ * FragmentRoot.
* @return RootExec of fragment.
* @throws ExecutionSetupException
*/
@@ -61,10 +71,16 @@ public class ImplCreator {
if (AssertionUtil.isAssertionsEnabled()) {
root = IteratorValidatorInjector.rewritePlanWithIteratorValidator(context, root);
}
-
+ final ImplCreator creator = new ImplCreator();
Stopwatch watch = new Stopwatch();
watch.start();
- final RootExec rootExec = INSTANCE.getRootExec(root, context);
+ final RootExec rootExec = creator.getRootExec(root, context);
+
+ // skip over this for SimpleRootExec (testing)
+ if (rootExec instanceof BaseRootExec) {
+ ((BaseRootExec) rootExec).setOperators(creator.getOperators());
+ }
+
logger.debug("Took {} ms to create RecordBatch tree", watch.elapsed(TimeUnit.MILLISECONDS));
if (rootExec == null) {
throw new ExecutionSetupException(
@@ -72,6 +88,7 @@ public class ImplCreator {
}
return rootExec;
+
}
/** Create RootExec and its children (RecordBatches) for given FragmentRoot */
@@ -96,6 +113,7 @@ public class ImplCreator {
}
}
+
/** Create a RecordBatch and its children for given PhysicalOperator */
private RecordBatch getRecordBatch(final PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException {
Preconditions.checkNotNull(op);
@@ -107,7 +125,10 @@ public class ImplCreator {
try {
return proxyUgi.doAs(new PrivilegedExceptionAction<RecordBatch>() {
public RecordBatch run() throws Exception {
- return ((BatchCreator<PhysicalOperator>) getOpCreator(op, context)).getBatch(context, op, childRecordBatches);
+ final CloseableRecordBatch batch = ((BatchCreator<PhysicalOperator>) getOpCreator(op, context)).getBatch(
+ context, op, childRecordBatches);
+ operators.addFirst(batch);
+ return batch;
}
});
} catch (InterruptedException | IOException e) {
@@ -116,7 +137,10 @@ public class ImplCreator {
throw new ExecutionSetupException(errMsg, e);
}
} else {
- return ((BatchCreator<PhysicalOperator>) getOpCreator(op, context)).getBatch(context, op, childRecordBatches);
+ final CloseableRecordBatch batch = ((BatchCreator<PhysicalOperator>) getOpCreator(op, context)).getBatch(context,
+ op, childRecordBatches);
+ operators.addFirst(batch);
+ return batch;
}
}
@@ -141,4 +165,5 @@ public class ImplCreator {
return children;
}
-} \ No newline at end of file
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
index daef44cce..1bf6c0105 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
@@ -32,7 +32,7 @@ public class MergingReceiverCreator implements BatchCreator<MergingReceiverPOP>
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergingReceiverCreator.class);
@Override
- public RecordBatch getBatch(FragmentContext context,
+ public MergingRecordBatch getBatch(FragmentContext context,
MergingReceiverPOP receiver,
List<RecordBatch> children)
throws ExecutionSetupException, OutOfMemoryException {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
index 8fd68b224..5e366fb43 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
@@ -23,7 +23,7 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
* A FragmentRoot is a node which is the last processing node in a query plan. FragmentTerminals include Exchange
* output nodes and storage nodes. They are there driving force behind the completion of a query.
*/
-public interface RootExec {
+public interface RootExec extends AutoCloseable {
/**
* Do the next batch of work.
* @return Whether or not additional batches of work are necessary. False means that this fragment is done.
@@ -31,11 +31,6 @@ public interface RootExec {
public boolean next();
/**
- * Inform all children to clean up and go away.
- */
- public void stop();
-
- /**
* Inform sender that receiving fragment is finished and doesn't need any more data
* @param handle
*/
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index ca2a0486d..6ea43cda4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -24,7 +24,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MinorType;
@@ -38,8 +37,8 @@ import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
+import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
@@ -59,7 +58,7 @@ import com.google.common.collect.Maps;
/**
* Record batch used for a particular scan. Operators against one or more
*/
-public class ScanBatch implements RecordBatch {
+public class ScanBatch implements CloseableRecordBatch {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanBatch.class);
private static final int MAX_RECORD_CNT = Character.MAX_VALUE;
@@ -115,7 +114,7 @@ public class ScanBatch implements RecordBatch {
public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, Iterator<RecordReader> readers) throws ExecutionSetupException {
this(subScanConfig, context,
- new OperatorContext(subScanConfig, context, false /* ScanBatch is not subject to fragment memory limit */),
+ context.newOperatorContext(subScanConfig, false /* ScanBatch is not subject to fragment memory limit */),
readers, Collections.<String[]> emptyList(), Collections.<Integer> emptyList());
}
@@ -343,7 +342,7 @@ public class ScanBatch implements RecordBatch {
return WritableBatch.get(this);
}
- public void cleanup() {
+ public void close() {
container.clear();
if (tempContainer != null) {
tempContainer.clear();
@@ -353,7 +352,6 @@ public class ScanBatch implements RecordBatch {
}
fieldVectorMap.clear();
currentReader.cleanup();
- oContext.close();
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 2069d35bf..5b4d7bddc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -81,12 +81,10 @@ public class ScreenCreator implements RootCreator<Screen>{
logger.trace("Screen Outcome {}", outcome);
switch (outcome) {
case STOP:
- this.internalStop();
return false;
case NONE:
if (firstBatch) {
// this is the only data message sent to the client and may contain the schema
- this.internalStop();
QueryWritableBatch batch;
QueryData header = QueryData.newBuilder() //
.setQueryId(context.getHandle().getQueryId()) //
@@ -130,21 +128,6 @@ public class ScreenCreator implements RootCreator<Screen>{
stats.addLongStat(Metric.BYTES_SENT, queryBatch.getByteCount());
}
-
- private void internalStop(){
- oContext.close();
- incoming.cleanup();
- }
-
- @Override
- public void stop() {
- super.stop();
- if (!oContext.isClosed()) {
- internalStop();
- }
- injector.injectPause(context.getExecutionControls(), "send-complete", logger);
- }
-
RecordBatch getIncoming() {
return incoming;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 18ea71d71..67062f3e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -24,7 +24,6 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.AccountingDataTunnel;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
-import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.config.SingleSender;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.record.BatchSchema;
@@ -64,7 +63,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
}
public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config) throws OutOfMemoryException {
- super(context, new OperatorContext(config, context, null, false), config);
+ super(context, context.newOperatorContext(config, null, false), config);
this.incoming = batch;
assert(incoming != null);
this.handle = context.getHandle();
@@ -136,13 +135,6 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
}
@Override
- public void stop() {
- super.stop();
- oContext.close();
- incoming.cleanup();
- }
-
- @Override
public void receivingFragmentFinished(FragmentHandle handle) {
done = true;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 681c3e339..9f6bea99b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -117,15 +117,14 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
}
@Override
- public void cleanup() {
+ public void close() {
if (sv4 != null) {
sv4.clear();
}
if (priorityQueue != null) {
priorityQueue.cleanup();
}
- super.cleanup();
- incoming.cleanup();
+ super.close();
}
public void buildSchema() throws SchemaChangeException {
@@ -424,10 +423,6 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
}
@Override
- public void cleanup() {
- }
-
- @Override
public Iterator<VectorWrapper<?>> iterator() {
return container.iterator();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java
index aa8b6115d..e815bff31 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java
@@ -31,7 +31,8 @@ public class TopNSortBatchCreator implements BatchCreator<TopN>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopNSortBatchCreator.class);
@Override
- public RecordBatch getBatch(FragmentContext context, TopN config, List<RecordBatch> children) throws ExecutionSetupException {
+ public TopNBatch getBatch(FragmentContext context, TopN config, List<RecordBatch> children)
+ throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new TopNBatch(config, context, children.iterator().next());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index b419f7102..15fb7b576 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -24,7 +24,6 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.Writer;
@@ -179,7 +178,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
}
@Override
- public void cleanup() {
+ public void close() {
try {
if (recordWriter != null) {
recordWriter.cleanup();
@@ -188,8 +187,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
logger.error("Failure while closing record writer", ex);
throw new RuntimeException("Failed to close RecordWriter", ex);
}
- super.cleanup();
- incoming.cleanup();
+ super.close();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index c29fbf246..b75357498 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -30,7 +30,6 @@ import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.ClassGenerator.BlockType;
import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
@@ -135,7 +134,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
return aggregator.getOutcome();
case UPDATE_AGGREGATOR:
context.fail(new SchemaChangeException("Hash aggregate does not support schema changes"));
- cleanup();
+ close();
killIncoming(false);
return IterOutcome.STOP;
default:
@@ -273,12 +272,11 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
}
@Override
- public void cleanup() {
+ public void close() {
if (aggregator != null) {
aggregator.cleanup();
}
- super.cleanup();
- incoming.cleanup();
+ super.close();
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java
index 8c605415d..1397342f6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java
@@ -31,7 +31,8 @@ public class HashAggBatchCreator implements BatchCreator<HashAggregate>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggBatchCreator.class);
@Override
- public RecordBatch getBatch(FragmentContext context, HashAggregate config, List<RecordBatch> children) throws ExecutionSetupException {
+ public HashAggBatch getBatch(FragmentContext context, HashAggregate config, List<RecordBatch> children)
+ throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new HashAggBatch(config, children.iterator().next(), context);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 87cd4d604..1b90dd8ae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -30,7 +30,6 @@ import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.types.TypeProtos.MajorType;
-import org.apache.drill.common.types.Types;
import org.apache.drill.exec.compile.sig.RuntimeOverridden;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
@@ -52,14 +51,12 @@ import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.ObjectVector;
-import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VariableWidthVector;
-import com.google.common.collect.Lists;
-
public abstract class HashAggTemplate implements HashAggregator {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggregator.class);
@@ -351,10 +348,6 @@ public abstract class HashAggTemplate implements HashAggregator {
outputCurrentBatch();
- // cleanup incoming batch since output of aggregation does not need
- // any references to the incoming
-
- incoming.cleanup();
// return setOkAndReturn();
return AggOutcome.RETURN_OUTCOME;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index ed5b415e5..c1c5cb947 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -30,7 +30,6 @@ import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
-import org.apache.drill.exec.expr.ClassGenerator.BlockType;
import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
@@ -177,7 +176,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
return outcome;
case UPDATE_AGGREGATOR:
context.fail(new SchemaChangeException("Streaming aggregate does not support schema changes"));
- cleanup();
+ close();
killIncoming(false);
return IterOutcome.STOP;
default:
@@ -409,9 +408,8 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
}
@Override
- public void cleanup() {
- super.cleanup();
- incoming.cleanup();
+ public void close() {
+ super.close();
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java
index 0203b8144..cac5b06d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java
@@ -31,7 +31,8 @@ public class StreamingAggBatchCreator implements BatchCreator<StreamingAggregate
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggBatchCreator.class);
@Override
- public RecordBatch getBatch(FragmentContext context, StreamingAggregate config, List<RecordBatch> children) throws ExecutionSetupException {
+ public StreamingAggBatch getBatch(FragmentContext context, StreamingAggregate config, List<RecordBatch> children)
+ throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new StreamingAggBatch(config, children.iterator().next(), context);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index b3a6a8f72..d2282c8b9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -23,7 +23,6 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.AccountingDataTunnel;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
-import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.config.BroadcastSender;
import org.apache.drill.exec.physical.impl.BaseRootExec;
@@ -62,7 +61,7 @@ public class BroadcastSenderRootExec extends BaseRootExec {
public BroadcastSenderRootExec(FragmentContext context,
RecordBatch incoming,
BroadcastSender config) throws OutOfMemoryException {
- super(context, new OperatorContext(config, context, null, false), config);
+ super(context, context.newOperatorContext(config, null, false), config);
this.ok = true;
this.context = context;
this.incoming = incoming;
@@ -153,10 +152,4 @@ public class BroadcastSenderRootExec extends BaseRootExec {
stats.addLongStat(Metric.BYTES_SENT, writableBatch.getByteCount());
}
- @Override
- public void stop() {
- super.stop();
- oContext.close();
- incoming.cleanup();
- }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
index 7f2fe8e24..e9b305169 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
@@ -31,7 +31,8 @@ public class FilterBatchCreator implements BatchCreator<Filter>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterBatchCreator.class);
@Override
- public RecordBatch getBatch(FragmentContext context, Filter config, List<RecordBatch> children) throws ExecutionSetupException {
+ public FilterRecordBatch getBatch(FragmentContext context, Filter config, List<RecordBatch> children)
+ throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new FilterRecordBatch(config, children.iterator().next(), context);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index 064d5c894..5eee9dfe5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -86,14 +86,14 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
@Override
- public void cleanup() {
+ public void close() {
if (sv2 != null) {
sv2.clear();
}
if (sv4 != null) {
sv4.clear();
}
- super.cleanup();
+ super.close();
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java
index 6f0282434..94203d81b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java
@@ -22,7 +22,6 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.FlattenPOP;
-import org.apache.drill.exec.physical.config.Project;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.RecordBatch;
@@ -32,7 +31,8 @@ public class FlattenBatchCreator implements BatchCreator<FlattenPOP>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenBatchCreator.class);
@Override
- public RecordBatch getBatch(FragmentContext context, FlattenPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+ public FlattenRecordBatch getBatch(FragmentContext context, FlattenPOP config, List<RecordBatch> children)
+ throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new FlattenRecordBatch(config, children.iterator().next(), context);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 79fe17740..dd53477aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -516,7 +516,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
}
@Override
- public void cleanup() {
+ public void close() {
if (hjHelper != null) {
hjHelper.clear();
}
@@ -529,9 +529,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
if (hashTable != null) {
hashTable.clear();
}
- super.cleanup();
- right.cleanup();
- left.cleanup();
+ super.close();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
index bfe89c020..140276943 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
@@ -30,7 +30,8 @@ import com.google.common.base.Preconditions;
public class HashJoinBatchCreator implements BatchCreator<HashJoinPOP> {
@Override
- public RecordBatch getBatch(FragmentContext context, HashJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+ public HashJoinBatch getBatch(FragmentContext context, HashJoinPOP config, List<RecordBatch> children)
+ throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 2);
return new HashJoinBatch(config, context, children.get(0), children.get(1));
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 1a7e60e7c..6466f70b7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -38,7 +38,6 @@ import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
@@ -256,14 +255,6 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
right.kill(sendUpstream);
}
- @Override
- public void cleanup() {
- super.cleanup();
-
- left.cleanup();
- right.cleanup();
- }
-
private void generateDoCompareNextLeft(ClassGenerator<JoinWorker> cg, JVar incomingRecordBatch,
LogicalExpression[] leftExpression, JVar incomingLeftRecordBatch, JVar joinStatus,
ErrorCollector collector) throws ClassTransformationException {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
index 7d100afa9..24f5533db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
@@ -32,7 +32,8 @@ public class MergeJoinCreator implements BatchCreator<MergeJoinPOP> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinCreator.class);
@Override
- public RecordBatch getBatch(FragmentContext context, MergeJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+ public MergeJoinBatch getBatch(FragmentContext context, MergeJoinPOP config, List<RecordBatch> children)
+ throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 2);
if(config.getJoinType() == JoinRelType.RIGHT){
return new MergeJoinBatch(config.flipIfRight(), context, children.get(1), children.get(0));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
index 4fb14095e..d20bfa18f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatch.java
@@ -17,9 +17,9 @@
*/
package org.apache.drill.exec.physical.impl.join;
-import com.sun.codemodel.JExpr;
-import com.sun.codemodel.JExpression;
-import com.sun.codemodel.JVar;
+import java.io.IOException;
+import java.util.LinkedList;
+
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.compile.sig.GeneratorMapping;
@@ -40,12 +40,11 @@ import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.AllocationHelper;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.complex.AbstractContainerVector;
-import com.google.common.base.Preconditions;
-import java.io.IOException;
-import java.util.LinkedList;
+import com.google.common.base.Preconditions;
+import com.sun.codemodel.JExpr;
+import com.sun.codemodel.JExpression;
+import com.sun.codemodel.JVar;
/*
* RecordBatch implementation for the nested loop join operator
@@ -309,12 +308,10 @@ public class NestedLoopJoinBatch extends AbstractRecordBatch<NestedLoopJoinPOP>
}
@Override
- public void cleanup() {
+ public void close() {
rightContainer.clear();
rightCounts.clear();
- super.cleanup();
- right.cleanup();
- left.cleanup();
+ super.close();
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java
index 12588acba..2e708a6ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java
@@ -17,17 +17,18 @@
*/
package org.apache.drill.exec.physical.impl.join;
+import java.util.List;
+
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.NestedLoopJoinPOP;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.RecordBatch;
-import java.util.List;
-
public class NestedLoopJoinBatchCreator implements BatchCreator<NestedLoopJoinPOP> {
@Override
- public RecordBatch getBatch(FragmentContext context, NestedLoopJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+ public NestedLoopJoinBatch getBatch(FragmentContext context, NestedLoopJoinPOP config, List<RecordBatch> children)
+ throws ExecutionSetupException {
return new NestedLoopJoinBatch(config, context, children.get(0), children.get(1));
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java
index e71dababa..f954e7202 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java
@@ -29,7 +29,8 @@ import com.google.common.collect.Iterables;
public class LimitBatchCreator implements BatchCreator<Limit> {
@Override
- public RecordBatch getBatch(FragmentContext context, Limit config, List<RecordBatch> children) throws ExecutionSetupException {
+ public LimitRecordBatch getBatch(FragmentContext context, Limit config, List<RecordBatch> children)
+ throws ExecutionSetupException {
return new LimitRecordBatch(config, context, Iterables.getOnlyElement(children));
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index 7e668935b..eff9e6155 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -194,9 +194,9 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
}
@Override
- public void cleanup(){
+ public void close(){
outgoingSv.clear();
- super.cleanup();
+ super.close();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 40cbc8931..c36b0d3d9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -44,7 +44,6 @@ import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
-import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.config.MergingReceiverPOP;
import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
@@ -130,7 +129,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
public MergingRecordBatch(final FragmentContext context,
final MergingReceiverPOP config,
final RawFragmentBatchProvider[] fragProviders) throws OutOfMemoryException {
- super(config, context, true, new OperatorContext(config, context, false));
+ super(config, context, true, context.newOperatorContext(config, false));
//super(config, context);
this.fragProviders = fragProviders;
this.context = context;
@@ -487,7 +486,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
if (sendUpstream) {
informSenders();
} else {
- cleanup();
+ close();
for (final RawFragmentBatchProvider provider : fragProviders) {
provider.kill(context);
}
@@ -697,7 +696,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
@Override
- public void cleanup() {
+ public void close() {
outgoingContainer.clear();
if (batchLoaders != null) {
for (final RecordBatchLoader rbl : batchLoaders) {
@@ -706,7 +705,6 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
}
}
- oContext.close();
if (fragProviders != null) {
for (final RawFragmentBatchProvider f : fragProviders) {
f.cleanup();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index b26c78a03..63b7eba21 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -158,9 +158,8 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
@Override
- public void cleanup() {
- incoming.cleanup();
- super.cleanup();
+ public void close() {
+ super.close();
this.partitionVectors.clear();
this.partitionKeyVector.clear();
}
@@ -302,9 +301,9 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
for (VectorWrapper<?> w : finalTable.get()) {
partitionVectors.add(w.getValueVector());
}
+
} catch (ClassTransformationException | IOException | SchemaChangeException | InterruptedException ex) {
kill(false);
- logger.error("Failure while building final partition table.", ex);
context.fail(ex);
return false;
// TODO InterruptedException
@@ -467,7 +466,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
// If this is the first iteration, we need to generate the partition vectors before we can proceed
if (this.first && upstream == IterOutcome.OK_NEW_SCHEMA) {
if (!getPartitionVectors()) {
- cleanup();
+ close();
return IterOutcome.STOP;
}
@@ -503,7 +502,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
case NONE:
case NOT_YET:
case STOP:
- cleanup();
+ close();
recordCount = 0;
return upstream;
case OK_NEW_SCHEMA:
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 7e3f4b20e..cf7ba1610 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -35,7 +35,6 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.AccountingDataTunnel;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
-import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.config.HashPartitionSender;
@@ -101,7 +100,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
public PartitionSenderRootExec(FragmentContext context,
RecordBatch incoming,
HashPartitionSender operator) throws OutOfMemoryException {
- super(context, new OperatorContext(operator, context, null, false), operator);
+ super(context, context.newOperatorContext(operator, null, false), operator);
this.incoming = incoming;
this.operator = operator;
this.context = context;
@@ -141,8 +140,6 @@ public class PartitionSenderRootExec extends BaseRootExec {
public boolean innerNext() {
if (!ok) {
- stop();
-
return false;
}
@@ -322,17 +319,15 @@ public class PartitionSenderRootExec extends BaseRootExec {
}
}
- public void stop() {
+ public void close() throws Exception {
logger.debug("Partition sender stopping.");
- super.stop();
+ super.close();
ok = false;
if (partitioner != null) {
updateAggregateStats();
partitioner.clear();
}
- oContext.close();
- incoming.cleanup();
}
public void sendEmptyBatch(boolean isLast) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index c2d616667..35bf3cdf9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -187,7 +187,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
}
@Override
- public void cleanup() {
+ public void close() {
stop = true;
try {
cleanUpLatch.await();
@@ -195,9 +195,8 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
logger.warn("Interrupted while waiting for producer to clean up first. I will try to clean up now...", e);
// TODO InterruptedException
} finally {
- super.cleanup();
+ super.close();
clearQueue();
- incoming.cleanup();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java
index c568ed4d4..6542576b5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java
@@ -29,7 +29,8 @@ import com.google.common.collect.Iterables;
public class ProducerConsumerBatchCreator implements BatchCreator<ProducerConsumer> {
@Override
- public RecordBatch getBatch(FragmentContext context, ProducerConsumer config, List<RecordBatch> children) throws ExecutionSetupException {
+ public ProducerConsumerBatch getBatch(FragmentContext context, ProducerConsumer config, List<RecordBatch> children)
+ throws ExecutionSetupException {
return new ProducerConsumerBatch(config, context, Iterables.getOnlyElement(children));
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java
index 0df949105..f2495402f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java
@@ -32,7 +32,8 @@ public class ComplexToJsonBatchCreator implements BatchCreator<ComplexToJson> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ComplexToJsonBatchCreator.class);
@Override
- public RecordBatch getBatch(FragmentContext context, ComplexToJson flatten, List<RecordBatch> children) throws ExecutionSetupException {
+ public ProjectRecordBatch getBatch(FragmentContext context, ComplexToJson flatten, List<RecordBatch> children)
+ throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new ProjectRecordBatch(new Project(null, flatten.getChild()),
children.iterator().next(),
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java
index cb1d4f1b5..e7a6b0542 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java
@@ -31,7 +31,8 @@ public class ProjectBatchCreator implements BatchCreator<Project>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectBatchCreator.class);
@Override
- public RecordBatch getBatch(FragmentContext context, Project config, List<RecordBatch> children) throws ExecutionSetupException {
+ public ProjectRecordBatch getBatch(FragmentContext context, Project config, List<RecordBatch> children)
+ throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new ProjectRecordBatch(config, children.iterator().next(), context);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index 0a097c1f4..74b7d859a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -84,10 +84,9 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
}
@Override
- public void cleanup() {
+ public void close() {
builder.clear();
- super.cleanup();
- incoming.cleanup();
+ super.close();
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java
index 217acf216..559558f49 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java
@@ -31,7 +31,8 @@ public class SortBatchCreator implements BatchCreator<Sort>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortBatchCreator.class);
@Override
- public RecordBatch getBatch(FragmentContext context, Sort config, List<RecordBatch> children) throws ExecutionSetupException {
+ public SortBatch getBatch(FragmentContext context, Sort config, List<RecordBatch> children)
+ throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new SortBatch(config, context, children.iterator().next());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 1fa759ca3..aa9297e10 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -187,8 +187,8 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
}
@Override
- public void cleanup(){
- super.cleanup();
+ public void close(){
+ super.close();
}
private class StraightCopier implements Copier{
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
index 455a5f920..9ab39a399 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
@@ -31,7 +31,8 @@ public class SVRemoverCreator implements BatchCreator<SelectionVectorRemover>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SVRemoverCreator.class);
@Override
- public RecordBatch getBatch(FragmentContext context, SelectionVectorRemover config, List<RecordBatch> children) throws ExecutionSetupException {
+ public RemovingRecordBatch getBatch(FragmentContext context, SelectionVectorRemover config, List<RecordBatch> children)
+ throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new RemovingRecordBatch(config, context, children.iterator().next());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
index 12afa3303..40ef2bb12 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
@@ -30,7 +30,7 @@ public class TraceBatchCreator implements BatchCreator<Trace> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceBatchCreator.class);
@Override
- public RecordBatch getBatch(FragmentContext context, Trace config, List<RecordBatch> children)
+ public TraceRecordBatch getBatch(FragmentContext context, Trace config, List<RecordBatch> children)
throws ExecutionSetupException {
// Preconditions.checkArgument(children.size() == 1);
return new TraceRecordBatch(config, children.iterator().next(), context);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index 8a7d65925..af45815cb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -155,7 +155,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
}
@Override
- public void cleanup() {
+ public void close() {
/* Release the selection vector */
if (sv != null) {
sv.clear();
@@ -167,8 +167,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
} catch (IOException e) {
logger.error("Unable to close file descriptors for file: " + getFileName());
}
- super.cleanup();
- incoming.cleanup();
+ super.close();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java
index 7f7e11038..1ef3142b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java
@@ -31,7 +31,8 @@ public class UnionAllBatchCreator implements BatchCreator<UnionAll>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionAllBatchCreator.class);
@Override
- public RecordBatch getBatch(FragmentContext context, UnionAll config, List<RecordBatch> children) throws ExecutionSetupException {
+ public UnionAllRecordBatch getBatch(FragmentContext context, UnionAll config, List<RecordBatch> children)
+ throws ExecutionSetupException {
Preconditions.checkArgument(children.size() >= 1);
return new UnionAllRecordBatch(config, children, context);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 52b179460..d7ea3bb7d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -20,15 +20,15 @@ package org.apache.drill.exec.physical.impl.union;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
-import com.google.common.collect.Lists;
+
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
@@ -38,6 +38,7 @@ import org.apache.drill.exec.expr.ValueVectorReadExpression;
import org.apache.drill.exec.expr.ValueVectorWriteExpression;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.UnionAll;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
@@ -53,7 +54,8 @@ import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.SchemaChangeCallBack;
import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.physical.config.UnionAll;
+
+import com.google.common.collect.Lists;
public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionAllRecordBatch.class);
@@ -132,12 +134,6 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
return WritableBatch.get(this);
}
- @Override
- public void cleanup() {
- super.cleanup();
- unionAllInput.cleanup();
- }
-
private void setValueCount(int count) {
for (ValueVector v : allocationVectors) {
ValueVector.Mutator m = v.getMutator();
@@ -505,11 +501,6 @@ public class UnionAllRecordBatch extends AbstractRecordBatch<UnionAll> {
return rightSide.getRecordBatch();
}
- public void cleanup() {
- leftSide.getRecordBatch().cleanup();
- rightSide.getRecordBatch().cleanup();
- }
-
private class OneSideInput {
private IterOutcome upstream = IterOutcome.NOT_YET;
private RecordBatch recordBatch;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 094865ee4..09cb7add2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -37,9 +37,9 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RawFragmentBatch;
import org.apache.drill.exec.record.RawFragmentBatchProvider;
-import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorContainer;
@@ -50,7 +50,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
-public class UnorderedReceiverBatch implements RecordBatch {
+public class UnorderedReceiverBatch implements CloseableRecordBatch {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverBatch.class);
private final RecordBatchLoader batchLoader;
@@ -60,7 +60,7 @@ public class UnorderedReceiverBatch implements RecordBatch {
private final OperatorStats stats;
private boolean first = true;
private final UnorderedReceiver config;
- OperatorContext oContext;
+ private final OperatorContext oContext;
public enum Metric implements MetricDef {
BYTES_RECEIVED,
@@ -77,7 +77,7 @@ public class UnorderedReceiverBatch implements RecordBatch {
this.context = context;
// In normal case, batchLoader does not require an allocator. However, in case of splitAndTransfer of a value vector,
// we may need an allocator for the new offset vector. Therefore, here we pass the context's allocator to batchLoader.
- oContext = new OperatorContext(config, context, false);
+ oContext = context.newOperatorContext(config, false);
this.batchLoader = new RecordBatchLoader(oContext.getAllocator());
this.stats = context.getStats().getOperatorStats(new OpProfileDef(config.getOperatorId(), config.getOperatorType(), 1), null);
@@ -194,10 +194,9 @@ public class UnorderedReceiverBatch implements RecordBatch {
}
@Override
- public void cleanup() {
+ public void close() {
batchLoader.clear();
fragProvider.cleanup();
- oContext.close();
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
index d9864f976..649ecd967 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
@@ -31,7 +31,7 @@ public class UnorderedReceiverCreator implements BatchCreator<UnorderedReceiver>
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnorderedReceiverCreator.class);
@Override
- public RecordBatch getBatch(FragmentContext context, UnorderedReceiver receiver, List<RecordBatch> children)
+ public UnorderedReceiverBatch getBatch(FragmentContext context, UnorderedReceiver receiver, List<RecordBatch> children)
throws ExecutionSetupException {
assert children == null || children.isEmpty();
IncomingBuffers bufHolder = context.getBuffers();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
index 66ec22f79..2ae53aa67 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorBatchIterator.java
@@ -20,9 +20,9 @@ package org.apache.drill.exec.physical.impl.validate;
import java.util.Iterator;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorContainer;
@@ -32,7 +32,7 @@ import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.vector.VectorValidator;
-public class IteratorValidatorBatchIterator implements RecordBatch {
+public class IteratorValidatorBatchIterator implements CloseableRecordBatch {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidatorBatchIterator.class);
static final boolean VALIDATE_VECTORS = false;
@@ -144,8 +144,7 @@ public class IteratorValidatorBatchIterator implements RecordBatch {
}
@Override
- public void cleanup() {
- incoming.cleanup();
+ public void close() {
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
index 5d08afb0b..cc30326fd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
@@ -31,7 +31,8 @@ public class IteratorValidatorCreator implements BatchCreator<IteratorValidator>
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidatorCreator.class);
@Override
- public RecordBatch getBatch(FragmentContext context, IteratorValidator config, List<RecordBatch> children)
+ public IteratorValidatorBatchIterator getBatch(FragmentContext context, IteratorValidator config,
+ List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new IteratorValidatorBatchIterator(children.iterator().next());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
index d526a8432..2298df596 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
@@ -34,7 +34,7 @@ import com.google.common.collect.Iterators;
public class ValuesBatchCreator implements BatchCreator<Values> {
@Override
- public RecordBatch getBatch(FragmentContext context, Values config, List<RecordBatch> children)
+ public ScanBatch getBatch(FragmentContext context, Values config, List<RecordBatch> children)
throws ExecutionSetupException {
assert children.isEmpty();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java
index 285e2cd30..59bc1159c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java
@@ -18,19 +18,21 @@
package org.apache.drill.exec.physical.impl.window;
-import com.google.common.base.Preconditions;
+import java.util.List;
+
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.WindowPOP;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.RecordBatch;
-import java.util.List;
+import com.google.common.base.Preconditions;
public class WindowFrameBatchCreator implements BatchCreator<WindowPOP> {
@Override
- public RecordBatch getBatch(FragmentContext context, WindowPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+ public WindowFrameRecordBatch getBatch(FragmentContext context, WindowPOP config, List<RecordBatch> children)
+ throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new WindowFrameRecordBatch(config, context, children.iterator().next());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index bc86390fd..86d11d58a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -17,8 +17,9 @@
*/
package org.apache.drill.exec.physical.impl.window;
-import com.google.common.collect.Lists;
-import com.sun.codemodel.JExpr;
+import java.io.IOException;
+import java.util.List;
+
import org.apache.drill.common.exceptions.DrillException;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
@@ -49,8 +50,8 @@ import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.ValueVector;
-import java.io.IOException;
-import java.util.List;
+import com.google.common.collect.Lists;
+import com.sun.codemodel.JExpr;
/**
* support for OVER(PARTITION BY expression1,expression2,... [ORDER BY expressionA, expressionB,...])
@@ -333,13 +334,12 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
}
@Override
- public void cleanup() {
+ public void close() {
if (framer != null) {
framer.cleanup();
framer = null;
}
- super.cleanup();
- incoming.cleanup();
+ super.close();
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index ca93a7214..e88bc677f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.physical.impl.xsort;
-import com.google.common.base.Joiner;
import io.netty.buffer.DrillBuf;
import java.io.IOException;
@@ -68,6 +67,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.calcite.rel.RelFieldCollation.Direction;
+import com.google.common.base.Joiner;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
@@ -145,7 +145,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
@Override
- public void cleanup() {
+ public void close() {
if (batchGroups != null) {
for (BatchGroup group: batchGroups) {
try {
@@ -165,8 +165,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
copier.cleanup();
}
copierAllocator.close();
- super.cleanup();
- incoming.cleanup();
+ super.close();
}
public void buildSchema() throws SchemaChangeException {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java
index eb5d83bf4..b9f639649 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java
@@ -31,7 +31,8 @@ public class ExternalSortBatchCreator implements BatchCreator<ExternalSort>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExternalSortBatchCreator.class);
@Override
- public RecordBatch getBatch(FragmentContext context, ExternalSort config, List<RecordBatch> children) throws ExecutionSetupException {
+ public ExternalSortBatch getBatch(FragmentContext context, ExternalSort config, List<RecordBatch> children)
+ throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new ExternalSortBatch(config, context, children.iterator().next());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index c96cb7c4d..4e348bb6f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -30,7 +30,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
-public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements RecordBatch{
+public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements CloseableRecordBatch {
final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(this.getClass());
protected final VectorContainer container; //= new VectorContainer();
@@ -42,14 +42,15 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
protected BatchState state;
protected AbstractRecordBatch(final T popConfig, final FragmentContext context) throws OutOfMemoryException {
- this(popConfig, context, true, new OperatorContext(popConfig, context, true));
+ this(popConfig, context, true, context.newOperatorContext(popConfig, true));
}
protected AbstractRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema) throws OutOfMemoryException {
- this(popConfig, context, buildSchema, new OperatorContext(popConfig, context, true));
+ this(popConfig, context, buildSchema, context.newOperatorContext(popConfig, true));
}
- protected AbstractRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema, final OperatorContext oContext) throws OutOfMemoryException {
+ protected AbstractRecordBatch(final T popConfig, final FragmentContext context, final boolean buildSchema,
+ final OperatorContext oContext) throws OutOfMemoryException {
super();
this.context = context;
this.popConfig = popConfig;
@@ -171,9 +172,8 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
protected abstract void killIncoming(boolean sendUpstream);
- public void cleanup(){
+ public void close(){
container.clear();
- oContext.close();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index 3cfe177b6..dd90cab45 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -115,13 +115,6 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
}
@Override
- public void cleanup() {
-// logger.debug("Cleaning up.");
- super.cleanup();
- incoming.cleanup();
- }
-
- @Override
public BatchSchema getSchema() {
if (container.hasSchema()) {
return container.getSchema();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/CloseableRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/CloseableRecordBatch.java
new file mode 100644
index 000000000..c52c3ee6d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/CloseableRecordBatch.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.record;
+
+public interface CloseableRecordBatch extends RecordBatch, AutoCloseable {
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index 0a8ece5d0..6f10a1cbb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.record;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -118,6 +117,4 @@ public interface RecordBatch extends VectorAccessible {
*/
public WritableBatch getWritableBatch();
- public void cleanup();
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
index 489a989c6..59999baf3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
@@ -29,7 +29,6 @@ import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.mapred.FsInput;
import org.apache.avro.util.Utf8;
-
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.PathSegment;
@@ -47,7 +46,6 @@ import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index f1271b117..b4efe70a4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -36,6 +36,7 @@ import org.apache.drill.exec.physical.base.AbstractWriter;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.physical.impl.WriterRecordBatch;
+import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractRecordReader;
@@ -43,11 +44,10 @@ import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.RecordWriter;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.FormatMatcher;
import org.apache.drill.exec.store.dfs.FormatPlugin;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
@@ -119,7 +119,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
public abstract RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
List<SchemaPath> columns) throws ExecutionSetupException;
- RecordBatch getReaderBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException {
+ CloseableRecordBatch getReaderBatch(FragmentContext context, EasySubScan scan) throws ExecutionSetupException {
String partitionDesignator = context.getOptions()
.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
List<SchemaPath> columns = scan.getColumns();
@@ -153,9 +153,11 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
}
int numParts = 0;
- OperatorContext oContext = new OperatorContext(scan, context,
- false /* ScanBatch is not subject to fragment memory limit */);
- DrillFileSystem dfs;
+ OperatorContext oContext = context.newOperatorContext(scan, false /*
+ * ScanBatch is not subject to fragment memory
+ * limit
+ */);
+ final DrillFileSystem dfs;
try {
dfs = new DrillFileSystem(fsConf, oContext.getStats());
} catch (IOException e) {
@@ -190,7 +192,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
public abstract RecordWriter getRecordWriter(FragmentContext context, EasyWriter writer) throws IOException;
- public RecordBatch getWriterBatch(FragmentContext context, RecordBatch incoming, EasyWriter writer)
+ public CloseableRecordBatch getWriterBatch(FragmentContext context, RecordBatch incoming, EasyWriter writer)
throws ExecutionSetupException {
try {
return new WriterRecordBatch(writer, incoming, context, getRecordWriter(context, writer));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java
index ac0d2e73c..f9dfd8bdb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyReaderBatchCreator.java
@@ -22,13 +22,14 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
public class EasyReaderBatchCreator implements BatchCreator<EasySubScan>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyReaderBatchCreator.class);
@Override
- public RecordBatch getBatch(FragmentContext context, EasySubScan config, List<RecordBatch> children)
+ public CloseableRecordBatch getBatch(FragmentContext context, EasySubScan config, List<RecordBatch> children)
throws ExecutionSetupException {
assert children == null || children.isEmpty();
return config.getFormatPlugin().getReaderBatch(context, config);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java
index c91ceba2a..bfb4188cc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriterBatchCreator.java
@@ -22,13 +22,14 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
public class EasyWriterBatchCreator implements BatchCreator<EasyWriter>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyWriterBatchCreator.class);
@Override
- public RecordBatch getBatch(FragmentContext context, EasyWriter config, List<RecordBatch> children)
+ public CloseableRecordBatch getBatch(FragmentContext context, EasyWriter config, List<RecordBatch> children)
throws ExecutionSetupException {
assert children != null && children.size() == 1;
return config.getFormatPlugin().getWriterBatch(context, children.iterator().next(), config);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
index 84587a946..d59cda21b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/DirectBatchCreator.java
@@ -30,7 +30,7 @@ public class DirectBatchCreator implements BatchCreator<DirectSubScan>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DirectBatchCreator.class);
@Override
- public RecordBatch getBatch(FragmentContext context, DirectSubScan config, List<RecordBatch> children)
+ public ScanBatch getBatch(FragmentContext context, DirectSubScan config, List<RecordBatch> children)
throws ExecutionSetupException {
return new ScanBatch(config, context, Collections.singleton(config.getReader()).iterator());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 2cccc6442..2666b2ebf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -22,6 +22,7 @@ import java.io.InputStream;
import java.util.List;
import com.google.common.collect.ImmutableList;
+
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
index b38a33fb1..2ef2333bd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaBatchCreator.java
@@ -31,7 +31,8 @@ public class InfoSchemaBatchCreator implements BatchCreator<InfoSchemaSubScan>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InfoSchemaBatchCreator.class);
@Override
- public RecordBatch getBatch(FragmentContext context, InfoSchemaSubScan config, List<RecordBatch> children) throws ExecutionSetupException {
+ public ScanBatch getBatch(FragmentContext context, InfoSchemaSubScan config, List<RecordBatch> children)
+ throws ExecutionSetupException {
RecordReader rr = config.getTable().getRecordReader(context.getRootSchema(), config.getFilter());
return new ScanBatch(config, context, Collections.singleton(rr).iterator());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
index 0bfd03886..74423bfda 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockScanBatchCreator.java
@@ -34,7 +34,8 @@ public class MockScanBatchCreator implements BatchCreator<MockSubScanPOP>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockScanBatchCreator.class);
@Override
- public RecordBatch getBatch(FragmentContext context, MockSubScanPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+ public ScanBatch getBatch(FragmentContext context, MockSubScanPOP config, List<RecordBatch> children)
+ throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
List<MockScanEntry> entries = config.getReadEntries();
List<RecordReader> readers = Lists.newArrayList();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 7298f53a0..cfa4c9318 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.RecordWriter;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.DrillPathFilter;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.FileSystemConfig;
@@ -45,7 +46,6 @@ import org.apache.drill.exec.store.dfs.FormatMatcher;
import org.apache.drill.exec.store.dfs.FormatPlugin;
import org.apache.drill.exec.store.dfs.FormatSelection;
import org.apache.drill.exec.store.dfs.MagicString;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.mock.MockStorageEngine;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -151,7 +151,7 @@ public class ParquetFormatPlugin implements FormatPlugin{
return recordWriter;
}
- public RecordBatch getWriterBatch(FragmentContext context, RecordBatch incoming, ParquetWriter writer)
+ public WriterRecordBatch getWriterBatch(FragmentContext context, RecordBatch incoming, ParquetWriter writer)
throws ExecutionSetupException {
try {
return new WriterRecordBatch(writer, incoming, context, getRecordWriter(context, writer));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
index 3e3572169..3506ffafe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRecordWriter.java
@@ -99,7 +99,7 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) throws OutOfMemoryException{
super();
- this.oContext=new OperatorContext(writer, context, true);
+ this.oContext = context.newOperatorContext(writer, true);
}
@Override
@@ -331,9 +331,6 @@ public class ParquetRecordWriter extends ParquetOutputRecordWriter {
if (pageStore != null) {
ColumnChunkPageWriteStoreExposer.close(pageStore);
}
- if(oContext!=null){
- oContext.close();
- }
if (!hasRecords) {
// the very last file is empty, delete it (DRILL-2408)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 52dccd90c..d5586ce61 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -61,14 +61,17 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
private static final String ENABLE_TIME_READ_COUNTER = "parquet.benchmark.time.read";
@Override
- public RecordBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws ExecutionSetupException {
+ public ScanBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children)
+ throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
String partitionDesignator = context.getOptions()
.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
List<SchemaPath> columns = rowGroupScan.getColumns();
List<RecordReader> readers = Lists.newArrayList();
- OperatorContext oContext = new OperatorContext(rowGroupScan, context,
- false /* ScanBatch is not subject to fragment memory limit */);
+ OperatorContext oContext = context.newOperatorContext(rowGroupScan, false /*
+ * ScanBatch is not subject to fragment
+ * memory limit
+ */);
List<String[]> partitionColumns = Lists.newArrayList();
List<Integer> selectedPartitionColumns = Lists.newArrayList();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java
index 10dd26da8..79c570953 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriterBatchCreator.java
@@ -22,13 +22,14 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.physical.impl.WriterRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
public class ParquetWriterBatchCreator implements BatchCreator<ParquetWriter>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetWriterBatchCreator.class);
@Override
- public RecordBatch getBatch(FragmentContext context, ParquetWriter config, List<RecordBatch> children)
+ public WriterRecordBatch getBatch(FragmentContext context, ParquetWriter config, List<RecordBatch> children)
throws ExecutionSetupException {
assert children != null && children.size() == 1;
return config.getFormatPlugin().getWriterBatch(context, children.iterator().next(), config);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 4d837c1ef..921d13400 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
import java.util.Set;
import com.google.common.collect.Sets;
+
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.PathSegment;
@@ -69,9 +70,9 @@ import parquet.schema.MessageType;
import parquet.schema.Type;
import parquet.schema.PrimitiveType;
-
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+
import parquet.schema.Types;
public class DrillParquetReader extends AbstractRecordReader {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
index 92f676af1..58bf433fc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/SystemTableBatchCreator.java
@@ -39,7 +39,7 @@ public class SystemTableBatchCreator implements BatchCreator<SystemTableScan> {
@SuppressWarnings({ "rawtypes", "unchecked" })
@Override
- public RecordBatch getBatch(final FragmentContext context, final SystemTableScan scan,
+ public ScanBatch getBatch(final FragmentContext context, final SystemTableScan scan,
final List<RecordBatch> children)
throws ExecutionSetupException {
final SystemTable table = scan.getTable();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index 336841266..87c78b278 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -25,6 +25,7 @@ import javax.annotation.Nullable;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
+
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.FieldReference;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 85262de10..d23655c6a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -105,9 +105,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
public void cleanup() {
if (!isFinished() && context.shouldContinue()) {
final String msg = String.format("Cleanup before finished. " + (fragmentCount - streamCounter) + " out of " + fragmentCount + " streams have finished.");
- logger.error(msg);
final IllegalStateException e = new IllegalStateException(msg);
- context.fail(e);
throw e;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index edbcfde2e..4249cbe4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -212,19 +212,23 @@ public class Foreman implements Runnable {
moveToState(QueryState.FAILED,
new ForemanException("Unexpected exception during fragment initialization: " + ex.getMessage(), ex));
} catch (final OutOfMemoryError e) {
- /*
- * FragmentExecutors use a DrillbitStatusListener to watch out for the death of their query's Foreman.
- * So, if we die here, they should get notified about that, and cancel themselves; we don't have to
- * attempt to notify them, which might not work under these conditions.
- */
- /*
- * TODO this will kill everything in this JVM; why can't we just free all allocation
- * associated with this Foreman and allow others to continue?
- */
- System.out.println("Out of memory, exiting.");
- e.printStackTrace();
- System.out.flush();
- System.exit(-1);
+ if ("Direct buffer memory".equals(e.getMessage())) {
+ moveToState(QueryState.FAILED,
+ UserException.resourceError(e)
+ .message("One or more nodes ran out of memory while executing the query.")
+ .build());
+ } else {
+ /*
+ * FragmentExecutors use a DrillbitStatusListener to watch out for the death of their query's Foreman. So, if we
+ * die here, they should get notified about that, and cancel themselves; we don't have to attempt to notify
+ * them, which might not work under these conditions.
+ */
+ System.out.println("Node ran out of Heap memory, exiting.");
+ e.printStackTrace();
+ System.out.flush();
+ System.exit(-1);
+ }
+
} finally {
/*
* Begin accepting external events.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 0783fee16..fb2045f62 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -272,12 +272,14 @@ public class FragmentExecutor implements Runnable {
private void closeOutResources() {
+ // first close the operators and release all memory.
try {
- root.stop(); // TODO make this an AutoCloseable so we can detect lack of closure
+ root.close();
} catch (final Exception e) {
fail(e);
}
+ // then close the fragment context.
fragmentContext.close();
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index b02051b80..d6e6d08a2 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -25,7 +25,6 @@ import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.base.Preconditions;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
@@ -45,9 +44,9 @@ import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.RemoteServiceSet;
import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.util.TestUtilities;
import org.apache.drill.exec.util.JsonStringArrayList;
import org.apache.drill.exec.util.JsonStringHashMap;
+import org.apache.drill.exec.util.TestUtilities;
import org.apache.drill.exec.util.VectorUtil;
import org.apache.hadoop.io.Text;
import org.junit.AfterClass;
@@ -57,6 +56,7 @@ import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
import com.google.common.io.Resources;
public class BaseTestQuery extends ExecTest {
@@ -219,7 +219,7 @@ public class BaseTestQuery extends ExecTest {
}
@AfterClass
- public static void closeClient() throws IOException{
+ public static void closeClient() throws IOException, InterruptedException {
if (client != null) {
client.close();
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/RunRootExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/RunRootExec.java
index 4258e60fb..13f9563ad 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/RunRootExec.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/RunRootExec.java
@@ -65,7 +65,7 @@ public class RunRootExec {
}
System.out.println("ENDITER: " + i);
System.out.println("TIME: " + w.elapsed(TimeUnit.MILLISECONDS) + "ms");
- exec.stop();
+ exec.close();
}
context.close();
bit.close();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
index f4f4966f5..7c58b19f5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/client/DumpCatTest.java
@@ -88,7 +88,7 @@ public class DumpCatTest extends ExecTest{
}
assertTrue(!context.isFailed());
- exec.stop();
+ exec.close();
FragmentHandle handle = context.getHandle();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java
index 49421851d..04e198028 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/interp/ExpressionInterpreterTest.java
@@ -17,7 +17,11 @@
******************************************************************************/
package org.apache.drill.exec.fn.interp;
-import com.google.common.collect.Lists;
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
import org.antlr.runtime.ANTLRStringStream;
import org.antlr.runtime.CommonTokenStream;
import org.antlr.runtime.RecognitionException;
@@ -36,6 +40,7 @@ import org.apache.drill.exec.expr.fn.interpreter.InterpreterEvaluator;
import org.apache.drill.exec.expr.holders.TimeStampHolder;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.QueryDateTimeInfo;
+import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.pop.PopUnitTestBase;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.record.MaterializedField;
@@ -49,10 +54,7 @@ import org.apache.drill.exec.vector.ValueVector;
import org.joda.time.DateTime;
import org.junit.Test;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
+import com.google.common.collect.Lists;
public class ExpressionInterpreterTest extends PopUnitTestBase {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExpressionInterpreterTest.class);
@@ -170,7 +172,7 @@ public class ExpressionInterpreterTest extends PopUnitTestBase {
MockGroupScanPOP.MockScanEntry entry = new MockGroupScanPOP.MockScanEntry(10, columns);
MockSubScanPOP scanPOP = new MockSubScanPOP("testTable", java.util.Collections.singletonList(entry));
- RecordBatch batch = createMockScanBatch(bit1, scanPOP, planFragment);
+ ScanBatch batch = createMockScanBatch(bit1, scanPOP, planFragment);
batch.next();
@@ -184,13 +186,13 @@ public class ExpressionInterpreterTest extends PopUnitTestBase {
showValueVectorContent(vv);
vv.clear();
- batch.cleanup();
+ batch.close();
batch.getContext().close();
bit1.close();
}
- private RecordBatch createMockScanBatch(Drillbit bit, MockSubScanPOP scanPOP, BitControl.PlanFragment planFragment) {
+ private ScanBatch createMockScanBatch(Drillbit bit, MockSubScanPOP scanPOP, BitControl.PlanFragment planFragment) {
List<RecordBatch> children = Lists.newArrayList();
MockScanBatchCreator creator = new MockScanBatchCreator();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
index 1f0951bbd..74ce22596 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/memory/TestAllocators.java
@@ -19,9 +19,13 @@
package org.apache.drill.exec.memory;
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
+import static org.junit.Assert.assertEquals;
import io.netty.buffer.DrillBuf;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.util.FileUtils;
import org.apache.drill.exec.ExecConstants;
@@ -41,11 +45,9 @@ import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.RemoteServiceSet;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
public class TestAllocators {
@@ -100,23 +102,25 @@ public class TestAllocators {
OperatorStats stats;
//Use some bogus operator type to create a new operator context.
- def = new OpProfileDef(physicalOperator1.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE, OperatorContext.getChildCount(physicalOperator1));
+ def = new OpProfileDef(physicalOperator1.getOperatorId(), UserBitShared.CoreOperatorType.MOCK_SUB_SCAN_VALUE,
+ OperatorContext.getChildCount(physicalOperator1));
stats = fragmentContext1.getStats().getOperatorStats(def, fragmentContext1.getAllocator());
// Add a couple of Operator Contexts
// Initial allocation = 1000000 bytes for all operators
- OperatorContext oContext11 = new OperatorContext(physicalOperator1, fragmentContext1, true);
+ OperatorContext oContext11 = fragmentContext1.newOperatorContext(physicalOperator1, true);
DrillBuf b11=oContext11.getAllocator().buffer(1000000);
- OperatorContext oContext12 = new OperatorContext(physicalOperator2, fragmentContext1, stats, true);
+ OperatorContext oContext12 = fragmentContext1.newOperatorContext(physicalOperator2, stats, true);
DrillBuf b12=oContext12.getAllocator().buffer(500000);
- OperatorContext oContext21 = new OperatorContext(physicalOperator3, fragmentContext2, true);
+ OperatorContext oContext21 = fragmentContext1.newOperatorContext(physicalOperator3, true);
- def = new OpProfileDef(physicalOperator4.getOperatorId(), UserBitShared.CoreOperatorType.TEXT_WRITER_VALUE, OperatorContext.getChildCount(physicalOperator4));
+ def = new OpProfileDef(physicalOperator4.getOperatorId(), UserBitShared.CoreOperatorType.TEXT_WRITER_VALUE,
+ OperatorContext.getChildCount(physicalOperator4));
stats = fragmentContext2.getStats().getOperatorStats(def, fragmentContext2.getAllocator());
- OperatorContext oContext22 = new OperatorContext(physicalOperator4, fragmentContext2, stats, true);
+ OperatorContext oContext22 = fragmentContext2.newOperatorContext(physicalOperator4, stats, true);
DrillBuf b22=oContext22.getAllocator().buffer(2000000);
// New Fragment begins
@@ -127,15 +131,16 @@ public class TestAllocators {
FragmentContext fragmentContext3 = new FragmentContext(bitContext, pf3, null, functionRegistry);
// New fragment starts an operator that allocates an amount within the limit
- def = new OpProfileDef(physicalOperator5.getOperatorId(), UserBitShared.CoreOperatorType.UNION_VALUE, OperatorContext.getChildCount(physicalOperator5));
+ def = new OpProfileDef(physicalOperator5.getOperatorId(), UserBitShared.CoreOperatorType.UNION_VALUE,
+ OperatorContext.getChildCount(physicalOperator5));
stats = fragmentContext3.getStats().getOperatorStats(def, fragmentContext3.getAllocator());
- OperatorContext oContext31 = new OperatorContext(physicalOperator5, fragmentContext3, stats, true);
+ OperatorContext oContext31 = fragmentContext3.newOperatorContext(physicalOperator5, stats, true);
DrillBuf b31a = oContext31.getAllocator().buffer(200000);
//Previously running operator completes
b22.release();
- oContext22.close();
+ ((AutoCloseable) oContext22).close();
// Fragment 3 asks for more and fails
boolean outOfMem=false;
@@ -153,7 +158,7 @@ public class TestAllocators {
// Operator is Exempt from Fragment limits. Fragment 3 asks for more and succeeds
outOfMem=false;
- OperatorContext oContext32 = new OperatorContext(physicalOperator6, fragmentContext3, false);
+ OperatorContext oContext32 = fragmentContext3.newOperatorContext(physicalOperator6, false);
DrillBuf b32=null;
try {
b32=oContext32.getAllocator().buffer(4400000);
@@ -165,17 +170,17 @@ public class TestAllocators {
}else{
outOfMem=true;
}
- oContext32.close();
+ closeOp(oContext32);
}
assertEquals(false, (boolean)outOfMem);
b11.release();
- oContext11.close();
+ closeOp(oContext11);
b12.release();
- oContext12.close();
- oContext21.close();
+ closeOp(oContext12);
+ closeOp(oContext21);
b31a.release();
- oContext31.close();
+ closeOp(oContext31);
fragmentContext1.close();
fragmentContext2.close();
@@ -184,4 +189,8 @@ public class TestAllocators {
bit.close();
serviceSet.close();
}
+
+ private void closeOp(OperatorContext c) throws Exception {
+ ((AutoCloseable) c).close();
+ }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
index 2536bbbb9..42d2193f5 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java
@@ -101,7 +101,6 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector> {
switch (incoming.next()) {
case NONE:
case STOP:
- incoming.cleanup();
return false;
default:
return true;
@@ -109,8 +108,8 @@ public class SimpleRootExec implements RootExec, Iterable<ValueVector> {
}
@Override
- public void stop() {
- screenRoot.stop();
+ public void close() throws Exception {
+ screenRoot.close();
}
@Override
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestCastFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestCastFunctions.java
index e5448ac93..ffa876505 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestCastFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestCastFunctions.java
@@ -111,7 +111,7 @@ public class TestCastFunctions extends PopUnitTestBase{
assertEquals(5, count);
}
- exec.stop();
+ exec.close();
context.close();
allocator.close();
@@ -159,7 +159,7 @@ public class TestCastFunctions extends PopUnitTestBase{
assertEquals(5, count);
}
- exec.stop();
+ exec.close();
context.close();
allocator.close();
@@ -205,7 +205,7 @@ public class TestCastFunctions extends PopUnitTestBase{
assertEquals(5, count);
}
- exec.stop();
+ exec.close();
context.close();
allocator.close();
@@ -252,7 +252,7 @@ public class TestCastFunctions extends PopUnitTestBase{
assertEquals(5, count);
}
- exec.stop();
+ exec.close();
context.close();
allocator.close();
@@ -299,7 +299,7 @@ public class TestCastFunctions extends PopUnitTestBase{
assertEquals(5, count);
}
- exec.stop();
+ exec.close();
context.close();
allocator.close();
@@ -346,7 +346,7 @@ public class TestCastFunctions extends PopUnitTestBase{
}
assertEquals(5, count);
}
- exec.stop();
+ exec.close();
context.close();
allocator.close();
@@ -392,7 +392,7 @@ public class TestCastFunctions extends PopUnitTestBase{
}
assertEquals(5, count);
}
- exec.stop();
+ exec.close();
context.close();
allocator.close();
@@ -428,7 +428,7 @@ public class TestCastFunctions extends PopUnitTestBase{
while(exec.next()){
}
- exec.stop();
+ exec.close();
context.close();
allocator.close();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
index 0f6fd43b9..c69c6f59c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java
@@ -80,7 +80,7 @@ public class TestComparisonFunctions extends ExecTest {
// }
}
- exec.stop();
+ exec.close();
context.close();
if (context.getFailureCause() != null) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
index a112d92a8..a069078ef 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/filter/TestSimpleFilter.java
@@ -74,7 +74,7 @@ public class TestSimpleFilter extends ExecTest {
assertEquals(50, exec.getRecordCount());
}
- exec.stop();
+ exec.close();
if(context.getFailureCause() != null){
throw context.getFailureCause();
@@ -106,7 +106,7 @@ public class TestSimpleFilter extends ExecTest {
}
recordCount += exec.getSelectionVector4().getCount();
}
- exec.stop();
+ exec.close();
assertEquals(50, recordCount);
if(context.getFailureCause() != null){
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java
index ef3a330b6..6c067febe 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoin.java
@@ -93,7 +93,7 @@ public class TestHashJoin extends PopUnitTestBase {
while (exec.next()) {
totalRecordCount += exec.getRecordCount();
}
- exec.stop();
+ exec.close();
assertEquals(expectedRows, totalRecordCount);
System.out.println("Total Record Count: " + totalRecordCount);
if (context.getFailureCause() != null) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
index 46bcc6014..6a6a7e080 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
@@ -17,7 +17,10 @@
*/
package org.apache.drill.exec.physical.impl.partitionsender;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
@@ -46,8 +49,8 @@ import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.PlanningSet;
import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
import org.apache.drill.exec.pop.PopUnitTestBase;
-import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.MetricValue;
import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
@@ -363,8 +366,8 @@ public class TestPartitionSender extends PlanTestBase {
super(context, incoming, operator);
}
- public void close() {
- oContext.close();
+ public void close() throws Exception {
+ ((AutoCloseable) oContext).close();
}
public int getNumberPartitions() {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
index d0d4005ba..b82846e6d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceMultiRecordBatch.java
@@ -82,7 +82,7 @@ public class TestTraceMultiRecordBatch extends ExecTest {
}
}
- exec.stop();
+ exec.close();
if(context.getFailureCause() != null){
throw context.getFailureCause();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
index f6766b1d5..1cb72ffa6 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/trace/TestTraceOutputDump.java
@@ -93,7 +93,7 @@ public class TestTraceOutputDump extends ExecTest {
while(exec.next()){
}
- exec.stop();
+ exec.close();
if(context.getFailureCause() != null){
throw context.getFailureCause();