diff options
author | Timothy Farkas <timothyfarkas@apache.org> | 2018-01-11 14:59:41 -0800 |
---|---|---|
committer | Arina Ielchiieva <arina.yelchiyeva@gmail.com> | 2018-01-26 13:42:27 +0200 |
commit | 186536d544d02ffc01339a4645e2a533545a2f86 (patch) | |
tree | 24ddcb013b243ee501d5dfa1d73eb9c0a1511510 /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl | |
parent | 9926eda21c748e96d67bce341a76dac3114002af (diff) |
DRILL-5730: Mock testing improvements and interface improvements
closes #1045
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl')
77 files changed, 292 insertions, 392 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java index d01e29454..82887ec3b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java @@ -21,11 +21,11 @@ import java.util.List; import org.apache.drill.common.DeferredException; import org.apache.drill.exec.exception.OutOfMemoryException; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OpProfileDef; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.ops.OperatorUtilities; +import org.apache.drill.exec.ops.RootFragmentContext; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.CloseableRecordBatch; @@ -39,10 +39,10 @@ public abstract class BaseRootExec implements RootExec { protected OperatorStats stats = null; protected OperatorContext oContext = null; - protected FragmentContext fragmentContext = null; + protected RootFragmentContext fragmentContext = null; private List<CloseableRecordBatch> operators; - public BaseRootExec(final FragmentContext fragmentContext, final PhysicalOperator config) throws OutOfMemoryException { + public BaseRootExec(final RootFragmentContext fragmentContext, final PhysicalOperator config) throws OutOfMemoryException { this.oContext = fragmentContext.newOperatorContext(config, stats); stats = new OperatorStats(new OpProfileDef(config.getOperatorId(), config.getOperatorType(), OperatorUtilities.getChildCount(config)), @@ -51,8 +51,8 @@ public abstract class BaseRootExec implements RootExec { this.fragmentContext = fragmentContext; } - public BaseRootExec(final FragmentContext fragmentContext, final OperatorContext oContext, - final PhysicalOperator config) throws OutOfMemoryException { + public BaseRootExec(final RootFragmentContext fragmentContext, final OperatorContext oContext, + final PhysicalOperator config) throws OutOfMemoryException { this.oContext = oContext; stats = new OperatorStats(new OpProfileDef(config.getOperatorId(), config.getOperatorType(), OperatorUtilities.getChildCount(config)), @@ -87,7 +87,7 @@ public abstract class BaseRootExec implements RootExec { public final boolean next() { // Stats should have been initialized assert stats != null; - if (!fragmentContext.shouldContinue()) { + if (!fragmentContext.getExecutorState().shouldContinue()) { return false; } try { @@ -156,7 +156,7 @@ public abstract class BaseRootExec implements RootExec { try { df.close(); } catch (Exception e) { - fragmentContext.fail(e); + fragmentContext.getExecutorState().fail(e); } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java index af99b5ea1..e9113ce67 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BatchCreator.java @@ -20,14 +20,12 @@ package org.apache.drill.exec.physical.impl; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.record.CloseableRecordBatch; import org.apache.drill.exec.record.RecordBatch; public interface BatchCreator<T extends PhysicalOperator> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchCreator.class); - - public CloseableRecordBatch getBatch(FragmentContext context, T config, List<RecordBatch> children) + CloseableRecordBatch getBatch(ExecutorFragmentContext context, T config, List<RecordBatch> children) throws ExecutionSetupException; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java index b418fd480..2e0d14e11 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ImplCreator.java @@ -27,7 +27,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.impl.validate.IteratorValidatorInjector; @@ -66,7 +66,7 @@ public class ImplCreator { * @return RootExec of fragment. * @throws ExecutionSetupException */ - public static RootExec getExec(FragmentContext context, FragmentRoot root) throws ExecutionSetupException { + public static RootExec getExec(ExecutorFragmentContext context, FragmentRoot root) throws ExecutionSetupException { Preconditions.checkNotNull(root); Preconditions.checkNotNull(context); @@ -99,14 +99,14 @@ public class ImplCreator { return rootExec; } catch(Exception e) { AutoCloseables.close(e, creator.getOperators()); - context.fail(e); + context.getExecutorState().fail(e); } return null; } /** Create RootExec and its children (RecordBatches) for given FragmentRoot */ - @SuppressWarnings("unchecked") - private RootExec getRootExec(final FragmentRoot root, final FragmentContext context) throws ExecutionSetupException { + + private RootExec getRootExec(final FragmentRoot root, final ExecutorFragmentContext context) throws ExecutionSetupException { final List<RecordBatch> childRecordBatches = getChildren(root, context); if (context.isImpersonationEnabled()) { @@ -131,7 +131,7 @@ public class ImplCreator { /** Create a RecordBatch and its children for given PhysicalOperator */ @VisibleForTesting - public RecordBatch getRecordBatch(final PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException { + public RecordBatch getRecordBatch(final PhysicalOperator op, final ExecutorFragmentContext context) throws ExecutionSetupException { Preconditions.checkNotNull(op); final List<RecordBatch> childRecordBatches = getChildren(op, context); @@ -164,9 +164,9 @@ public class ImplCreator { } /** Helper method to get OperatorCreator (RootCreator or BatchCreator) for given PhysicalOperator (root or non-root) */ - private Object getOpCreator(PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException { + private Object getOpCreator(PhysicalOperator op, final ExecutorFragmentContext context) throws ExecutionSetupException { final Class<? extends PhysicalOperator> opClass = op.getClass(); - Object opCreator = context.getDrillbitContext().getOperatorCreatorRegistry().getOperatorCreator(opClass); + Object opCreator = context.getOperatorCreatorRegistry().getOperatorCreator(opClass); if (opCreator == null) { throw new UnsupportedOperationException( String.format("BatchCreator for PhysicalOperator type '%s' not found.", opClass.getCanonicalName())); @@ -176,7 +176,7 @@ public class ImplCreator { } /** Helper method to traverse the children of given PhysicalOperator and create RecordBatches for children recursively */ - private List<RecordBatch> getChildren(final PhysicalOperator op, final FragmentContext context) throws ExecutionSetupException { + private List<RecordBatch> getChildren(final PhysicalOperator op, final ExecutorFragmentContext context) throws ExecutionSetupException { List<RecordBatch> children = Lists.newArrayList(); for (PhysicalOperator child : op) { children.add(getRecordBatch(child, context)); @@ -184,5 +184,4 @@ public class ImplCreator { return children; } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java index 690a6624e..0ef84b960 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/MergingReceiverCreator.java @@ -21,7 +21,7 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.exception.OutOfMemoryException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.MergingReceiverPOP; import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch; import org.apache.drill.exec.record.RecordBatch; @@ -33,7 +33,7 @@ public class MergingReceiverCreator implements BatchCreator<MergingReceiverPOP> @SuppressWarnings("resource") @Override - public MergingRecordBatch getBatch(FragmentContext context, + public MergingRecordBatch getBatch(ExecutorFragmentContext context, MergingReceiverPOP receiver, List<RecordBatch> children) throws ExecutionSetupException, OutOfMemoryException { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java index f3d95243c..17e402765 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootCreator.java @@ -20,12 +20,10 @@ package org.apache.drill.exec.physical.impl; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.record.RecordBatch; public interface RootCreator<T extends PhysicalOperator> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootCreator.class); - - public RootExec getRoot(FragmentContext context, T config, List<RecordBatch> children) throws ExecutionSetupException; + RootExec getRoot(ExecutorFragmentContext context, T config, List<RecordBatch> children) throws ExecutionSetupException; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index e0d1545b0..f6a486397 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -82,14 +82,13 @@ public class ScanBatch implements CloseableRecordBatch { private String currentReaderClassName; /** * - * @param subScanConfig * @param context * @param oContext * @param readerList * @param implicitColumnList : either an emptylist when all the readers do not have implicit * columns, or there is a one-to-one mapping between reader and implicitColumns. */ - public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, + public ScanBatch(FragmentContext context, OperatorContext oContext, List<RecordReader> readerList, List<Map<String, String>> implicitColumnList) { this.context = context; @@ -126,8 +125,7 @@ public class ScanBatch implements CloseableRecordBatch { public ScanBatch(PhysicalOperator subScanConfig, FragmentContext context, List<RecordReader> readers) throws ExecutionSetupException { - this(subScanConfig, context, - context.newOperatorContext(subScanConfig), + this(context, context.newOperatorContext(subScanConfig), readers, Collections.<Map<String, String>> emptyList()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java index d9abf406c..46dc45038 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java @@ -22,8 +22,9 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.ops.AccountingUserConnection; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.ops.MetricDef; +import org.apache.drill.exec.ops.RootFragmentContext; import org.apache.drill.exec.physical.config.Screen; import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch; import org.apache.drill.exec.physical.impl.materialize.RecordMaterializer; @@ -38,11 +39,10 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory; import com.google.common.base.Preconditions; public class ScreenCreator implements RootCreator<Screen> { - //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenCreator.class); private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(ScreenCreator.class); @Override - public RootExec getRoot(FragmentContext context, Screen config, List<RecordBatch> children) + public RootExec getRoot(ExecutorFragmentContext context, Screen config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkNotNull(children); Preconditions.checkArgument(children.size() == 1); @@ -52,7 +52,7 @@ public class ScreenCreator implements RootCreator<Screen> { public static class ScreenRoot extends BaseRootExec { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class); private final RecordBatch incoming; - private final FragmentContext context; + private final RootFragmentContext context; private final AccountingUserConnection userConnection; private RecordMaterializer materializer; @@ -67,13 +67,17 @@ public class ScreenCreator implements RootCreator<Screen> { } } - public ScreenRoot(FragmentContext context, RecordBatch incoming, Screen config) throws OutOfMemoryException { + public ScreenRoot(RootFragmentContext context, RecordBatch incoming, Screen config) throws OutOfMemoryException { super(context, config); this.context = context; this.incoming = incoming; userConnection = context.getUserDataTunnel(); } + public RootFragmentContext getContext() { + return context; + } + @Override public boolean innerNext() { IterOutcome outcome = next(incoming); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java index 2f33193a3..9231aef0d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java @@ -22,8 +22,9 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.ops.AccountingDataTunnel; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.ops.MetricDef; +import org.apache.drill.exec.ops.RootFragmentContext; import org.apache.drill.exec.physical.config.SingleSender; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.BatchSchema; @@ -36,7 +37,7 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory; public class SingleSenderCreator implements RootCreator<SingleSender>{ @Override - public RootExec getRoot(FragmentContext context, SingleSender config, List<RecordBatch> children) + public RootExec getRoot(ExecutorFragmentContext context, SingleSender config, List<RecordBatch> children) throws ExecutionSetupException { assert children != null && children.size() == 1; return new SingleSenderRootExec(context, children.iterator().next(), config); @@ -64,7 +65,7 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ } } - public SingleSenderRootExec(FragmentContext context, RecordBatch batch, SingleSender config) throws OutOfMemoryException { + public SingleSenderRootExec(RootFragmentContext context, RecordBatch batch, SingleSender config) throws OutOfMemoryException { super(context, context.newOperatorContext(config, null), config); this.incoming = batch; assert incoming != null; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java index 7f9aca4a7..97e26b662 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/PriorityQueueTemplate.java @@ -26,7 +26,6 @@ import javax.inject.Named; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.sort.RecordBatchData; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.ExpandableHyperContainer; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java index 442a753be..16832867b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java @@ -281,7 +281,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { } catch(SchemaChangeException | ClassTransformationException | IOException ex) { kill(false); logger.error("Failure during query", ex); - context.fail(ex); + context.getExecutorState().fail(ex); return IterOutcome.STOP; } } @@ -295,7 +295,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context); SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, null, context); if (copier == null) { - copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(), newContainer, newBatch, null); + copier = RemovingRecordBatch.getGenerated4Copier(batch, context, newContainer, newBatch, null); } else { for (VectorWrapper<?> i : batch) { @@ -323,7 +323,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { selectionVector4.clear(); c.clear(); VectorContainer newQueue = new VectorContainer(); - builder.build(context, newQueue); + builder.build(newQueue); priorityQueue.resetQueue(newQueue, builder.getSv4().createNewWrapperCurrent()); builder.getSv4().clear(); selectionVector4.clear(); @@ -336,7 +336,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { private PriorityQueue createNewPriorityQueue(VectorAccessible batch, int limit) throws SchemaChangeException, ClassTransformationException, IOException { return createNewPriorityQueue( - mainMapping, leftMapping, rightMapping, context.getOptions(), context.getFunctionRegistry(), context.getDrillbitContext().getCompiler(), + mainMapping, leftMapping, rightMapping, context.getOptions(), context.getFunctionRegistry(), context.getCompiler(), config.getOrderings(), batch, unionTypeEnabled, codegenDump, limit, oContext.getAllocator(), schema.getSelectionVectorMode()); } @@ -415,7 +415,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { final SelectionVector4 selectionVector4 = priorityQueue.getSv4(); final SimpleSV4RecordBatch batch = new SimpleSV4RecordBatch(c, selectionVector4, context); final SimpleSV4RecordBatch newBatch = new SimpleSV4RecordBatch(newContainer, null, context); - copier = RemovingRecordBatch.getGenerated4Copier(batch, context, oContext.getAllocator(), newContainer, newBatch, null); + copier = RemovingRecordBatch.getGenerated4Copier(batch, context, newContainer, newBatch, null); @SuppressWarnings("resource") SortRecordBatchBuilder builder = new SortRecordBatchBuilder(oContext.getAllocator()); try { @@ -434,7 +434,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> { selectionVector4.clear(); c.clear(); final VectorContainer oldSchemaContainer = new VectorContainer(oContext); - builder.build(context, oldSchemaContainer); + builder.build(oldSchemaContainer); oldSchemaContainer.setRecordCount(builder.getSv4().getCount()); final VectorContainer newSchemaContainer = SchemaUtil.coerceContainer(oldSchemaContainer, this.schema, oContext); newSchemaContainer.buildSchema(SelectionVectorMode.FOUR_BYTE); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java index e815bff31..d4777449a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNSortBatchCreator.java @@ -20,22 +20,18 @@ package org.apache.drill.exec.physical.impl.TopN; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.TopN; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; import com.google.common.base.Preconditions; -public class TopNSortBatchCreator implements BatchCreator<TopN>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TopNSortBatchCreator.class); - +public class TopNSortBatchCreator implements BatchCreator<TopN> { @Override - public TopNBatch getBatch(FragmentContext context, TopN config, List<RecordBatch> children) + public TopNBatch getBatch(ExecutorFragmentContext context, TopN config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new TopNBatch(config, context, children.iterator().next()); } - - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java index 0ea17d699..e98a7c6ad 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/WriterRecordBatch.java @@ -118,7 +118,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { } catch(IOException ex) { logger.error("Failure during query", ex); kill(false); - context.fail(ex); + context.getExecutorState().fail(ex); return IterOutcome.STOP; } @@ -185,7 +185,7 @@ public class WriterRecordBatch extends AbstractRecordBatch<Writer> { try { recordWriter.cleanup(); } catch(IOException ex) { - context.fail(ex); + context.getExecutorState().fail(ex); } finally { try { if (!processed) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index b3d68d3fb..47f1017b3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -186,7 +186,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { return aggregator.getOutcome(); case UPDATE_AGGREGATOR: - context.fail(UserException.unsupportedError() + context.getExecutorState().fail(UserException.unsupportedError() .message(SchemaChangeException.schemaChanged( "Hash aggregate does not support schema change", incomingSchema, @@ -212,7 +212,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { this.aggregator = createAggregatorInternal(); return true; } catch (SchemaChangeException | ClassTransformationException | IOException ex) { - context.fail(ex); + context.getExecutorState().fail(ex); container.clear(); incoming.kill(false); return false; @@ -227,9 +227,6 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { ClassGenerator<HashAggregator> cg = top.getRoot(); ClassGenerator<HashAggregator> cgInner = cg.getInnerGenerator("BatchHolder"); top.plainJavaCapable(true); - // Uncomment out this line to debug the generated code. - // top.saveCodeForDebugging(true); - container.clear(); int numGroupByExprs = (popConfig.getGroupByExprs() != null) ? popConfig.getGroupByExprs().size() : 0; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java index 1397342f6..ed203e56a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatchCreator.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.aggregate; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.HashAggregate; import org.apache.drill.exec.physical.impl.BatchCreator; @@ -27,15 +28,11 @@ import org.apache.drill.exec.record.RecordBatch; import com.google.common.base.Preconditions; -public class HashAggBatchCreator implements BatchCreator<HashAggregate>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashAggBatchCreator.class); - +public class HashAggBatchCreator implements BatchCreator<HashAggregate> { @Override - public HashAggBatch getBatch(FragmentContext context, HashAggregate config, List<RecordBatch> children) + public HashAggBatch getBatch(ExecutorFragmentContext context, HashAggregate config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new HashAggBatch(config, children.iterator().next(), context); } - - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index 89ba59b4b..ef8d9d90b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -300,7 +300,9 @@ public abstract class HashAggTemplate implements HashAggregator { } @Override - public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] groupByOutFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException { + public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext, + RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, + TypedFieldId[] groupByOutFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException { if (valueExprs == null || valueFieldIds == null) { throw new IllegalArgumentException("Invalid aggr value exprs or workspace variables."); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java index 16b54997b..3384e671d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java @@ -35,34 +35,35 @@ import org.apache.drill.exec.record.VectorContainer; public interface HashAggregator { - public static TemplateClassDefinition<HashAggregator> TEMPLATE_DEFINITION = + TemplateClassDefinition<HashAggregator> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashAggregator>(HashAggregator.class, HashAggTemplate.class); - public static enum AggOutcome { + enum AggOutcome { RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR, CALL_WORK_AGAIN } // For returning results from outputCurrentBatch // OK - batch returned, NONE - end of data, RESTART - call again - public enum AggIterOutcome { AGG_OK, AGG_NONE, AGG_RESTART } + enum AggIterOutcome { AGG_OK, AGG_NONE, AGG_RESTART } - public abstract void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException, ClassTransformationException; + void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, FragmentContext context, OperatorContext oContext, RecordBatch incoming, HashAggBatch outgoing, + LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, TypedFieldId[] keyFieldIds, VectorContainer outContainer, int extraRowBytes) throws SchemaChangeException, IOException, ClassTransformationException; - public abstract IterOutcome getOutcome(); + IterOutcome getOutcome(); - public abstract int getOutputCount(); + int getOutputCount(); - public abstract AggOutcome doWork(); + AggOutcome doWork(); - public abstract void cleanup(); + void cleanup(); - public abstract boolean allFlushed(); + boolean allFlushed(); - public abstract boolean buildComplete(); + boolean buildComplete(); - public abstract AggIterOutcome outputCurrentBatch(); + AggIterOutcome outputCurrentBatch(); - public abstract boolean earlyOutput(); + boolean earlyOutput(); - public abstract RecordBatch getNewIncoming(); + RecordBatch getNewIncoming(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java index ac4b29d16..c473b94e7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/SpilledRecordbatch.java @@ -45,19 +45,15 @@ public class SpilledRecordbatch implements CloseableRecordBatch { private int spilledBatches; private FragmentContext context; private BatchSchema schema; - private OperatorContext oContext; private SpillSet spillSet; - // Path spillStreamPath; private String spillFile; VectorAccessibleSerializable vas; - public SpilledRecordbatch(String spillFile,/* Path spillStreamPath,*/ int spilledBatches, FragmentContext context, BatchSchema schema, OperatorContext oContext, SpillSet spillSet) { + public SpilledRecordbatch(String spillFile, int spilledBatches, FragmentContext context, BatchSchema schema, OperatorContext oContext, SpillSet spillSet) { this.context = context; this.schema = schema; this.spilledBatches = spilledBatches; - this.oContext = oContext; this.spillSet = spillSet; - //this.spillStreamPath = spillStreamPath; this.spillFile = spillFile; vas = new VectorAccessibleSerializable(oContext.getAllocator()); container = vas.get(); @@ -126,7 +122,7 @@ public class SpilledRecordbatch implements CloseableRecordBatch { @Override public IterOutcome next() { - if ( ! context.shouldContinue() ) { return IterOutcome.STOP; } + if ( ! context.getExecutorState().shouldContinue() ) { return IterOutcome.STOP; } if ( spilledBatches <= 0 ) { // no more batches to read in this partition this.close(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index b33dbd605..34ab97e2c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -202,7 +202,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { } return outcome; case UPDATE_AGGREGATOR: - context.fail(UserException.unsupportedError() + context.getExecutorState().fail(UserException.unsupportedError() .message(SchemaChangeException.schemaChanged("Streaming aggregate does not support schema changes", incomingSchema, incoming.getSchema()).getMessage()) .build(logger)); close(); @@ -263,7 +263,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { this.aggregator = createAggregatorInternal(); return true; } catch (SchemaChangeException | ClassTransformationException | IOException ex) { - context.fail(ex); + context.getExecutorState().fail(ex); container.clear(); incoming.kill(false); return false; @@ -275,8 +275,6 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { private StreamingAggregator createAggregatorInternal() throws SchemaChangeException, ClassTransformationException, IOException{ ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getOptions()); cg.getCodeGenerator().plainJavaCapable(true); - // Uncomment out this line to debug the generated code. - // cg.getCodeGenerator().saveCodeForDebugging(true); container.clear(); LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getKeys().size()]; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java index cac5b06d8..864271ee6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatchCreator.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.impl.aggregate; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.StreamingAggregate; import org.apache.drill.exec.physical.impl.BatchCreator; @@ -27,15 +28,11 @@ import org.apache.drill.exec.record.RecordBatch; import com.google.common.base.Preconditions; -public class StreamingAggBatchCreator implements BatchCreator<StreamingAggregate>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggBatchCreator.class); - +public class StreamingAggBatchCreator implements BatchCreator<StreamingAggregate> { @Override - public StreamingAggBatch getBatch(FragmentContext context, StreamingAggregate config, List<RecordBatch> children) + public StreamingAggBatch getBatch(ExecutorFragmentContext context, StreamingAggregate config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new StreamingAggBatch(config, children.iterator().next(), context); } - - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java index 61c82d8e2..7e13eb346 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java @@ -19,7 +19,6 @@ package org.apache.drill.exec.physical.impl.aggregate; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java index 01122bebd..6c42a1ac9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderCreator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.broadcastsender; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.BroadcastSender; import org.apache.drill.exec.physical.impl.RootCreator; import org.apache.drill.exec.physical.impl.RootExec; @@ -30,7 +30,7 @@ import com.google.common.collect.Iterators; public class BroadcastSenderCreator implements RootCreator<BroadcastSender> { @Override - public RootExec getRoot(FragmentContext context, BroadcastSender config, List<RecordBatch> children) throws ExecutionSetupException { + public RootExec getRoot(ExecutorFragmentContext context, BroadcastSender config, List<RecordBatch> children) throws ExecutionSetupException { assert children != null && children.size() == 1; return new BroadcastSenderRootExec(context, Iterators.getOnlyElement(children.iterator()), config); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java index 80d774483..bd4a1ec42 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/broadcastsender/BroadcastSenderRootExec.java @@ -21,8 +21,8 @@ import java.util.List; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.ops.AccountingDataTunnel; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; +import org.apache.drill.exec.ops.RootFragmentContext; import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.config.BroadcastSender; import org.apache.drill.exec.physical.impl.BaseRootExec; @@ -57,7 +57,7 @@ public class BroadcastSenderRootExec extends BaseRootExec { } } - public BroadcastSenderRootExec(FragmentContext context, + public BroadcastSenderRootExec(RootFragmentContext context, RecordBatch incoming, BroadcastSender config) throws OutOfMemoryException { super(context, context.newOperatorContext(config, null), config); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java index 9bab67d29..703868e94 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java @@ -225,12 +225,11 @@ public class ChainedHashTable { setupGetHash(cg /* use top level code generator for getHash */, GetHashIncomingProbeMapping, incomingProbe, keyExprsProbe, true); HashTable ht = context.getImplementationClass(top); - ht.setup(htConfig, context, allocator, incomingBuild, incomingProbe, outgoing, htContainerOrig); + ht.setup(htConfig, allocator, incomingBuild, incomingProbe, outgoing, htContainerOrig); return ht; } - private void setupIsKeyMatchInternal(ClassGenerator<HashTable> cg, MappingSet incomingMapping, MappingSet htableMapping, LogicalExpression[] keyExprs, List<Comparator> comparators, TypedFieldId[] htKeyFieldIds) throws SchemaChangeException { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java index 3749e3e57..d28fe498e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java @@ -20,64 +20,58 @@ package org.apache.drill.exec.physical.impl.common; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.common.exceptions.RetryAfterSpillException; public interface HashTable { - - public static TemplateClassDefinition<HashTable> TEMPLATE_DEFINITION = - new TemplateClassDefinition<HashTable>(HashTable.class, HashTableTemplate.class); - - /** - * The initial default capacity of the hash table (in terms of number of buckets). - */ - static final public int DEFAULT_INITIAL_CAPACITY = 1 << 16; + TemplateClassDefinition<HashTable> TEMPLATE_DEFINITION = + new TemplateClassDefinition<>(HashTable.class, HashTableTemplate.class); /** * The maximum capacity of the hash table (in terms of number of buckets). */ - static final public int MAXIMUM_CAPACITY = 1 << 30; + int MAXIMUM_CAPACITY = 1 << 30; /** * The default load factor of a hash table. */ - static final public float DEFAULT_LOAD_FACTOR = 0.75f; + float DEFAULT_LOAD_FACTOR = 0.75f; - static public enum PutStatus {KEY_PRESENT, KEY_ADDED, NEW_BATCH_ADDED, KEY_ADDED_LAST, PUT_FAILED;} + enum PutStatus {KEY_PRESENT, KEY_ADDED, NEW_BATCH_ADDED, KEY_ADDED_LAST, PUT_FAILED;} /** * The batch size used for internal batch holders */ - static final public int BATCH_SIZE = Character.MAX_VALUE + 1; - static final public int BATCH_MASK = 0x0000FFFF; + int BATCH_SIZE = Character.MAX_VALUE + 1; + int BATCH_MASK = 0x0000FFFF; - public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig); + void setup(HashTableConfig htConfig, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, + VectorContainer htContainerOrig); - public void updateBatches() throws SchemaChangeException; + void updateBatches() throws SchemaChangeException; - public int getHashCode(int incomingRowIdx) throws SchemaChangeException; + int getHashCode(int incomingRowIdx) throws SchemaChangeException; - public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException, RetryAfterSpillException; + PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int hashCode) throws SchemaChangeException, RetryAfterSpillException; - public int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException; + int containsKey(int incomingRowIdx, boolean isProbe) throws SchemaChangeException; - public void getStats(HashTableStats stats); + void getStats(HashTableStats stats); - public int size(); + int size(); - public boolean isEmpty(); + boolean isEmpty(); - public void clear(); + void clear(); - public void reinit(RecordBatch newIncoming); + void reinit(RecordBatch newIncoming); - public void reset(); + void reset(); - public void setMaxVarcharSize(int size); + void setMaxVarcharSize(int size); - public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords, int numExpectedRecords); + boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords, int numExpectedRecords); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java index 6cbbdcbb5..272d782e5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java @@ -29,7 +29,6 @@ import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; @@ -48,7 +47,6 @@ public abstract class HashTableTemplate implements HashTable { private static final boolean EXTRA_DEBUG = false; private static final int EMPTY_SLOT = -1; - // private final int MISSING_VALUE = 65544; // A hash 'bucket' consists of the start index to indicate start of a hash chain @@ -78,8 +76,6 @@ public abstract class HashTableTemplate implements HashTable { // Placeholder for the current index while probing the hash table private IndexPointer currentIdxHolder; -// private FragmentContext context; - private BufferAllocator allocator; // The incoming build side record batch @@ -451,7 +447,7 @@ public abstract class HashTableTemplate implements HashTable { @Override - public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig) { + public void setup(HashTableConfig htConfig, BufferAllocator allocator, RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig) { float loadf = htConfig.getLoadFactor(); int initialCap = htConfig.getInitialCapacity(); @@ -470,7 +466,6 @@ public abstract class HashTableTemplate implements HashTable { } this.htConfig = htConfig; -// this.context = context; this.allocator = allocator; this.incomingBuild = incomingBuild; this.incomingProbe = incomingProbe; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java index e9b305169..ace4f24f3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterBatchCreator.java @@ -20,22 +20,18 @@ package org.apache.drill.exec.physical.impl.filter; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.Filter; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; import com.google.common.base.Preconditions; -public class FilterBatchCreator implements BatchCreator<Filter>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterBatchCreator.class); - +public class FilterBatchCreator implements BatchCreator<Filter> { @Override - public FilterRecordBatch getBatch(FragmentContext context, Filter config, List<RecordBatch> children) + public FilterRecordBatch getBatch(ExecutorFragmentContext context, Filter config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new FilterRecordBatch(config, children.iterator().next(), context); } - - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java index 1bdd09743..f0b832a78 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java @@ -42,9 +42,7 @@ import org.apache.drill.exec.vector.ValueVector; import com.google.common.collect.Lists; -public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ - //private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterRecordBatch.class); - +public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter> { private SelectionVector2 sv2; private SelectionVector4 sv4; private Filterer filter; @@ -120,16 +118,7 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ * logic that handles SV4 + filter should always be pushed beyond sort so disabling * it in FilterPrel. * - - // set up the multi-batch selection vector - this.svAllocator = oContext.getAllocator().getNewPreAllocator(); - if (!svAllocator.preAllocate(incoming.getRecordCount()*4)) - throw new SchemaChangeException("Attempted to filter an SV4 which exceeds allowed memory (" + - incoming.getRecordCount() * 4 + " bytes)"); - sv4 = new SelectionVector4(svAllocator.getAllocation(), incoming.getRecordCount(), Character.MAX_VALUE); - this.filter = generateSV4Filterer(); - break; - */ + */ default: throw new UnsupportedOperationException(); } @@ -197,8 +186,6 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ final TransferPair[] tx = transfers.toArray(new TransferPair[transfers.size()]); CodeGenerator<Filterer> codeGen = cg.getCodeGenerator(); codeGen.plainJavaCapable(true); - // Uncomment out this line to debug the generated code. -// cg.saveCodeForDebugging(true); final Filterer filter = context.getImplementationClass(codeGen); filter.setup(context, incoming, this, tx); return filter; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java index 74a5d1671..db62d3693 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterSignature.java @@ -20,12 +20,12 @@ package org.apache.drill.exec.physical.impl.filter; import javax.inject.Named; import org.apache.drill.exec.compile.sig.CodeGeneratorSignature; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.FragmentContextImpl; import org.apache.drill.exec.record.RecordBatch; public interface FilterSignature extends CodeGeneratorSignature{ - public void doSetup(@Named("context") FragmentContext context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); + public void doSetup(@Named("context") FragmentContextImpl context, @Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing); public boolean doEval(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java index d014a2efe..52533bd50 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate2.java @@ -27,9 +27,7 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.selection.SelectionVector2; -public abstract class FilterTemplate2 implements Filterer{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterTemplate2.class); - +public abstract class FilterTemplate2 implements Filterer { private SelectionVector2 outgoingSelectionVector; private SelectionVector2 incomingSelectionVector; private SelectionVectorMode svMode; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java index fd1f9e68c..4850cff41 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterTemplate4.java @@ -26,8 +26,6 @@ import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.record.selection.SelectionVector4; public abstract class FilterTemplate4 implements Filterer { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FilterTemplate4.class); - private SelectionVector4 outgoingSelectionVector; private SelectionVector4 incomingSelectionVector; private TransferPair[] transfers; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java index aa45f54ff..a3d03c2d8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/Filterer.java @@ -24,11 +24,9 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TransferPair; public interface Filterer { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Filterer.class); + TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION2 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate2.class); + TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate4.class); - public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException; - public void filterBatch(int recordCount) throws SchemaChangeException; - - public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION2 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate2.class); - public static TemplateClassDefinition<Filterer> TEMPLATE_DEFINITION4 = new TemplateClassDefinition<Filterer>(Filterer.class, FilterTemplate4.class); + void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, TransferPair[] transfers) throws SchemaChangeException; + void filterBatch(int recordCount) throws SchemaChangeException; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java index 94203d81b..bfda4f43a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenBatchCreator.java @@ -20,21 +20,18 @@ package org.apache.drill.exec.physical.impl.flatten; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.FlattenPOP; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; import com.google.common.base.Preconditions; -public class FlattenBatchCreator implements BatchCreator<FlattenPOP>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FlattenBatchCreator.class); - +public class FlattenBatchCreator implements BatchCreator<FlattenPOP> { @Override - public FlattenRecordBatch getBatch(FragmentContext context, FlattenPOP config, List<RecordBatch> children) + public FlattenRecordBatch getBatch(ExecutorFragmentContext context, FlattenPOP config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new FlattenRecordBatch(config, children.iterator().next(), context); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java index 2aa841b38..8be16ad4a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java @@ -303,8 +303,6 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { final ClassGenerator<Flattener> cg = CodeGenerator.getRoot(Flattener.TEMPLATE_DEFINITION, context.getOptions()); cg.getCodeGenerator().plainJavaCapable(true); - // Uncomment out this line to debug the generated code. - // cg.getCodeGenerator().saveCodeForDebugging(true); final IntHashSet transferFieldIds = new IntHashSet(); final NamedExpression flattenExpr = new NamedExpression(popConfig.getColumn(), new FieldReference(popConfig.getColumn())); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java index 5293060df..392757e17 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/Flattener.java @@ -27,9 +27,11 @@ import org.apache.drill.exec.record.TransferPair; import org.apache.drill.exec.vector.complex.RepeatedValueVector; public interface Flattener { - public void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException; + TemplateClassDefinition<Flattener> TEMPLATE_DEFINITION = new TemplateClassDefinition<Flattener>(Flattener.class, FlattenTemplate.class); - public interface Monitor { + void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, List<TransferPair> transfers) throws SchemaChangeException; + + interface Monitor { /** * Get the required buffer size for the specified number of records. * {@see ValueVector#getBufferSizeFor(int)} for the meaning of this. @@ -37,14 +39,14 @@ public interface Flattener { * @param recordCount the number of records processed so far * @return the buffer size the vectors report as being in use */ - public int getBufferSizeFor(int recordCount); - }; + int getBufferSizeFor(int recordCount); + } + + int flattenRecords(int recordCount, int firstOutputIndex, Monitor monitor); - public int flattenRecords(int recordCount, int firstOutputIndex, Monitor monitor); + void setFlattenField(RepeatedValueVector repeatedColumn); - public void setFlattenField(RepeatedValueVector repeatedColumn); - public RepeatedValueVector getFlattenField(); - public void resetGroupIndex(); + RepeatedValueVector getFlattenField(); - public static final TemplateClassDefinition<Flattener> TEMPLATE_DEFINITION = new TemplateClassDefinition<Flattener>(Flattener.class, FlattenTemplate.class); + void resetGroupIndex(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 7b679c0d6..e087bc84b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -256,12 +256,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { // No more output records, clean up and return state = BatchState.DONE; - // if (first) { - // return IterOutcome.OK_NEW_SCHEMA; - // } return IterOutcome.NONE; } catch (ClassTransformationException | SchemaChangeException | IOException e) { - context.fail(e); + context.getExecutorState().fail(e); killIncoming(false); return IterOutcome.STOP; } @@ -405,8 +402,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException { final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getOptions()); cg.plainJavaCapable(true); - // Uncomment out this line to debug the generated code. - // cg.saveCodeForDebugging(true); final ClassGenerator<HashJoinProbe> g = cg.getRoot(); // Generate the code to project build side records diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java index 140276943..a005559cd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatchCreator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.join; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.HashJoinPOP; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; @@ -30,7 +30,7 @@ import com.google.common.base.Preconditions; public class HashJoinBatchCreator implements BatchCreator<HashJoinPOP> { @Override - public HashJoinBatch getBatch(FragmentContext context, HashJoinPOP config, List<RecordBatch> children) + public HashJoinBatch getBatch(ExecutorFragmentContext context, HashJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 2); return new HashJoinBatch(config, context, children.get(0), children.get(1)); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java index cc6bd5578..4ef28e667 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbe.java @@ -30,7 +30,7 @@ import org.apache.drill.exec.record.VectorContainer; import org.apache.calcite.rel.core.JoinRelType; public interface HashJoinProbe { - public static TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, HashJoinProbeTemplate.class); + TemplateClassDefinition<HashJoinProbe> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashJoinProbe>(HashJoinProbe.class, HashJoinProbeTemplate.class); /* The probe side of the hash join can be in the following two states * 1. PROBE_PROJECT: Inner join case, we probe our hash table to see if we have a @@ -40,15 +40,15 @@ public interface HashJoinProbe { * case we handle it internally by projecting the record if there isn't a match on the build side * 3. DONE: Once we have projected all possible records we are done */ - public static enum ProbeState { + enum ProbeState { PROBE_PROJECT, PROJECT_RIGHT, DONE } - public abstract void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, - int probeRecordCount, HashJoinBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper, - JoinRelType joinRelType); - public abstract void doSetup(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, RecordBatch outgoing); - public abstract int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException; - public abstract void projectBuildRecord(int buildIndex, int outIndex); - public abstract void projectProbeRecord(int probeIndex, int outIndex); + void setupHashJoinProbe(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, + int probeRecordCount, HashJoinBatch outgoing, HashTable hashTable, HashJoinHelper hjHelper, + JoinRelType joinRelType); + void doSetup(FragmentContext context, VectorContainer buildBatch, RecordBatch probeBatch, RecordBatch outgoing); + int probeAndProject() throws SchemaChangeException, ClassTransformationException, IOException; + void projectBuildRecord(int buildIndex, int outIndex); + void projectProbeRecord(int probeIndex, int outIndex); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java index 55322f849..95f7c3d84 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinWorker.java @@ -24,15 +24,12 @@ import org.apache.drill.exec.record.VectorContainer; public interface JoinWorker { + TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(JoinWorker.class, JoinTemplate.class); - public static enum JoinOutcome { + enum JoinOutcome { NO_MORE_DATA, BATCH_RETURNED, SCHEMA_CHANGED, WAITING, FAILURE; } - public void setupJoin(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException; - public boolean doJoin(JoinStatus status); - - public static TemplateClassDefinition<JoinWorker> TEMPLATE_DEFINITION = new TemplateClassDefinition<>( - JoinWorker.class, JoinTemplate.class); - + void setupJoin(FragmentContext context, JoinStatus status, VectorContainer outgoing) throws SchemaChangeException; + boolean doJoin(JoinStatus status); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 8ad3f84db..1ed4722bc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -199,7 +199,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { this.worker = generateNewWorker(); first = true; } catch (ClassTransformationException | IOException | SchemaChangeException e) { - context.fail(new SchemaChangeException(e)); + context.getExecutorState().fail(new SchemaChangeException(e)); kill(false); return IterOutcome.STOP; } finally { @@ -269,12 +269,9 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { right.kill(sendUpstream); } - private JoinWorker generateNewWorker() throws ClassTransformationException, IOException, SchemaChangeException{ - + private JoinWorker generateNewWorker() throws ClassTransformationException, IOException, SchemaChangeException { final ClassGenerator<JoinWorker> cg = CodeGenerator.getRoot(JoinWorker.TEMPLATE_DEFINITION, context.getOptions()); cg.getCodeGenerator().plainJavaCapable(true); - // Uncomment out this line to debug the generated code. - // cg.getCodeGenerator().saveCodeForDebugging(true); final ErrorCollector collector = new ErrorCollectorImpl(); // Generate members and initialization code diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java index 24f5533db..b24624eb9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinCreator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.join; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.MergeJoinPOP; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; @@ -29,10 +29,8 @@ import org.apache.calcite.rel.core.JoinRelType; import com.google.common.base.Preconditions; public class MergeJoinCreator implements BatchCreator<MergeJoinPOP> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MergeJoinCreator.class); - @Override - public MergeJoinBatch getBatch(FragmentContext context, MergeJoinPOP config, List<RecordBatch> children) + public MergeJoinBatch getBatch(ExecutorFragmentContext context, MergeJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 2); if(config.getJoinType() == JoinRelType.RIGHT){ @@ -40,6 +38,5 @@ public class MergeJoinCreator implements BatchCreator<MergeJoinPOP> { }else{ return new MergeJoinBatch(config, context, children.get(0), children.get(1)); } - } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java index 2e708a6ea..ef4cab10a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinBatchCreator.java @@ -20,14 +20,14 @@ package org.apache.drill.exec.physical.impl.join; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.NestedLoopJoinPOP; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; public class NestedLoopJoinBatchCreator implements BatchCreator<NestedLoopJoinPOP> { @Override - public NestedLoopJoinBatch getBatch(FragmentContext context, NestedLoopJoinPOP config, List<RecordBatch> children) + public NestedLoopJoinBatch getBatch(ExecutorFragmentContext context, NestedLoopJoinPOP config, List<RecordBatch> children) throws ExecutionSetupException { return new NestedLoopJoinBatch(config, context, children.get(0), children.get(1)); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java index f954e7202..15e52758f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitBatchCreator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.limit; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.Limit; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; @@ -29,7 +29,7 @@ import com.google.common.collect.Iterables; public class LimitBatchCreator implements BatchCreator<Limit> { @Override - public LimitRecordBatch getBatch(FragmentContext context, Limit config, List<RecordBatch> children) + public LimitRecordBatch getBatch(ExecutorFragmentContext context, Limit config, List<RecordBatch> children) throws ExecutionSetupException { return new LimitRecordBatch(config, context, Iterables.getOnlyElement(children)); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index f9ceff268..7e5ff2126 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -41,6 +41,7 @@ import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; +import org.apache.drill.exec.ops.ExchangeFragmentContext; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.physical.MinorFragmentEndpoint; @@ -96,7 +97,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> private RecordBatchLoader[] batchLoaders; private final RawFragmentBatchProvider[] fragProviders; - private final FragmentContext context; + private final ExchangeFragmentContext context; private VectorContainer outgoingContainer; private MergingReceiverGeneratorBase merger; private final MergingReceiverPOP config; @@ -109,12 +110,11 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> private RawFragmentBatch[] incomingBatches; private int[] batchOffsets; private PriorityQueue <Node> pqueue; - private RawFragmentBatch emptyBatch = null; private RawFragmentBatch[] tempBatchHolder; private long[] inputCounts; private long[] outputCounts; - public static enum Metric implements MetricDef{ + public enum Metric implements MetricDef { BYTES_RECEIVED, NUM_SENDERS, NEXT_WAIT_NANOS; @@ -125,7 +125,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } } - public MergingRecordBatch(final FragmentContext context, + public MergingRecordBatch(final ExchangeFragmentContext context, final MergingReceiverPOP config, final RawFragmentBatchProvider[] fragProviders) throws OutOfMemoryException { super(config, context, true, context.newOperatorContext(config)); @@ -210,11 +210,11 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> try { rawBatch = getNext(p); } catch (final IOException e) { - context.fail(e); + context.getExecutorState().fail(e); return IterOutcome.STOP; } } - if (rawBatch == null && !context.shouldContinue()) { + if (rawBatch == null && !context.getExecutorState().shouldContinue()) { clearBatches(rawBatches); return IterOutcome.STOP; } @@ -241,12 +241,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> while ((rawBatch = getNext(p)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0) { // Do nothing } - if (rawBatch == null && !context.shouldContinue()) { + if (rawBatch == null && !context.getExecutorState().shouldContinue()) { clearBatches(rawBatches); return IterOutcome.STOP; } } catch (final IOException e) { - context.fail(e); + context.getExecutorState().fail(e); clearBatches(rawBatches); return IterOutcome.STOP; } @@ -315,7 +315,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> // SchemaChangeException, so check/clean catch clause below. } catch(final SchemaChangeException e) { logger.error("MergingReceiver failed to load record batch from remote host. {}", e); - context.fail(e); + context.getExecutorState().fail(e); return IterOutcome.STOP; } batch.release(); @@ -328,7 +328,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> // Ensure all the incoming batches have the identical schema. // Note: RecordBatchLoader permutes the columns to obtain the same columns order for all batches. if (!isSameSchemaAmongBatches(batchLoaders)) { - context.fail(new SchemaChangeException("Incoming batches for merging receiver have different schemas!")); + context.getExecutorState().fail(new SchemaChangeException("Incoming batches for merging receiver have different schemas!")); return IterOutcome.STOP; } @@ -351,7 +351,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> merger = createMerger(); } catch (final SchemaChangeException e) { logger.error("Failed to generate code for MergingReceiver. {}", e); - context.fail(e); + context.getExecutorState().fail(e); return IterOutcome.STOP; } @@ -380,12 +380,12 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } else { batchLoaders[b].clear(); batchLoaders[b] = null; - if (!context.shouldContinue()) { + if (!context.getExecutorState().shouldContinue()) { return IterOutcome.STOP; } } } catch (IOException | SchemaChangeException e) { - context.fail(e); + context.getExecutorState().fail(e); return IterOutcome.STOP; } } @@ -418,11 +418,11 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> assert nextBatch != null || inputCounts[node.batchId] == outputCounts[node.batchId] : String.format("Stream %d input count: %d output count %d", node.batchId, inputCounts[node.batchId], outputCounts[node.batchId]); - if (nextBatch == null && !context.shouldContinue()) { + if (nextBatch == null && !context.getExecutorState().shouldContinue()) { return IterOutcome.STOP; } } catch (final IOException e) { - context.fail(e); + context.getExecutorState().fail(e); return IterOutcome.STOP; } @@ -456,7 +456,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> // TODO: Clean: DRILL-2933: That load(...) no longer throws // SchemaChangeException, so check/clean catch clause below. } catch(final SchemaChangeException ex) { - context.fail(ex); + context.getExecutorState().fail(ex); return IterOutcome.STOP; } incomingBatches[node.batchId].release(); @@ -548,7 +548,7 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } final RawFragmentBatch batch = getNext(i); if (batch == null) { - if (!context.shouldContinue()) { + if (!context.getExecutorState().shouldContinue()) { state = BatchState.STOP; } else { state = BatchState.DONE; @@ -605,7 +605,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> .setReceiver(context.getHandle()) .setSender(sender) .build(); - context.getControlTunnel(providingEndpoint.getEndpoint()).informReceiverFinished(new OutcomeListener(), finishedReceiver); + context.getController() + .getTunnel(providingEndpoint.getEndpoint()) + .informReceiverFinished(new OutcomeListener(), finishedReceiver); } } @@ -624,10 +626,10 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> @Override public void interrupted(final InterruptedException e) { - if (context.shouldContinue()) { + if (context.getExecutorState().shouldContinue()) { final String errMsg = "Received an interrupt RPC outcome while sending ReceiverFinished message"; logger.error(errMsg, e); - context.fail(new RpcException(errMsg, e)); + context.getExecutorState().fail(new RpcException(errMsg, e)); } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java index 7f662ae22..9e82af8bb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java @@ -196,7 +196,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart } } VectorContainer sortedSamples = new VectorContainer(); - builder.build(context, sortedSamples); + builder.build(sortedSamples); // Sort the records according the orderings given in the configuration @@ -262,7 +262,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart Thread.sleep(timeout); return true; } catch (final InterruptedException e) { - if (!context.shouldContinue()) { + if (!context.getExecutorState().shouldContinue()) { return false; } } @@ -329,7 +329,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart } catch (final ClassTransformationException | IOException | SchemaChangeException ex) { kill(false); - context.fail(ex); + context.getExecutorState().fail(ex); return false; // TODO InterruptedException } @@ -349,7 +349,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart for (CachedVectorContainer w : mmap.get(mapKey)) { containerBuilder.add(w.get()); } - containerBuilder.build(context, allSamplesContainer); + containerBuilder.build(allSamplesContainer); List<Ordering> orderDefs = Lists.newArrayList(); int i = 0; @@ -390,7 +390,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart candidatePartitionTable.setRecordCount(copier.getOutputRecords()); @SuppressWarnings("resource") WritableBatch batch = WritableBatch.getBatchNoHVWrap(candidatePartitionTable.getRecordCount(), candidatePartitionTable, false); - wrap = new CachedVectorContainer(batch, context.getDrillbitContext().getAllocator()); + wrap = new CachedVectorContainer(batch, context.getAllocator()); tableMap.putIfAbsent(mapKey + "final", wrap, 1, TimeUnit.MINUTES); } finally { candidatePartitionTable.clear(); @@ -486,7 +486,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart } catch (SchemaChangeException ex) { kill(false); logger.error("Failure during query", ex); - context.fail(ex); + context.getExecutorState().fail(ex); return IterOutcome.STOP; } doWork(vc); @@ -519,7 +519,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart } catch (SchemaChangeException ex) { kill(false); logger.error("Failure during query", ex); - context.fail(ex); + context.getExecutorState().fail(ex); return IterOutcome.STOP; } doWork(vc); @@ -550,7 +550,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart } catch (SchemaChangeException ex) { kill(false); logger.error("Failure during query", ex); - context.fail(ex); + context.getExecutorState().fail(ex); return IterOutcome.STOP; } // fall through. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java index d2e07e7b6..5705aca0a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionSenderCreator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.orderedpartitioner; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.config.OrderedPartitionSender; import org.apache.drill.exec.physical.impl.RootCreator; @@ -61,7 +61,7 @@ public class OrderedPartitionSenderCreator implements RootCreator<OrderedPartiti @SuppressWarnings("resource") @Override - public RootExec getRoot(FragmentContext context, OrderedPartitionSender config, + public RootExec getRoot(ExecutorFragmentContext context, OrderedPartitionSender config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java index 06fd1155a..e0b7b9a91 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderCreator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.partitionsender; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.RootCreator; import org.apache.drill.exec.physical.impl.RootExec; @@ -29,7 +29,7 @@ import org.apache.drill.exec.record.RecordBatch; public class PartitionSenderCreator implements RootCreator<HashPartitionSender> { @Override - public RootExec getRoot(FragmentContext context, + public RootExec getRoot(ExecutorFragmentContext context, HashPartitionSender config, List<RecordBatch> children) throws ExecutionSetupException { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 108d539f8..25be50a06 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -33,9 +33,10 @@ import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.ops.AccountingDataTunnel; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExchangeFragmentContext; import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.ops.OperatorStats; +import org.apache.drill.exec.ops.RootFragmentContext; import org.apache.drill.exec.physical.MinorFragmentEndpoint; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.physical.impl.BaseRootExec; @@ -63,7 +64,7 @@ public class PartitionSenderRootExec extends BaseRootExec { private HashPartitionSender operator; private PartitionerDecorator partitioner; - private FragmentContext context; + private ExchangeFragmentContext context; private boolean ok = true; private final int outGoingBatchCount; private final HashPartitionSender popConfig; @@ -98,13 +99,13 @@ public class PartitionSenderRootExec extends BaseRootExec { } } - public PartitionSenderRootExec(FragmentContext context, + public PartitionSenderRootExec(RootFragmentContext context, RecordBatch incoming, HashPartitionSender operator) throws OutOfMemoryException { this(context, incoming, operator, false); } - public PartitionSenderRootExec(FragmentContext context, + public PartitionSenderRootExec(RootFragmentContext context, RecordBatch incoming, HashPartitionSender operator, boolean closeIncoming) throws OutOfMemoryException { @@ -173,7 +174,7 @@ public class PartitionSenderRootExec extends BaseRootExec { } catch (IOException e) { incoming.kill(false); logger.error("Error while creating partitioning sender or flushing outgoing batches", e); - context.fail(e); + context.getExecutorState().fail(e); } return false; @@ -203,19 +204,19 @@ public class PartitionSenderRootExec extends BaseRootExec { } catch (IOException e) { incoming.kill(false); logger.error("Error while flushing outgoing batches", e); - context.fail(e); + context.getExecutorState().fail(e); return false; } catch (SchemaChangeException e) { incoming.kill(false); logger.error("Error while setting up partitioner", e); - context.fail(e); + context.getExecutorState().fail(e); return false; } case OK: try { partitioner.partitionBatch(incoming); } catch (IOException e) { - context.fail(e); + context.getExecutorState().fail(e); incoming.kill(false); return false; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java index 95a48135d..5d1b08cc5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java @@ -22,34 +22,33 @@ import java.util.List; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExchangeFragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; import org.apache.drill.exec.physical.config.HashPartitionSender; import org.apache.drill.exec.record.RecordBatch; public interface Partitioner { + void setup(ExchangeFragmentContext context, + RecordBatch incoming, + HashPartitionSender popConfig, + OperatorStats stats, + OperatorContext oContext, + int start, int count) throws SchemaChangeException; - public abstract void setup(FragmentContext context, - RecordBatch incoming, - HashPartitionSender popConfig, - OperatorStats stats, - OperatorContext oContext, - int start, int count) throws SchemaChangeException; - - public abstract void partitionBatch(RecordBatch incoming) throws IOException; - public abstract void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException; - public abstract void initialize(); - public abstract void clear(); - public abstract List<? extends PartitionOutgoingBatch> getOutgoingBatches(); + void partitionBatch(RecordBatch incoming) throws IOException; + void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException; + void initialize(); + void clear(); + List<? extends PartitionOutgoingBatch> getOutgoingBatches(); /** * Method to get PartitionOutgoingBatch based on the fact that there can be > 1 Partitioner * @param index * @return PartitionOutgoingBatch that matches index within Partitioner. This method can * return null if index does not fall within boundary of this Partitioner */ - public abstract PartitionOutgoingBatch getOutgoingBatch(int index); - public abstract OperatorStats getStats(); + PartitionOutgoingBatch getOutgoingBatch(int index); + OperatorStats getStats(); - public static TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class); + TemplateClassDefinition<Partitioner> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(Partitioner.class, PartitionerTemplate.class); }
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java index 042222a2d..78b8d033e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java @@ -57,7 +57,7 @@ public class PartitionerDecorator { this.partitioners = partitioners; this.stats = stats; this.context = context; - this.executor = context.getDrillbitContext().getExecutor(); + this.executor = context.getExecutor(); this.tName = Thread.currentThread().getName(); this.childThreadPrefix = "Partitioner-" + tName + "-"; } @@ -177,7 +177,7 @@ public class PartitionerDecorator { break; } catch (final InterruptedException e) { // If the fragment state says we shouldn't continue, cancel or interrupt partitioner threads - if (!context.shouldContinue()) { + if (!context.getExecutorState().shouldContinue()) { logger.debug("Interrupting partioner threads. Fragment thread {}", tName); for(Future<?> f : taskFutures) { f.cancel(true); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java index aa72c44d8..0d52b53ef 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java @@ -29,6 +29,7 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.AccountingDataTunnel; +import org.apache.drill.exec.ops.ExchangeFragmentContext; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorStats; @@ -81,7 +82,7 @@ public abstract class PartitionerTemplate implements Partitioner { } @Override - public final void setup(FragmentContext context, + public final void setup(ExchangeFragmentContext context, RecordBatch incoming, HashPartitionSender popConfig, OperatorStats stats, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java index 3afa8527e..bbcb75831 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java @@ -68,8 +68,8 @@ public class ProducerConsumerBatch extends AbstractRecordBatch<ProducerConsumer> wrapper = queue.take(); logger.debug("Got batch from queue"); } catch (final InterruptedException e) { - if (context.shouldContinue()) { - context.fail(e); + if (context.getExecutorState().shouldContinue()) { + context.getExecutorState().fail(e); } return IterOutcome.STOP; // TODO InterruptedException diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java index 6542576b5..779728a91 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatchCreator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.producer; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.ProducerConsumer; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; @@ -29,7 +29,7 @@ import com.google.common.collect.Iterables; public class ProducerConsumerBatchCreator implements BatchCreator<ProducerConsumer> { @Override - public ProducerConsumerBatch getBatch(FragmentContext context, ProducerConsumer config, List<RecordBatch> children) + public ProducerConsumerBatch getBatch(ExecutorFragmentContext context, ProducerConsumer config, List<RecordBatch> children) throws ExecutionSetupException { return new ProducerConsumerBatch(config, context, Iterables.getOnlyElement(children)); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java index f2495402f..73ab44127 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ComplexToJsonBatchCreator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.project; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.ComplexToJson; import org.apache.drill.exec.physical.config.Project; import org.apache.drill.exec.physical.impl.BatchCreator; @@ -29,15 +29,12 @@ import org.apache.drill.exec.record.RecordBatch; import com.google.common.base.Preconditions; public class ComplexToJsonBatchCreator implements BatchCreator<ComplexToJson> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ComplexToJsonBatchCreator.class); - @Override - public ProjectRecordBatch getBatch(FragmentContext context, ComplexToJson flatten, List<RecordBatch> children) + public ProjectRecordBatch getBatch(ExecutorFragmentContext context, ComplexToJson flatten, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new ProjectRecordBatch(new Project(null, flatten.getChild()), children.iterator().next(), context); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java index e7a6b0542..37753cd81 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectBatchCreator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.project; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.Project; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; @@ -28,13 +28,10 @@ import org.apache.drill.exec.record.RecordBatch; import com.google.common.base.Preconditions; public class ProjectBatchCreator implements BatchCreator<Project>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectBatchCreator.class); - @Override - public ProjectRecordBatch getBatch(FragmentContext context, Project config, List<RecordBatch> children) + public ProjectRecordBatch getBatch(ExecutorFragmentContext context, Project config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new ProjectRecordBatch(config, children.iterator().next(), context); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index be0f61fa7..89e0ee9e8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -804,7 +804,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { } catch (SchemaChangeException e) { kill(false); logger.error("Failure during query", e); - context.fail(e); + context.getExecutorState().fail(e); return IterOutcome.STOP; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java index d71159231..f38b62e3a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatch.java @@ -132,7 +132,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> { return IterOutcome.NONE; } - builder.build(context, container); + builder.build(container); sorter = createNewSorter(); sorter.setup(context, getSelectionVector4(), this.container); sorter.sort(getSelectionVector4(), this.container); @@ -142,7 +142,7 @@ public class SortBatch extends AbstractRecordBatch<Sort> { } catch(SchemaChangeException | ClassTransformationException | IOException ex) { kill(false); logger.error("Failure during query", ex); - context.fail(ex); + context.getExecutorState().fail(ex); return IterOutcome.STOP; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java index 559558f49..ccd55619e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortBatchCreator.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.sort; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.Sort; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; @@ -28,14 +28,10 @@ import org.apache.drill.exec.record.RecordBatch; import com.google.common.base.Preconditions; public class SortBatchCreator implements BatchCreator<Sort>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SortBatchCreator.class); - @Override - public SortBatch getBatch(FragmentContext context, Sort config, List<RecordBatch> children) + public SortBatch getBatch(ExecutorFragmentContext context, Sort config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new SortBatch(config, context, children.iterator().next()); } - - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java index 6b3de25fc..6c66c01b2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/sort/SortRecordBatchBuilder.java @@ -27,7 +27,6 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.AllocationReservation; import org.apache.drill.exec.memory.BaseAllocator; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.exec.record.MaterializedField; @@ -136,10 +135,6 @@ public class SortRecordBatchBuilder implements AutoCloseable { return batches.isEmpty(); } - public void build(FragmentContext context, VectorContainer outputContainer) throws SchemaChangeException { - build(outputContainer); - } - @SuppressWarnings("resource") public void build(VectorContainer outputContainer) throws SchemaChangeException { outputContainer.clear(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java index 4304c2cb0..66fe2610e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java @@ -24,7 +24,6 @@ import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.CodeGenerator; -import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.SelectionVectorRemover; import org.apache.drill.exec.record.AbstractSingleRecordBatch; @@ -235,10 +234,11 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect private Copier getGenerated4Copier() throws SchemaChangeException { Preconditions.checkArgument(incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.FOUR_BYTE); - return getGenerated4Copier(incoming, context, oContext.getAllocator(), container, this, callBack); + return getGenerated4Copier(incoming, context, container, this, callBack); } - public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, BufferAllocator allocator, VectorContainer container, RecordBatch outgoing, SchemaChangeCallBack callBack) throws SchemaChangeException{ + public static Copier getGenerated4Copier(RecordBatch batch, FragmentContext context, VectorContainer container, RecordBatch outgoing, + SchemaChangeCallBack callBack) throws SchemaChangeException{ for(VectorWrapper<?> vv : batch){ @SuppressWarnings("resource") diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java index 9ab39a399..4bf5b5cbc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/SVRemoverCreator.java @@ -20,22 +20,18 @@ package org.apache.drill.exec.physical.impl.svremover; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.SelectionVectorRemover; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; import com.google.common.base.Preconditions; -public class SVRemoverCreator implements BatchCreator<SelectionVectorRemover>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SVRemoverCreator.class); - +public class SVRemoverCreator implements BatchCreator<SelectionVectorRemover> { @Override - public RemovingRecordBatch getBatch(FragmentContext context, SelectionVectorRemover config, List<RecordBatch> children) + public RemovingRecordBatch getBatch(ExecutorFragmentContext context, SelectionVectorRemover config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new RemovingRecordBatch(config, context, children.iterator().next()); } - - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java index 40ef2bb12..dd2f6db9f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceBatchCreator.java @@ -21,19 +21,15 @@ package org.apache.drill.exec.physical.impl.trace; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.Trace; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; public class TraceBatchCreator implements BatchCreator<Trace> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TraceBatchCreator.class); - @Override - public TraceRecordBatch getBatch(FragmentContext context, Trace config, List<RecordBatch> children) + public TraceRecordBatch getBatch(ExecutorFragmentContext context, Trace config, List<RecordBatch> children) throws ExecutionSetupException { - // Preconditions.checkArgument(children.size() == 1); return new TraceRecordBatch(config, children.iterator().next(), context); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java index 1ef3142b1..bdc1a3d26 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllBatchCreator.java @@ -20,22 +20,18 @@ package org.apache.drill.exec.physical.impl.union; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.UnionAll; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; import com.google.common.base.Preconditions; -public class UnionAllBatchCreator implements BatchCreator<UnionAll>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnionAllBatchCreator.class); - +public class UnionAllBatchCreator implements BatchCreator<UnionAll> { @Override - public UnionAllRecordBatch getBatch(FragmentContext context, UnionAll config, List<RecordBatch> children) + public UnionAllRecordBatch getBatch(ExecutorFragmentContext context, UnionAll config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() >= 1); return new UnionAllRecordBatch(config, children, context); } - - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java index 1d1ecb00c..b4d0e7726 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/union/UnionAllRecordBatch.java @@ -129,7 +129,7 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> { } } } catch (ClassTransformationException | IOException | SchemaChangeException ex) { - context.fail(ex); + context.getExecutorState().fail(ex); killIncoming(false); return IterOutcome.STOP; } @@ -168,8 +168,6 @@ public class UnionAllRecordBatch extends AbstractBinaryRecordBatch<UnionAll> { final ClassGenerator<UnionAller> cg = CodeGenerator.getRoot(UnionAller.TEMPLATE_DEFINITION, context.getOptions()); cg.getCodeGenerator().plainJavaCapable(true); - // Uncomment out this line to debug the generated code. - // cg.getCodeGenerator().saveCodeForDebugging(true); int index = 0; for(VectorWrapper<?> vw : inputBatch) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java index cfdc06d94..9da8a4b6f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java @@ -25,6 +25,7 @@ import java.util.Iterator; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.ops.ExchangeFragmentContext; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.ops.OperatorContext; @@ -57,7 +58,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch { private final RecordBatchLoader batchLoader; private final RawFragmentBatchProvider fragProvider; - private final FragmentContext context; + private final ExchangeFragmentContext context; private BatchSchema schema; private final OperatorStats stats; private boolean first = true; @@ -74,7 +75,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch { } } - public UnorderedReceiverBatch(final FragmentContext context, final RawFragmentBatchProvider fragProvider, final UnorderedReceiver config) throws OutOfMemoryException { + public UnorderedReceiverBatch(final ExchangeFragmentContext context, final RawFragmentBatchProvider fragProvider, final UnorderedReceiver config) throws OutOfMemoryException { this.fragProvider = fragProvider; this.context = context; // In normal case, batchLoader does not require an allocator. However, in case of splitAndTransfer of a value vector, @@ -171,13 +172,13 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch { if (batch == null) { batchLoader.clear(); - if (!context.shouldContinue()) { + if (!context.getExecutorState().shouldContinue()) { return IterOutcome.STOP; } return IterOutcome.NONE; } - if (context.isOverMemoryLimit()) { + if (context.getAllocator().isOverLimit()) { return IterOutcome.OUT_OF_MEMORY; } @@ -197,7 +198,7 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch { return IterOutcome.OK; } } catch(SchemaChangeException | IOException ex) { - context.fail(ex); + context.getExecutorState().fail(ex); return IterOutcome.STOP; } finally { stats.stopProcessing(); @@ -233,7 +234,9 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch { .setReceiver(context.getHandle()) .setSender(sender) .build(); - context.getControlTunnel(providingEndpoint.getEndpoint()).informReceiverFinished(new OutcomeListener(), finishedReceiver); + context.getController() + .getTunnel(providingEndpoint.getEndpoint()) + .informReceiverFinished(new OutcomeListener(), finishedReceiver); } } @@ -252,12 +255,11 @@ public class UnorderedReceiverBatch implements CloseableRecordBatch { @Override public void interrupted(final InterruptedException e) { - if (context.shouldContinue()) { + if (context.getExecutorState().shouldContinue()) { final String errMsg = "Received an interrupt RPC outcome while sending ReceiverFinished message"; logger.error(errMsg, e); - context.fail(new RpcException(errMsg, e)); + context.getExecutorState().fail(new RpcException(errMsg, e)); } } } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java index 6d4f1d7d6..01a458890 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverCreator.java @@ -20,18 +20,18 @@ package org.apache.drill.exec.physical.impl.unorderedreceiver; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.UnorderedReceiver; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.work.batch.IncomingBuffers; import org.apache.drill.exec.work.batch.RawBatchBuffer; -public class UnorderedReceiverCreator implements BatchCreator<UnorderedReceiver>{ +public class UnorderedReceiverCreator implements BatchCreator<UnorderedReceiver> { @SuppressWarnings("resource") @Override - public UnorderedReceiverBatch getBatch(FragmentContext context, UnorderedReceiver receiver, List<RecordBatch> children) + public UnorderedReceiverBatch getBatch(ExecutorFragmentContext context, UnorderedReceiver receiver, List<RecordBatch> children) throws ExecutionSetupException { assert children == null || children.isEmpty(); IncomingBuffers bufHolder = context.getBuffers(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java index 4199191ed..e27f88127 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/validate/IteratorValidatorCreator.java @@ -21,7 +21,7 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.IteratorValidator; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; @@ -32,8 +32,8 @@ public class IteratorValidatorCreator implements BatchCreator<IteratorValidator> static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IteratorValidatorCreator.class); @Override - public IteratorValidatorBatchIterator getBatch(FragmentContext context, IteratorValidator config, - List<RecordBatch> children) + public IteratorValidatorBatchIterator getBatch(ExecutorFragmentContext context, IteratorValidator config, + List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); RecordBatch child = children.iterator().next(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java index a8eddbc62..c2bcab00d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/values/ValuesBatchCreator.java @@ -22,7 +22,7 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.Values; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.ScanBatch; @@ -30,11 +30,9 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.easy.json.JSONRecordReader; -import com.google.common.collect.Iterators; - public class ValuesBatchCreator implements BatchCreator<Values> { @Override - public ScanBatch getBatch(FragmentContext context, Values config, List<RecordBatch> children) + public ScanBatch getBatch(ExecutorFragmentContext context, Values config, List<RecordBatch> children) throws ExecutionSetupException { assert children.isEmpty(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java index 59bc1159c..6ca9652c3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameBatchCreator.java @@ -21,7 +21,7 @@ package org.apache.drill.exec.physical.impl.window; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.WindowPOP; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.RecordBatch; @@ -29,9 +29,8 @@ import org.apache.drill.exec.record.RecordBatch; import com.google.common.base.Preconditions; public class WindowFrameBatchCreator implements BatchCreator<WindowPOP> { - @Override - public WindowFrameRecordBatch getBatch(FragmentContext context, WindowPOP config, List<RecordBatch> children) + public WindowFrameRecordBatch getBatch(ExecutorFragmentContext context, WindowPOP config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); return new WindowFrameRecordBatch(config, context, children.iterator().next()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java index f4a9825cd..aa067cf18 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/WindowFrameRecordBatch.java @@ -145,7 +145,7 @@ public class WindowFrameRecordBatch extends AbstractRecordBatch<WindowPOP> { try { doWork(); } catch (DrillException e) { - context.fail(e); + context.getExecutorState().fail(e); cleanup(); return IterOutcome.STOP; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java index c212593d9..9cde1a552 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java @@ -201,7 +201,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { // where it would have been passed to context.fail() // passing the exception directly to context.fail(e) will let the cleanup process continue instead of stopping // right away, this will also make sure we collect any additional exception we may get while cleaning up - context.fail(e); + context.getExecutorState().fail(e); } } } @@ -483,7 +483,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { mSorter.sort(this.container); // sort may have prematurely exited due to should continue returning false. - if (!context.shouldContinue()) { + if (!context.getExecutorState().shouldContinue()) { return IterOutcome.STOP; } @@ -522,12 +522,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } catch (SchemaChangeException ex) { kill(false); - context.fail(UserException.unsupportedError(ex) + context.getExecutorState().fail(UserException.unsupportedError(ex) .message("Sort doesn't currently support sorts with changing schemas").build(logger)); return IterOutcome.STOP; } catch(ClassTransformationException | IOException ex) { kill(false); - context.fail(ex); + context.getExecutorState().fail(ex); return IterOutcome.STOP; } catch (UnsupportedOperationException e) { throw new RuntimeException(e); @@ -650,7 +650,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { try { Thread.sleep(waitTime * 1000); } catch(final InterruptedException e) { - if (!context.shouldContinue()) { + if (!context.getExecutorState().shouldContinue()) { throw e; } } @@ -688,11 +688,12 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } private MSorter createNewMSorter() throws ClassTransformationException, IOException, SchemaChangeException { - return createNewMSorter(this.context, this.popConfig.getOrderings(), this, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING); + return createNewMSorter(context, this.popConfig.getOrderings(), this, MAIN_MAPPING, LEFT_MAPPING, RIGHT_MAPPING); } - private MSorter createNewMSorter(FragmentContext context, List<Ordering> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet rightMapping) - throws ClassTransformationException, IOException, SchemaChangeException{ + private MSorter createNewMSorter(FragmentContext context, List<Ordering> orderings, VectorAccessible batch, MappingSet mainMapping, MappingSet leftMapping, MappingSet + rightMapping) + throws ClassTransformationException, IOException, SchemaChangeException { CodeGenerator<MSorter> cg = CodeGenerator.get(MSorter.TEMPLATE_DEFINITION, context.getOptions()); ClassGenerator<MSorter> g = cg.getRoot(); g.setMappingSet(mainMapping); @@ -735,7 +736,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { } public SingleBatchSorter createNewSorter(FragmentContext context, VectorAccessible batch) - throws ClassTransformationException, IOException, SchemaChangeException{ + throws ClassTransformationException, IOException, SchemaChangeException { CodeGenerator<SingleBatchSorter> cg = CodeGenerator.get(SingleBatchSorter.TEMPLATE_DEFINITION, context.getOptions()); cg.plainJavaCapable(true); // This class can generate plain-old Java. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java index e579fc2a3..6a601963b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatchCreator.java @@ -22,19 +22,19 @@ import java.util.List; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.physical.config.ExternalSort; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.server.options.OptionManager; import com.google.common.base.Preconditions; +import org.apache.drill.exec.server.options.OptionManager; public class ExternalSortBatchCreator implements BatchCreator<ExternalSort>{ @Override - public AbstractRecordBatch<ExternalSort> getBatch(FragmentContext context, ExternalSort config, List<RecordBatch> children) + public AbstractRecordBatch<ExternalSort> getBatch(ExecutorFragmentContext context, ExternalSort config, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.size() == 1); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java index 2f3d2f624..9b691705b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java @@ -130,7 +130,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable { while (runStarts.size() > 1) { // check if we're cancelled/failed frequently - if (!context.shouldContinue()) { + if (!context.getExecutorState().shouldContinue()) { return; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java index 733ea5e1f..4dbee3e45 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorter.java @@ -19,12 +19,12 @@ package org.apache.drill.exec.physical.impl.xsort; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.ops.FragmentContextInterface; +import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.selection.SelectionVector2; public interface SingleBatchSorter { - public void setup(FragmentContextInterface context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException; + public void setup(FragmentContext context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException; public void sort(SelectionVector2 vector2) throws SchemaChangeException; public static TemplateClassDefinition<SingleBatchSorter> TEMPLATE_DEFINITION = diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java index 0f4680d72..4a1af4e56 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/SingleBatchSorterTemplate.java @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit; import javax.inject.Named; import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.ops.FragmentContextInterface; +import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.selection.SelectionVector2; @@ -38,7 +38,7 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In private SelectionVector2 vector2; @Override - public void setup(FragmentContextInterface context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException{ + public void setup(FragmentContext context, SelectionVector2 vector2, VectorAccessible incoming) throws SchemaChangeException{ Preconditions.checkNotNull(vector2); this.vector2 = vector2; try { @@ -76,7 +76,7 @@ public abstract class SingleBatchSorterTemplate implements SingleBatchSorter, In } } - public abstract void doSetup(@Named("context") FragmentContextInterface context, + public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorAccessible incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java index 9150fe316..23e66a030 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/ExternalSortBatch.java @@ -135,8 +135,7 @@ import org.apache.drill.exec.vector.complex.AbstractContainerVector; * into new batches of a size determined by that operator.</li> * <li>A series of batches, without a selection vector, if the sort spills to * disk. In this case, the downstream operator will still be a selection vector - * remover, but there is nothing for that operator to remove. Each batch is - * of the size set by {@link #MAX_MERGED_BATCH_SIZE}.</li> + * remover, but there is nothing for that operator to remove. * </ul> * Note that, even in the in-memory sort case, this operator could do the copying * to eliminate the extra selection vector remover. That is left as an exercise @@ -375,7 +374,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { // sort may have prematurely exited due to shouldContinue() returning false. - if (! context.shouldContinue()) { + if (!context.getExecutorState().shouldContinue()) { sortState = SortState.DONE; return IterOutcome.STOP; } @@ -440,8 +439,6 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { /** * Handle a new schema from upstream. The ESB is quite limited in its ability * to handle schema changes. - * - * @param upstream the status code from upstream: either OK or OK_NEW_SCHEMA */ private void setupSchema() { @@ -482,6 +479,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> { * <p> * Some Drill code ends up calling close() two or more times. The code * here protects itself from these undesirable semantics. + * </p> */ @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java index 698e32fd2..625d3603d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSortTemplate.java @@ -24,7 +24,7 @@ import javax.inject.Named; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BaseAllocator; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.ops.FragmentContextInterface; +import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.selection.SelectionVector4; @@ -49,7 +49,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable { */ private Queue<Integer> runStarts = Queues.newLinkedBlockingQueue(); - private FragmentContextInterface context; + private FragmentContext context; /** * Controls the maximum size of batches exposed to downstream @@ -57,7 +57,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable { private int desiredRecordBatchCount; @Override - public void setup(final FragmentContextInterface context, final BufferAllocator allocator, final SelectionVector4 vector4, + public void setup(final FragmentContext context, final BufferAllocator allocator, final SelectionVector4 vector4, final VectorContainer hyperBatch, int outputBatchSize, int desiredBatchSize) throws SchemaChangeException{ // we pass in the local hyperBatch since that is where we'll be reading data. Preconditions.checkNotNull(vector4); @@ -162,7 +162,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable { final int totalCount = this.vector4.getTotalCount(); // check if we're cancelled/failed recently - if (!context.shouldContinue()) { + if (!context.getExecutorState().shouldContinue()) { return; } int outIndex = 0; @@ -233,7 +233,7 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable { } } - public abstract void doSetup(@Named("context") FragmentContextInterface context, + public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorContainer incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java index 428f6f87a..8eae8b71d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/MSorter.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.physical.impl.xsort.managed; import org.apache.drill.exec.compile.TemplateClassDefinition; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.ops.FragmentContextInterface; +import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.selection.SelectionVector4; @@ -30,7 +30,7 @@ import org.apache.drill.exec.record.selection.SelectionVector4; */ public interface MSorter { - public void setup(FragmentContextInterface context, BufferAllocator allocator, SelectionVector4 vector4, + public void setup(FragmentContext context, BufferAllocator allocator, SelectionVector4 vector4, VectorContainer hyperBatch, int outputBatchSize, int desiredBatchSize) throws SchemaChangeException; public void sort(); public SelectionVector4 getSV4(); |