diff options
author | Timothy Chen <tnachen@gmail.com> | 2014-09-21 23:54:40 -0700 |
---|---|---|
committer | Steven Phillips <sphillips@maprtech.com> | 2014-09-23 22:24:21 -0700 |
commit | 8def6e91489455c0ae670f49ef5ba51ef71b31bd (patch) | |
tree | 1e53ac7654039b05b6a0531039aa1b317665f45a /exec/java-exec/src/main/java/org/apache/drill/exec/physical | |
parent | 9bc71fc54b97b52ac5c7335247b6ca7887045fd2 (diff) |
Patch for DRILL-705
Currently only supports partitioning/ordering, not yet preceding or
after offsets
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical')
16 files changed, 746 insertions, 15 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java index 48b38011f..031ab10c5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java @@ -39,6 +39,7 @@ import org.apache.drill.exec.physical.config.Trace; import org.apache.drill.exec.physical.config.UnionAll; import org.apache.drill.exec.physical.config.UnionExchange; import org.apache.drill.exec.physical.config.UnorderedReceiver; +import org.apache.drill.exec.physical.config.WindowPOP; public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> implements PhysicalVisitor<T, X, E> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractPhysicalVisitor.class); @@ -64,6 +65,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme } @Override + public T visitWindowFrame(WindowPOP windowFrame, X value) throws E { + return visitOp(windowFrame, value); + } + + @Override public T visitProject(Project project, X value) throws E{ return visitOp(project, value); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java index 2b10e6d0a..6d6c5912b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java @@ -28,7 +28,7 @@ import com.google.common.collect.Iterators; * Describes an operator that expects a single child operator as its input. * @param <T> The type of Exec model supported. */ -public abstract class AbstractSingle extends AbstractBase{ +public abstract class AbstractSingle extends AbstractBase { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSingle.class); protected final PhysicalOperator child; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java index 8f5139029..a5518caf6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java @@ -37,7 +37,7 @@ import com.fasterxml.jackson.annotation.ObjectIdGenerators; @JsonPropertyOrder({ "@id" }) @JsonIdentityInfo(generator = ObjectIdGenerators.PropertyGenerator.class, property = "@id") @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "pop") -public interface PhysicalOperator extends GraphValue<PhysicalOperator> { +public interface PhysicalOperator extends GraphValue<PhysicalOperator> { /** * Describes whether or not a particular physical operator can actually be executed. Most physical operators can be diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java index 8da06cbb8..f9a9c21af 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java @@ -39,6 +39,7 @@ import org.apache.drill.exec.physical.config.Trace; import org.apache.drill.exec.physical.config.UnionAll; import org.apache.drill.exec.physical.config.UnionExchange; import org.apache.drill.exec.physical.config.UnorderedReceiver; +import org.apache.drill.exec.physical.config.WindowPOP; /** * Visitor class designed to traversal of a operator tree. Basis for a number of operator manipulations including fragmentation and materialization. @@ -80,6 +81,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> { public RETURN visitScreen(Screen op, EXTRA value) throws EXCEP; public RETURN visitSingleSender(SingleSender op, EXTRA value) throws EXCEP; public RETURN visitUnionExchange(UnionExchange op, EXTRA value) throws EXCEP; + public RETURN visitWindowFrame(WindowPOP op, EXTRA value) throws EXCEP; public RETURN visitProducerConsumer(ProducerConsumer op, EXTRA value) throws EXCEP; public RETURN visitIteratorValidator(IteratorValidator op, EXTRA value) throws EXCEP; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java new file mode 100644 index 000000000..17738eecd --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.config; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.common.logical.data.NamedExpression; +import org.apache.drill.exec.physical.base.AbstractSingle; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.proto.UserBitShared; + +@JsonTypeName("window") +public class WindowPOP extends AbstractSingle { + + private final NamedExpression[] withins; + private final NamedExpression[] aggregations; + private final long start; + private final long end; + + public WindowPOP(@JsonProperty("child") PhysicalOperator child, + @JsonProperty("within") NamedExpression[] withins, + @JsonProperty("aggregations") NamedExpression[] aggregations, + @JsonProperty("start") long start, + @JsonProperty("end") long end) { + super(child); + this.withins = withins; + this.aggregations = aggregations; + this.start = start; + this.end = end; + } + + @Override + protected PhysicalOperator getNewWithChild(PhysicalOperator child) { + return new WindowPOP(child, withins, aggregations, start, end); + } + + @Override + public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { + return physicalVisitor.visitWindowFrame(this, value); + } + + @Override + public int getOperatorType() { + return UserBitShared.CoreOperatorType.WINDOW_VALUE; + } + + public long getStart() { + return start; + } + + public long getEnd() { + return end; + } + + public NamedExpression[] getAggregations() { + return aggregations; + } + + public NamedExpression[] getWithins() { + return withins; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java index e6900605f..dae9eaedb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java @@ -35,7 +35,11 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{ private final SelectionVector4 sv4; public InternalBatch(RecordBatch incoming) { - switch(incoming.getSchema().getSelectionVectorMode()) { + this(incoming, null); + } + + public InternalBatch(RecordBatch incoming, VectorWrapper[] ignoreWrappers){ + switch(incoming.getSchema().getSelectionVectorMode()){ case FOUR_BYTE: this.sv4 = incoming.getSelectionVector4().createNewWrapperCurrent(); this.sv2 = null; @@ -49,7 +53,7 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{ this.sv2 = null; } this.schema = incoming.getSchema(); - this.container = VectorContainer.getTransferClone(incoming); + this.container = VectorContainer.getTransferClone(incoming, ignoreWrappers); } public BatchSchema getSchema() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index ced51798f..4d3925ea5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -325,9 +325,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { default: throw new IllegalStateException(); - } - } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java index f1fcce0d6..85f664cfc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java @@ -76,13 +76,15 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ } @Override - protected void doWork() { + protected IterOutcome doWork() { int recordCount = incoming.getRecordCount(); filter.filterBatch(recordCount); // for (VectorWrapper<?> v : container) { // ValueVector.Mutator m = v.getValueVector().getMutator(); // m.setValueCount(recordCount); // } + + return IterOutcome.OK; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java index f5bc9f91b..8f4d90f3f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java @@ -122,7 +122,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { } @Override - protected void doWork() { + protected IterOutcome doWork() { for(TransferPair tp : transfers) { tp.transfer(); } @@ -139,6 +139,8 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { limitWithNoSV(recordCount); } } + + return IterOutcome.OK; } private IterOutcome produceEmptyFirstBatch() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index a1a834052..224753e08 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -70,7 +70,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.sun.codemodel.JExpr; -public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ +public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class); private Projector projector; @@ -133,13 +133,13 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ } @Override - protected void doWork() { + protected IterOutcome doWork() { // VectorUtil.showVectorAccessibleContent(incoming, ","); int incomingRecordCount = incoming.getRecordCount(); if (!doAlloc()) { outOfMemory = true; - return; + return IterOutcome.OUT_OF_MEMORY; } int outputRecords = projector.projectRecords(0, incomingRecordCount, 0); @@ -160,6 +160,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ if (complexWriters != null) { container.buildSchema(SelectionVectorMode.NONE); } + + return IterOutcome.OK; } private void handleRemainder() { @@ -177,7 +179,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ setValueCount(remainingRecordCount); hasRemainder = false; remainderIndex = 0; - for(VectorWrapper<?> v: incoming) { + for (VectorWrapper<?> v : incoming) { v.clear(); } this.recordCount = remainingRecordCount; @@ -259,7 +261,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ } @Override - protected void setupNewSchema() throws SchemaChangeException{ + protected void setupNewSchema() throws SchemaChangeException { this.allocationVectors = Lists.newArrayList(); container.clear(); final List<NamedExpression> exprs = getExpressionList(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java index 97f3608b2..7178d4c23 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java @@ -91,7 +91,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect } @Override - protected void doWork() { + protected IterOutcome doWork() { int incomingRecordCount = incoming.getRecordCount(); int copiedRecords = copier.copyRecords(0, incomingRecordCount); @@ -125,6 +125,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect incomingRecordCount, incomingRecordCount - remainderIndex, incoming.getSchema()); + return IterOutcome.OK; } private void handleRemainder() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java index 6d909623a..4e644dfed 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java @@ -100,7 +100,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> { * this record batch to a log file. */ @Override - protected void doWork() { + protected IterOutcome doWork() { boolean incomingHasSv2 = incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE; if (incomingHasSv2) { @@ -121,6 +121,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> { if (incomingHasSv2) { sv = wrap.getSv2(); } + return IterOutcome.OK; } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameBatchCreator.java new file mode 100644 index 000000000..9b8929f7b --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameBatchCreator.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.impl.window; + +import com.google.common.collect.Iterables; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.WindowPOP; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.record.RecordBatch; + +import java.util.List; + +public class StreamingWindowFrameBatchCreator implements BatchCreator<WindowPOP> { + + @Override + public RecordBatch getBatch(FragmentContext context, WindowPOP config, List<RecordBatch> children) throws ExecutionSetupException { + return new StreamingWindowFrameRecordBatch(config, context, Iterables.getOnlyElement(children)); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java new file mode 100644 index 000000000..2a9208965 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.impl.window; + +import com.google.common.collect.Lists; +import com.sun.codemodel.JExpr; +import com.sun.codemodel.JVar; +import org.apache.drill.common.expression.ErrorCollector; +import org.apache.drill.common.expression.ErrorCollectorImpl; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.logical.data.NamedExpression; +import org.apache.drill.exec.compile.sig.GeneratorMapping; +import org.apache.drill.exec.compile.sig.MappingSet; +import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.expr.CodeGenerator; +import org.apache.drill.exec.expr.ExpressionTreeMaterializer; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.expr.ValueVectorReadExpression; +import org.apache.drill.exec.expr.ValueVectorWriteExpression; +import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; +import org.apache.drill.exec.memory.OutOfMemoryException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.WindowPOP; +import org.apache.drill.exec.record.AbstractSingleRecordBatch; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.vector.ValueVector; + +import java.io.IOException; +import java.util.List; + +public class StreamingWindowFrameRecordBatch extends AbstractSingleRecordBatch<WindowPOP> { + private StreamingWindowFramer framer; + + public StreamingWindowFrameRecordBatch(WindowPOP popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { + super(popConfig, context, incoming); + } + + @Override + protected void setupNewSchema() throws SchemaChangeException { + container.clear(); + + try { + this.framer = createFramer(); + } catch (ClassTransformationException | IOException ex) { + throw new SchemaChangeException("Failed to create framer: " + ex); + } + } + + private void getIndex(ClassGenerator<StreamingWindowFramer> g) { + switch (incoming.getSchema().getSelectionVectorMode()) { + case FOUR_BYTE: { + JVar var = g.declareClassField("sv4_", g.getModel()._ref(SelectionVector4.class)); + g.getBlock("setupInterior").assign(var, JExpr.direct("incoming").invoke("getSelectionVector4")); + g.getBlock("getVectorIndex")._return(var.invoke("get").arg(JExpr.direct("recordIndex"))); + return; + } + case NONE: { + g.getBlock("getVectorIndex")._return(JExpr.direct("recordIndex")); + return; + } + case TWO_BYTE: { + JVar var = g.declareClassField("sv2_", g.getModel()._ref(SelectionVector2.class)); + g.getBlock("setupInterior").assign(var, JExpr.direct("incoming").invoke("getSelectionVector2")); + g.getBlock("getVectorIndex")._return(var.invoke("get").arg(JExpr.direct("recordIndex"))); + return; + } + + default: + throw new IllegalStateException(); + } + } + + private StreamingWindowFramer createFramer() throws SchemaChangeException, IOException, ClassTransformationException { + int configLength = popConfig.getAggregations().length; + List<LogicalExpression> valueExprs = Lists.newArrayList(); + LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getWithins().length]; + + ErrorCollector collector = new ErrorCollectorImpl(); + + for (int i = 0; i < configLength; i++) { + NamedExpression ne = popConfig.getAggregations()[i]; + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry()); + if (expr == null) { + continue; + } + + final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); + ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); + TypedFieldId id = container.add(vector); + valueExprs.add(new ValueVectorWriteExpression(id, expr, true)); + } + + int j = 0; + LogicalExpression[] windowExprs = new LogicalExpression[incoming.getSchema().getFieldCount()]; + // TODO: Should transfer all existing columns instead of copy. Currently this is not easily doable because + // we are not processing one entire batch in one iteration, so cannot simply transfer. + for (VectorWrapper wrapper : incoming) { + ValueVector vv = wrapper.isHyper() ? wrapper.getValueVectors()[0] : wrapper.getValueVector(); + ValueVector vector = TypeHelper.getNewVector(vv.getField(), oContext.getAllocator()); + TypedFieldId id = container.add(vector); + final LogicalExpression expr = ExpressionTreeMaterializer.materialize( + new ValueVectorReadExpression(new TypedFieldId(vv.getField().getType(), wrapper.isHyper(), j)), + incoming, + collector, + context.getFunctionRegistry()); + windowExprs[j] = new ValueVectorWriteExpression(id, expr, true); + j++; + } + + for (int i = 0; i < keyExprs.length; i++) { + NamedExpression ne = popConfig.getWithins()[i]; + + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry()); + if (expr == null) { + continue; + } + + keyExprs[i] = expr; + } + + if (collector.hasErrors()) { + throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } + + final ClassGenerator<StreamingWindowFramer> cg = CodeGenerator.getRoot(StreamingWindowFramer.TEMPLATE_DEFINITION, context.getFunctionRegistry()); + setupIsSame(cg, keyExprs); + setupIsSameFromBatch(cg, keyExprs); + addRecordValues(cg, valueExprs.toArray(new LogicalExpression[valueExprs.size()])); + outputWindowValues(cg, windowExprs); + + cg.getBlock("resetValues")._return(JExpr.TRUE); + + container.buildSchema(BatchSchema.SelectionVectorMode.NONE); + getIndex(cg); + StreamingWindowFramer agg = context.getImplementationClass(cg); + agg.setup( + context, + incoming, + this, + StreamingWindowFrameTemplate.UNBOUNDED, StreamingWindowFrameTemplate.CURRENT_ROW + ); + return agg; + } + + private final GeneratorMapping IS_SAME_PREV_INTERNAL_BATCH_READ = GeneratorMapping.create("isSameFromBatch", "isSameFromBatch", null, null); // the internal batch changes each time so we need to redo setup. + private final GeneratorMapping IS_SAME_PREV = GeneratorMapping.create("setupInterior", "isSameFromBatch", null, null); + private final MappingSet ISA_B1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PREV_INTERNAL_BATCH_READ, IS_SAME_PREV_INTERNAL_BATCH_READ); + private final MappingSet ISA_B2 = new MappingSet("b2Index", null, "incoming", null, IS_SAME_PREV, IS_SAME_PREV); + + private void setupIsSameFromBatch(ClassGenerator<StreamingWindowFramer> cg, LogicalExpression[] keyExprs) { + cg.setMappingSet(ISA_B1); + for (LogicalExpression expr : keyExprs) { + // first, we rewrite the evaluation stack for each side of the comparison. + cg.setMappingSet(ISA_B1); + ClassGenerator.HoldingContainer first = cg.addExpr(expr, false); + cg.setMappingSet(ISA_B2); + ClassGenerator.HoldingContainer second = cg.addExpr(expr, false); + + LogicalExpression fh = FunctionGenerationHelper.getComparator(first, second, context.getFunctionRegistry()); + ClassGenerator.HoldingContainer out = cg.addExpr(fh, false); + cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE); + } + cg.getEvalBlock()._return(JExpr.TRUE); + } + + private final GeneratorMapping IS_SAME = GeneratorMapping.create("setupInterior", "isSame", null, null); + private final MappingSet IS_SAME_I1 = new MappingSet("index1", null, IS_SAME, IS_SAME); + private final MappingSet IS_SAME_I2 = new MappingSet("index2", null, IS_SAME, IS_SAME); + + private void setupIsSame(ClassGenerator<StreamingWindowFramer> cg, LogicalExpression[] keyExprs) { + cg.setMappingSet(IS_SAME_I1); + for (LogicalExpression expr : keyExprs) { + // first, we rewrite the evaluation stack for each side of the comparison. + cg.setMappingSet(IS_SAME_I1); + ClassGenerator.HoldingContainer first = cg.addExpr(expr, false); + cg.setMappingSet(IS_SAME_I2); + ClassGenerator.HoldingContainer second = cg.addExpr(expr, false); + + LogicalExpression fh = FunctionGenerationHelper.getComparator(first, second, context.getFunctionRegistry()); + ClassGenerator.HoldingContainer out = cg.addExpr(fh, false); + cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE); + } + cg.getEvalBlock()._return(JExpr.TRUE); + } + + private final GeneratorMapping EVAL_INSIDE = GeneratorMapping.create("setupInterior", "addRecord", null, null); + private final GeneratorMapping EVAL_OUTSIDE = GeneratorMapping.create("setupInterior", "outputRecordValues", "resetValues", "cleanup"); + private final MappingSet EVAL = new MappingSet("index", "outIndex", EVAL_INSIDE, EVAL_OUTSIDE, EVAL_INSIDE); + + private void addRecordValues(ClassGenerator<StreamingWindowFramer> cg, LogicalExpression[] valueExprs) { + cg.setMappingSet(EVAL); + for (LogicalExpression ex : valueExprs) { + ClassGenerator.HoldingContainer hc = cg.addExpr(ex); + cg.getBlock(ClassGenerator.BlockType.EVAL)._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE); + } + cg.getBlock(ClassGenerator.BlockType.EVAL)._return(JExpr.TRUE); + } + + private final GeneratorMapping OUTPUT_WINDOW_VALUES = GeneratorMapping.create("setupInterior", "outputWindowValues", null, null); + private final MappingSet WINDOW_VALUES = new MappingSet("inIndex" /* read index */, "outIndex" /* write index */, "incoming", "outgoing", OUTPUT_WINDOW_VALUES, OUTPUT_WINDOW_VALUES); + + private void outputWindowValues(ClassGenerator<StreamingWindowFramer> cg, LogicalExpression[] valueExprs) { + cg.setMappingSet(WINDOW_VALUES); + for (int i = 0; i < valueExprs.length; i++) { + ClassGenerator.HoldingContainer hc = cg.addExpr(valueExprs[i]); + cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE); + } + cg.getEvalBlock()._return(JExpr.TRUE); + } + + @Override + protected IterOutcome doWork() { + StreamingWindowFramer.AggOutcome out = framer.doWork(); + + while (out == StreamingWindowFramer.AggOutcome.UPDATE_AGGREGATOR) { + framer = null; + try { + setupNewSchema(); + } catch (SchemaChangeException e) { + return IterOutcome.STOP; + } + out = framer.doWork(); + } + + if (out == StreamingWindowFramer.AggOutcome.RETURN_AND_COMPLETE) { + done = true; + } + + return framer.getOutcome(); + } + + @Override + public int getRecordCount() { + return framer.getOutputCount(); + } + + @Override + public void cleanup() { + if (framer != null) { + framer.cleanup(); + } + super.cleanup(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java new file mode 100644 index 000000000..b4e3fed5e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.impl.window; + +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.impl.aggregate.InternalBatch; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorWrapper; + +import javax.inject.Named; + +public abstract class StreamingWindowFrameTemplate implements StreamingWindowFramer { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingWindowFramer.class); + private static final String TOO_BIG_ERROR = "Couldn't add value to an empty batch. This likely means that a single value is too long for a varlen field."; + private static final boolean EXTRA_DEBUG = false; + public static final int UNBOUNDED = -1; + public static final int CURRENT_ROW = 0; + private boolean first = true; + private int previousIndex = 0; + private int underlyingIndex = -1; + private int currentIndex; + private boolean pendingOutput = false; + private RecordBatch.IterOutcome outcome; + private int outputCount = 0; + private RecordBatch incoming; + private RecordBatch outgoing; + private FragmentContext context; + private InternalBatch previousBatch = null; + private int precedingConfig = UNBOUNDED; + private int followingConfig = CURRENT_ROW; + + + @Override + public void setup(FragmentContext context, + RecordBatch incoming, + RecordBatch outgoing, + int precedingConfig, + int followingConfig) throws SchemaChangeException { + this.context = context; + this.incoming = incoming; + this.outgoing = outgoing; + this.precedingConfig = precedingConfig; + this.followingConfig = followingConfig; + + setupInterior(incoming, outgoing); + } + + + private void allocateOutgoing() { + for (VectorWrapper<?> w : outgoing){ + w.getValueVector().allocateNew(); + } + } + + @Override + public RecordBatch.IterOutcome getOutcome() { + return outcome; + } + + @Override + public int getOutputCount() { + return outputCount; + } + + private AggOutcome tooBigFailure() { + context.fail(new Exception(TOO_BIG_ERROR)); + this.outcome = RecordBatch.IterOutcome.STOP; + return AggOutcome.RETURN_OUTCOME; + } + + @Override + public AggOutcome doWork() { + // if we're in the first state, allocate outgoing. + try { + if (first) { + allocateOutgoing(); + } + + // setup for new output and pick any remainder. + if (pendingOutput) { + allocateOutgoing(); + pendingOutput = false; + outputToBatch(previousIndex); + } + + boolean recordsAdded = false; + + outside: while (true) { + if (EXTRA_DEBUG) { + logger.trace("Looping from underlying index {}, previous {}, current {}", underlyingIndex, previousIndex, currentIndex); + logger.debug("Processing {} records in window framer", incoming.getRecordCount()); + } + // loop through existing records, adding as necessary. + while(incIndex()) { + if (previousBatch != null) { + boolean isSameFromBatch = isSameFromBatch(previousIndex, previousBatch, currentIndex); + if (EXTRA_DEBUG) { + logger.trace("Same as previous batch: {}, previous index {}, current index {}", isSameFromBatch, previousIndex, currentIndex); + } + + if(!isSameFromBatch) { + resetValues(); + } + previousBatch.clear(); + previousBatch = null; + } else if (!isSame(previousIndex, currentIndex)) { + resetValues(); + } + + addRecord(currentIndex); + + if (!outputToBatch(currentIndex)) { + if (outputCount == 0) { + return tooBigFailure(); + } + + // mark the pending output but move forward for the next cycle. + pendingOutput = true; + incIndex(); + return setOkAndReturn(); + } + + recordsAdded = true; + } + + if (EXTRA_DEBUG) { + logger.debug("Exit Loop from underlying index {}, previous {}, current {}", underlyingIndex, previousIndex, currentIndex); + } + + previousBatch = new InternalBatch(incoming); + + while (true) { + RecordBatch.IterOutcome out = incoming.next(); + switch (out) { + case NONE: + outcome = innerOutcome(out, recordsAdded); + if (EXTRA_DEBUG) { + logger.trace("Received IterOutcome of {}, assigning {} outcome", out, outcome); + } + return AggOutcome.RETURN_AND_COMPLETE; + case NOT_YET: + outcome = innerOutcome(out, recordsAdded); + if (EXTRA_DEBUG) { + logger.trace("Received IterOutcome of {}, assigning {} outcome", out, outcome); + } + return AggOutcome.RETURN_OUTCOME; + + case OK_NEW_SCHEMA: + if (EXTRA_DEBUG) { + logger.trace("Received new schema. Batch has {} records.", incoming.getRecordCount()); + } + resetIndex(); + return AggOutcome.UPDATE_AGGREGATOR; + + case OK: + if (EXTRA_DEBUG) { + logger.trace("Received OK with {} records.", incoming.getRecordCount()); + } + resetIndex(); + if (incoming.getRecordCount() == 0) { + continue; + } else { + continue outside; + } + case STOP: + default: + outcome = out; + if (EXTRA_DEBUG) { + logger.trace("Stop received.", incoming.getRecordCount()); + } + return AggOutcome.RETURN_OUTCOME; + } + } + } + } finally { + first = false; + } + } + + private RecordBatch.IterOutcome innerOutcome(RecordBatch.IterOutcome innerOutcome, boolean newRecordsAdded) { + if(newRecordsAdded) { + setOkAndReturn(); + return outcome; + } + return innerOutcome; + } + + + private final boolean incIndex() { + underlyingIndex++; + + if(currentIndex != -1) { + previousIndex = currentIndex; + } + + if (underlyingIndex >= incoming.getRecordCount()) { + return false; + } + + currentIndex = getVectorIndex(underlyingIndex); + return true; + } + + private final void resetIndex() { + underlyingIndex = -1; + currentIndex = getVectorIndex(underlyingIndex); + if (EXTRA_DEBUG) { + logger.trace("Reset new indexes: underlying {}, previous {}, current {}", underlyingIndex, previousIndex, currentIndex); + } + } + + private final AggOutcome setOkAndReturn() { + if (first) { + this.outcome = RecordBatch.IterOutcome.OK_NEW_SCHEMA; + } else { + this.outcome = RecordBatch.IterOutcome.OK; + } + + if (EXTRA_DEBUG) { + logger.debug("Setting output count {}", outputCount); + } + for (VectorWrapper<?> v : outgoing) { + v.getValueVector().getMutator().setValueCount(outputCount); + } + return AggOutcome.RETURN_OUTCOME; + } + + private final boolean outputToBatch(int inIndex) { + boolean success = outputRecordValues(outputCount) + && outputWindowValues(inIndex, outputCount); + + if (success) { + outputCount++; + } + + return success; + } + + @Override + public void cleanup() { + if(previousBatch != null) { + previousBatch.clear(); + previousBatch = null; + } + } + + public abstract void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException; + + /** + * Compares withins from two indexes in the same batch + * @param index1 First record index + * @param index2 Second record index + * @return does within value match + */ + public abstract boolean isSame(@Named("index1") int index1, @Named("index2") int index2); + /** + * Compares withins from one index of given batch (typically previous completed batch), and one index from current batch + * @param b1Index First record index + * @param index2 Second record index + * @return does within value match + */ + public abstract boolean isSameFromBatch(@Named("b1Index") int b1Index, @Named("b1") InternalBatch b1, @Named("b2Index") int index2); + public abstract void addRecord(@Named("index") int index); + public abstract boolean outputRecordValues(@Named("outIndex") int outIndex); + public abstract boolean outputWindowValues(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); + public abstract int getVectorIndex(@Named("recordIndex") int recordIndex); + + public abstract boolean resetValues(); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFramer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFramer.java new file mode 100644 index 000000000..9588ceffa --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFramer.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.impl.window; + +import org.apache.drill.exec.compile.TemplateClassDefinition; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.record.RecordBatch; + +public interface StreamingWindowFramer { + public static TemplateClassDefinition<StreamingWindowFramer> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(StreamingWindowFramer.class, StreamingWindowFrameTemplate.class); + + + public static enum AggOutcome { + RETURN_OUTCOME, UPDATE_AGGREGATOR, RETURN_AND_COMPLETE; + } + + public abstract void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, + int precedingConfig, int followingConfig) throws SchemaChangeException; + + public abstract RecordBatch.IterOutcome getOutcome(); + + public abstract int getOutputCount(); + + public abstract AggOutcome doWork(); + + public abstract void cleanup(); +} |