aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/physical
diff options
context:
space:
mode:
authorTimothy Chen <tnachen@gmail.com>2014-09-21 23:54:40 -0700
committerSteven Phillips <sphillips@maprtech.com>2014-09-23 22:24:21 -0700
commit8def6e91489455c0ae670f49ef5ba51ef71b31bd (patch)
tree1e53ac7654039b05b6a0531039aa1b317665f45a /exec/java-exec/src/main/java/org/apache/drill/exec/physical
parent9bc71fc54b97b52ac5c7335247b6ca7887045fd2 (diff)
Patch for DRILL-705
Currently only supports partitioning/ordering, not yet preceding or after offsets
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java79
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameBatchCreator.java36
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java268
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java286
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFramer.java44
16 files changed, 746 insertions, 15 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index 48b38011f..031ab10c5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.physical.config.Trace;
import org.apache.drill.exec.physical.config.UnionAll;
import org.apache.drill.exec.physical.config.UnionExchange;
import org.apache.drill.exec.physical.config.UnorderedReceiver;
+import org.apache.drill.exec.physical.config.WindowPOP;
public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> implements PhysicalVisitor<T, X, E> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractPhysicalVisitor.class);
@@ -64,6 +65,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme
}
@Override
+ public T visitWindowFrame(WindowPOP windowFrame, X value) throws E {
+ return visitOp(windowFrame, value);
+ }
+
+ @Override
public T visitProject(Project project, X value) throws E{
return visitOp(project, value);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java
index 2b10e6d0a..6d6c5912b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java
@@ -28,7 +28,7 @@ import com.google.common.collect.Iterators;
* Describes an operator that expects a single child operator as its input.
* @param <T> The type of Exec model supported.
*/
-public abstract class AbstractSingle extends AbstractBase{
+public abstract class AbstractSingle extends AbstractBase {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSingle.class);
protected final PhysicalOperator child;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
index 8f5139029..a5518caf6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
@@ -37,7 +37,7 @@ import com.fasterxml.jackson.annotation.ObjectIdGenerators;
@JsonPropertyOrder({ "@id" })
@JsonIdentityInfo(generator = ObjectIdGenerators.PropertyGenerator.class, property = "@id")
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "pop")
-public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
+public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
/**
* Describes whether or not a particular physical operator can actually be executed. Most physical operators can be
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index 8da06cbb8..f9a9c21af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.physical.config.Trace;
import org.apache.drill.exec.physical.config.UnionAll;
import org.apache.drill.exec.physical.config.UnionExchange;
import org.apache.drill.exec.physical.config.UnorderedReceiver;
+import org.apache.drill.exec.physical.config.WindowPOP;
/**
* Visitor class designed to traversal of a operator tree. Basis for a number of operator manipulations including fragmentation and materialization.
@@ -80,6 +81,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
public RETURN visitScreen(Screen op, EXTRA value) throws EXCEP;
public RETURN visitSingleSender(SingleSender op, EXTRA value) throws EXCEP;
public RETURN visitUnionExchange(UnionExchange op, EXTRA value) throws EXCEP;
+ public RETURN visitWindowFrame(WindowPOP op, EXTRA value) throws EXCEP;
public RETURN visitProducerConsumer(ProducerConsumer op, EXTRA value) throws EXCEP;
public RETURN visitIteratorValidator(IteratorValidator op, EXTRA value) throws EXCEP;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java
new file mode 100644
index 000000000..17738eecd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.config;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.physical.base.AbstractSingle;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.proto.UserBitShared;
+
+@JsonTypeName("window")
+public class WindowPOP extends AbstractSingle {
+
+ private final NamedExpression[] withins;
+ private final NamedExpression[] aggregations;
+ private final long start;
+ private final long end;
+
+ public WindowPOP(@JsonProperty("child") PhysicalOperator child,
+ @JsonProperty("within") NamedExpression[] withins,
+ @JsonProperty("aggregations") NamedExpression[] aggregations,
+ @JsonProperty("start") long start,
+ @JsonProperty("end") long end) {
+ super(child);
+ this.withins = withins;
+ this.aggregations = aggregations;
+ this.start = start;
+ this.end = end;
+ }
+
+ @Override
+ protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+ return new WindowPOP(child, withins, aggregations, start, end);
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return physicalVisitor.visitWindowFrame(this, value);
+ }
+
+ @Override
+ public int getOperatorType() {
+ return UserBitShared.CoreOperatorType.WINDOW_VALUE;
+ }
+
+ public long getStart() {
+ return start;
+ }
+
+ public long getEnd() {
+ return end;
+ }
+
+ public NamedExpression[] getAggregations() {
+ return aggregations;
+ }
+
+ public NamedExpression[] getWithins() {
+ return withins;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
index e6900605f..dae9eaedb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
@@ -35,7 +35,11 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{
private final SelectionVector4 sv4;
public InternalBatch(RecordBatch incoming) {
- switch(incoming.getSchema().getSelectionVectorMode()) {
+ this(incoming, null);
+ }
+
+ public InternalBatch(RecordBatch incoming, VectorWrapper[] ignoreWrappers){
+ switch(incoming.getSchema().getSelectionVectorMode()){
case FOUR_BYTE:
this.sv4 = incoming.getSelectionVector4().createNewWrapperCurrent();
this.sv2 = null;
@@ -49,7 +53,7 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{
this.sv2 = null;
}
this.schema = incoming.getSchema();
- this.container = VectorContainer.getTransferClone(incoming);
+ this.container = VectorContainer.getTransferClone(incoming, ignoreWrappers);
}
public BatchSchema getSchema() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index ced51798f..4d3925ea5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -325,9 +325,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
default:
throw new IllegalStateException();
-
}
-
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index f1fcce0d6..85f664cfc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -76,13 +76,15 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
}
@Override
- protected void doWork() {
+ protected IterOutcome doWork() {
int recordCount = incoming.getRecordCount();
filter.filterBatch(recordCount);
// for (VectorWrapper<?> v : container) {
// ValueVector.Mutator m = v.getValueVector().getMutator();
// m.setValueCount(recordCount);
// }
+
+ return IterOutcome.OK;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index f5bc9f91b..8f4d90f3f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -122,7 +122,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
}
@Override
- protected void doWork() {
+ protected IterOutcome doWork() {
for(TransferPair tp : transfers) {
tp.transfer();
}
@@ -139,6 +139,8 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
limitWithNoSV(recordCount);
}
}
+
+ return IterOutcome.OK;
}
private IterOutcome produceEmptyFirstBatch() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index a1a834052..224753e08 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -70,7 +70,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.sun.codemodel.JExpr;
-public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
+public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class);
private Projector projector;
@@ -133,13 +133,13 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
}
@Override
- protected void doWork() {
+ protected IterOutcome doWork() {
// VectorUtil.showVectorAccessibleContent(incoming, ",");
int incomingRecordCount = incoming.getRecordCount();
if (!doAlloc()) {
outOfMemory = true;
- return;
+ return IterOutcome.OUT_OF_MEMORY;
}
int outputRecords = projector.projectRecords(0, incomingRecordCount, 0);
@@ -160,6 +160,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
if (complexWriters != null) {
container.buildSchema(SelectionVectorMode.NONE);
}
+
+ return IterOutcome.OK;
}
private void handleRemainder() {
@@ -177,7 +179,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
setValueCount(remainingRecordCount);
hasRemainder = false;
remainderIndex = 0;
- for(VectorWrapper<?> v: incoming) {
+ for (VectorWrapper<?> v : incoming) {
v.clear();
}
this.recordCount = remainingRecordCount;
@@ -259,7 +261,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
}
@Override
- protected void setupNewSchema() throws SchemaChangeException{
+ protected void setupNewSchema() throws SchemaChangeException {
this.allocationVectors = Lists.newArrayList();
container.clear();
final List<NamedExpression> exprs = getExpressionList();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 97f3608b2..7178d4c23 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -91,7 +91,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
}
@Override
- protected void doWork() {
+ protected IterOutcome doWork() {
int incomingRecordCount = incoming.getRecordCount();
int copiedRecords = copier.copyRecords(0, incomingRecordCount);
@@ -125,6 +125,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
incomingRecordCount,
incomingRecordCount - remainderIndex,
incoming.getSchema());
+ return IterOutcome.OK;
}
private void handleRemainder() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index 6d909623a..4e644dfed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -100,7 +100,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
* this record batch to a log file.
*/
@Override
- protected void doWork() {
+ protected IterOutcome doWork() {
boolean incomingHasSv2 = incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE;
if (incomingHasSv2) {
@@ -121,6 +121,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
if (incomingHasSv2) {
sv = wrap.getSv2();
}
+ return IterOutcome.OK;
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameBatchCreator.java
new file mode 100644
index 000000000..9b8929f7b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameBatchCreator.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.window;
+
+import com.google.common.collect.Iterables;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.WindowPOP;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.util.List;
+
+public class StreamingWindowFrameBatchCreator implements BatchCreator<WindowPOP> {
+
+ @Override
+ public RecordBatch getBatch(FragmentContext context, WindowPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+ return new StreamingWindowFrameRecordBatch(config, context, Iterables.getOnlyElement(children));
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java
new file mode 100644
index 000000000..2a9208965
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.window;
+
+import com.google.common.collect.Lists;
+import com.sun.codemodel.JExpr;
+import com.sun.codemodel.JVar;
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.compile.sig.GeneratorMapping;
+import org.apache.drill.exec.compile.sig.MappingSet;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.expr.ValueVectorReadExpression;
+import org.apache.drill.exec.expr.ValueVectorWriteExpression;
+import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.WindowPOP;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.ValueVector;
+
+import java.io.IOException;
+import java.util.List;
+
+public class StreamingWindowFrameRecordBatch extends AbstractSingleRecordBatch<WindowPOP> {
+ private StreamingWindowFramer framer;
+
+ public StreamingWindowFrameRecordBatch(WindowPOP popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
+ super(popConfig, context, incoming);
+ }
+
+ @Override
+ protected void setupNewSchema() throws SchemaChangeException {
+ container.clear();
+
+ try {
+ this.framer = createFramer();
+ } catch (ClassTransformationException | IOException ex) {
+ throw new SchemaChangeException("Failed to create framer: " + ex);
+ }
+ }
+
+ private void getIndex(ClassGenerator<StreamingWindowFramer> g) {
+ switch (incoming.getSchema().getSelectionVectorMode()) {
+ case FOUR_BYTE: {
+ JVar var = g.declareClassField("sv4_", g.getModel()._ref(SelectionVector4.class));
+ g.getBlock("setupInterior").assign(var, JExpr.direct("incoming").invoke("getSelectionVector4"));
+ g.getBlock("getVectorIndex")._return(var.invoke("get").arg(JExpr.direct("recordIndex")));
+ return;
+ }
+ case NONE: {
+ g.getBlock("getVectorIndex")._return(JExpr.direct("recordIndex"));
+ return;
+ }
+ case TWO_BYTE: {
+ JVar var = g.declareClassField("sv2_", g.getModel()._ref(SelectionVector2.class));
+ g.getBlock("setupInterior").assign(var, JExpr.direct("incoming").invoke("getSelectionVector2"));
+ g.getBlock("getVectorIndex")._return(var.invoke("get").arg(JExpr.direct("recordIndex")));
+ return;
+ }
+
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ private StreamingWindowFramer createFramer() throws SchemaChangeException, IOException, ClassTransformationException {
+ int configLength = popConfig.getAggregations().length;
+ List<LogicalExpression> valueExprs = Lists.newArrayList();
+ LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getWithins().length];
+
+ ErrorCollector collector = new ErrorCollectorImpl();
+
+ for (int i = 0; i < configLength; i++) {
+ NamedExpression ne = popConfig.getAggregations()[i];
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry());
+ if (expr == null) {
+ continue;
+ }
+
+ final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
+ ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
+ TypedFieldId id = container.add(vector);
+ valueExprs.add(new ValueVectorWriteExpression(id, expr, true));
+ }
+
+ int j = 0;
+ LogicalExpression[] windowExprs = new LogicalExpression[incoming.getSchema().getFieldCount()];
+ // TODO: Should transfer all existing columns instead of copy. Currently this is not easily doable because
+ // we are not processing one entire batch in one iteration, so cannot simply transfer.
+ for (VectorWrapper wrapper : incoming) {
+ ValueVector vv = wrapper.isHyper() ? wrapper.getValueVectors()[0] : wrapper.getValueVector();
+ ValueVector vector = TypeHelper.getNewVector(vv.getField(), oContext.getAllocator());
+ TypedFieldId id = container.add(vector);
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(
+ new ValueVectorReadExpression(new TypedFieldId(vv.getField().getType(), wrapper.isHyper(), j)),
+ incoming,
+ collector,
+ context.getFunctionRegistry());
+ windowExprs[j] = new ValueVectorWriteExpression(id, expr, true);
+ j++;
+ }
+
+ for (int i = 0; i < keyExprs.length; i++) {
+ NamedExpression ne = popConfig.getWithins()[i];
+
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry());
+ if (expr == null) {
+ continue;
+ }
+
+ keyExprs[i] = expr;
+ }
+
+ if (collector.hasErrors()) {
+ throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ }
+
+ final ClassGenerator<StreamingWindowFramer> cg = CodeGenerator.getRoot(StreamingWindowFramer.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+ setupIsSame(cg, keyExprs);
+ setupIsSameFromBatch(cg, keyExprs);
+ addRecordValues(cg, valueExprs.toArray(new LogicalExpression[valueExprs.size()]));
+ outputWindowValues(cg, windowExprs);
+
+ cg.getBlock("resetValues")._return(JExpr.TRUE);
+
+ container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+ getIndex(cg);
+ StreamingWindowFramer agg = context.getImplementationClass(cg);
+ agg.setup(
+ context,
+ incoming,
+ this,
+ StreamingWindowFrameTemplate.UNBOUNDED, StreamingWindowFrameTemplate.CURRENT_ROW
+ );
+ return agg;
+ }
+
+ private final GeneratorMapping IS_SAME_PREV_INTERNAL_BATCH_READ = GeneratorMapping.create("isSameFromBatch", "isSameFromBatch", null, null); // the internal batch changes each time so we need to redo setup.
+ private final GeneratorMapping IS_SAME_PREV = GeneratorMapping.create("setupInterior", "isSameFromBatch", null, null);
+ private final MappingSet ISA_B1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PREV_INTERNAL_BATCH_READ, IS_SAME_PREV_INTERNAL_BATCH_READ);
+ private final MappingSet ISA_B2 = new MappingSet("b2Index", null, "incoming", null, IS_SAME_PREV, IS_SAME_PREV);
+
+ private void setupIsSameFromBatch(ClassGenerator<StreamingWindowFramer> cg, LogicalExpression[] keyExprs) {
+ cg.setMappingSet(ISA_B1);
+ for (LogicalExpression expr : keyExprs) {
+ // first, we rewrite the evaluation stack for each side of the comparison.
+ cg.setMappingSet(ISA_B1);
+ ClassGenerator.HoldingContainer first = cg.addExpr(expr, false);
+ cg.setMappingSet(ISA_B2);
+ ClassGenerator.HoldingContainer second = cg.addExpr(expr, false);
+
+ LogicalExpression fh = FunctionGenerationHelper.getComparator(first, second, context.getFunctionRegistry());
+ ClassGenerator.HoldingContainer out = cg.addExpr(fh, false);
+ cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE);
+ }
+ cg.getEvalBlock()._return(JExpr.TRUE);
+ }
+
+ private final GeneratorMapping IS_SAME = GeneratorMapping.create("setupInterior", "isSame", null, null);
+ private final MappingSet IS_SAME_I1 = new MappingSet("index1", null, IS_SAME, IS_SAME);
+ private final MappingSet IS_SAME_I2 = new MappingSet("index2", null, IS_SAME, IS_SAME);
+
+ private void setupIsSame(ClassGenerator<StreamingWindowFramer> cg, LogicalExpression[] keyExprs) {
+ cg.setMappingSet(IS_SAME_I1);
+ for (LogicalExpression expr : keyExprs) {
+ // first, we rewrite the evaluation stack for each side of the comparison.
+ cg.setMappingSet(IS_SAME_I1);
+ ClassGenerator.HoldingContainer first = cg.addExpr(expr, false);
+ cg.setMappingSet(IS_SAME_I2);
+ ClassGenerator.HoldingContainer second = cg.addExpr(expr, false);
+
+ LogicalExpression fh = FunctionGenerationHelper.getComparator(first, second, context.getFunctionRegistry());
+ ClassGenerator.HoldingContainer out = cg.addExpr(fh, false);
+ cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE);
+ }
+ cg.getEvalBlock()._return(JExpr.TRUE);
+ }
+
+ private final GeneratorMapping EVAL_INSIDE = GeneratorMapping.create("setupInterior", "addRecord", null, null);
+ private final GeneratorMapping EVAL_OUTSIDE = GeneratorMapping.create("setupInterior", "outputRecordValues", "resetValues", "cleanup");
+ private final MappingSet EVAL = new MappingSet("index", "outIndex", EVAL_INSIDE, EVAL_OUTSIDE, EVAL_INSIDE);
+
+ private void addRecordValues(ClassGenerator<StreamingWindowFramer> cg, LogicalExpression[] valueExprs) {
+ cg.setMappingSet(EVAL);
+ for (LogicalExpression ex : valueExprs) {
+ ClassGenerator.HoldingContainer hc = cg.addExpr(ex);
+ cg.getBlock(ClassGenerator.BlockType.EVAL)._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
+ }
+ cg.getBlock(ClassGenerator.BlockType.EVAL)._return(JExpr.TRUE);
+ }
+
+ private final GeneratorMapping OUTPUT_WINDOW_VALUES = GeneratorMapping.create("setupInterior", "outputWindowValues", null, null);
+ private final MappingSet WINDOW_VALUES = new MappingSet("inIndex" /* read index */, "outIndex" /* write index */, "incoming", "outgoing", OUTPUT_WINDOW_VALUES, OUTPUT_WINDOW_VALUES);
+
+ private void outputWindowValues(ClassGenerator<StreamingWindowFramer> cg, LogicalExpression[] valueExprs) {
+ cg.setMappingSet(WINDOW_VALUES);
+ for (int i = 0; i < valueExprs.length; i++) {
+ ClassGenerator.HoldingContainer hc = cg.addExpr(valueExprs[i]);
+ cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
+ }
+ cg.getEvalBlock()._return(JExpr.TRUE);
+ }
+
+ @Override
+ protected IterOutcome doWork() {
+ StreamingWindowFramer.AggOutcome out = framer.doWork();
+
+ while (out == StreamingWindowFramer.AggOutcome.UPDATE_AGGREGATOR) {
+ framer = null;
+ try {
+ setupNewSchema();
+ } catch (SchemaChangeException e) {
+ return IterOutcome.STOP;
+ }
+ out = framer.doWork();
+ }
+
+ if (out == StreamingWindowFramer.AggOutcome.RETURN_AND_COMPLETE) {
+ done = true;
+ }
+
+ return framer.getOutcome();
+ }
+
+ @Override
+ public int getRecordCount() {
+ return framer.getOutputCount();
+ }
+
+ @Override
+ public void cleanup() {
+ if (framer != null) {
+ framer.cleanup();
+ }
+ super.cleanup();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java
new file mode 100644
index 000000000..b4e3fed5e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.window;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.aggregate.InternalBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorWrapper;
+
+import javax.inject.Named;
+
+public abstract class StreamingWindowFrameTemplate implements StreamingWindowFramer {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingWindowFramer.class);
+ private static final String TOO_BIG_ERROR = "Couldn't add value to an empty batch. This likely means that a single value is too long for a varlen field.";
+ private static final boolean EXTRA_DEBUG = false;
+ public static final int UNBOUNDED = -1;
+ public static final int CURRENT_ROW = 0;
+ private boolean first = true;
+ private int previousIndex = 0;
+ private int underlyingIndex = -1;
+ private int currentIndex;
+ private boolean pendingOutput = false;
+ private RecordBatch.IterOutcome outcome;
+ private int outputCount = 0;
+ private RecordBatch incoming;
+ private RecordBatch outgoing;
+ private FragmentContext context;
+ private InternalBatch previousBatch = null;
+ private int precedingConfig = UNBOUNDED;
+ private int followingConfig = CURRENT_ROW;
+
+
+ @Override
+ public void setup(FragmentContext context,
+ RecordBatch incoming,
+ RecordBatch outgoing,
+ int precedingConfig,
+ int followingConfig) throws SchemaChangeException {
+ this.context = context;
+ this.incoming = incoming;
+ this.outgoing = outgoing;
+ this.precedingConfig = precedingConfig;
+ this.followingConfig = followingConfig;
+
+ setupInterior(incoming, outgoing);
+ }
+
+
+ private void allocateOutgoing() {
+ for (VectorWrapper<?> w : outgoing){
+ w.getValueVector().allocateNew();
+ }
+ }
+
+ @Override
+ public RecordBatch.IterOutcome getOutcome() {
+ return outcome;
+ }
+
+ @Override
+ public int getOutputCount() {
+ return outputCount;
+ }
+
+ private AggOutcome tooBigFailure() {
+ context.fail(new Exception(TOO_BIG_ERROR));
+ this.outcome = RecordBatch.IterOutcome.STOP;
+ return AggOutcome.RETURN_OUTCOME;
+ }
+
+ @Override
+ public AggOutcome doWork() {
+ // if we're in the first state, allocate outgoing.
+ try {
+ if (first) {
+ allocateOutgoing();
+ }
+
+ // setup for new output and pick any remainder.
+ if (pendingOutput) {
+ allocateOutgoing();
+ pendingOutput = false;
+ outputToBatch(previousIndex);
+ }
+
+ boolean recordsAdded = false;
+
+ outside: while (true) {
+ if (EXTRA_DEBUG) {
+ logger.trace("Looping from underlying index {}, previous {}, current {}", underlyingIndex, previousIndex, currentIndex);
+ logger.debug("Processing {} records in window framer", incoming.getRecordCount());
+ }
+ // loop through existing records, adding as necessary.
+ while(incIndex()) {
+ if (previousBatch != null) {
+ boolean isSameFromBatch = isSameFromBatch(previousIndex, previousBatch, currentIndex);
+ if (EXTRA_DEBUG) {
+ logger.trace("Same as previous batch: {}, previous index {}, current index {}", isSameFromBatch, previousIndex, currentIndex);
+ }
+
+ if(!isSameFromBatch) {
+ resetValues();
+ }
+ previousBatch.clear();
+ previousBatch = null;
+ } else if (!isSame(previousIndex, currentIndex)) {
+ resetValues();
+ }
+
+ addRecord(currentIndex);
+
+ if (!outputToBatch(currentIndex)) {
+ if (outputCount == 0) {
+ return tooBigFailure();
+ }
+
+ // mark the pending output but move forward for the next cycle.
+ pendingOutput = true;
+ incIndex();
+ return setOkAndReturn();
+ }
+
+ recordsAdded = true;
+ }
+
+ if (EXTRA_DEBUG) {
+ logger.debug("Exit Loop from underlying index {}, previous {}, current {}", underlyingIndex, previousIndex, currentIndex);
+ }
+
+ previousBatch = new InternalBatch(incoming);
+
+ while (true) {
+ RecordBatch.IterOutcome out = incoming.next();
+ switch (out) {
+ case NONE:
+ outcome = innerOutcome(out, recordsAdded);
+ if (EXTRA_DEBUG) {
+ logger.trace("Received IterOutcome of {}, assigning {} outcome", out, outcome);
+ }
+ return AggOutcome.RETURN_AND_COMPLETE;
+ case NOT_YET:
+ outcome = innerOutcome(out, recordsAdded);
+ if (EXTRA_DEBUG) {
+ logger.trace("Received IterOutcome of {}, assigning {} outcome", out, outcome);
+ }
+ return AggOutcome.RETURN_OUTCOME;
+
+ case OK_NEW_SCHEMA:
+ if (EXTRA_DEBUG) {
+ logger.trace("Received new schema. Batch has {} records.", incoming.getRecordCount());
+ }
+ resetIndex();
+ return AggOutcome.UPDATE_AGGREGATOR;
+
+ case OK:
+ if (EXTRA_DEBUG) {
+ logger.trace("Received OK with {} records.", incoming.getRecordCount());
+ }
+ resetIndex();
+ if (incoming.getRecordCount() == 0) {
+ continue;
+ } else {
+ continue outside;
+ }
+ case STOP:
+ default:
+ outcome = out;
+ if (EXTRA_DEBUG) {
+ logger.trace("Stop received.", incoming.getRecordCount());
+ }
+ return AggOutcome.RETURN_OUTCOME;
+ }
+ }
+ }
+ } finally {
+ first = false;
+ }
+ }
+
+ private RecordBatch.IterOutcome innerOutcome(RecordBatch.IterOutcome innerOutcome, boolean newRecordsAdded) {
+ if(newRecordsAdded) {
+ setOkAndReturn();
+ return outcome;
+ }
+ return innerOutcome;
+ }
+
+
+ private final boolean incIndex() {
+ underlyingIndex++;
+
+ if(currentIndex != -1) {
+ previousIndex = currentIndex;
+ }
+
+ if (underlyingIndex >= incoming.getRecordCount()) {
+ return false;
+ }
+
+ currentIndex = getVectorIndex(underlyingIndex);
+ return true;
+ }
+
+ private final void resetIndex() {
+ underlyingIndex = -1;
+ currentIndex = getVectorIndex(underlyingIndex);
+ if (EXTRA_DEBUG) {
+ logger.trace("Reset new indexes: underlying {}, previous {}, current {}", underlyingIndex, previousIndex, currentIndex);
+ }
+ }
+
+ private final AggOutcome setOkAndReturn() {
+ if (first) {
+ this.outcome = RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+ } else {
+ this.outcome = RecordBatch.IterOutcome.OK;
+ }
+
+ if (EXTRA_DEBUG) {
+ logger.debug("Setting output count {}", outputCount);
+ }
+ for (VectorWrapper<?> v : outgoing) {
+ v.getValueVector().getMutator().setValueCount(outputCount);
+ }
+ return AggOutcome.RETURN_OUTCOME;
+ }
+
+ private final boolean outputToBatch(int inIndex) {
+ boolean success = outputRecordValues(outputCount)
+ && outputWindowValues(inIndex, outputCount);
+
+ if (success) {
+ outputCount++;
+ }
+
+ return success;
+ }
+
+ @Override
+ public void cleanup() {
+ if(previousBatch != null) {
+ previousBatch.clear();
+ previousBatch = null;
+ }
+ }
+
+ public abstract void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;
+
+ /**
+ * Compares withins from two indexes in the same batch
+ * @param index1 First record index
+ * @param index2 Second record index
+ * @return does within value match
+ */
+ public abstract boolean isSame(@Named("index1") int index1, @Named("index2") int index2);
+ /**
+ * Compares withins from one index of given batch (typically previous completed batch), and one index from current batch
+ * @param b1Index First record index
+ * @param index2 Second record index
+ * @return does within value match
+ */
+ public abstract boolean isSameFromBatch(@Named("b1Index") int b1Index, @Named("b1") InternalBatch b1, @Named("b2Index") int index2);
+ public abstract void addRecord(@Named("index") int index);
+ public abstract boolean outputRecordValues(@Named("outIndex") int outIndex);
+ public abstract boolean outputWindowValues(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+ public abstract int getVectorIndex(@Named("recordIndex") int recordIndex);
+
+ public abstract boolean resetValues();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFramer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFramer.java
new file mode 100644
index 000000000..9588ceffa
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFramer.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.window;
+
+import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+
+public interface StreamingWindowFramer {
+ public static TemplateClassDefinition<StreamingWindowFramer> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(StreamingWindowFramer.class, StreamingWindowFrameTemplate.class);
+
+
+ public static enum AggOutcome {
+ RETURN_OUTCOME, UPDATE_AGGREGATOR, RETURN_AND_COMPLETE;
+ }
+
+ public abstract void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing,
+ int precedingConfig, int followingConfig) throws SchemaChangeException;
+
+ public abstract RecordBatch.IterOutcome getOutcome();
+
+ public abstract int getOutputCount();
+
+ public abstract AggOutcome doWork();
+
+ public abstract void cleanup();
+}