aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl
diff options
context:
space:
mode:
authorTimothy Farkas <timothyfarkas@apache.org>2018-01-11 14:59:41 -0800
committerArina Ielchiieva <arina.yelchiyeva@gmail.com>2018-01-26 13:42:27 +0200
commit186536d544d02ffc01339a4645e2a533545a2f86 (patch)
tree24ddcb013b243ee501d5dfa1d73eb9c0a1511510 /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl
parent9926eda21c748e96d67bce341a76dac3114002af (diff)
DRILL-5730: Mock testing improvements and interface improvements
closes #1045
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java19
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java27
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java48
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java20
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java18
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java42
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java31
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java20
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java19
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java4
77 files changed, 292 insertions, 392 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index d01e29454..82887ec3b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -21,11 +21,11 @@ import java.util.List;
import org.apache.drill.common.DeferredException;
import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OpProfileDef;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.ops.OperatorUtilities;
+import org.apache.drill.exec.ops.RootFragmentContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.record.CloseableRecordBatch;
@@ -39,10 +39,10 @@ public abstract class BaseRootExec implements RootExec {
protected OperatorStats stats = null;
protected OperatorContext oContext = null;
- protected FragmentContext fragmentContext = null;
+ protected RootFragmentContext fragmentContext = null;
private List<CloseableRecordBatch> operators;
- public BaseRootExec(final FragmentContext fragmentContext, final PhysicalOperator config) throws OutOfMemoryException {
+ public BaseRootExec(final RootFragmentContext fragmentContext, final PhysicalOperator config) throws OutOfMemoryException {
this.oContext = fragmentContext.newOperatorContext(config, stats);
stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
config.getOperatorType(), OperatorUtilities.getChildCount(config)),
@@ -51,8 +51,8 @@ public abstract class BaseRootExec implements RootExec {
this.fragmentContext = fragmentContext;
}
- public BaseRootExec(final FragmentContext fragmentContext, final OperatorContext oContext,
- final PhysicalOperator config) throws OutOfMemoryException {
+ public BaseRootExec(final RootFragmentContext fragmentContext, final OperatorContext oContext,
+ final PhysicalOperator config) throws OutOfMemoryException {
this.oContext = oContext;
stats = new OperatorStats(new OpProfileDef(config.getOperatorId(),
config.getOperatorType(), OperatorUtilities.getChildCount(config)),
@@ -87,7 +87,7 @@ public abstract class BaseRootExec implements RootExec {
public final boolean next() {
// Stats should have been initialized
assert stats != null;
- if (!fragmentContext.shouldContinue()) {
+ if (!fragmentContext.getExecutorState().shouldContinue()) {
return false;
}
try {
@@ -156,7 +156,7 @@ public abstract class BaseRootExec implements RootExec {
try {
df.close();
} catch (Exception e) {
- fragmentContext.fail(e);
+ fragmentContext.getExecutorState().fail(e);
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
index af99b5ea1..e9113ce67 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java
@@ -20,14 +20,12 @@ package org.apache.drill.exec.physical.impl;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.record.CloseableRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
public interface BatchCreator<T extends PhysicalOperator> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchCreator.class);
-
- public CloseableRecordBatch getBatch(FragmentContext context, T config, List<RecordBatch> children)
+ CloseableRecordBatch getBatch(ExecutorFragmentContext context, T config, List<RecordBatch> children)
throws ExecutionSetupException;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
index b418fd480..2e0d14e11 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java
@@ -27,7 +27,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.impl.validate.IteratorValidatorInjector;
@@ -66,7 +66,7 @@ public class ImplCreator {
* @return RootExec of fragment.
* @throws ExecutionSetupException
*/
- public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException {
+ public static RootExec getExec(ExecutorFragmentContext context, FragmentRoot root) throws ExecutionSetupException {
Preconditions.checkNotNull(root);
Preconditions.checkNotNull(context);
@@ -99,14 +99,14 @@ public class ImplCreator {
return rootExec;
} catch(Exception e) {
AutoCloseables.close(e, creator.getOperators());
- context.fail(e);
+ context.getExecutorState().fail(e);
}
return null;
}
/** Create RootExec and its children (RecordBatches) for given FragmentRoot */
- @SuppressWarnings("unchecked")
- private RootExec getRootExec(final FragmentRoot root, final FragmentContext context) throws ExecutionSetupException {
+
+ private RootExec getRootExec(final FragmentRoot root, final ExecutorFragmentContext context) throws ExecutionSetupException {
final List<RecordBatch> childRecordBatches = getChildren(root, context);
if (context.isImpersonationEnabled()) {
@@ -131,7 +131,7 @@ public class ImplCreator {
/** Create a RecordBatch and its children for given PhysicalOperator */
@VisibleForTesting
- public RecordBatch getRecordBatch(final PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException {
+ public RecordBatch getRecordBatch(final PhysicalOperator op, final ExecutorFragmentContext context) throws ExecutionSetupException {
Preconditions.checkNotNull(op);
final List<RecordBatch> childRecordBatches = getChildren(op, context);
@@ -164,9 +164,9 @@ public class ImplCreator {
}
/** Helper method to get OperatorCreator (RootCreator or BatchCreator) for given PhysicalOperator (root or non-root) */
- private Object getOpCreator(PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException {
+ private Object getOpCreator(PhysicalOperator op, final ExecutorFragmentContext context) throws ExecutionSetupException {
final Class<? extends PhysicalOperator> opClass = op.getClass();
- Object opCreator = context.getDrillbitContext().getOperatorCreatorRegistry().getOperatorCreator(opClass);
+ Object opCreator = context.getOperatorCreatorRegistry().getOperatorCreator(opClass);
if (opCreator == null) {
throw new UnsupportedOperationException(
String.format("BatchCreator for PhysicalOperator type '%s' not found.", opClass.getCanonicalName()));
@@ -176,7 +176,7 @@ public class ImplCreator {
}
/** Helper method to traverse the children of given PhysicalOperator and create RecordBatches for children recursively */
- private List<RecordBatch> getChildren(final PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException {
+ private List<RecordBatch> getChildren(final PhysicalOperator op, final ExecutorFragmentContext context) throws ExecutionSetupException {
List<RecordBatch> children = Lists.newArrayList();
for (PhysicalOperator child : op) {
children.add(getRecordBatch(child, context));
@@ -184,5 +184,4 @@ public class ImplCreator {
return children;
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
index 690a6624e..0ef84b960 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java
@@ -21,7 +21,7 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.MergingReceiverPOP;
import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
@@ -33,7 +33,7 @@ public class MergingReceiverCreator implements BatchCreator<MergingReceiverPOP>
@SuppressWarnings("resource")
@Override
- public MergingRecordBatch getBatch(FragmentContext context,
+ public MergingRecordBatch getBatch(ExecutorFragmentContext context,
MergingReceiverPOP receiver,
List<RecordBatch> children)
throws ExecutionSetupException, OutOfMemoryException {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java
index f3d95243c..17e402765 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java
@@ -20,12 +20,10 @@ package org.apache.drill.exec.physical.impl;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.record.RecordBatch;
public interface RootCreator<T extends PhysicalOperator> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootCreator.class);
-
- public RootExec getRoot(FragmentContext context, T config, List<RecordBatch> children) throws ExecutionSetupException;
+ RootExec getRoot(ExecutorFragmentContext context, T config, List<RecordBatch> children) throws ExecutionSetupException;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index e0d1545b0..f6a486397 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -82,14 +82,13 @@ public class ScanBatch implements CloseableRecordBatch {
private String currentReaderClassName;
/**
*
- * @param subScanConfig
* @param context
* @param oContext
* @param readerList
* @param implicitColumnList : either an emptylist when all the readers do not have implicit
* columns, or there is a one-to-one mapping between reader and implicitColumns.
*/
- public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context,
+ public ScanBatch(FragmentContext context,
OperatorContext oContext, List<RecordReader> readerList,
List<Map<String, String>> implicitColumnList) {
this.context = context;
@@ -126,8 +125,7 @@ public class ScanBatch implements CloseableRecordBatch {
public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context,
List<RecordReader> readers)
throws ExecutionSetupException {
- this(subScanConfig, context,
- context.newOperatorContext(subScanConfig),
+ this(context, context.newOperatorContext(subScanConfig),
readers, Collections.<Map<String, String>> emptyList());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index d9abf406c..46dc45038 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -22,8 +22,9 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.AccountingUserConnection;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.RootFragmentContext;
import org.apache.drill.exec.physical.config.Screen;
import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer;
@@ -38,11 +39,10 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory;
import com.google.common.base.Preconditions;
public class ScreenCreator implements RootCreator<Screen> {
- //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenCreator.class);
private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(ScreenCreator.class);
@Override
- public RootExec getRoot(FragmentContext context, Screen config, List<RecordBatch> children)
+ public RootExec getRoot(ExecutorFragmentContext context, Screen config, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkNotNull(children);
Preconditions.checkArgument(children.size() == 1);
@@ -52,7 +52,7 @@ public class ScreenCreator implements RootCreator<Screen> {
public static class ScreenRoot extends BaseRootExec {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class);
private final RecordBatch incoming;
- private final FragmentContext context;
+ private final RootFragmentContext context;
private final AccountingUserConnection userConnection;
private RecordMaterializer materializer;
@@ -67,13 +67,17 @@ public class ScreenCreator implements RootCreator<Screen> {
}
}
- public ScreenRoot(FragmentContext context, RecordBatch incoming, Screen config) throws OutOfMemoryException {
+ public ScreenRoot(RootFragmentContext context, RecordBatch incoming, Screen config) throws OutOfMemoryException {
super(context, config);
this.context = context;
this.incoming = incoming;
userConnection = context.getUserDataTunnel();
}
+ public RootFragmentContext getContext() {
+ return context;
+ }
+
@Override
public boolean innerNext() {
IterOutcome outcome = next(incoming);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 2f33193a3..9231aef0d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -22,8 +22,9 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.AccountingDataTunnel;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.RootFragmentContext;
import org.apache.drill.exec.physical.config.SingleSender;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.record.BatchSchema;
@@ -36,7 +37,7 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory;
public class SingleSenderCreator implements RootCreator<SingleSender>{
@Override
- public RootExec getRoot(FragmentContext context, SingleSender config, List<RecordBatch> children)
+ public RootExec getRoot(ExecutorFragmentContext context, SingleSender config, List<RecordBatch> children)
throws ExecutionSetupException {
assert children != null && children.size() == 1;
return new SingleSenderRootExec(context, children.iterator().next(), config);
@@ -64,7 +65,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
}
}
- public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config) throws OutOfMemoryException {
+ public SingleSenderRootExec(RootFragmentContext context, RecordBatch batch, SingleSender config) throws OutOfMemoryException {
super(context, context.newOperatorContext(config, null), config);
this.incoming = batch;
assert incoming != null;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
index 7f9aca4a7..97e26b662 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java
@@ -26,7 +26,6 @@ import javax.inject.Named;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.ExpandableHyperContainer;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 442a753be..16832867b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -281,7 +281,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
} catch(SchemaChangeException | ClassTransformationException | IOException ex) {
kill(false);
logger.error("Failure during query", ex);
- context.fail(ex);
+ context.getExecutorState().fail(ex);
return IterOutcome.STOP;
}
}
@@ -295,7 +295,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context);
SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, null, context);
if (copier == null) {
- copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(), newContainer, newBatch, null);
+ copier = RemovingRecordBatch.getGenerated4Copier(batch, context, newContainer, newBatch, null);
} else {
for (VectorWrapper<?> i : batch) {
@@ -323,7 +323,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
selectionVector4.clear();
c.clear();
VectorContainer newQueue = new VectorContainer();
- builder.build(context, newQueue);
+ builder.build(newQueue);
priorityQueue.resetQueue(newQueue, builder.getSv4().createNewWrapperCurrent());
builder.getSv4().clear();
selectionVector4.clear();
@@ -336,7 +336,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
private PriorityQueue createNewPriorityQueue(VectorAccessible batch, int limit)
throws SchemaChangeException, ClassTransformationException, IOException {
return createNewPriorityQueue(
- mainMapping, leftMapping, rightMapping, context.getOptions(), context.getFunctionRegistry(), context.getDrillbitContext().getCompiler(),
+ mainMapping, leftMapping, rightMapping, context.getOptions(), context.getFunctionRegistry(), context.getCompiler(),
config.getOrderings(), batch, unionTypeEnabled, codegenDump, limit, oContext.getAllocator(), schema.getSelectionVectorMode());
}
@@ -415,7 +415,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
final SelectionVector4 selectionVector4 = priorityQueue.getSv4();
final SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context);
final SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, null, context);
- copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(), newContainer, newBatch, null);
+ copier = RemovingRecordBatch.getGenerated4Copier(batch, context, newContainer, newBatch, null);
@SuppressWarnings("resource")
SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator());
try {
@@ -434,7 +434,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
selectionVector4.clear();
c.clear();
final VectorContainer oldSchemaContainer = new VectorContainer(oContext);
- builder.build(context, oldSchemaContainer);
+ builder.build(oldSchemaContainer);
oldSchemaContainer.setRecordCount(builder.getSv4().getCount());
final VectorContainer newSchemaContainer = SchemaUtil.coerceContainer(oldSchemaContainer, this.schema, oContext);
newSchemaContainer.buildSchema(SelectionVectorMode.FOUR_BYTE);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java
index e815bff31..d4777449a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java
@@ -20,22 +20,18 @@ package org.apache.drill.exec.physical.impl.TopN;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.TopN;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.RecordBatch;
import com.google.common.base.Preconditions;
-public class TopNSortBatchCreator implements BatchCreator<TopN>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopNSortBatchCreator.class);
-
+public class TopNSortBatchCreator implements BatchCreator<TopN> {
@Override
- public TopNBatch getBatch(FragmentContext context, TopN config, List<RecordBatch> children)
+ public TopNBatch getBatch(ExecutorFragmentContext context, TopN config, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new TopNBatch(config, context, children.iterator().next());
}
-
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
index 0ea17d699..e98a7c6ad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java
@@ -118,7 +118,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
} catch(IOException ex) {
logger.error("Failure during query", ex);
kill(false);
- context.fail(ex);
+ context.getExecutorState().fail(ex);
return IterOutcome.STOP;
}
@@ -185,7 +185,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> {
try {
recordWriter.cleanup();
} catch(IOException ex) {
- context.fail(ex);
+ context.getExecutorState().fail(ex);
} finally {
try {
if (!processed) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index b3d68d3fb..47f1017b3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -186,7 +186,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
return aggregator.getOutcome();
case UPDATE_AGGREGATOR:
- context.fail(UserException.unsupportedError()
+ context.getExecutorState().fail(UserException.unsupportedError()
.message(SchemaChangeException.schemaChanged(
"Hash aggregate does not support schema change",
incomingSchema,
@@ -212,7 +212,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
this.aggregator = createAggregatorInternal();
return true;
} catch (SchemaChangeException | ClassTransformationException | IOException ex) {
- context.fail(ex);
+ context.getExecutorState().fail(ex);
container.clear();
incoming.kill(false);
return false;
@@ -227,9 +227,6 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
ClassGenerator<HashAggregator> cg = top.getRoot();
ClassGenerator<HashAggregator> cgInner = cg.getInnerGenerator("BatchHolder");
top.plainJavaCapable(true);
- // Uncomment out this line to debug the generated code.
- // top.saveCodeForDebugging(true);
-
container.clear();
int numGroupByExprs = (popConfig.getGroupByExprs() != null) ? popConfig.getGroupByExprs().size() : 0;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java
index 1397342f6..ed203e56a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.aggregate;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.HashAggregate;
import org.apache.drill.exec.physical.impl.BatchCreator;
@@ -27,15 +28,11 @@ import org.apache.drill.exec.record.RecordBatch;
import com.google.common.base.Preconditions;
-public class HashAggBatchCreator implements BatchCreator<HashAggregate>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggBatchCreator.class);
-
+public class HashAggBatchCreator implements BatchCreator<HashAggregate> {
@Override
- public HashAggBatch getBatch(FragmentContext context, HashAggregate config, List<RecordBatch> children)
+ public HashAggBatch getBatch(ExecutorFragmentContext context, HashAggregate config, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new HashAggBatch(config, children.iterator().next(), context);
}
-
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 89ba59b4b..ef8d9d90b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -300,7 +300,9 @@ public abstract class HashAggTemplate implements HashAggregator {
}
@Override
- public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] groupByOutFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException {
+ public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext,
+ RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds,
+ TypedFieldId[] groupByOutFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException {
if (valueExprs == null || valueFieldIds == null) {
throw new IllegalArgumentException("Invalid aggr value exprs or workspace variables.");
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
index 16b54997b..3384e671d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
@@ -35,34 +35,35 @@ import org.apache.drill.exec.record.VectorContainer;
public interface HashAggregator {
- public static TemplateClassDefinition<HashAggregator> TEMPLATE_DEFINITION =
+ TemplateClassDefinition<HashAggregator> TEMPLATE_DEFINITION =
new TemplateClassDefinition<HashAggregator>(HashAggregator.class, HashAggTemplate.class);
- public static enum AggOutcome {
+ enum AggOutcome {
RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR, CALL_WORK_AGAIN
}
// For returning results from outputCurrentBatch
// OK - batch returned, NONE - end of data, RESTART - call again
- public enum AggIterOutcome { AGG_OK, AGG_NONE, AGG_RESTART }
+ enum AggIterOutcome { AGG_OK, AGG_NONE, AGG_RESTART }
- public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException, ClassTransformationException;
+ void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing,
+ LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException, ClassTransformationException;
- public abstract IterOutcome getOutcome();
+ IterOutcome getOutcome();
- public abstract int getOutputCount();
+ int getOutputCount();
- public abstract AggOutcome doWork();
+ AggOutcome doWork();
- public abstract void cleanup();
+ void cleanup();
- public abstract boolean allFlushed();
+ boolean allFlushed();
- public abstract boolean buildComplete();
+ boolean buildComplete();
- public abstract AggIterOutcome outputCurrentBatch();
+ AggIterOutcome outputCurrentBatch();
- public abstract boolean earlyOutput();
+ boolean earlyOutput();
- public abstract RecordBatch getNewIncoming();
+ RecordBatch getNewIncoming();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
index ac4b29d16..c473b94e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java
@@ -45,19 +45,15 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
private int spilledBatches;
private FragmentContext context;
private BatchSchema schema;
- private OperatorContext oContext;
private SpillSet spillSet;
- // Path spillStreamPath;
private String spillFile;
VectorAccessibleSerializable vas;
- public SpilledRecordbatch(String spillFile,/* Path spillStreamPath,*/ int spilledBatches, FragmentContext context, BatchSchema schema, OperatorContext oContext, SpillSet spillSet) {
+ public SpilledRecordbatch(String spillFile, int spilledBatches, FragmentContext context, BatchSchema schema, OperatorContext oContext, SpillSet spillSet) {
this.context = context;
this.schema = schema;
this.spilledBatches = spilledBatches;
- this.oContext = oContext;
this.spillSet = spillSet;
- //this.spillStreamPath = spillStreamPath;
this.spillFile = spillFile;
vas = new VectorAccessibleSerializable(oContext.getAllocator());
container = vas.get();
@@ -126,7 +122,7 @@ public class SpilledRecordbatch implements CloseableRecordBatch {
@Override
public IterOutcome next() {
- if ( ! context.shouldContinue() ) { return IterOutcome.STOP; }
+ if ( ! context.getExecutorState().shouldContinue() ) { return IterOutcome.STOP; }
if ( spilledBatches <= 0 ) { // no more batches to read in this partition
this.close();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index b33dbd605..34ab97e2c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -202,7 +202,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
}
return outcome;
case UPDATE_AGGREGATOR:
- context.fail(UserException.unsupportedError()
+ context.getExecutorState().fail(UserException.unsupportedError()
.message(SchemaChangeException.schemaChanged("Streaming aggregate does not support schema changes", incomingSchema, incoming.getSchema()).getMessage())
.build(logger));
close();
@@ -263,7 +263,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
this.aggregator = createAggregatorInternal();
return true;
} catch (SchemaChangeException | ClassTransformationException | IOException ex) {
- context.fail(ex);
+ context.getExecutorState().fail(ex);
container.clear();
incoming.kill(false);
return false;
@@ -275,8 +275,6 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
private StreamingAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException, IOException{
ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getOptions());
cg.getCodeGenerator().plainJavaCapable(true);
- // Uncomment out this line to debug the generated code.
- // cg.getCodeGenerator().saveCodeForDebugging(true);
container.clear();
LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getKeys().size()];
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java
index cac5b06d8..864271ee6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.aggregate;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.StreamingAggregate;
import org.apache.drill.exec.physical.impl.BatchCreator;
@@ -27,15 +28,11 @@ import org.apache.drill.exec.record.RecordBatch;
import com.google.common.base.Preconditions;
-public class StreamingAggBatchCreator implements BatchCreator<StreamingAggregate>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggBatchCreator.class);
-
+public class StreamingAggBatchCreator implements BatchCreator<StreamingAggregate> {
@Override
- public StreamingAggBatch getBatch(FragmentContext context, StreamingAggregate config, List<RecordBatch> children)
+ public StreamingAggBatch getBatch(ExecutorFragmentContext context, StreamingAggregate config, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new StreamingAggBatch(config, children.iterator().next(), context);
}
-
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
index 61c82d8e2..7e13eb346 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.physical.impl.aggregate;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java
index 01122bebd..6c42a1ac9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.broadcastsender;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.BroadcastSender;
import org.apache.drill.exec.physical.impl.RootCreator;
import org.apache.drill.exec.physical.impl.RootExec;
@@ -30,7 +30,7 @@ import com.google.common.collect.Iterators;
public class BroadcastSenderCreator implements RootCreator<BroadcastSender> {
@Override
- public RootExec getRoot(FragmentContext context, BroadcastSender config, List<RecordBatch> children) throws ExecutionSetupException {
+ public RootExec getRoot(ExecutorFragmentContext context, BroadcastSender config, List<RecordBatch> children) throws ExecutionSetupException {
assert children != null && children.size() == 1;
return new BroadcastSenderRootExec(context, Iterators.getOnlyElement(children.iterator()), config);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
index 80d774483..bd4a1ec42 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java
@@ -21,8 +21,8 @@ import java.util.List;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.AccountingDataTunnel;
-import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
+import org.apache.drill.exec.ops.RootFragmentContext;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.config.BroadcastSender;
import org.apache.drill.exec.physical.impl.BaseRootExec;
@@ -57,7 +57,7 @@ public class BroadcastSenderRootExec extends BaseRootExec {
}
}
- public BroadcastSenderRootExec(FragmentContext context,
+ public BroadcastSenderRootExec(RootFragmentContext context,
RecordBatch incoming,
BroadcastSender config) throws OutOfMemoryException {
super(context, context.newOperatorContext(config, null), config);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index 9bab67d29..703868e94 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -225,12 +225,11 @@ public class ChainedHashTable {
setupGetHash(cg /* use top level code generator for getHash */, GetHashIncomingProbeMapping, incomingProbe, keyExprsProbe, true);
HashTable ht = context.getImplementationClass(top);
- ht.setup(htConfig, context, allocator, incomingBuild, incomingProbe, outgoing, htContainerOrig);
+ ht.setup(htConfig, allocator, incomingBuild, incomingProbe, outgoing, htContainerOrig);
return ht;
}
-
private void setupIsKeyMatchInternal(ClassGenerator<HashTable> cg, MappingSet incomingMapping, MappingSet htableMapping,
LogicalExpression[] keyExprs, List<Comparator> comparators, TypedFieldId[] htKeyFieldIds)
throws SchemaChangeException {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index 3749e3e57..d28fe498e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -20,64 +20,58 @@ package org.apache.drill.exec.physical.impl.common;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.common.exceptions.RetryAfterSpillException;
public interface HashTable {
-
- public static TemplateClassDefinition<HashTable> TEMPLATE_DEFINITION =
- new TemplateClassDefinition<HashTable>(HashTable.class, HashTableTemplate.class);
-
- /**
- * The initial default capacity of the hash table (in terms of number of buckets).
- */
- static final public int DEFAULT_INITIAL_CAPACITY = 1 << 16;
+ TemplateClassDefinition<HashTable> TEMPLATE_DEFINITION =
+ new TemplateClassDefinition<>(HashTable.class, HashTableTemplate.class);
/**
* The maximum capacity of the hash table (in terms of number of buckets).
*/
- static final public int MAXIMUM_CAPACITY = 1 << 30;
+ int MAXIMUM_CAPACITY = 1 << 30;
/**
* The default load factor of a hash table.
*/
- static final public float DEFAULT_LOAD_FACTOR = 0.75f;
+ float DEFAULT_LOAD_FACTOR = 0.75f;
- static public enum PutStatus {KEY_PRESENT, KEY_ADDED, NEW_BATCH_ADDED, KEY_ADDED_LAST, PUT_FAILED;}
+ enum PutStatus {KEY_PRESENT, KEY_ADDED, NEW_BATCH_ADDED, KEY_ADDED_LAST, PUT_FAILED;}
/**
* The batch size used for internal batch holders
*/
- static final public int BATCH_SIZE = Character.MAX_VALUE + 1;
- static final public int BATCH_MASK = 0x0000FFFF;
+ int BATCH_SIZE = Character.MAX_VALUE + 1;
+ int BATCH_MASK = 0x0000FFFF;
- public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig);
+ void setup(HashTableConfig htConfig, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing,
+ VectorContainer htContainerOrig);
- public void updateBatches() throws SchemaChangeException;
+ void updateBatches() throws SchemaChangeException;
- public int getHashCode(int incomingRowIdx) throws SchemaChangeException;
+ int getHashCode(int incomingRowIdx) throws SchemaChangeException;
- public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException, RetryAfterSpillException;
+ PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException, RetryAfterSpillException;
- public int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException;
+ int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException;
- public void getStats(HashTableStats stats);
+ void getStats(HashTableStats stats);
- public int size();
+ int size();
- public boolean isEmpty();
+ boolean isEmpty();
- public void clear();
+ void clear();
- public void reinit(RecordBatch newIncoming);
+ void reinit(RecordBatch newIncoming);
- public void reset();
+ void reset();
- public void setMaxVarcharSize(int size);
+ void setMaxVarcharSize(int size);
- public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords, int numExpectedRecords);
+ boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords, int numExpectedRecords);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 6cbbdcbb5..272d782e5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -29,7 +29,6 @@ import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
@@ -48,7 +47,6 @@ public abstract class HashTableTemplate implements HashTable {
private static final boolean EXTRA_DEBUG = false;
private static final int EMPTY_SLOT = -1;
- // private final int MISSING_VALUE = 65544;
// A hash 'bucket' consists of the start index to indicate start of a hash chain
@@ -78,8 +76,6 @@ public abstract class HashTableTemplate implements HashTable {
// Placeholder for the current index while probing the hash table
private IndexPointer currentIdxHolder;
-// private FragmentContext context;
-
private BufferAllocator allocator;
// The incoming build side record batch
@@ -451,7 +447,7 @@ public abstract class HashTableTemplate implements HashTable {
@Override
- public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig) {
+ public void setup(HashTableConfig htConfig, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig) {
float loadf = htConfig.getLoadFactor();
int initialCap = htConfig.getInitialCapacity();
@@ -470,7 +466,6 @@ public abstract class HashTableTemplate implements HashTable {
}
this.htConfig = htConfig;
-// this.context = context;
this.allocator = allocator;
this.incomingBuild = incomingBuild;
this.incomingProbe = incomingProbe;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
index e9b305169..ace4f24f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java
@@ -20,22 +20,18 @@ package org.apache.drill.exec.physical.impl.filter;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.Filter;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.RecordBatch;
import com.google.common.base.Preconditions;
-public class FilterBatchCreator implements BatchCreator<Filter>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterBatchCreator.class);
-
+public class FilterBatchCreator implements BatchCreator<Filter> {
@Override
- public FilterRecordBatch getBatch(FragmentContext context, Filter config, List<RecordBatch> children)
+ public FilterRecordBatch getBatch(ExecutorFragmentContext context, Filter config, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new FilterRecordBatch(config, children.iterator().next(), context);
}
-
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index 1bdd09743..f0b832a78 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -42,9 +42,7 @@ import org.apache.drill.exec.vector.ValueVector;
import com.google.common.collect.Lists;
-public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
- //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class);
-
+public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter> {
private SelectionVector2 sv2;
private SelectionVector4 sv4;
private Filterer filter;
@@ -120,16 +118,7 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
* logic that handles SV4 + filter should always be pushed beyond sort so disabling
* it in FilterPrel.
*
-
- // set up the multi-batch selection vector
- this.svAllocator = oContext.getAllocator().getNewPreAllocator();
- if (!svAllocator.preAllocate(incoming.getRecordCount()*4))
- throw new SchemaChangeException("Attempted to filter an SV4 which exceeds allowed memory (" +
- incoming.getRecordCount() * 4 + " bytes)");
- sv4 = new SelectionVector4(svAllocator.getAllocation(), incoming.getRecordCount(), Character.MAX_VALUE);
- this.filter = generateSV4Filterer();
- break;
- */
+ */
default:
throw new UnsupportedOperationException();
}
@@ -197,8 +186,6 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
final TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]);
CodeGenerator<Filterer> codeGen = cg.getCodeGenerator();
codeGen.plainJavaCapable(true);
- // Uncomment out this line to debug the generated code.
-// cg.saveCodeForDebugging(true);
final Filterer filter = context.getImplementationClass(codeGen);
filter.setup(context, incoming, this, tx);
return filter;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java
index 74a5d1671..db62d3693 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java
@@ -20,12 +20,12 @@ package org.apache.drill.exec.physical.impl.filter;
import javax.inject.Named;
import org.apache.drill.exec.compile.sig.CodeGeneratorSignature;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.FragmentContextImpl;
import org.apache.drill.exec.record.RecordBatch;
public interface FilterSignature extends CodeGeneratorSignature{
- public void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
+ public void doSetup(@Named("context") FragmentContextImpl context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing);
public boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
index d014a2efe..52533bd50 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java
@@ -27,9 +27,7 @@ import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.selection.SelectionVector2;
-public abstract class FilterTemplate2 implements Filterer{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterTemplate2.class);
-
+public abstract class FilterTemplate2 implements Filterer {
private SelectionVector2 outgoingSelectionVector;
private SelectionVector2 incomingSelectionVector;
private SelectionVectorMode svMode;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
index fd1f9e68c..4850cff41 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java
@@ -26,8 +26,6 @@ import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.record.selection.SelectionVector4;
public abstract class FilterTemplate4 implements Filterer {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterTemplate4.class);
-
private SelectionVector4 outgoingSelectionVector;
private SelectionVector4 incomingSelectionVector;
private TransferPair[] transfers;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
index aa45f54ff..a3d03c2d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java
@@ -24,11 +24,9 @@ import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TransferPair;
public interface Filterer {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Filterer.class);
+ TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION2 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate2.class);
+ TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate4.class);
- public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException;
- public void filterBatch(int recordCount) throws SchemaChangeException;
-
- public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION2 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate2.class);
- public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate4.class);
+ void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException;
+ void filterBatch(int recordCount) throws SchemaChangeException;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java
index 94203d81b..bfda4f43a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java
@@ -20,21 +20,18 @@ package org.apache.drill.exec.physical.impl.flatten;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.FlattenPOP;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.RecordBatch;
import com.google.common.base.Preconditions;
-public class FlattenBatchCreator implements BatchCreator<FlattenPOP>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenBatchCreator.class);
-
+public class FlattenBatchCreator implements BatchCreator<FlattenPOP> {
@Override
- public FlattenRecordBatch getBatch(FragmentContext context, FlattenPOP config, List<RecordBatch> children)
+ public FlattenRecordBatch getBatch(ExecutorFragmentContext context, FlattenPOP config, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new FlattenRecordBatch(config, children.iterator().next(), context);
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
index 2aa841b38..8be16ad4a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java
@@ -303,8 +303,6 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> {
final ClassGenerator<Flattener> cg = CodeGenerator.getRoot(Flattener.TEMPLATE_DEFINITION, context.getOptions());
cg.getCodeGenerator().plainJavaCapable(true);
- // Uncomment out this line to debug the generated code.
- // cg.getCodeGenerator().saveCodeForDebugging(true);
final IntHashSet transferFieldIds = new IntHashSet();
final NamedExpression flattenExpr = new NamedExpression(popConfig.getColumn(), new FieldReference(popConfig.getColumn()));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
index 5293060df..392757e17 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java
@@ -27,9 +27,11 @@ import org.apache.drill.exec.record.TransferPair;
import org.apache.drill.exec.vector.complex.RepeatedValueVector;
public interface Flattener {
- public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException;
+ TemplateClassDefinition<Flattener> TEMPLATE_DEFINITION = new TemplateClassDefinition<Flattener>(Flattener.class, FlattenTemplate.class);
- public interface Monitor {
+ void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException;
+
+ interface Monitor {
/**
* Get the required buffer size for the specified number of records.
* {@see ValueVector#getBufferSizeFor(int)} for the meaning of this.
@@ -37,14 +39,14 @@ public interface Flattener {
* @param recordCount the number of records processed so far
* @return the buffer size the vectors report as being in use
*/
- public int getBufferSizeFor(int recordCount);
- };
+ int getBufferSizeFor(int recordCount);
+ }
+
+ int flattenRecords(int recordCount, int firstOutputIndex, Monitor monitor);
- public int flattenRecords(int recordCount, int firstOutputIndex, Monitor monitor);
+ void setFlattenField(RepeatedValueVector repeatedColumn);
- public void setFlattenField(RepeatedValueVector repeatedColumn);
- public RepeatedValueVector getFlattenField();
- public void resetGroupIndex();
+ RepeatedValueVector getFlattenField();
- public static final TemplateClassDefinition<Flattener> TEMPLATE_DEFINITION = new TemplateClassDefinition<Flattener>(Flattener.class, FlattenTemplate.class);
+ void resetGroupIndex();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 7b679c0d6..e087bc84b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -256,12 +256,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
// No more output records, clean up and return
state = BatchState.DONE;
- // if (first) {
- // return IterOutcome.OK_NEW_SCHEMA;
- // }
return IterOutcome.NONE;
} catch (ClassTransformationException | SchemaChangeException | IOException e) {
- context.fail(e);
+ context.getExecutorState().fail(e);
killIncoming(false);
return IterOutcome.STOP;
}
@@ -405,8 +402,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException {
final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getOptions());
cg.plainJavaCapable(true);
- // Uncomment out this line to debug the generated code.
- // cg.saveCodeForDebugging(true);
final ClassGenerator<HashJoinProbe> g = cg.getRoot();
// Generate the code to project build side records
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
index 140276943..a005559cd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.join;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.HashJoinPOP;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.RecordBatch;
@@ -30,7 +30,7 @@ import com.google.common.base.Preconditions;
public class HashJoinBatchCreator implements BatchCreator<HashJoinPOP> {
@Override
- public HashJoinBatch getBatch(FragmentContext context, HashJoinPOP config, List<RecordBatch> children)
+ public HashJoinBatch getBatch(ExecutorFragmentContext context, HashJoinPOP config, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 2);
return new HashJoinBatch(config, context, children.get(0), children.get(1));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
index cc6bd5578..4ef28e667 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java
@@ -30,7 +30,7 @@ import org.apache.drill.exec.record.VectorContainer;
import org.apache.calcite.rel.core.JoinRelType;
public interface HashJoinProbe {
- public static TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, HashJoinProbeTemplate.class);
+ TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, HashJoinProbeTemplate.class);
/* The probe side of the hash join can be in the following two states
* 1. PROBE_PROJECT: Inner join case, we probe our hash table to see if we have a
@@ -40,15 +40,15 @@ public interface HashJoinProbe {
* case we handle it internally by projecting the record if there isn't a match on the build side
* 3. DONE: Once we have projected all possible records we are done
*/
- public static enum ProbeState {
+ enum ProbeState {
PROBE_PROJECT, PROJECT_RIGHT, DONE
}
- public abstract void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch,
- int probeRecordCount, HashJoinBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper,
- JoinRelType joinRelType);
- public abstract void doSetup(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, RecordBatch outgoing);
- public abstract int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException;
- public abstract void projectBuildRecord(int buildIndex, int outIndex);
- public abstract void projectProbeRecord(int probeIndex, int outIndex);
+ void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch,
+ int probeRecordCount, HashJoinBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper,
+ JoinRelType joinRelType);
+ void doSetup(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, RecordBatch outgoing);
+ int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException;
+ void projectBuildRecord(int buildIndex, int outIndex);
+ void projectProbeRecord(int probeIndex, int outIndex);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
index 55322f849..95f7c3d84 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java
@@ -24,15 +24,12 @@ import org.apache.drill.exec.record.VectorContainer;
public interface JoinWorker {
+ TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(JoinWorker.class, JoinTemplate.class);
- public static enum JoinOutcome {
+ enum JoinOutcome {
NO_MORE_DATA, BATCH_RETURNED, SCHEMA_CHANGED, WAITING, FAILURE;
}
- public void setupJoin(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException;
- public boolean doJoin(JoinStatus status);
-
- public static TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(
- JoinWorker.class, JoinTemplate.class);
-
+ void setupJoin(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException;
+ boolean doJoin(JoinStatus status);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 8ad3f84db..1ed4722bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -199,7 +199,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
this.worker = generateNewWorker();
first = true;
} catch (ClassTransformationException | IOException | SchemaChangeException e) {
- context.fail(new SchemaChangeException(e));
+ context.getExecutorState().fail(new SchemaChangeException(e));
kill(false);
return IterOutcome.STOP;
} finally {
@@ -269,12 +269,9 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
right.kill(sendUpstream);
}
- private JoinWorker generateNewWorker() throws ClassTransformationException, IOException, SchemaChangeException{
-
+ private JoinWorker generateNewWorker() throws ClassTransformationException, IOException, SchemaChangeException {
final ClassGenerator<JoinWorker> cg = CodeGenerator.getRoot(JoinWorker.TEMPLATE_DEFINITION, context.getOptions());
cg.getCodeGenerator().plainJavaCapable(true);
- // Uncomment out this line to debug the generated code.
- // cg.getCodeGenerator().saveCodeForDebugging(true);
final ErrorCollector collector = new ErrorCollectorImpl();
// Generate members and initialization code
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
index 24f5533db..b24624eb9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.join;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.MergeJoinPOP;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.RecordBatch;
@@ -29,10 +29,8 @@ import org.apache.calcite.rel.core.JoinRelType;
import com.google.common.base.Preconditions;
public class MergeJoinCreator implements BatchCreator<MergeJoinPOP> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinCreator.class);
-
@Override
- public MergeJoinBatch getBatch(FragmentContext context, MergeJoinPOP config, List<RecordBatch> children)
+ public MergeJoinBatch getBatch(ExecutorFragmentContext context, MergeJoinPOP config, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 2);
if(config.getJoinType() == JoinRelType.RIGHT){
@@ -40,6 +38,5 @@ public class MergeJoinCreator implements BatchCreator<MergeJoinPOP> {
}else{
return new MergeJoinBatch(config, context, children.get(0), children.get(1));
}
-
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java
index 2e708a6ea..ef4cab10a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java
@@ -20,14 +20,14 @@ package org.apache.drill.exec.physical.impl.join;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.NestedLoopJoinPOP;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.RecordBatch;
public class NestedLoopJoinBatchCreator implements BatchCreator<NestedLoopJoinPOP> {
@Override
- public NestedLoopJoinBatch getBatch(FragmentContext context, NestedLoopJoinPOP config, List<RecordBatch> children)
+ public NestedLoopJoinBatch getBatch(ExecutorFragmentContext context, NestedLoopJoinPOP config, List<RecordBatch> children)
throws ExecutionSetupException {
return new NestedLoopJoinBatch(config, context, children.get(0), children.get(1));
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java
index f954e7202..15e52758f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.limit;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.Limit;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.RecordBatch;
@@ -29,7 +29,7 @@ import com.google.common.collect.Iterables;
public class LimitBatchCreator implements BatchCreator<Limit> {
@Override
- public LimitRecordBatch getBatch(FragmentContext context, Limit config, List<RecordBatch> children)
+ public LimitRecordBatch getBatch(ExecutorFragmentContext context, Limit config, List<RecordBatch> children)
throws ExecutionSetupException {
return new LimitRecordBatch(config, context, Iterables.getOnlyElement(children));
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index f9ceff268..7e5ff2126 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -41,6 +41,7 @@ import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
+import org.apache.drill.exec.ops.ExchangeFragmentContext;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
@@ -96,7 +97,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
private RecordBatchLoader[] batchLoaders;
private final RawFragmentBatchProvider[] fragProviders;
- private final FragmentContext context;
+ private final ExchangeFragmentContext context;
private VectorContainer outgoingContainer;
private MergingReceiverGeneratorBase merger;
private final MergingReceiverPOP config;
@@ -109,12 +110,11 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
private RawFragmentBatch[] incomingBatches;
private int[] batchOffsets;
private PriorityQueue <Node> pqueue;
- private RawFragmentBatch emptyBatch = null;
private RawFragmentBatch[] tempBatchHolder;
private long[] inputCounts;
private long[] outputCounts;
- public static enum Metric implements MetricDef{
+ public enum Metric implements MetricDef {
BYTES_RECEIVED,
NUM_SENDERS,
NEXT_WAIT_NANOS;
@@ -125,7 +125,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
}
- public MergingRecordBatch(final FragmentContext context,
+ public MergingRecordBatch(final ExchangeFragmentContext context,
final MergingReceiverPOP config,
final RawFragmentBatchProvider[] fragProviders) throws OutOfMemoryException {
super(config, context, true, context.newOperatorContext(config));
@@ -210,11 +210,11 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
try {
rawBatch = getNext(p);
} catch (final IOException e) {
- context.fail(e);
+ context.getExecutorState().fail(e);
return IterOutcome.STOP;
}
}
- if (rawBatch == null && !context.shouldContinue()) {
+ if (rawBatch == null && !context.getExecutorState().shouldContinue()) {
clearBatches(rawBatches);
return IterOutcome.STOP;
}
@@ -241,12 +241,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
while ((rawBatch = getNext(p)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) {
// Do nothing
}
- if (rawBatch == null && !context.shouldContinue()) {
+ if (rawBatch == null && !context.getExecutorState().shouldContinue()) {
clearBatches(rawBatches);
return IterOutcome.STOP;
}
} catch (final IOException e) {
- context.fail(e);
+ context.getExecutorState().fail(e);
clearBatches(rawBatches);
return IterOutcome.STOP;
}
@@ -315,7 +315,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
// SchemaChangeException, so check/clean catch clause below.
} catch(final SchemaChangeException e) {
logger.error("MergingReceiver failed to load record batch from remote host. {}", e);
- context.fail(e);
+ context.getExecutorState().fail(e);
return IterOutcome.STOP;
}
batch.release();
@@ -328,7 +328,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
// Ensure all the incoming batches have the identical schema.
// Note: RecordBatchLoader permutes the columns to obtain the same columns order for all batches.
if (!isSameSchemaAmongBatches(batchLoaders)) {
- context.fail(new SchemaChangeException("Incoming batches for merging receiver have different schemas!"));
+ context.getExecutorState().fail(new SchemaChangeException("Incoming batches for merging receiver have different schemas!"));
return IterOutcome.STOP;
}
@@ -351,7 +351,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
merger = createMerger();
} catch (final SchemaChangeException e) {
logger.error("Failed to generate code for MergingReceiver. {}", e);
- context.fail(e);
+ context.getExecutorState().fail(e);
return IterOutcome.STOP;
}
@@ -380,12 +380,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
} else {
batchLoaders[b].clear();
batchLoaders[b] = null;
- if (!context.shouldContinue()) {
+ if (!context.getExecutorState().shouldContinue()) {
return IterOutcome.STOP;
}
}
} catch (IOException | SchemaChangeException e) {
- context.fail(e);
+ context.getExecutorState().fail(e);
return IterOutcome.STOP;
}
}
@@ -418,11 +418,11 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
assert nextBatch != null || inputCounts[node.batchId] == outputCounts[node.batchId]
: String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]);
- if (nextBatch == null && !context.shouldContinue()) {
+ if (nextBatch == null && !context.getExecutorState().shouldContinue()) {
return IterOutcome.STOP;
}
} catch (final IOException e) {
- context.fail(e);
+ context.getExecutorState().fail(e);
return IterOutcome.STOP;
}
@@ -456,7 +456,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
// TODO: Clean: DRILL-2933: That load(...) no longer throws
// SchemaChangeException, so check/clean catch clause below.
} catch(final SchemaChangeException ex) {
- context.fail(ex);
+ context.getExecutorState().fail(ex);
return IterOutcome.STOP;
}
incomingBatches[node.batchId].release();
@@ -548,7 +548,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
final RawFragmentBatch batch = getNext(i);
if (batch == null) {
- if (!context.shouldContinue()) {
+ if (!context.getExecutorState().shouldContinue()) {
state = BatchState.STOP;
} else {
state = BatchState.DONE;
@@ -605,7 +605,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
.setReceiver(context.getHandle())
.setSender(sender)
.build();
- context.getControlTunnel(providingEndpoint.getEndpoint()).informReceiverFinished(new OutcomeListener(), finishedReceiver);
+ context.getController()
+ .getTunnel(providingEndpoint.getEndpoint())
+ .informReceiverFinished(new OutcomeListener(), finishedReceiver);
}
}
@@ -624,10 +626,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
@Override
public void interrupted(final InterruptedException e) {
- if (context.shouldContinue()) {
+ if (context.getExecutorState().shouldContinue()) {
final String errMsg = "Received an interrupt RPC outcome while sending ReceiverFinished message";
logger.error(errMsg, e);
- context.fail(new RpcException(errMsg, e));
+ context.getExecutorState().fail(new RpcException(errMsg, e));
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index 7f662ae22..9e82af8bb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -196,7 +196,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
}
}
VectorContainer sortedSamples = new VectorContainer();
- builder.build(context, sortedSamples);
+ builder.build(sortedSamples);
// Sort the records according the orderings given in the configuration
@@ -262,7 +262,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
Thread.sleep(timeout);
return true;
} catch (final InterruptedException e) {
- if (!context.shouldContinue()) {
+ if (!context.getExecutorState().shouldContinue()) {
return false;
}
}
@@ -329,7 +329,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
} catch (final ClassTransformationException | IOException | SchemaChangeException ex) {
kill(false);
- context.fail(ex);
+ context.getExecutorState().fail(ex);
return false;
// TODO InterruptedException
}
@@ -349,7 +349,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
for (CachedVectorContainer w : mmap.get(mapKey)) {
containerBuilder.add(w.get());
}
- containerBuilder.build(context, allSamplesContainer);
+ containerBuilder.build(allSamplesContainer);
List<Ordering> orderDefs = Lists.newArrayList();
int i = 0;
@@ -390,7 +390,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
candidatePartitionTable.setRecordCount(copier.getOutputRecords());
@SuppressWarnings("resource")
WritableBatch batch = WritableBatch.getBatchNoHVWrap(candidatePartitionTable.getRecordCount(), candidatePartitionTable, false);
- wrap = new CachedVectorContainer(batch, context.getDrillbitContext().getAllocator());
+ wrap = new CachedVectorContainer(batch, context.getAllocator());
tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES);
} finally {
candidatePartitionTable.clear();
@@ -486,7 +486,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
} catch (SchemaChangeException ex) {
kill(false);
logger.error("Failure during query", ex);
- context.fail(ex);
+ context.getExecutorState().fail(ex);
return IterOutcome.STOP;
}
doWork(vc);
@@ -519,7 +519,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
} catch (SchemaChangeException ex) {
kill(false);
logger.error("Failure during query", ex);
- context.fail(ex);
+ context.getExecutorState().fail(ex);
return IterOutcome.STOP;
}
doWork(vc);
@@ -550,7 +550,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
} catch (SchemaChangeException ex) {
kill(false);
logger.error("Failure during query", ex);
- context.fail(ex);
+ context.getExecutorState().fail(ex);
return IterOutcome.STOP;
}
// fall through.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java
index d2e07e7b6..5705aca0a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.orderedpartitioner;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.physical.config.OrderedPartitionSender;
import org.apache.drill.exec.physical.impl.RootCreator;
@@ -61,7 +61,7 @@ public class OrderedPartitionSenderCreator implements RootCreator<OrderedPartiti
@SuppressWarnings("resource")
@Override
- public RootExec getRoot(FragmentContext context, OrderedPartitionSender config,
+ public RootExec getRoot(ExecutorFragmentContext context, OrderedPartitionSender config,
List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java
index 06fd1155a..e0b7b9a91 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.partitionsender;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.physical.impl.RootCreator;
import org.apache.drill.exec.physical.impl.RootExec;
@@ -29,7 +29,7 @@ import org.apache.drill.exec.record.RecordBatch;
public class PartitionSenderCreator implements RootCreator<HashPartitionSender> {
@Override
- public RootExec getRoot(FragmentContext context,
+ public RootExec getRoot(ExecutorFragmentContext context,
HashPartitionSender config,
List<RecordBatch> children) throws ExecutionSetupException {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 108d539f8..25be50a06 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -33,9 +33,10 @@ import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.ops.AccountingDataTunnel;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExchangeFragmentContext;
import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.ops.RootFragmentContext;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.physical.impl.BaseRootExec;
@@ -63,7 +64,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
private HashPartitionSender operator;
private PartitionerDecorator partitioner;
- private FragmentContext context;
+ private ExchangeFragmentContext context;
private boolean ok = true;
private final int outGoingBatchCount;
private final HashPartitionSender popConfig;
@@ -98,13 +99,13 @@ public class PartitionSenderRootExec extends BaseRootExec {
}
}
- public PartitionSenderRootExec(FragmentContext context,
+ public PartitionSenderRootExec(RootFragmentContext context,
RecordBatch incoming,
HashPartitionSender operator) throws OutOfMemoryException {
this(context, incoming, operator, false);
}
- public PartitionSenderRootExec(FragmentContext context,
+ public PartitionSenderRootExec(RootFragmentContext context,
RecordBatch incoming,
HashPartitionSender operator,
boolean closeIncoming) throws OutOfMemoryException {
@@ -173,7 +174,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
} catch (IOException e) {
incoming.kill(false);
logger.error("Error while creating partitioning sender or flushing outgoing batches", e);
- context.fail(e);
+ context.getExecutorState().fail(e);
}
return false;
@@ -203,19 +204,19 @@ public class PartitionSenderRootExec extends BaseRootExec {
} catch (IOException e) {
incoming.kill(false);
logger.error("Error while flushing outgoing batches", e);
- context.fail(e);
+ context.getExecutorState().fail(e);
return false;
} catch (SchemaChangeException e) {
incoming.kill(false);
logger.error("Error while setting up partitioner", e);
- context.fail(e);
+ context.getExecutorState().fail(e);
return false;
}
case OK:
try {
partitioner.partitionBatch(incoming);
} catch (IOException e) {
- context.fail(e);
+ context.getExecutorState().fail(e);
incoming.kill(false);
return false;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
index 95a48135d..5d1b08cc5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
@@ -22,34 +22,33 @@ import java.util.List;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExchangeFragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.physical.config.HashPartitionSender;
import org.apache.drill.exec.record.RecordBatch;
public interface Partitioner {
+ void setup(ExchangeFragmentContext context,
+ RecordBatch incoming,
+ HashPartitionSender popConfig,
+ OperatorStats stats,
+ OperatorContext oContext,
+ int start, int count) throws SchemaChangeException;
- public abstract void setup(FragmentContext context,
- RecordBatch incoming,
- HashPartitionSender popConfig,
- OperatorStats stats,
- OperatorContext oContext,
- int start, int count) throws SchemaChangeException;
-
- public abstract void partitionBatch(RecordBatch incoming) throws IOException;
- public abstract void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException;
- public abstract void initialize();
- public abstract void clear();
- public abstract List<? extends PartitionOutgoingBatch> getOutgoingBatches();
+ void partitionBatch(RecordBatch incoming) throws IOException;
+ void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException;
+ void initialize();
+ void clear();
+ List<? extends PartitionOutgoingBatch> getOutgoingBatches();
/**
* Method to get PartitionOutgoingBatch based on the fact that there can be > 1 Partitioner
* @param index
* @return PartitionOutgoingBatch that matches index within Partitioner. This method can
* return null if index does not fall within boundary of this Partitioner
*/
- public abstract PartitionOutgoingBatch getOutgoingBatch(int index);
- public abstract OperatorStats getStats();
+ PartitionOutgoingBatch getOutgoingBatch(int index);
+ OperatorStats getStats();
- public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class);
+ TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class);
} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
index 042222a2d..78b8d033e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java
@@ -57,7 +57,7 @@ public class PartitionerDecorator {
this.partitioners = partitioners;
this.stats = stats;
this.context = context;
- this.executor = context.getDrillbitContext().getExecutor();
+ this.executor = context.getExecutor();
this.tName = Thread.currentThread().getName();
this.childThreadPrefix = "Partitioner-" + tName + "-";
}
@@ -177,7 +177,7 @@ public class PartitionerDecorator {
break;
} catch (final InterruptedException e) {
// If the fragment state says we shouldn't continue, cancel or interrupt partitioner threads
- if (!context.shouldContinue()) {
+ if (!context.getExecutorState().shouldContinue()) {
logger.debug("Interrupting partioner threads. Fragment thread {}", tName);
for(Future<?> f : taskFutures) {
f.cancel(true);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index aa72c44d8..0d52b53ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.ExchangeFragmentContext;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.ops.OperatorStats;
@@ -81,7 +82,7 @@ public abstract class PartitionerTemplate implements Partitioner {
}
@Override
- public final void setup(FragmentContext context,
+ public final void setup(ExchangeFragmentContext context,
RecordBatch incoming,
HashPartitionSender popConfig,
OperatorStats stats,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index 3afa8527e..bbcb75831 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -68,8 +68,8 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer>
wrapper = queue.take();
logger.debug("Got batch from queue");
} catch (final InterruptedException e) {
- if (context.shouldContinue()) {
- context.fail(e);
+ if (context.getExecutorState().shouldContinue()) {
+ context.getExecutorState().fail(e);
}
return IterOutcome.STOP;
// TODO InterruptedException
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java
index 6542576b5..779728a91 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.producer;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.ProducerConsumer;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.RecordBatch;
@@ -29,7 +29,7 @@ import com.google.common.collect.Iterables;
public class ProducerConsumerBatchCreator implements BatchCreator<ProducerConsumer> {
@Override
- public ProducerConsumerBatch getBatch(FragmentContext context, ProducerConsumer config, List<RecordBatch> children)
+ public ProducerConsumerBatch getBatch(ExecutorFragmentContext context, ProducerConsumer config, List<RecordBatch> children)
throws ExecutionSetupException {
return new ProducerConsumerBatch(config, context, Iterables.getOnlyElement(children));
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java
index f2495402f..73ab44127 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.project;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.ComplexToJson;
import org.apache.drill.exec.physical.config.Project;
import org.apache.drill.exec.physical.impl.BatchCreator;
@@ -29,15 +29,12 @@ import org.apache.drill.exec.record.RecordBatch;
import com.google.common.base.Preconditions;
public class ComplexToJsonBatchCreator implements BatchCreator<ComplexToJson> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ComplexToJsonBatchCreator.class);
-
@Override
- public ProjectRecordBatch getBatch(FragmentContext context, ComplexToJson flatten, List<RecordBatch> children)
+ public ProjectRecordBatch getBatch(ExecutorFragmentContext context, ComplexToJson flatten, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new ProjectRecordBatch(new Project(null, flatten.getChild()),
children.iterator().next(),
context);
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java
index e7a6b0542..37753cd81 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.project;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.Project;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.RecordBatch;
@@ -28,13 +28,10 @@ import org.apache.drill.exec.record.RecordBatch;
import com.google.common.base.Preconditions;
public class ProjectBatchCreator implements BatchCreator<Project>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectBatchCreator.class);
-
@Override
- public ProjectRecordBatch getBatch(FragmentContext context, Project config, List<RecordBatch> children)
+ public ProjectRecordBatch getBatch(ExecutorFragmentContext context, Project config, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new ProjectRecordBatch(config, children.iterator().next(), context);
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index be0f61fa7..89e0ee9e8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -804,7 +804,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
} catch (SchemaChangeException e) {
kill(false);
logger.error("Failure during query", e);
- context.fail(e);
+ context.getExecutorState().fail(e);
return IterOutcome.STOP;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
index d71159231..f38b62e3a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java
@@ -132,7 +132,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
return IterOutcome.NONE;
}
- builder.build(context, container);
+ builder.build(container);
sorter = createNewSorter();
sorter.setup(context, getSelectionVector4(), this.container);
sorter.sort(getSelectionVector4(), this.container);
@@ -142,7 +142,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> {
} catch(SchemaChangeException | ClassTransformationException | IOException ex) {
kill(false);
logger.error("Failure during query", ex);
- context.fail(ex);
+ context.getExecutorState().fail(ex);
return IterOutcome.STOP;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java
index 559558f49..ccd55619e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.sort;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.RecordBatch;
@@ -28,14 +28,10 @@ import org.apache.drill.exec.record.RecordBatch;
import com.google.common.base.Preconditions;
public class SortBatchCreator implements BatchCreator<Sort>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortBatchCreator.class);
-
@Override
- public SortBatch getBatch(FragmentContext context, Sort config, List<RecordBatch> children)
+ public SortBatch getBatch(ExecutorFragmentContext context, Sort config, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new SortBatch(config, context, children.iterator().next());
}
-
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
index 6b3de25fc..6c66c01b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java
@@ -27,7 +27,6 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.AllocationReservation;
import org.apache.drill.exec.memory.BaseAllocator;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.MaterializedField;
@@ -136,10 +135,6 @@ public class SortRecordBatchBuilder implements AutoCloseable {
return batches.isEmpty();
}
- public void build(FragmentContext context, VectorContainer outputContainer) throws SchemaChangeException {
- build(outputContainer);
- }
-
@SuppressWarnings("resource")
public void build(VectorContainer outputContainer) throws SchemaChangeException {
outputContainer.clear();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 4304c2cb0..66fe2610e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -24,7 +24,6 @@ import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.CodeGenerator;
-import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.SelectionVectorRemover;
import org.apache.drill.exec.record.AbstractSingleRecordBatch;
@@ -235,10 +234,11 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
private Copier getGenerated4Copier() throws SchemaChangeException {
Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE);
- return getGenerated4Copier(incoming, context, oContext.getAllocator(), container, this, callBack);
+ return getGenerated4Copier(incoming, context, container, this, callBack);
}
- public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, BufferAllocator allocator, VectorContainer container, RecordBatch outgoing, SchemaChangeCallBack callBack) throws SchemaChangeException{
+ public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, VectorContainer container, RecordBatch outgoing,
+ SchemaChangeCallBack callBack) throws SchemaChangeException{
for(VectorWrapper<?> vv : batch){
@SuppressWarnings("resource")
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
index 9ab39a399..4bf5b5cbc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java
@@ -20,22 +20,18 @@ package org.apache.drill.exec.physical.impl.svremover;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.SelectionVectorRemover;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.RecordBatch;
import com.google.common.base.Preconditions;
-public class SVRemoverCreator implements BatchCreator<SelectionVectorRemover>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SVRemoverCreator.class);
-
+public class SVRemoverCreator implements BatchCreator<SelectionVectorRemover> {
@Override
- public RemovingRecordBatch getBatch(FragmentContext context, SelectionVectorRemover config, List<RecordBatch> children)
+ public RemovingRecordBatch getBatch(ExecutorFragmentContext context, SelectionVectorRemover config, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new RemovingRecordBatch(config, context, children.iterator().next());
}
-
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
index 40ef2bb12..dd2f6db9f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java
@@ -21,19 +21,15 @@ package org.apache.drill.exec.physical.impl.trace;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.Trace;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.RecordBatch;
public class TraceBatchCreator implements BatchCreator<Trace> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceBatchCreator.class);
-
@Override
- public TraceRecordBatch getBatch(FragmentContext context, Trace config, List<RecordBatch> children)
+ public TraceRecordBatch getBatch(ExecutorFragmentContext context, Trace config, List<RecordBatch> children)
throws ExecutionSetupException {
- // Preconditions.checkArgument(children.size() == 1);
return new TraceRecordBatch(config, children.iterator().next(), context);
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java
index 1ef3142b1..bdc1a3d26 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java
@@ -20,22 +20,18 @@ package org.apache.drill.exec.physical.impl.union;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.UnionAll;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.RecordBatch;
import com.google.common.base.Preconditions;
-public class UnionAllBatchCreator implements BatchCreator<UnionAll>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionAllBatchCreator.class);
-
+public class UnionAllBatchCreator implements BatchCreator<UnionAll> {
@Override
- public UnionAllRecordBatch getBatch(FragmentContext context, UnionAll config, List<RecordBatch> children)
+ public UnionAllRecordBatch getBatch(ExecutorFragmentContext context, UnionAll config, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.size() >= 1);
return new UnionAllRecordBatch(config, children, context);
}
-
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
index 1d1ecb00c..b4d0e7726 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java
@@ -129,7 +129,7 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
}
}
} catch (ClassTransformationException | IOException | SchemaChangeException ex) {
- context.fail(ex);
+ context.getExecutorState().fail(ex);
killIncoming(false);
return IterOutcome.STOP;
}
@@ -168,8 +168,6 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> {
final ClassGenerator<UnionAller> cg = CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION, context.getOptions());
cg.getCodeGenerator().plainJavaCapable(true);
- // Uncomment out this line to debug the generated code.
- // cg.getCodeGenerator().saveCodeForDebugging(true);
int index = 0;
for(VectorWrapper<?> vw : inputBatch) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index cfdc06d94..9da8a4b6f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.ExchangeFragmentContext;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.ops.OperatorContext;
@@ -57,7 +58,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
private final RecordBatchLoader batchLoader;
private final RawFragmentBatchProvider fragProvider;
- private final FragmentContext context;
+ private final ExchangeFragmentContext context;
private BatchSchema schema;
private final OperatorStats stats;
private boolean first = true;
@@ -74,7 +75,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
}
}
- public UnorderedReceiverBatch(final FragmentContext context, final RawFragmentBatchProvider fragProvider, final UnorderedReceiver config) throws OutOfMemoryException {
+ public UnorderedReceiverBatch(final ExchangeFragmentContext context, final RawFragmentBatchProvider fragProvider, final UnorderedReceiver config) throws OutOfMemoryException {
this.fragProvider = fragProvider;
this.context = context;
// In normal case, batchLoader does not require an allocator. However, in case of splitAndTransfer of a value vector,
@@ -171,13 +172,13 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
if (batch == null) {
batchLoader.clear();
- if (!context.shouldContinue()) {
+ if (!context.getExecutorState().shouldContinue()) {
return IterOutcome.STOP;
}
return IterOutcome.NONE;
}
- if (context.isOverMemoryLimit()) {
+ if (context.getAllocator().isOverLimit()) {
return IterOutcome.OUT_OF_MEMORY;
}
@@ -197,7 +198,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
return IterOutcome.OK;
}
} catch(SchemaChangeException | IOException ex) {
- context.fail(ex);
+ context.getExecutorState().fail(ex);
return IterOutcome.STOP;
} finally {
stats.stopProcessing();
@@ -233,7 +234,9 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
.setReceiver(context.getHandle())
.setSender(sender)
.build();
- context.getControlTunnel(providingEndpoint.getEndpoint()).informReceiverFinished(new OutcomeListener(), finishedReceiver);
+ context.getController()
+ .getTunnel(providingEndpoint.getEndpoint())
+ .informReceiverFinished(new OutcomeListener(), finishedReceiver);
}
}
@@ -252,12 +255,11 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch {
@Override
public void interrupted(final InterruptedException e) {
- if (context.shouldContinue()) {
+ if (context.getExecutorState().shouldContinue()) {
final String errMsg = "Received an interrupt RPC outcome while sending ReceiverFinished message";
logger.error(errMsg, e);
- context.fail(new RpcException(errMsg, e));
+ context.getExecutorState().fail(new RpcException(errMsg, e));
}
}
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
index 6d4f1d7d6..01a458890 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java
@@ -20,18 +20,18 @@ package org.apache.drill.exec.physical.impl.unorderedreceiver;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.UnorderedReceiver;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.work.batch.IncomingBuffers;
import org.apache.drill.exec.work.batch.RawBatchBuffer;
-public class UnorderedReceiverCreator implements BatchCreator<UnorderedReceiver>{
+public class UnorderedReceiverCreator implements BatchCreator<UnorderedReceiver> {
@SuppressWarnings("resource")
@Override
- public UnorderedReceiverBatch getBatch(FragmentContext context, UnorderedReceiver receiver, List<RecordBatch> children)
+ public UnorderedReceiverBatch getBatch(ExecutorFragmentContext context, UnorderedReceiver receiver, List<RecordBatch> children)
throws ExecutionSetupException {
assert children == null || children.isEmpty();
IncomingBuffers bufHolder = context.getBuffers();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
index 4199191ed..e27f88127 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java
@@ -21,7 +21,7 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.IteratorValidator;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.RecordBatch;
@@ -32,8 +32,8 @@ public class IteratorValidatorCreator implements BatchCreator<IteratorValidator>
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidatorCreator.class);
@Override
- public IteratorValidatorBatchIterator getBatch(FragmentContext context, IteratorValidator config,
- List<RecordBatch> children)
+ public IteratorValidatorBatchIterator getBatch(ExecutorFragmentContext context, IteratorValidator config,
+ List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
RecordBatch child = children.iterator().next();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
index a8eddbc62..c2bcab00d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java
@@ -22,7 +22,7 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.Values;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.ScanBatch;
@@ -30,11 +30,9 @@ import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.easy.json.JSONRecordReader;
-import com.google.common.collect.Iterators;
-
public class ValuesBatchCreator implements BatchCreator<Values> {
@Override
- public ScanBatch getBatch(FragmentContext context, Values config, List<RecordBatch> children)
+ public ScanBatch getBatch(ExecutorFragmentContext context, Values config, List<RecordBatch> children)
throws ExecutionSetupException {
assert children.isEmpty();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java
index 59bc1159c..6ca9652c3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java
@@ -21,7 +21,7 @@ package org.apache.drill.exec.physical.impl.window;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.WindowPOP;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.RecordBatch;
@@ -29,9 +29,8 @@ import org.apache.drill.exec.record.RecordBatch;
import com.google.common.base.Preconditions;
public class WindowFrameBatchCreator implements BatchCreator<WindowPOP> {
-
@Override
- public WindowFrameRecordBatch getBatch(FragmentContext context, WindowPOP config, List<RecordBatch> children)
+ public WindowFrameRecordBatch getBatch(ExecutorFragmentContext context, WindowPOP config, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new WindowFrameRecordBatch(config, context, children.iterator().next());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
index f4a9825cd..aa067cf18 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java
@@ -145,7 +145,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> {
try {
doWork();
} catch (DrillException e) {
- context.fail(e);
+ context.getExecutorState().fail(e);
cleanup();
return IterOutcome.STOP;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index c212593d9..9cde1a552 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -201,7 +201,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
// where it would have been passed to context.fail()
// passing the exception directly to context.fail(e) will let the cleanup process continue instead of stopping
// right away, this will also make sure we collect any additional exception we may get while cleaning up
- context.fail(e);
+ context.getExecutorState().fail(e);
}
}
}
@@ -483,7 +483,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
mSorter.sort(this.container);
// sort may have prematurely exited due to should continue returning false.
- if (!context.shouldContinue()) {
+ if (!context.getExecutorState().shouldContinue()) {
return IterOutcome.STOP;
}
@@ -522,12 +522,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
} catch (SchemaChangeException ex) {
kill(false);
- context.fail(UserException.unsupportedError(ex)
+ context.getExecutorState().fail(UserException.unsupportedError(ex)
.message("Sort doesn't currently support sorts with changing schemas").build(logger));
return IterOutcome.STOP;
} catch(ClassTransformationException | IOException ex) {
kill(false);
- context.fail(ex);
+ context.getExecutorState().fail(ex);
return IterOutcome.STOP;
} catch (UnsupportedOperationException e) {
throw new RuntimeException(e);
@@ -650,7 +650,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
try {
Thread.sleep(waitTime * 1000);
} catch(final InterruptedException e) {
- if (!context.shouldContinue()) {
+ if (!context.getExecutorState().shouldContinue()) {
throw e;
}
}
@@ -688,11 +688,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
private MSorter createNewMSorter() throws ClassTransformationException, IOException, SchemaChangeException {
- return createNewMSorter(this.context, this.popConfig.getOrderings(), this, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
+ return createNewMSorter(context, this.popConfig.getOrderings(), this, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING);
}
- private MSorter createNewMSorter(FragmentContext context, List<Ordering> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping)
- throws ClassTransformationException, IOException, SchemaChangeException{
+ private MSorter createNewMSorter(FragmentContext context, List<Ordering> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet
+ rightMapping)
+ throws ClassTransformationException, IOException, SchemaChangeException {
CodeGenerator<MSorter> cg = CodeGenerator.get(MSorter.TEMPLATE_DEFINITION, context.getOptions());
ClassGenerator<MSorter> g = cg.getRoot();
g.setMappingSet(mainMapping);
@@ -735,7 +736,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
public SingleBatchSorter createNewSorter(FragmentContext context, VectorAccessible batch)
- throws ClassTransformationException, IOException, SchemaChangeException{
+ throws ClassTransformationException, IOException, SchemaChangeException {
CodeGenerator<SingleBatchSorter> cg = CodeGenerator.get(SingleBatchSorter.TEMPLATE_DEFINITION, context.getOptions());
cg.plainJavaCapable(true); // This class can generate plain-old Java.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java
index e579fc2a3..6a601963b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java
@@ -22,19 +22,19 @@ import java.util.List;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.physical.config.ExternalSort;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.RecordBatch;
-import org.apache.drill.exec.server.options.OptionManager;
import com.google.common.base.Preconditions;
+import org.apache.drill.exec.server.options.OptionManager;
public class ExternalSortBatchCreator implements BatchCreator<ExternalSort>{
@Override
- public AbstractRecordBatch<ExternalSort> getBatch(FragmentContext context, ExternalSort config, List<RecordBatch> children)
+ public AbstractRecordBatch<ExternalSort> getBatch(ExecutorFragmentContext context, ExternalSort config, List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index 2f3d2f624..9b691705b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -130,7 +130,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
while (runStarts.size() > 1) {
// check if we're cancelled/failed frequently
- if (!context.shouldContinue()) {
+ if (!context.getExecutorState().shouldContinue()) {
return;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java
index 733ea5e1f..4dbee3e45 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java
@@ -19,12 +19,12 @@ package org.apache.drill.exec.physical.impl.xsort;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContextInterface;
+import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.selection.SelectionVector2;
public interface SingleBatchSorter {
- public void setup(FragmentContextInterface context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException;
+ public void setup(FragmentContext context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException;
public void sort(SelectionVector2 vector2) throws SchemaChangeException;
public static TemplateClassDefinition<SingleBatchSorter> TEMPLATE_DEFINITION =
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
index 0f4680d72..4a1af4e56 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit;
import javax.inject.Named;
import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.ops.FragmentContextInterface;
+import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.selection.SelectionVector2;
@@ -38,7 +38,7 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In
private SelectionVector2 vector2;
@Override
- public void setup(FragmentContextInterface context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException{
+ public void setup(FragmentContext context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException{
Preconditions.checkNotNull(vector2);
this.vector2 = vector2;
try {
@@ -76,7 +76,7 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In
}
}
- public abstract void doSetup(@Named("context") FragmentContextInterface context,
+ public abstract void doSetup(@Named("context") FragmentContext context,
@Named("incoming") VectorAccessible incoming,
@Named("outgoing") RecordBatch outgoing)
throws SchemaChangeException;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
index 9150fe316..23e66a030 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java
@@ -135,8 +135,7 @@ import org.apache.drill.exec.vector.complex.AbstractContainerVector;
* into new batches of a size determined by that operator.</li>
* <li>A series of batches, without a selection vector, if the sort spills to
* disk. In this case, the downstream operator will still be a selection vector
- * remover, but there is nothing for that operator to remove. Each batch is
- * of the size set by {@link #MAX_MERGED_BATCH_SIZE}.</li>
+ * remover, but there is nothing for that operator to remove.
* </ul>
* Note that, even in the in-memory sort case, this operator could do the copying
* to eliminate the extra selection vector remover. That is left as an exercise
@@ -375,7 +374,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
// sort may have prematurely exited due to shouldContinue() returning false.
- if (! context.shouldContinue()) {
+ if (!context.getExecutorState().shouldContinue()) {
sortState = SortState.DONE;
return IterOutcome.STOP;
}
@@ -440,8 +439,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
/**
* Handle a new schema from upstream. The ESB is quite limited in its ability
* to handle schema changes.
- *
- * @param upstream the status code from upstream: either OK or OK_NEW_SCHEMA
*/
private void setupSchema() {
@@ -482,6 +479,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
* <p>
* Some Drill code ends up calling close() two or more times. The code
* here protects itself from these undesirable semantics.
+ * </p>
*/
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
index 698e32fd2..625d3603d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java
@@ -24,7 +24,7 @@ import javax.inject.Named;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BaseAllocator;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContextInterface;
+import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -49,7 +49,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
*/
private Queue<Integer> runStarts = Queues.newLinkedBlockingQueue();
- private FragmentContextInterface context;
+ private FragmentContext context;
/**
* Controls the maximum size of batches exposed to downstream
@@ -57,7 +57,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
private int desiredRecordBatchCount;
@Override
- public void setup(final FragmentContextInterface context, final BufferAllocator allocator, final SelectionVector4 vector4,
+ public void setup(final FragmentContext context, final BufferAllocator allocator, final SelectionVector4 vector4,
final VectorContainer hyperBatch, int outputBatchSize, int desiredBatchSize) throws SchemaChangeException{
// we pass in the local hyperBatch since that is where we'll be reading data.
Preconditions.checkNotNull(vector4);
@@ -162,7 +162,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
final int totalCount = this.vector4.getTotalCount();
// check if we're cancelled/failed recently
- if (!context.shouldContinue()) {
+ if (!context.getExecutorState().shouldContinue()) {
return; }
int outIndex = 0;
@@ -233,7 +233,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable {
}
}
- public abstract void doSetup(@Named("context") FragmentContextInterface context,
+ public abstract void doSetup(@Named("context") FragmentContext context,
@Named("incoming") VectorContainer incoming,
@Named("outgoing") RecordBatch outgoing)
throws SchemaChangeException;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
index 428f6f87a..8eae8b71d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.xsort.managed;
import org.apache.drill.exec.compile.TemplateClassDefinition;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.ops.FragmentContextInterface;
+import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.selection.SelectionVector4;
@@ -30,7 +30,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4;
*/
public interface MSorter {
- public void setup(FragmentContextInterface context, BufferAllocator allocator, SelectionVector4 vector4,
+ public void setup(FragmentContext context, BufferAllocator allocator, SelectionVector4 vector4,
VectorContainer hyperBatch, int outputBatchSize, int desiredBatchSize) throws SchemaChangeException;
public void sort();
public SelectionVector4 getSV4();