aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec
diff options
context:
space:
mode:
authorTimothy Chen <tnachen@gmail.com>2014-09-21 23:54:40 -0700
committerSteven Phillips <sphillips@maprtech.com>2014-09-23 22:24:21 -0700
commit8def6e91489455c0ae670f49ef5ba51ef71b31bd (patch)
tree1e53ac7654039b05b6a0531039aa1b317665f45a /exec/java-exec
parent9bc71fc54b97b52ac5c7335247b6ca7887045fd2 (diff)
Patch for DRILL-705
Currently only supports partitioning/ordering, not yet preceding or after offsets
Diffstat (limited to 'exec/java-exec')
-rw-r--r--exec/java-exec/src/main/codegen/data/AggrTypes1.tdd6
-rw-r--r--exec/java-exec/src/main/codegen/templates/TypeHelper.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java27
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java79
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameBatchCreator.java36
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java268
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java286
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFramer.java44
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillWindowRelBase.java41
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWindowRel.java111
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWindowRule.java52
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java19
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamingWindowPrel.java136
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamingWindowPrule.java133
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlAggOperator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java21
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java21
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowSchemasHandler.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerConfig.java47
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillSqlCall.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java15
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDescribeTable.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropView.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowSchemas.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowTables.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java17
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java2
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java1
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java202
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestWindowFunctions.java29
-rw-r--r--exec/java-exec/src/test/resources/window/mediumData.json1000
-rw-r--r--exec/java-exec/src/test/resources/window/oneKeyCount.json43
-rw-r--r--exec/java-exec/src/test/resources/window/oneKeyCountData.json4
-rw-r--r--exec/java-exec/src/test/resources/window/oneKeyCountMultiBatch.json72
-rw-r--r--exec/java-exec/src/test/resources/window/twoKeys.json44
-rw-r--r--exec/java-exec/src/test/resources/window/twoKeysData.json8
66 files changed, 2847 insertions, 124 deletions
diff --git a/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd b/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd
index 812c289e6..5b4041c80 100644
--- a/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd
+++ b/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd
@@ -78,7 +78,7 @@
{inputType: "NullableVarBinary", outputType: "VarBinary", runningType: "VarBinary", major: "VarBytes"}
]
},
- {className: "Sum", funcName: "sum", types: [
+ {className: "Sum", funcName: "sum", types: [
{inputType: "Bit", outputType: "Bit", runningType: "Bit", major: "Numeric"},
{inputType: "Int", outputType: "BigInt", runningType: "BigInt", major: "Numeric"},
{inputType: "BigInt", outputType: "BigInt", runningType: "BigInt", major: "Numeric"},
@@ -96,7 +96,7 @@
{inputType: "Interval", outputType: "Interval", runningType: "Interval", major: "Date", initialValue: "0"},
{inputType: "NullableInterval", outputType: "Interval", runningType: "Interval", major: "Date", initialValue: "0"}
]
- },
+ },
{className: "Count", funcName: "count", types: [
{inputType: "Bit", outputType: "Bit", runningType: "Bit", major: "Numeric"},
{inputType: "Int", outputType: "BigInt", runningType: "BigInt", major: "Numeric"},
@@ -129,4 +129,4 @@
]
}
]
-}
+}
diff --git a/exec/java-exec/src/main/codegen/templates/TypeHelper.java b/exec/java-exec/src/main/codegen/templates/TypeHelper.java
index 3c0d9d3af..cb6a030d6 100644
--- a/exec/java-exec/src/main/codegen/templates/TypeHelper.java
+++ b/exec/java-exec/src/main/codegen/templates/TypeHelper.java
@@ -73,8 +73,7 @@ public class TypeHelper {
case LIST:
return new GenericAccessor(vector);
}
-
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("Unable to find sql accessor for minor type [" + vector.getField().getType().getMinorType() + "] and mode [" + vector.getField().getType().getMode() + "]");
}
public static ValueVector getNewVector(SchemaPath parentPath, String name, BufferAllocator allocator, MajorType type){
@@ -255,7 +254,7 @@ public class TypeHelper {
default:
break;
}
- throw new UnsupportedOperationException();
+ throw new UnsupportedOperationException("Unable to find holder type for minorType: " + type);
}
public static ValueVector getNewVector(MaterializedField field, BufferAllocator allocator){
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
index a5b7beeb3..6280c40b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java
@@ -71,6 +71,9 @@ import com.sun.codemodel.JLabel;
import com.sun.codemodel.JType;
import com.sun.codemodel.JVar;
+/**
+ * Visitor that generates code for eval
+ */
public class EvaluationVisitor {
private final FunctionImplementationRegistry registry;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
index 6ef46de2c..18c507273 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java
@@ -17,6 +17,9 @@
*/
package org.apache.drill.exec.memory;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
@@ -31,9 +34,9 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.util.AssertionUtil;
-import com.google.common.collect.LinkedListMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multimap;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ConcurrentMap;
public class Accountor {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Accountor.class);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index 876ba37af..98202aca7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -17,12 +17,7 @@
*/
package org.apache.drill.exec.opt;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
+import com.google.common.collect.Lists;
import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -40,6 +35,7 @@ import org.apache.drill.common.logical.data.Project;
import org.apache.drill.common.logical.data.Scan;
import org.apache.drill.common.logical.data.SinkOperator;
import org.apache.drill.common.logical.data.Store;
+import org.apache.drill.common.logical.data.Window;
import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.DataMode;
@@ -54,13 +50,18 @@ import org.apache.drill.exec.physical.config.Screen;
import org.apache.drill.exec.physical.config.SelectionVectorRemover;
import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.config.StreamingAggregate;
+import org.apache.drill.exec.physical.config.WindowPOP;
import org.apache.drill.exec.rpc.user.UserServer;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.StoragePlugin;
import org.eigenbase.rel.RelFieldCollation.Direction;
import org.eigenbase.rel.RelFieldCollation.NullDirection;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
public class BasicOptimizer extends Optimizer{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicOptimizer.class);
@@ -183,13 +184,22 @@ public class BasicOptimizer extends Optimizer{
return sa;
}
+ @Override
+ public PhysicalOperator visitWindow(Window window, Object value) throws OptimizerException {
+ PhysicalOperator input = window.getInput().accept(this, value);
+ List<Ordering> ods = Lists.newArrayList();
+
+ input = new Sort(input, ods, false);
+
+ return new WindowPOP(input, window.getWithins(), window.getAggregations(), window.getStart(), window.getEnd());
+ }
@Override
public PhysicalOperator visitOrder(Order order, Object value) throws OptimizerException {
PhysicalOperator input = order.getInput().accept(this, value);
List<Ordering> ods = Lists.newArrayList();
- for(Ordering o : order.getOrderings()){
+ for (Ordering o : order.getOrderings()){
ods.add(o);
}
return new SelectionVectorRemover(new Sort(input, ods, false));
@@ -250,7 +260,6 @@ public class BasicOptimizer extends Optimizer{
@Override
public PhysicalOperator visitProject(Project project, Object obj) throws OptimizerException {
-// return project.getInput().accept(this, obj);
return new org.apache.drill.exec.physical.config.Project(Arrays.asList(project.getSelections()), project.iterator().next().accept(this, obj));
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
index 48b38011f..031ab10c5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.physical.config.Trace;
import org.apache.drill.exec.physical.config.UnionAll;
import org.apache.drill.exec.physical.config.UnionExchange;
import org.apache.drill.exec.physical.config.UnorderedReceiver;
+import org.apache.drill.exec.physical.config.WindowPOP;
public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> implements PhysicalVisitor<T, X, E> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractPhysicalVisitor.class);
@@ -64,6 +65,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme
}
@Override
+ public T visitWindowFrame(WindowPOP windowFrame, X value) throws E {
+ return visitOp(windowFrame, value);
+ }
+
+ @Override
public T visitProject(Project project, X value) throws E{
return visitOp(project, value);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java
index 2b10e6d0a..6d6c5912b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java
@@ -28,7 +28,7 @@ import com.google.common.collect.Iterators;
* Describes an operator that expects a single child operator as its input.
* @param <T> The type of Exec model supported.
*/
-public abstract class AbstractSingle extends AbstractBase{
+public abstract class AbstractSingle extends AbstractBase {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSingle.class);
protected final PhysicalOperator child;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
index 8f5139029..a5518caf6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
@@ -37,7 +37,7 @@ import com.fasterxml.jackson.annotation.ObjectIdGenerators;
@JsonPropertyOrder({ "@id" })
@JsonIdentityInfo(generator = ObjectIdGenerators.PropertyGenerator.class, property = "@id")
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "pop")
-public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
+public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
/**
* Describes whether or not a particular physical operator can actually be executed. Most physical operators can be
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
index 8da06cbb8..f9a9c21af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java
@@ -39,6 +39,7 @@ import org.apache.drill.exec.physical.config.Trace;
import org.apache.drill.exec.physical.config.UnionAll;
import org.apache.drill.exec.physical.config.UnionExchange;
import org.apache.drill.exec.physical.config.UnorderedReceiver;
+import org.apache.drill.exec.physical.config.WindowPOP;
/**
* Visitor class designed to traversal of a operator tree. Basis for a number of operator manipulations including fragmentation and materialization.
@@ -80,6 +81,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> {
public RETURN visitScreen(Screen op, EXTRA value) throws EXCEP;
public RETURN visitSingleSender(SingleSender op, EXTRA value) throws EXCEP;
public RETURN visitUnionExchange(UnionExchange op, EXTRA value) throws EXCEP;
+ public RETURN visitWindowFrame(WindowPOP op, EXTRA value) throws EXCEP;
public RETURN visitProducerConsumer(ProducerConsumer op, EXTRA value) throws EXCEP;
public RETURN visitIteratorValidator(IteratorValidator op, EXTRA value) throws EXCEP;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java
new file mode 100644
index 000000000..17738eecd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.config;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.physical.base.AbstractSingle;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.base.PhysicalVisitor;
+import org.apache.drill.exec.proto.UserBitShared;
+
+@JsonTypeName("window")
+public class WindowPOP extends AbstractSingle {
+
+ private final NamedExpression[] withins;
+ private final NamedExpression[] aggregations;
+ private final long start;
+ private final long end;
+
+ public WindowPOP(@JsonProperty("child") PhysicalOperator child,
+ @JsonProperty("within") NamedExpression[] withins,
+ @JsonProperty("aggregations") NamedExpression[] aggregations,
+ @JsonProperty("start") long start,
+ @JsonProperty("end") long end) {
+ super(child);
+ this.withins = withins;
+ this.aggregations = aggregations;
+ this.start = start;
+ this.end = end;
+ }
+
+ @Override
+ protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
+ return new WindowPOP(child, withins, aggregations, start, end);
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E {
+ return physicalVisitor.visitWindowFrame(this, value);
+ }
+
+ @Override
+ public int getOperatorType() {
+ return UserBitShared.CoreOperatorType.WINDOW_VALUE;
+ }
+
+ public long getStart() {
+ return start;
+ }
+
+ public long getEnd() {
+ return end;
+ }
+
+ public NamedExpression[] getAggregations() {
+ return aggregations;
+ }
+
+ public NamedExpression[] getWithins() {
+ return withins;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
index e6900605f..dae9eaedb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java
@@ -35,7 +35,11 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{
private final SelectionVector4 sv4;
public InternalBatch(RecordBatch incoming) {
- switch(incoming.getSchema().getSelectionVectorMode()) {
+ this(incoming, null);
+ }
+
+ public InternalBatch(RecordBatch incoming, VectorWrapper[] ignoreWrappers){
+ switch(incoming.getSchema().getSelectionVectorMode()){
case FOUR_BYTE:
this.sv4 = incoming.getSelectionVector4().createNewWrapperCurrent();
this.sv2 = null;
@@ -49,7 +53,7 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{
this.sv2 = null;
}
this.schema = incoming.getSchema();
- this.container = VectorContainer.getTransferClone(incoming);
+ this.container = VectorContainer.getTransferClone(incoming, ignoreWrappers);
}
public BatchSchema getSchema() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index ced51798f..4d3925ea5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -325,9 +325,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> {
default:
throw new IllegalStateException();
-
}
-
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index f1fcce0d6..85f664cfc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -76,13 +76,15 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{
}
@Override
- protected void doWork() {
+ protected IterOutcome doWork() {
int recordCount = incoming.getRecordCount();
filter.filterBatch(recordCount);
// for (VectorWrapper<?> v : container) {
// ValueVector.Mutator m = v.getValueVector().getMutator();
// m.setValueCount(recordCount);
// }
+
+ return IterOutcome.OK;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
index f5bc9f91b..8f4d90f3f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java
@@ -122,7 +122,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
}
@Override
- protected void doWork() {
+ protected IterOutcome doWork() {
for(TransferPair tp : transfers) {
tp.transfer();
}
@@ -139,6 +139,8 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> {
limitWithNoSV(recordCount);
}
}
+
+ return IterOutcome.OK;
}
private IterOutcome produceEmptyFirstBatch() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index a1a834052..224753e08 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -70,7 +70,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.sun.codemodel.JExpr;
-public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
+public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class);
private Projector projector;
@@ -133,13 +133,13 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
}
@Override
- protected void doWork() {
+ protected IterOutcome doWork() {
// VectorUtil.showVectorAccessibleContent(incoming, ",");
int incomingRecordCount = incoming.getRecordCount();
if (!doAlloc()) {
outOfMemory = true;
- return;
+ return IterOutcome.OUT_OF_MEMORY;
}
int outputRecords = projector.projectRecords(0, incomingRecordCount, 0);
@@ -160,6 +160,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
if (complexWriters != null) {
container.buildSchema(SelectionVectorMode.NONE);
}
+
+ return IterOutcome.OK;
}
private void handleRemainder() {
@@ -177,7 +179,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
setValueCount(remainingRecordCount);
hasRemainder = false;
remainderIndex = 0;
- for(VectorWrapper<?> v: incoming) {
+ for (VectorWrapper<?> v : incoming) {
v.clear();
}
this.recordCount = remainingRecordCount;
@@ -259,7 +261,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
}
@Override
- protected void setupNewSchema() throws SchemaChangeException{
+ protected void setupNewSchema() throws SchemaChangeException {
this.allocationVectors = Lists.newArrayList();
container.clear();
final List<NamedExpression> exprs = getExpressionList();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 97f3608b2..7178d4c23 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -91,7 +91,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
}
@Override
- protected void doWork() {
+ protected IterOutcome doWork() {
int incomingRecordCount = incoming.getRecordCount();
int copiedRecords = copier.copyRecords(0, incomingRecordCount);
@@ -125,6 +125,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect
incomingRecordCount,
incomingRecordCount - remainderIndex,
incoming.getSchema());
+ return IterOutcome.OK;
}
private void handleRemainder() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
index 6d909623a..4e644dfed 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java
@@ -100,7 +100,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
* this record batch to a log file.
*/
@Override
- protected void doWork() {
+ protected IterOutcome doWork() {
boolean incomingHasSv2 = incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE;
if (incomingHasSv2) {
@@ -121,6 +121,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> {
if (incomingHasSv2) {
sv = wrap.getSv2();
}
+ return IterOutcome.OK;
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameBatchCreator.java
new file mode 100644
index 000000000..9b8929f7b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameBatchCreator.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.window;
+
+import com.google.common.collect.Iterables;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.WindowPOP;
+import org.apache.drill.exec.physical.impl.BatchCreator;
+import org.apache.drill.exec.record.RecordBatch;
+
+import java.util.List;
+
+public class StreamingWindowFrameBatchCreator implements BatchCreator<WindowPOP> {
+
+ @Override
+ public RecordBatch getBatch(FragmentContext context, WindowPOP config, List<RecordBatch> children) throws ExecutionSetupException {
+ return new StreamingWindowFrameRecordBatch(config, context, Iterables.getOnlyElement(children));
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java
new file mode 100644
index 000000000..2a9208965
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java
@@ -0,0 +1,268 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.window;
+
+import com.google.common.collect.Lists;
+import com.sun.codemodel.JExpr;
+import com.sun.codemodel.JVar;
+import org.apache.drill.common.expression.ErrorCollector;
+import org.apache.drill.common.expression.ErrorCollectorImpl;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.compile.sig.GeneratorMapping;
+import org.apache.drill.exec.compile.sig.MappingSet;
+import org.apache.drill.exec.exception.ClassTransformationException;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.ClassGenerator;
+import org.apache.drill.exec.expr.CodeGenerator;
+import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.expr.ValueVectorReadExpression;
+import org.apache.drill.exec.expr.ValueVectorWriteExpression;
+import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
+import org.apache.drill.exec.memory.OutOfMemoryException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.config.WindowPOP;
+import org.apache.drill.exec.record.AbstractSingleRecordBatch;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.TypedFieldId;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.exec.record.selection.SelectionVector4;
+import org.apache.drill.exec.vector.ValueVector;
+
+import java.io.IOException;
+import java.util.List;
+
+public class StreamingWindowFrameRecordBatch extends AbstractSingleRecordBatch<WindowPOP> {
+ private StreamingWindowFramer framer;
+
+ public StreamingWindowFrameRecordBatch(WindowPOP popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
+ super(popConfig, context, incoming);
+ }
+
+ @Override
+ protected void setupNewSchema() throws SchemaChangeException {
+ container.clear();
+
+ try {
+ this.framer = createFramer();
+ } catch (ClassTransformationException | IOException ex) {
+ throw new SchemaChangeException("Failed to create framer: " + ex);
+ }
+ }
+
+ private void getIndex(ClassGenerator<StreamingWindowFramer> g) {
+ switch (incoming.getSchema().getSelectionVectorMode()) {
+ case FOUR_BYTE: {
+ JVar var = g.declareClassField("sv4_", g.getModel()._ref(SelectionVector4.class));
+ g.getBlock("setupInterior").assign(var, JExpr.direct("incoming").invoke("getSelectionVector4"));
+ g.getBlock("getVectorIndex")._return(var.invoke("get").arg(JExpr.direct("recordIndex")));
+ return;
+ }
+ case NONE: {
+ g.getBlock("getVectorIndex")._return(JExpr.direct("recordIndex"));
+ return;
+ }
+ case TWO_BYTE: {
+ JVar var = g.declareClassField("sv2_", g.getModel()._ref(SelectionVector2.class));
+ g.getBlock("setupInterior").assign(var, JExpr.direct("incoming").invoke("getSelectionVector2"));
+ g.getBlock("getVectorIndex")._return(var.invoke("get").arg(JExpr.direct("recordIndex")));
+ return;
+ }
+
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ private StreamingWindowFramer createFramer() throws SchemaChangeException, IOException, ClassTransformationException {
+ int configLength = popConfig.getAggregations().length;
+ List<LogicalExpression> valueExprs = Lists.newArrayList();
+ LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getWithins().length];
+
+ ErrorCollector collector = new ErrorCollectorImpl();
+
+ for (int i = 0; i < configLength; i++) {
+ NamedExpression ne = popConfig.getAggregations()[i];
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry());
+ if (expr == null) {
+ continue;
+ }
+
+ final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType());
+ ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
+ TypedFieldId id = container.add(vector);
+ valueExprs.add(new ValueVectorWriteExpression(id, expr, true));
+ }
+
+ int j = 0;
+ LogicalExpression[] windowExprs = new LogicalExpression[incoming.getSchema().getFieldCount()];
+ // TODO: Should transfer all existing columns instead of copy. Currently this is not easily doable because
+ // we are not processing one entire batch in one iteration, so cannot simply transfer.
+ for (VectorWrapper wrapper : incoming) {
+ ValueVector vv = wrapper.isHyper() ? wrapper.getValueVectors()[0] : wrapper.getValueVector();
+ ValueVector vector = TypeHelper.getNewVector(vv.getField(), oContext.getAllocator());
+ TypedFieldId id = container.add(vector);
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(
+ new ValueVectorReadExpression(new TypedFieldId(vv.getField().getType(), wrapper.isHyper(), j)),
+ incoming,
+ collector,
+ context.getFunctionRegistry());
+ windowExprs[j] = new ValueVectorWriteExpression(id, expr, true);
+ j++;
+ }
+
+ for (int i = 0; i < keyExprs.length; i++) {
+ NamedExpression ne = popConfig.getWithins()[i];
+
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry());
+ if (expr == null) {
+ continue;
+ }
+
+ keyExprs[i] = expr;
+ }
+
+ if (collector.hasErrors()) {
+ throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString());
+ }
+
+ final ClassGenerator<StreamingWindowFramer> cg = CodeGenerator.getRoot(StreamingWindowFramer.TEMPLATE_DEFINITION, context.getFunctionRegistry());
+ setupIsSame(cg, keyExprs);
+ setupIsSameFromBatch(cg, keyExprs);
+ addRecordValues(cg, valueExprs.toArray(new LogicalExpression[valueExprs.size()]));
+ outputWindowValues(cg, windowExprs);
+
+ cg.getBlock("resetValues")._return(JExpr.TRUE);
+
+ container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
+ getIndex(cg);
+ StreamingWindowFramer agg = context.getImplementationClass(cg);
+ agg.setup(
+ context,
+ incoming,
+ this,
+ StreamingWindowFrameTemplate.UNBOUNDED, StreamingWindowFrameTemplate.CURRENT_ROW
+ );
+ return agg;
+ }
+
+ private final GeneratorMapping IS_SAME_PREV_INTERNAL_BATCH_READ = GeneratorMapping.create("isSameFromBatch", "isSameFromBatch", null, null); // the internal batch changes each time so we need to redo setup.
+ private final GeneratorMapping IS_SAME_PREV = GeneratorMapping.create("setupInterior", "isSameFromBatch", null, null);
+ private final MappingSet ISA_B1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PREV_INTERNAL_BATCH_READ, IS_SAME_PREV_INTERNAL_BATCH_READ);
+ private final MappingSet ISA_B2 = new MappingSet("b2Index", null, "incoming", null, IS_SAME_PREV, IS_SAME_PREV);
+
+ private void setupIsSameFromBatch(ClassGenerator<StreamingWindowFramer> cg, LogicalExpression[] keyExprs) {
+ cg.setMappingSet(ISA_B1);
+ for (LogicalExpression expr : keyExprs) {
+ // first, we rewrite the evaluation stack for each side of the comparison.
+ cg.setMappingSet(ISA_B1);
+ ClassGenerator.HoldingContainer first = cg.addExpr(expr, false);
+ cg.setMappingSet(ISA_B2);
+ ClassGenerator.HoldingContainer second = cg.addExpr(expr, false);
+
+ LogicalExpression fh = FunctionGenerationHelper.getComparator(first, second, context.getFunctionRegistry());
+ ClassGenerator.HoldingContainer out = cg.addExpr(fh, false);
+ cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE);
+ }
+ cg.getEvalBlock()._return(JExpr.TRUE);
+ }
+
+ private final GeneratorMapping IS_SAME = GeneratorMapping.create("setupInterior", "isSame", null, null);
+ private final MappingSet IS_SAME_I1 = new MappingSet("index1", null, IS_SAME, IS_SAME);
+ private final MappingSet IS_SAME_I2 = new MappingSet("index2", null, IS_SAME, IS_SAME);
+
+ private void setupIsSame(ClassGenerator<StreamingWindowFramer> cg, LogicalExpression[] keyExprs) {
+ cg.setMappingSet(IS_SAME_I1);
+ for (LogicalExpression expr : keyExprs) {
+ // first, we rewrite the evaluation stack for each side of the comparison.
+ cg.setMappingSet(IS_SAME_I1);
+ ClassGenerator.HoldingContainer first = cg.addExpr(expr, false);
+ cg.setMappingSet(IS_SAME_I2);
+ ClassGenerator.HoldingContainer second = cg.addExpr(expr, false);
+
+ LogicalExpression fh = FunctionGenerationHelper.getComparator(first, second, context.getFunctionRegistry());
+ ClassGenerator.HoldingContainer out = cg.addExpr(fh, false);
+ cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE);
+ }
+ cg.getEvalBlock()._return(JExpr.TRUE);
+ }
+
+ private final GeneratorMapping EVAL_INSIDE = GeneratorMapping.create("setupInterior", "addRecord", null, null);
+ private final GeneratorMapping EVAL_OUTSIDE = GeneratorMapping.create("setupInterior", "outputRecordValues", "resetValues", "cleanup");
+ private final MappingSet EVAL = new MappingSet("index", "outIndex", EVAL_INSIDE, EVAL_OUTSIDE, EVAL_INSIDE);
+
+ private void addRecordValues(ClassGenerator<StreamingWindowFramer> cg, LogicalExpression[] valueExprs) {
+ cg.setMappingSet(EVAL);
+ for (LogicalExpression ex : valueExprs) {
+ ClassGenerator.HoldingContainer hc = cg.addExpr(ex);
+ cg.getBlock(ClassGenerator.BlockType.EVAL)._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
+ }
+ cg.getBlock(ClassGenerator.BlockType.EVAL)._return(JExpr.TRUE);
+ }
+
+ private final GeneratorMapping OUTPUT_WINDOW_VALUES = GeneratorMapping.create("setupInterior", "outputWindowValues", null, null);
+ private final MappingSet WINDOW_VALUES = new MappingSet("inIndex" /* read index */, "outIndex" /* write index */, "incoming", "outgoing", OUTPUT_WINDOW_VALUES, OUTPUT_WINDOW_VALUES);
+
+ private void outputWindowValues(ClassGenerator<StreamingWindowFramer> cg, LogicalExpression[] valueExprs) {
+ cg.setMappingSet(WINDOW_VALUES);
+ for (int i = 0; i < valueExprs.length; i++) {
+ ClassGenerator.HoldingContainer hc = cg.addExpr(valueExprs[i]);
+ cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
+ }
+ cg.getEvalBlock()._return(JExpr.TRUE);
+ }
+
+ @Override
+ protected IterOutcome doWork() {
+ StreamingWindowFramer.AggOutcome out = framer.doWork();
+
+ while (out == StreamingWindowFramer.AggOutcome.UPDATE_AGGREGATOR) {
+ framer = null;
+ try {
+ setupNewSchema();
+ } catch (SchemaChangeException e) {
+ return IterOutcome.STOP;
+ }
+ out = framer.doWork();
+ }
+
+ if (out == StreamingWindowFramer.AggOutcome.RETURN_AND_COMPLETE) {
+ done = true;
+ }
+
+ return framer.getOutcome();
+ }
+
+ @Override
+ public int getRecordCount() {
+ return framer.getOutputCount();
+ }
+
+ @Override
+ public void cleanup() {
+ if (framer != null) {
+ framer.cleanup();
+ }
+ super.cleanup();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java
new file mode 100644
index 000000000..b4e3fed5e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.window;
+
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.impl.aggregate.InternalBatch;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.VectorWrapper;
+
+import javax.inject.Named;
+
+public abstract class StreamingWindowFrameTemplate implements StreamingWindowFramer {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingWindowFramer.class);
+ private static final String TOO_BIG_ERROR = "Couldn't add value to an empty batch. This likely means that a single value is too long for a varlen field.";
+ private static final boolean EXTRA_DEBUG = false;
+ public static final int UNBOUNDED = -1;
+ public static final int CURRENT_ROW = 0;
+ private boolean first = true;
+ private int previousIndex = 0;
+ private int underlyingIndex = -1;
+ private int currentIndex;
+ private boolean pendingOutput = false;
+ private RecordBatch.IterOutcome outcome;
+ private int outputCount = 0;
+ private RecordBatch incoming;
+ private RecordBatch outgoing;
+ private FragmentContext context;
+ private InternalBatch previousBatch = null;
+ private int precedingConfig = UNBOUNDED;
+ private int followingConfig = CURRENT_ROW;
+
+
+ @Override
+ public void setup(FragmentContext context,
+ RecordBatch incoming,
+ RecordBatch outgoing,
+ int precedingConfig,
+ int followingConfig) throws SchemaChangeException {
+ this.context = context;
+ this.incoming = incoming;
+ this.outgoing = outgoing;
+ this.precedingConfig = precedingConfig;
+ this.followingConfig = followingConfig;
+
+ setupInterior(incoming, outgoing);
+ }
+
+
+ private void allocateOutgoing() {
+ for (VectorWrapper<?> w : outgoing){
+ w.getValueVector().allocateNew();
+ }
+ }
+
+ @Override
+ public RecordBatch.IterOutcome getOutcome() {
+ return outcome;
+ }
+
+ @Override
+ public int getOutputCount() {
+ return outputCount;
+ }
+
+ private AggOutcome tooBigFailure() {
+ context.fail(new Exception(TOO_BIG_ERROR));
+ this.outcome = RecordBatch.IterOutcome.STOP;
+ return AggOutcome.RETURN_OUTCOME;
+ }
+
+ @Override
+ public AggOutcome doWork() {
+ // if we're in the first state, allocate outgoing.
+ try {
+ if (first) {
+ allocateOutgoing();
+ }
+
+ // setup for new output and pick any remainder.
+ if (pendingOutput) {
+ allocateOutgoing();
+ pendingOutput = false;
+ outputToBatch(previousIndex);
+ }
+
+ boolean recordsAdded = false;
+
+ outside: while (true) {
+ if (EXTRA_DEBUG) {
+ logger.trace("Looping from underlying index {}, previous {}, current {}", underlyingIndex, previousIndex, currentIndex);
+ logger.debug("Processing {} records in window framer", incoming.getRecordCount());
+ }
+ // loop through existing records, adding as necessary.
+ while(incIndex()) {
+ if (previousBatch != null) {
+ boolean isSameFromBatch = isSameFromBatch(previousIndex, previousBatch, currentIndex);
+ if (EXTRA_DEBUG) {
+ logger.trace("Same as previous batch: {}, previous index {}, current index {}", isSameFromBatch, previousIndex, currentIndex);
+ }
+
+ if(!isSameFromBatch) {
+ resetValues();
+ }
+ previousBatch.clear();
+ previousBatch = null;
+ } else if (!isSame(previousIndex, currentIndex)) {
+ resetValues();
+ }
+
+ addRecord(currentIndex);
+
+ if (!outputToBatch(currentIndex)) {
+ if (outputCount == 0) {
+ return tooBigFailure();
+ }
+
+ // mark the pending output but move forward for the next cycle.
+ pendingOutput = true;
+ incIndex();
+ return setOkAndReturn();
+ }
+
+ recordsAdded = true;
+ }
+
+ if (EXTRA_DEBUG) {
+ logger.debug("Exit Loop from underlying index {}, previous {}, current {}", underlyingIndex, previousIndex, currentIndex);
+ }
+
+ previousBatch = new InternalBatch(incoming);
+
+ while (true) {
+ RecordBatch.IterOutcome out = incoming.next();
+ switch (out) {
+ case NONE:
+ outcome = innerOutcome(out, recordsAdded);
+ if (EXTRA_DEBUG) {
+ logger.trace("Received IterOutcome of {}, assigning {} outcome", out, outcome);
+ }
+ return AggOutcome.RETURN_AND_COMPLETE;
+ case NOT_YET:
+ outcome = innerOutcome(out, recordsAdded);
+ if (EXTRA_DEBUG) {
+ logger.trace("Received IterOutcome of {}, assigning {} outcome", out, outcome);
+ }
+ return AggOutcome.RETURN_OUTCOME;
+
+ case OK_NEW_SCHEMA:
+ if (EXTRA_DEBUG) {
+ logger.trace("Received new schema. Batch has {} records.", incoming.getRecordCount());
+ }
+ resetIndex();
+ return AggOutcome.UPDATE_AGGREGATOR;
+
+ case OK:
+ if (EXTRA_DEBUG) {
+ logger.trace("Received OK with {} records.", incoming.getRecordCount());
+ }
+ resetIndex();
+ if (incoming.getRecordCount() == 0) {
+ continue;
+ } else {
+ continue outside;
+ }
+ case STOP:
+ default:
+ outcome = out;
+ if (EXTRA_DEBUG) {
+ logger.trace("Stop received.", incoming.getRecordCount());
+ }
+ return AggOutcome.RETURN_OUTCOME;
+ }
+ }
+ }
+ } finally {
+ first = false;
+ }
+ }
+
+ private RecordBatch.IterOutcome innerOutcome(RecordBatch.IterOutcome innerOutcome, boolean newRecordsAdded) {
+ if(newRecordsAdded) {
+ setOkAndReturn();
+ return outcome;
+ }
+ return innerOutcome;
+ }
+
+
+ private final boolean incIndex() {
+ underlyingIndex++;
+
+ if(currentIndex != -1) {
+ previousIndex = currentIndex;
+ }
+
+ if (underlyingIndex >= incoming.getRecordCount()) {
+ return false;
+ }
+
+ currentIndex = getVectorIndex(underlyingIndex);
+ return true;
+ }
+
+ private final void resetIndex() {
+ underlyingIndex = -1;
+ currentIndex = getVectorIndex(underlyingIndex);
+ if (EXTRA_DEBUG) {
+ logger.trace("Reset new indexes: underlying {}, previous {}, current {}", underlyingIndex, previousIndex, currentIndex);
+ }
+ }
+
+ private final AggOutcome setOkAndReturn() {
+ if (first) {
+ this.outcome = RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+ } else {
+ this.outcome = RecordBatch.IterOutcome.OK;
+ }
+
+ if (EXTRA_DEBUG) {
+ logger.debug("Setting output count {}", outputCount);
+ }
+ for (VectorWrapper<?> v : outgoing) {
+ v.getValueVector().getMutator().setValueCount(outputCount);
+ }
+ return AggOutcome.RETURN_OUTCOME;
+ }
+
+ private final boolean outputToBatch(int inIndex) {
+ boolean success = outputRecordValues(outputCount)
+ && outputWindowValues(inIndex, outputCount);
+
+ if (success) {
+ outputCount++;
+ }
+
+ return success;
+ }
+
+ @Override
+ public void cleanup() {
+ if(previousBatch != null) {
+ previousBatch.clear();
+ previousBatch = null;
+ }
+ }
+
+ public abstract void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException;
+
+ /**
+ * Compares withins from two indexes in the same batch
+ * @param index1 First record index
+ * @param index2 Second record index
+ * @return does within value match
+ */
+ public abstract boolean isSame(@Named("index1") int index1, @Named("index2") int index2);
+ /**
+ * Compares withins from one index of given batch (typically previous completed batch), and one index from current batch
+ * @param b1Index First record index
+ * @param index2 Second record index
+ * @return does within value match
+ */
+ public abstract boolean isSameFromBatch(@Named("b1Index") int b1Index, @Named("b1") InternalBatch b1, @Named("b2Index") int index2);
+ public abstract void addRecord(@Named("index") int index);
+ public abstract boolean outputRecordValues(@Named("outIndex") int outIndex);
+ public abstract boolean outputWindowValues(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex);
+ public abstract int getVectorIndex(@Named("recordIndex") int recordIndex);
+
+ public abstract boolean resetValues();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFramer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFramer.java
new file mode 100644
index 000000000..9588ceffa
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFramer.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.physical.impl.window;
+
+import org.apache.drill.exec.compile.TemplateClassDefinition;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RecordBatch;
+
+public interface StreamingWindowFramer {
+ public static TemplateClassDefinition<StreamingWindowFramer> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(StreamingWindowFramer.class, StreamingWindowFrameTemplate.class);
+
+
+ public static enum AggOutcome {
+ RETURN_OUTCOME, UPDATE_AGGREGATOR, RETURN_AND_COMPLETE;
+ }
+
+ public abstract void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing,
+ int precedingConfig, int followingConfig) throws SchemaChangeException;
+
+ public abstract RecordBatch.IterOutcome getOutcome();
+
+ public abstract int getOutputCount();
+
+ public abstract AggOutcome doWork();
+
+ public abstract void cleanup();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillWindowRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillWindowRelBase.java
new file mode 100644
index 000000000..fcf52ee62
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillWindowRelBase.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner.common;
+
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.WindowRelBase;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.rex.RexLiteral;
+
+import java.util.List;
+
+public class DrillWindowRelBase extends WindowRelBase implements DrillRelNode {
+
+ public DrillWindowRelBase(
+ RelOptCluster cluster,
+ RelTraitSet traits,
+ RelNode child,
+ List<RexLiteral> constants,
+ RelDataType rowType,
+ List<Window> windows) {
+ super(cluster, traits, child, constants, rowType, windows);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
index 3fc3b89d1..a2685ada5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
@@ -24,6 +24,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Store;
import org.apache.drill.exec.physical.base.SubScan;
import org.apache.drill.exec.physical.config.Limit;
+import org.apache.drill.exec.physical.config.WindowPOP;
import org.apache.drill.exec.planner.AbstractOpWrapperVisitor;
import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
@@ -113,6 +114,11 @@ public class StatsCollector {
return null;
}
+ @Override
+ public Void visitWindowFrame(WindowPOP window, Wrapper value) throws RuntimeException {
+ return visitOp(window, value);
+ }
+
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
index 6b0c3b4b4..ee035c635 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java
@@ -82,7 +82,7 @@ public class DrillAggregateRel extends DrillAggregateRelBase implements DrillRel
return builder.build();
}
- private LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillImplementor implementor) {
+ public static LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillImplementor implementor) {
List<LogicalExpression> args = Lists.newArrayList();
for(Integer i : call.getArgList()) {
args.add(new FieldReference(fn.get(i)));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java
index c3b0d00c6..f6c910e34 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java
@@ -25,7 +25,7 @@ import org.eigenbase.relopt.RelOptRuleCall;
import org.eigenbase.relopt.RelTraitSet;
/**
- * This rule converts a SortRel that has either a offset and fetch into a Drill Sort and Limit Rel
+ * This rule converts a SortRel that has either a offset and fetch into a Drill Sort and LimitPOP Rel
*/
public class DrillLimitRule extends RelOptRule {
public static DrillLimitRule INSTANCE = new DrillLimitRule();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
index 082daccbf..fcfced25c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
@@ -27,6 +27,9 @@ import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.planner.physical.PrelUtil.ProjectPushInfo;
import org.eigenbase.rel.ProjectRel;
+import org.eigenbase.rel.ProjectRelBase;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.rules.PushProjector;
import org.eigenbase.rel.rules.RemoveTrivialProjectRule;
import org.eigenbase.relopt.RelOptRule;
import org.eigenbase.relopt.RelOptRuleCall;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java
index 7eca54e9b..7ed7885ff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java
@@ -27,7 +27,7 @@ import org.eigenbase.relopt.Convention;
public interface DrillRel extends DrillRelNode {
/** Calling convention for relational expressions that are "implemented" by
* generating Drill logical plans. */
- Convention DRILL_LOGICAL = new Convention.Impl("LOGICAL", DrillRel.class);
+ public static final Convention DRILL_LOGICAL = new Convention.Impl("LOGICAL", DrillRel.class);
LogicalOperator implement(DrillImplementor implementor);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index dbb85b22d..1d3ce9a2e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -38,6 +38,8 @@ import org.apache.drill.exec.planner.physical.ScreenPrule;
import org.apache.drill.exec.planner.physical.SortConvertPrule;
import org.apache.drill.exec.planner.physical.SortPrule;
import org.apache.drill.exec.planner.physical.StreamAggPrule;
+import org.apache.drill.exec.planner.physical.StreamingWindowPrule;
+import org.apache.drill.exec.planner.physical.StreamingWindowPrule;
import org.apache.drill.exec.planner.physical.UnionAllPrule;
import org.apache.drill.exec.planner.physical.WriterPrule;
import org.eigenbase.rel.RelFactories;
@@ -100,6 +102,7 @@ public class DrillRuleSets {
DrillScanRule.INSTANCE,
DrillFilterRule.INSTANCE,
DrillProjectRule.INSTANCE,
+ DrillWindowRule.INSTANCE,
DrillAggregateRule.INSTANCE,
DrillLimitRule.INSTANCE,
@@ -139,6 +142,8 @@ public class DrillRuleSets {
HashJoinPrule.INSTANCE,
FilterPrule.INSTANCE,
LimitPrule.INSTANCE,
+ WindowPrule.INSTANCE,
+
WriterPrule.INSTANCE,
PushLimitToTopN.INSTANCE
@@ -182,6 +187,7 @@ public class DrillRuleSets {
ruleList.add(FilterPrule.INSTANCE);
ruleList.add(LimitPrule.INSTANCE);
ruleList.add(WriterPrule.INSTANCE);
+ ruleList.add(StreamingWindowPrule.INSTANCE);
ruleList.add(PushLimitToTopN.INSTANCE);
ruleList.add(UnionAllPrule.INSTANCE);
// ruleList.add(UnionDistinctPrule.INSTANCE);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWindowRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWindowRel.java
new file mode 100644
index 000000000..113f98c45
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWindowRel.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner.logical;
+
+import com.google.common.collect.Lists;
+import net.hydromatic.linq4j.Ord;
+import net.hydromatic.optiq.util.BitSets;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.Order;
+import org.apache.drill.exec.planner.common.DrillWindowRelBase;
+import org.eigenbase.rel.AggregateCall;
+import org.eigenbase.rel.RelFieldCollation;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.rex.RexLiteral;
+
+import java.util.List;
+
+public class DrillWindowRel extends DrillWindowRelBase implements DrillRel {
+ /**
+ * Creates a window relational expression.
+ *
+ * @param cluster Cluster
+ * @param traits
+ * @param child Input relational expression
+ * @param rowType Output row type
+ * @param windows Windows
+ */
+ public DrillWindowRel(
+ RelOptCluster cluster,
+ RelTraitSet traits,
+ RelNode child,
+ List<RexLiteral> constants,
+ RelDataType rowType,
+ List<Window> windows) {
+ super(cluster, traits, child, constants, rowType, windows);
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new DrillWindowRel(getCluster(), traitSet, sole(inputs), constants, getRowType(), windows);
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+ final LogicalOperator inputOp = implementor.visitChild(this, 0, getChild());
+ org.apache.drill.common.logical.data.Window.Builder builder = new org.apache.drill.common.logical.data.Window.Builder();
+ final List<String> fields = getRowType().getFieldNames();
+ final List<String> childFields = getChild().getRowType().getFieldNames();
+ for (Window window : windows) {
+
+ for(RelFieldCollation orderKey : window.orderKeys.getFieldCollations()) {
+ builder.addOrdering(new Order.Ordering(orderKey.getDirection(), new FieldReference(fields.get(orderKey.getFieldIndex()))));
+ }
+
+ for (int group : BitSets.toIter(window.groupSet)) {
+ FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN);
+ builder.addWithin(fr, fr);
+ }
+
+ int groupCardinality = window.groupSet.cardinality();
+ for (Ord<AggregateCall> aggCall : Ord.zip(window.getAggregateCalls(this))) {
+ FieldReference ref = new FieldReference(fields.get(groupCardinality + aggCall.i));
+ LogicalExpression expr = toDrill(aggCall.e, childFields);
+ builder.addAggregation(ref, expr);
+ }
+ }
+ builder.setInput(inputOp);
+ org.apache.drill.common.logical.data.Window frame = builder.build();
+ return frame;
+ }
+
+ protected LogicalExpression toDrill(AggregateCall call, List<String> fn) {
+ List<LogicalExpression> args = Lists.newArrayList();
+ for (Integer i : call.getArgList()) {
+ args.add(new FieldReference(fn.get(i)));
+ }
+
+ // for count(1).
+ if (args.isEmpty()) {
+ args.add(new ValueExpressions.LongExpression(1l));
+ }
+ LogicalExpression expr = new FunctionCall(call.getAggregation().getName().toLowerCase(), args, ExpressionPosition.UNKNOWN);
+ return expr;
+ }
+}
+
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWindowRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWindowRule.java
new file mode 100644
index 000000000..847e87aa1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWindowRule.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner.logical;
+
+import com.google.common.collect.Lists;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.WindowRel;
+import org.eigenbase.relopt.Convention;
+import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.rex.RexLiteral;
+
+public class DrillWindowRule extends RelOptRule {
+ public static final RelOptRule INSTANCE = new DrillWindowRule();
+
+ private DrillWindowRule() {
+ super(RelOptHelper.some(WindowRel.class, Convention.NONE, RelOptHelper.any(RelNode.class)), "DrillWindowRule");
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final WindowRel window = call.rel(0);
+ final RelNode input = call.rel(1);
+ final RelTraitSet traits = window.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
+ final RelNode convertedInput = convert(input, traits);
+ call.transformTo(
+ new DrillWindowRel(
+ window.getCluster(),
+ traits,
+ convertedInput,
+ Lists.<RexLiteral>newArrayList(),
+ window.getRowType(),
+ window.windows));
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
index 05fb64a9c..a69188b43 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
@@ -17,21 +17,16 @@
*/
package org.apache.drill.exec.planner.physical;
-import java.util.BitSet;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import net.hydromatic.linq4j.Ord;
import net.hydromatic.optiq.util.BitSets;
-
import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.common.logical.data.NamedExpression;
-import org.apache.drill.exec.planner.logical.DrillParseContext;
import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
import org.eigenbase.rel.AggregateCall;
import org.eigenbase.rel.AggregateRelBase;
@@ -48,8 +43,10 @@ import org.eigenbase.sql.SqlKind;
import org.eigenbase.sql.type.OperandTypes;
import org.eigenbase.sql.type.ReturnTypes;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
public abstract class AggPrelBase extends AggregateRelBase implements Prel {
@@ -130,7 +127,7 @@ public abstract class AggPrelBase extends AggregateRelBase implements Prel {
for (Ord<AggregateCall> aggCall : Ord.zip(aggCalls)) {
int aggExprOrdinal = groupSet.cardinality() + aggCall.i;
FieldReference ref = new FieldReference(fields.get(aggExprOrdinal));
- LogicalExpression expr = toDrill(aggCall.e, childFields, new DrillParseContext());
+ LogicalExpression expr = toDrill(aggCall.e, childFields);
NamedExpression ne = new NamedExpression(expr, ref);
aggExprs.add(ne);
@@ -162,7 +159,7 @@ public abstract class AggPrelBase extends AggregateRelBase implements Prel {
}
}
- protected LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillParseContext pContext) {
+ protected LogicalExpression toDrill(AggregateCall call, List<String> fn) {
List<LogicalExpression> args = Lists.newArrayList();
for (Integer i : call.getArgList()) {
args.add(new FieldReference(fn.get(i)));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
index 5060195bd..6012a5a7a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java
@@ -17,10 +17,6 @@
*/
package org.apache.drill.exec.planner.physical;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.Limit;
import org.apache.drill.exec.planner.common.DrillLimitRelBase;
@@ -32,6 +28,10 @@ import org.eigenbase.relopt.RelTraitSet;
import org.eigenbase.rex.RexLiteral;
import org.eigenbase.rex.RexNode;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
public class LimitPrel extends DrillLimitRelBase implements Prel {
public LimitPrel(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, RexNode offset, RexNode fetch) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamingWindowPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamingWindowPrel.java
new file mode 100644
index 000000000..f1a8bc0b6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamingWindowPrel.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner.physical;
+
+import com.google.common.collect.Lists;
+import net.hydromatic.optiq.util.BitSets;
+import org.apache.drill.common.expression.ExpressionPosition;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.FunctionCall;
+import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.ValueExpressions;
+import org.apache.drill.common.logical.data.NamedExpression;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.WindowPOP;
+import org.apache.drill.exec.planner.common.DrillWindowRelBase;
+import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
+import org.apache.drill.exec.record.BatchSchema;
+import org.eigenbase.rel.AggregateCall;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.rex.RexLiteral;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkState;
+
+public class StreamingWindowPrel extends DrillWindowRelBase implements Prel {
+ public StreamingWindowPrel(RelOptCluster cluster,
+ RelTraitSet traits,
+ RelNode child,
+ List<RexLiteral> constants,
+ RelDataType rowType,
+ Window window) {
+ super(cluster, traits, child, constants, rowType, Collections.singletonList(window));
+ }
+
+ @Override
+ public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
+ return new StreamingWindowPrel(getCluster(), traitSet, sole(inputs), constants, getRowType(), windows.get(0));
+ }
+
+ @Override
+ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
+ Prel child = (Prel) this.getChild();
+
+ PhysicalOperator childPOP = child.getPhysicalOperator(creator);
+
+ final List<String> childFields = getChild().getRowType().getFieldNames();
+
+ checkState(windows.size() == 1, "Only one window is expected in WindowPrel");
+
+ Window window = windows.get(0);
+ List<NamedExpression> withins = Lists.newArrayList();
+ List<NamedExpression> aggs = Lists.newArrayList();
+ for (int group : BitSets.toIter(window.groupSet)) {
+ FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN);
+ withins.add(new NamedExpression(fr, fr));
+ }
+
+ for (AggregateCall aggCall : window.getAggregateCalls(this)) {
+ FieldReference ref = new FieldReference(aggCall.getName());
+ LogicalExpression expr = toDrill(aggCall, childFields);
+ aggs.add(new NamedExpression(expr, ref));
+ }
+
+ WindowPOP windowPOP = new WindowPOP(
+ childPOP,
+ withins.toArray(new NamedExpression[withins.size()]),
+ aggs.toArray(new NamedExpression[aggs.size()]),
+ Long.MIN_VALUE, //TODO: Get first/last to work
+ Long.MIN_VALUE);
+
+ creator.addMetadata(this, windowPOP);
+ return windowPOP;
+ }
+
+ protected LogicalExpression toDrill(AggregateCall call, List<String> fn) {
+ List<LogicalExpression> args = Lists.newArrayList();
+ for (Integer i : call.getArgList()) {
+ args.add(new FieldReference(fn.get(i)));
+ }
+
+ // for count(1).
+ if (args.isEmpty()) {
+ args.add(new ValueExpressions.LongExpression(1l));
+ }
+ LogicalExpression expr = new FunctionCall(call.getAggregation().getName().toLowerCase(), args, ExpressionPosition.UNKNOWN);
+ return expr;
+ }
+
+ @Override
+ public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E {
+ return logicalVisitor.visitPrel(this, value);
+ }
+
+ @Override
+ public BatchSchema.SelectionVectorMode[] getSupportedEncodings() {
+ return BatchSchema.SelectionVectorMode.ALL;
+ }
+
+ @Override
+ public BatchSchema.SelectionVectorMode getEncoding() {
+ return BatchSchema.SelectionVectorMode.NONE;
+ }
+
+ @Override
+ public boolean needsFinalColumnReordering() {
+ return false;
+ }
+
+ @Override
+ public Iterator<Prel> iterator() {
+ return PrelUtil.iter(getChild());
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamingWindowPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamingWindowPrule.java
new file mode 100644
index 000000000..00c20b23e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamingWindowPrule.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner.physical;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import net.hydromatic.linq4j.Ord;
+import net.hydromatic.optiq.util.BitSets;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.logical.DrillWindowRel;
+import org.apache.drill.exec.planner.logical.RelOptHelper;
+import org.eigenbase.rel.RelCollation;
+import org.eigenbase.rel.RelCollationImpl;
+import org.eigenbase.rel.RelFieldCollation;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.WindowRelBase;
+import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeField;
+import org.eigenbase.reltype.RelRecordType;
+import org.eigenbase.sql.SqlAggFunction;
+
+import java.util.List;
+
+public class StreamingWindowPrule extends RelOptRule {
+ public static final RelOptRule INSTANCE = new StreamingWindowPrule();
+
+ private StreamingWindowPrule() {
+ super(RelOptHelper.some(DrillWindowRel.class, DrillRel.DRILL_LOGICAL, RelOptHelper.any(RelNode.class)), "Prel.WindowPrule");
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final DrillWindowRel window = call.rel(0);
+ RelNode input = call.rel(1);
+
+ // TODO: Order window based on existing partition by
+ //input.getTraitSet().subsumes()
+
+ for (final Ord<WindowRelBase.Window> w : Ord.zip(window.windows)) {
+ WindowRelBase.Window windowBase = w.getValue();
+ DrillDistributionTrait distOnAllKeys =
+ new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+ ImmutableList.copyOf(getDistributionFields(windowBase)));
+
+ RelCollation collation = getCollation(windowBase);
+ RelTraitSet traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(distOnAllKeys);
+ final RelNode convertedInput = convert(input, traits);
+
+ List<RelDataTypeField> newRowFields = Lists.newArrayList();
+ for(RelDataTypeField field : convertedInput.getRowType().getFieldList()) {
+ newRowFields.add(field);
+ }
+
+ Iterable<RelDataTypeField> newWindowFields = Iterables.filter(window.getRowType().getFieldList(), new Predicate<RelDataTypeField>() {
+ @Override
+ public boolean apply(RelDataTypeField relDataTypeField) {
+ return relDataTypeField.getName().startsWith("w" + w.i + "$");
+ }
+ });
+
+ for(RelDataTypeField newField : newWindowFields) {
+ newRowFields.add(newField);
+ }
+
+ RelDataType rowType = new RelRecordType(newRowFields);
+
+ List<WindowRelBase.RexWinAggCall> newWinAggCalls = Lists.newArrayList();
+ for(Ord<WindowRelBase.RexWinAggCall> aggOrd : Ord.zip(windowBase.aggCalls)) {
+ WindowRelBase.RexWinAggCall aggCall = aggOrd.getValue();
+ newWinAggCalls.add(new WindowRelBase.RexWinAggCall(
+ (SqlAggFunction)aggCall.getOperator(), aggCall.getType(), aggCall.getOperands(), aggOrd.i)
+ );
+ }
+
+ windowBase = new WindowRelBase.Window(
+ windowBase.groupSet,
+ windowBase.isRows,
+ windowBase.lowerBound,
+ windowBase.upperBound,
+ windowBase.orderKeys,
+ newWinAggCalls
+ );
+
+ input = new StreamingWindowPrel(
+ window.getCluster(),
+ window.getTraitSet().merge(traits),
+ convertedInput,
+ window.getConstants(),
+ rowType,
+ windowBase);
+ }
+
+ call.transformTo(input);
+ }
+
+ private RelCollation getCollation(WindowRelBase.Window window) {
+ List<RelFieldCollation> fields = Lists.newArrayList();
+ for (int group : BitSets.toIter(window.groupSet)) {
+ fields.add(new RelFieldCollation(group));
+ }
+ return RelCollationImpl.of(fields);
+ }
+
+ private List<DrillDistributionTrait.DistributionField> getDistributionFields(WindowRelBase.Window window) {
+ List<DrillDistributionTrait.DistributionField> groupByFields = Lists.newArrayList();
+ for (int group : BitSets.toIter(window.groupSet)) {
+ DrillDistributionTrait.DistributionField field = new DrillDistributionTrait.DistributionField(group);
+ groupByFields.add(field);
+ }
+ return groupByFields;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
index 85a5734f2..97d873c50 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
@@ -17,8 +17,8 @@
*/
package org.apache.drill.exec.planner.sql;
-import java.util.List;
-
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.eigenbase.sql.SqlFunctionCategory;
import org.eigenbase.sql.SqlIdentifier;
@@ -27,8 +27,7 @@ import org.eigenbase.sql.SqlOperatorTable;
import org.eigenbase.sql.SqlSyntax;
import org.eigenbase.sql.fun.SqlStdOperatorTable;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Lists;
+import java.util.List;
public class DrillOperatorTable extends SqlStdOperatorTable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillOperatorTable.class);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlAggOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlAggOperator.java
index 0b8668b38..7ab2e9fb7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlAggOperator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlAggOperator.java
@@ -60,4 +60,4 @@ public class DrillSqlAggOperator extends SqlAggFunction {
public RelDataType getReturnType(RelDataTypeFactory typeFactory) {
return getAny(typeFactory);
}
-} \ No newline at end of file
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index 2238155a9..2de46ee9d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -28,7 +28,6 @@ import net.hydromatic.optiq.tools.Planner;
import net.hydromatic.optiq.tools.RelConversionException;
import net.hydromatic.optiq.tools.RuleSet;
import net.hydromatic.optiq.tools.ValidationException;
-
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.planner.cost.DrillCostBase;
@@ -38,14 +37,19 @@ import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
import org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler;
import org.apache.drill.exec.planner.sql.handlers.ExplainHandler;
import org.apache.drill.exec.planner.sql.handlers.SetOptionHandler;
+import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
import org.apache.drill.exec.planner.sql.parser.DrillSqlCall;
import org.apache.drill.exec.planner.sql.parser.impl.DrillParserWithCompoundIdConverter;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.util.Pointer;
import org.eigenbase.rel.RelCollationTraitDef;
+import org.eigenbase.rel.rules.ReduceExpressionsRule;
+import org.eigenbase.rel.rules.WindowedAggSplitterRule;
import org.eigenbase.relopt.ConventionTraitDef;
import org.eigenbase.relopt.RelOptCostFactory;
import org.eigenbase.relopt.RelTraitDef;
+import org.eigenbase.relopt.hep.HepPlanner;
+import org.eigenbase.relopt.hep.HepProgramBuilder;
import org.eigenbase.sql.SqlNode;
import org.eigenbase.sql.parser.SqlParseException;
@@ -53,6 +57,7 @@ public class DrillSqlWorker {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSqlWorker.class);
private final Planner planner;
+ private final HepPlanner hepPlanner;
public final static int LOGICAL_RULES = 0;
public final static int PHYSICAL_MEM_RULES = 1;
private final QueryContext context;
@@ -79,7 +84,12 @@ public class DrillSqlWorker {
.costFactory(costFactory) //
.build();
this.planner = Frameworks.getPlanner(config);
-
+ HepProgramBuilder builder = new HepProgramBuilder();
+ builder.addRuleClass(ReduceExpressionsRule.class);
+ builder.addRuleClass(WindowedAggSplitterRule.class);
+ this.hepPlanner = new HepPlanner(builder.build());
+ hepPlanner.addRule(ReduceExpressionsRule.CALC_INSTANCE);
+ hepPlanner.addRule(WindowedAggSplitterRule.PROJECT);
}
private RuleSet[] getRules(QueryContext context) {
@@ -99,23 +109,24 @@ public class DrillSqlWorker {
SqlNode sqlNode = planner.parse(sql);
AbstractSqlHandler handler;
+ SqlHandlerConfig config = new SqlHandlerConfig(hepPlanner, planner, context);
// TODO: make this use path scanning or something similar.
switch(sqlNode.getKind()){
case EXPLAIN:
- handler = new ExplainHandler(planner, context);
+ handler = new ExplainHandler(config);
break;
case SET_OPTION:
handler = new SetOptionHandler(context);
break;
case OTHER:
if (sqlNode instanceof DrillSqlCall) {
- handler = ((DrillSqlCall)sqlNode).getSqlHandler(planner, context);
+ handler = ((DrillSqlCall)sqlNode).getSqlHandler(config);
break;
}
// fallthrough
default:
- handler = new DefaultSqlHandler(planner, context, textPlan);
+ handler = new DefaultSqlHandler(config, textPlan);
}
return handler.getPlan(sqlNode);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
index 708951a27..df2f8076d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java
@@ -40,13 +40,14 @@ import org.apache.drill.exec.planner.types.DrillFixedRelDataTypeImpl;
import org.apache.drill.exec.store.AbstractSchema;
import org.eigenbase.rel.RelNode;
import org.eigenbase.relopt.RelOptUtil;
+import org.eigenbase.relopt.hep.HepPlanner;
import org.eigenbase.reltype.RelDataType;
import org.eigenbase.sql.SqlNode;
public class CreateTableHandler extends DefaultSqlHandler {
- public CreateTableHandler(Planner planner, QueryContext context) {
- super(planner, context);
+ public CreateTableHandler(SqlHandlerConfig config) {
+ super(config);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index e63474f13..0bb59bf6a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -60,6 +60,7 @@ import org.apache.drill.exec.util.Pointer;
import org.eigenbase.rel.RelNode;
import org.eigenbase.relopt.RelOptUtil;
import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.relopt.hep.HepPlanner;
import org.eigenbase.sql.SqlExplainLevel;
import org.eigenbase.sql.SqlNode;
@@ -70,19 +71,23 @@ import com.google.common.collect.Lists;
public class DefaultSqlHandler extends AbstractSqlHandler {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DefaultSqlHandler.class);
- protected final Planner planner;
+ protected final SqlHandlerConfig config;
protected final QueryContext context;
+ protected final HepPlanner hepPlanner;
+ protected final Planner planner;
private Pointer<String> textPlan;
private final long targetSliceSize;
- public DefaultSqlHandler(Planner planner, QueryContext context) {
- this(planner, context, null);
+ public DefaultSqlHandler(SqlHandlerConfig config) {
+ this(config, null);
}
- public DefaultSqlHandler(Planner planner, QueryContext context, Pointer<String> textPlan) {
+ public DefaultSqlHandler(SqlHandlerConfig config, Pointer<String> textPlan) {
super();
- this.planner = planner;
- this.context = context;
+ this.planner = config.getPlanner();
+ this.context = config.getContext();
+ this.hepPlanner = config.getHepPlanner();
+ this.config = config;
this.textPlan = textPlan;
targetSliceSize = context.getOptions().getOption(ExecConstants.SLICE_TARGET).num_val;
}
@@ -139,7 +144,9 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
}
protected RelNode convertToRel(SqlNode node) throws RelConversionException {
- return planner.convert(node);
+ RelNode convertedNode = planner.convert(node);
+ hepPlanner.setRoot(convertedNode);
+ return hepPlanner.findBestExp();
}
protected DrillRel convertToDrel(RelNode relNode) throws RelConversionException {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java
index e6f1fe1c4..84082e3e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.planner.sql.parser.DrillParserUtil;
import org.apache.drill.exec.planner.sql.parser.SqlDescribeTable;
import org.apache.drill.exec.store.AbstractSchema;
+import org.eigenbase.relopt.hep.HepPlanner;
import org.eigenbase.sql.SqlIdentifier;
import org.eigenbase.sql.SqlLiteral;
import org.eigenbase.sql.SqlNode;
@@ -43,7 +44,7 @@ import com.google.common.collect.ImmutableList;
public class DescribeTableHandler extends DefaultSqlHandler {
- public DescribeTableHandler(Planner planner, QueryContext context) { super(planner, context); }
+ public DescribeTableHandler(SqlHandlerConfig config) { super(config); }
/** Rewrite the parse tree as SELECT ... FROM INFORMATION_SCHEMA.COLUMNS ... */
@Override
@@ -104,4 +105,3 @@ public class DescribeTableHandler extends DefaultSqlHandler {
}
}
}
-
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
index f3243212e..8beed3478 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
@@ -17,12 +17,8 @@
*/
package org.apache.drill.exec.planner.sql.handlers;
-import java.io.IOException;
-
-import net.hydromatic.optiq.tools.Planner;
import net.hydromatic.optiq.tools.RelConversionException;
import net.hydromatic.optiq.tools.ValidationException;
-
import org.apache.drill.common.logical.LogicalPlan;
import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
import org.apache.drill.exec.ops.QueryContext;
@@ -41,13 +37,15 @@ import org.eigenbase.sql.SqlExplainLevel;
import org.eigenbase.sql.SqlLiteral;
import org.eigenbase.sql.SqlNode;
+import java.io.IOException;
+
public class ExplainHandler extends DefaultSqlHandler {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExplainHandler.class);
private ResultMode mode;
private SqlExplainLevel level = SqlExplainLevel.ALL_ATTRIBUTES;
- public ExplainHandler(Planner planner, QueryContext context) {
- super(planner, context);
+ public ExplainHandler(SqlHandlerConfig config) {
+ super(config);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
index 3627a7b05..ff3542da8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java
@@ -35,6 +35,7 @@ import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.WorkspaceSchema;
import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
+import org.eigenbase.relopt.hep.HepPlanner;
import org.eigenbase.sql.SqlIdentifier;
import org.eigenbase.sql.SqlNode;
@@ -42,8 +43,8 @@ import org.eigenbase.sql.SqlNode;
public class ShowFileHandler extends DefaultSqlHandler {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SetOptionHandler.class);
- public ShowFileHandler(Planner planner, QueryContext context) {
- super(planner, context);
+ public ShowFileHandler(SqlHandlerConfig config) {
+ super(config);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowSchemasHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowSchemasHandler.java
index 5e77628f0..b05521868 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowSchemasHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowSchemasHandler.java
@@ -26,6 +26,7 @@ import net.hydromatic.optiq.tools.RelConversionException;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.planner.sql.parser.DrillParserUtil;
import org.apache.drill.exec.planner.sql.parser.SqlShowSchemas;
+import org.eigenbase.relopt.hep.HepPlanner;
import org.eigenbase.sql.SqlIdentifier;
import org.eigenbase.sql.SqlNode;
import org.eigenbase.sql.SqlNodeList;
@@ -37,7 +38,7 @@ import com.google.common.collect.ImmutableList;
public class ShowSchemasHandler extends DefaultSqlHandler {
- public ShowSchemasHandler(Planner planner, QueryContext context) { super(planner, context); }
+ public ShowSchemasHandler(SqlHandlerConfig config) { super(config); }
/** Rewrite the parse tree as SELECT ... FROM INFORMATION_SCHEMA.SCHEMATA ... */
@Override
@@ -61,4 +62,3 @@ public class ShowSchemasHandler extends DefaultSqlHandler {
fromClause, where, null, null, null, null, null, null);
}
}
-
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java
index a1c5aee92..0a029f733 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java
@@ -35,6 +35,7 @@ import org.eigenbase.sql.SqlLiteral;
import org.eigenbase.sql.SqlNode;
import org.eigenbase.sql.SqlNodeList;
import org.eigenbase.sql.SqlSelect;
+import org.eigenbase.relopt.hep.HepPlanner;
import org.eigenbase.sql.fun.SqlStdOperatorTable;
import org.eigenbase.sql.parser.SqlParserPos;
@@ -43,7 +44,7 @@ import com.google.common.collect.Lists;
public class ShowTablesHandler extends DefaultSqlHandler {
- public ShowTablesHandler(Planner planner, QueryContext context) { super(planner, context); }
+ public ShowTablesHandler(SqlHandlerConfig config) { super(config); }
/** Rewrite the parse tree as SELECT ... FROM INFORMATION_SCHEMA.`TABLES` ... */
@Override
@@ -105,4 +106,3 @@ public class ShowTablesHandler extends DefaultSqlHandler {
fromClause, where, null, null, null, null, null, null);
}
}
-
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerConfig.java
new file mode 100644
index 000000000..132a2c97b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerConfig.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner.sql.handlers;
+
+import net.hydromatic.optiq.tools.Planner;
+import org.apache.drill.exec.ops.QueryContext;
+import org.eigenbase.relopt.hep.HepPlanner;
+
+public class SqlHandlerConfig {
+ private final QueryContext context;
+ private final HepPlanner hepPlanner;
+ private final Planner planner;
+
+ public SqlHandlerConfig(HepPlanner hepPlanner, Planner planner, QueryContext context) {
+ this.hepPlanner = hepPlanner;
+ this.planner = planner;
+ this.context = context;
+ }
+
+ public Planner getPlanner() {
+ return planner;
+ }
+
+ public HepPlanner getHepPlanner() {
+ return hepPlanner;
+ }
+
+ public QueryContext getContext() {
+ return context;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillSqlCall.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillSqlCall.java
index 4005b8164..a6bd8b725 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillSqlCall.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillSqlCall.java
@@ -22,6 +22,8 @@ import net.hydromatic.optiq.tools.Planner;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
import org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
+import org.eigenbase.relopt.hep.HepPlanner;
import org.eigenbase.sql.SqlCall;
import org.eigenbase.sql.parser.SqlParserPos;
@@ -34,7 +36,7 @@ public abstract class DrillSqlCall extends SqlCall {
super(pos);
}
- public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
- return new DefaultSqlHandler(planner, context);
+ public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) {
+ return new DefaultSqlHandler(config);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java
index 10db4c49b..5e3c21560 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java
@@ -24,6 +24,8 @@ import net.hydromatic.optiq.tools.Planner;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
import org.apache.drill.exec.planner.sql.handlers.CreateTableHandler;
+import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
+import org.eigenbase.relopt.hep.HepPlanner;
import org.eigenbase.sql.SqlCall;
import org.eigenbase.sql.SqlIdentifier;
import org.eigenbase.sql.SqlKind;
@@ -90,8 +92,8 @@ public class SqlCreateTable extends DrillSqlCall {
}
@Override
- public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
- return new CreateTableHandler(planner, context);
+ public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) {
+ return new CreateTableHandler(config);
}
public List<String> getSchemaPath() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java
index ccd08e14c..b7352b4b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java
@@ -17,12 +17,10 @@
*/
package org.apache.drill.exec.planner.sql.parser;
-import java.util.List;
-
-import net.hydromatic.optiq.tools.Planner;
-
-import org.apache.drill.exec.ops.QueryContext;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
import org.apache.drill.exec.planner.sql.handlers.ViewHandler;
import org.eigenbase.sql.SqlCall;
import org.eigenbase.sql.SqlIdentifier;
@@ -35,8 +33,7 @@ import org.eigenbase.sql.SqlSpecialOperator;
import org.eigenbase.sql.SqlWriter;
import org.eigenbase.sql.parser.SqlParserPos;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Lists;
+import java.util.List;
public class SqlCreateView extends DrillSqlCall {
public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE_VIEW", SqlKind.OTHER) {
@@ -103,8 +100,8 @@ public class SqlCreateView extends DrillSqlCall {
}
@Override
- public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
- return new ViewHandler.CreateView(planner, context);
+ public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) {
+ return new ViewHandler.CreateView(config.getPlanner(), config.getContext());
}
public List<String> getSchemaPath() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDescribeTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDescribeTable.java
index 29275d753..7d464e144 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDescribeTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDescribeTable.java
@@ -24,6 +24,8 @@ import net.hydromatic.optiq.tools.Planner;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
import org.apache.drill.exec.planner.sql.handlers.DescribeTableHandler;
+import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
+import org.eigenbase.relopt.hep.HepPlanner;
import org.eigenbase.sql.SqlCall;
import org.eigenbase.sql.SqlIdentifier;
import org.eigenbase.sql.SqlKind;
@@ -89,8 +91,8 @@ public class SqlDescribeTable extends DrillSqlCall {
}
@Override
- public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
- return new DescribeTableHandler(planner, context);
+ public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) {
+ return new DescribeTableHandler(config);
}
public SqlIdentifier getTable() { return table; }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropView.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropView.java
index 33b71b74c..a0d6f7b2d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropView.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropView.java
@@ -24,7 +24,9 @@ import net.hydromatic.optiq.tools.Planner;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
import org.apache.drill.exec.planner.sql.handlers.ViewHandler.DropView;
+import org.eigenbase.relopt.hep.HepPlanner;
import org.eigenbase.sql.SqlCall;
import org.eigenbase.sql.SqlIdentifier;
import org.eigenbase.sql.SqlKind;
@@ -70,8 +72,8 @@ public class SqlDropView extends DrillSqlCall {
}
@Override
- public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
- return new DropView(context);
+ public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) {
+ return new DropView(config.getContext());
}
public List<String> getSchemaPath() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java
index 8779969e9..38abfeb83 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java
@@ -25,6 +25,8 @@ import net.hydromatic.optiq.tools.Planner;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
import org.apache.drill.exec.planner.sql.handlers.ShowFileHandler;
+import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
+import org.eigenbase.relopt.hep.HepPlanner;
import org.eigenbase.sql.SqlCall;
import org.eigenbase.sql.SqlIdentifier;
import org.eigenbase.sql.SqlKind;
@@ -76,8 +78,8 @@ public class SqlShowFiles extends DrillSqlCall {
}
@Override
- public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
- return new ShowFileHandler(planner, context);
+ public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) {
+ return new ShowFileHandler(config);
}
public SqlIdentifier getDb() { return db; }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowSchemas.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowSchemas.java
index 9b4229548..9d8771a11 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowSchemas.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowSchemas.java
@@ -24,6 +24,8 @@ import net.hydromatic.optiq.tools.Planner;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
import org.apache.drill.exec.planner.sql.handlers.ShowSchemasHandler;
+import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
+import org.eigenbase.relopt.hep.HepPlanner;
import org.eigenbase.sql.SqlCall;
import org.eigenbase.sql.SqlKind;
import org.eigenbase.sql.SqlLiteral;
@@ -85,8 +87,8 @@ public class SqlShowSchemas extends DrillSqlCall {
}
@Override
- public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
- return new ShowSchemasHandler(planner, context);
+ public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) {
+ return new ShowSchemasHandler(config);
}
public SqlNode getLikePattern() { return likePattern; }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowTables.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowTables.java
index 33d20aa4d..da3f0fd1b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowTables.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowTables.java
@@ -17,13 +17,10 @@
*/
package org.apache.drill.exec.planner.sql.parser;
-import java.util.List;
-
-import net.hydromatic.optiq.tools.Planner;
-
-import org.apache.drill.exec.ops.QueryContext;
+import com.google.common.collect.Lists;
import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
import org.apache.drill.exec.planner.sql.handlers.ShowTablesHandler;
+import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
import org.eigenbase.sql.SqlCall;
import org.eigenbase.sql.SqlIdentifier;
import org.eigenbase.sql.SqlKind;
@@ -34,7 +31,7 @@ import org.eigenbase.sql.SqlSpecialOperator;
import org.eigenbase.sql.SqlWriter;
import org.eigenbase.sql.parser.SqlParserPos;
-import com.google.common.collect.Lists;
+import java.util.List;
/**
* Sql parse tree node to represent statement:
@@ -92,8 +89,8 @@ public class SqlShowTables extends DrillSqlCall {
}
@Override
- public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
- return new ShowTablesHandler(planner, context);
+ public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) {
+ return new ShowTablesHandler(config);
}
public SqlIdentifier getDb() { return db; }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java
index ed4695ed6..c8af0021d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java
@@ -17,13 +17,8 @@
*/
package org.apache.drill.exec.planner.sql.parser;
-import java.util.Collections;
-import java.util.List;
-
-import net.hydromatic.optiq.tools.Planner;
-
-import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
import org.apache.drill.exec.planner.sql.handlers.UseSchemaHandler;
import org.eigenbase.sql.SqlCall;
import org.eigenbase.sql.SqlIdentifier;
@@ -35,6 +30,9 @@ import org.eigenbase.sql.SqlSpecialOperator;
import org.eigenbase.sql.SqlWriter;
import org.eigenbase.sql.parser.SqlParserPos;
+import java.util.Collections;
+import java.util.List;
+
/**
* Sql parser tree node to represent <code>USE SCHEMA</code> statement.
*/
@@ -73,8 +71,8 @@ public class SqlUseSchema extends DrillSqlCall {
}
@Override
- public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) {
- return new UseSchemaHandler(context);
+ public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) {
+ return new UseSchemaHandler(config.getContext());
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
index 0adc09ec1..f05243dcc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java
@@ -27,6 +27,7 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
protected final RecordBatch incoming;
private boolean first = true;
+ protected boolean done = false;
protected boolean outOfMemory = false;
public AbstractSingleRecordBatch(T popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
@@ -41,6 +42,11 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
@Override
public IterOutcome innerNext() {
+ // Short circuit if record batch has already sent all data and is done
+ if (done) {
+ return IterOutcome.NONE;
+ }
+
IterOutcome upstream = next(incoming);
if (!first && upstream == IterOutcome.OK && incoming.getRecordCount() == 0) {
do {
@@ -100,6 +106,5 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte
}
protected abstract void setupNewSchema() throws SchemaChangeException;
- protected abstract void doWork();
-
+ protected abstract IterOutcome doWork();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index e2f4a954b..b1b7c7627 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -105,6 +105,23 @@ public class VectorContainer extends AbstractMapVector implements Iterable<Vecto
return vc;
}
+ public static VectorContainer getTransferClone(VectorAccessible incoming, VectorWrapper[] ignoreWrappers) {
+ VectorContainer vc = new VectorContainer();
+ for (VectorWrapper<?> w : incoming) {
+ if(ignoreWrappers != null) {
+ for(VectorWrapper wrapper : ignoreWrappers) {
+ if (w == wrapper) {
+ continue;
+ }
+ }
+ }
+
+ vc.cloneAndTransfer(w);
+ }
+
+ return vc;
+ }
+
public static VectorContainer canonicalize(VectorContainer original) {
VectorContainer vc = new VectorContainer();
List<VectorWrapper<?>> canonicalWrappers = new ArrayList<VectorWrapper<?>>(original.wrappers);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
index e7c6dc06f..0272b2310 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java
@@ -21,7 +21,7 @@ import org.apache.drill.exec.metrics.DrillMetrics;
import org.apache.drill.test.DrillTest;
import org.junit.After;
-public class ExecTest extends DrillTest{
+public class ExecTest extends DrillTest {
@After
public void clear(){
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
index 3ba6cb175..7cdb41a31 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java
@@ -117,6 +117,7 @@ public class TestSimpleLimit extends ExecTest {
if(context.getFailureCause() != null){
throw context.getFailureCause();
}
+
assertTrue(!context.isFailed());
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
new file mode 100644
index 000000000..ac7b035e2
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.window;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.FileUtils;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.pop.PopUnitTestBase;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.exec.vector.BigIntVector;
+import org.apache.drill.exec.vector.NullableBigIntVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.junit.Test;
+
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TestWindowFrame extends PopUnitTestBase {
+
+ @Test
+ public void testWindowFrameWithOneKeyCount() throws Throwable {
+ try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+ Drillbit bit = new Drillbit(CONFIG, serviceSet);
+ DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) {
+
+ // run query.
+ bit.run();
+ client.connect();
+ List<QueryResultBatch> results = client.runQuery(UserBitShared.QueryType.PHYSICAL,
+ Files.toString(FileUtils.getResourceAsFile("/window/oneKeyCount.json"), Charsets.UTF_8)
+ .replace("#{DATA_FILE}", FileUtils.getResourceAsFile("/window/oneKeyCountData.json").toURI().toString())
+ );
+
+ long[] cntArr = {1, 2, 1, 2};
+ long[] sumArr = {100, 150, 25, 75};
+
+ // look at records
+ RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator());
+ int recordCount = 0;
+
+ assertEquals(2, results.size());
+
+ QueryResultBatch batch = results.get(0);
+ assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
+ batchLoader.load(batch.getHeader().getDef(), batch.getData());
+
+ for (int r = 0; r < batchLoader.getRecordCount(); r++) {
+ recordCount++;
+ VectorWrapper<?> wrapper = batchLoader.getValueAccessorById(
+ BigIntVector.class,
+ batchLoader.getValueVectorId(new SchemaPath(new PathSegment.NameSegment("cnt"))).getFieldIds()[0]
+ );
+ assertEquals(cntArr[r], wrapper.getValueVector().getAccessor().getObject(r));
+ wrapper = batchLoader.getValueAccessorById(
+ NullableBigIntVector.class,
+ batchLoader.getValueVectorId(new SchemaPath(new PathSegment.NameSegment("sum"))).getFieldIds()[0]
+ );
+ assertEquals(sumArr[r], wrapper.getValueVector().getAccessor().getObject(r));
+ }
+ batchLoader.clear();
+ batch.release();
+
+ assertEquals(4, recordCount);
+ }
+ }
+
+ @Test
+ public void testWindowFrameWithOneKeyMultipleBatches() throws Throwable {
+ try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+ Drillbit bit = new Drillbit(CONFIG, serviceSet);
+ DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) {
+
+ // run query.
+ bit.run();
+ client.connect();
+ List<QueryResultBatch> results = client.runQuery(UserBitShared.QueryType.PHYSICAL,
+ Files.toString(FileUtils.getResourceAsFile("/window/oneKeyCountMultiBatch.json"), Charsets.UTF_8)
+ .replace("#{DATA_FILE}", FileUtils.getResourceAsFile("/window/mediumData.json").toURI().toString()));
+
+ // look at records
+ RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator());
+ int recordCount = 0;
+
+ assertEquals(2, results.size());
+
+ QueryResultBatch batch = results.get(0);
+ assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
+ batchLoader.load(batch.getHeader().getDef(), batch.getData());
+ ValueVector.Accessor output = batchLoader.getValueAccessorById(NullableBigIntVector.class,
+ batchLoader.getValueVectorId(
+ new SchemaPath(new PathSegment.NameSegment("output"))).getFieldIds()[0]
+ ).getValueVector().getAccessor();
+ ValueVector.Accessor sum = batchLoader.getValueAccessorById(
+ BigIntVector.class,
+ batchLoader.getValueVectorId(
+ new SchemaPath(new PathSegment.NameSegment("sum"))).getFieldIds()[0]
+ ).getValueVector().getAccessor();
+ ValueVector.Accessor cnt = batchLoader.getValueAccessorById(
+ BigIntVector.class,
+ batchLoader.getValueVectorId(
+ new SchemaPath(new PathSegment.NameSegment("cnt"))).getFieldIds()[0]
+ ).getValueVector().getAccessor();
+ int lastGroup = -1;
+ long groupCounter = 0;
+ long s = 0;
+ for (int r = 1; r <= batchLoader.getRecordCount(); r++) {
+ recordCount++;
+ int group = r / 4;
+ if(lastGroup != group) {
+ lastGroup = group;
+ groupCounter = 1;
+ s = 0;
+ } else {
+ groupCounter++;
+ }
+
+ s += group * 8 + r % 4;
+
+ assertEquals("Count, Row " + r, groupCounter, cnt.getObject(r - 1));
+ assertEquals("Sum, Row " + r, s, sum.getObject(r - 1));
+ assertEquals("Output, Row " + r, s, output.getObject(r - 1));
+ }
+ batchLoader.clear();
+ batch.release();
+
+ assertEquals(1000, recordCount);
+ }
+ }
+
+ @Test
+ public void testWindowFrameWithTwoKeys() throws Throwable {
+ try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet();
+ Drillbit bit = new Drillbit(CONFIG, serviceSet);
+ DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) {
+
+ // run query.
+ bit.run();
+ client.connect();
+ List<QueryResultBatch> results = client.runQuery(UserBitShared.QueryType.PHYSICAL,
+ Files.toString(FileUtils.getResourceAsFile("/window/twoKeys.json"), Charsets.UTF_8)
+ .replace("#{DATA_FILE}", FileUtils.getResourceAsFile("/window/twoKeysData.json").toURI().toString())
+ );
+
+ long[] cntArr = {1, 2, 1, 2, 1, 2, 1, 2};
+ long[] sumArr = {5, 15, 15, 35, 25, 55, 35, 75};
+
+ // look at records
+ RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator());
+ int recordCount = 0;
+
+ assertEquals(2, results.size());
+
+ QueryResultBatch batch = results.get(0);
+ assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData()));
+ batchLoader.load(batch.getHeader().getDef(), batch.getData());
+
+ for (int r = 0; r < batchLoader.getRecordCount(); r++) {
+ recordCount++;
+ VectorWrapper<?> wrapper = batchLoader.getValueAccessorById(
+ BigIntVector.class,
+ batchLoader.getValueVectorId(new SchemaPath(new PathSegment.NameSegment("cnt"))).getFieldIds()[0]
+ );
+ assertEquals(cntArr[r], wrapper.getValueVector().getAccessor().getObject(r));
+ wrapper = batchLoader.getValueAccessorById(
+ NullableBigIntVector.class,
+ batchLoader.getValueVectorId(new SchemaPath(new PathSegment.NameSegment("sum"))).getFieldIds()[0]
+ );
+ assertEquals(sumArr[r], wrapper.getValueVector().getAccessor().getObject(r));
+ }
+ batchLoader.clear();
+ batch.release();
+
+ assertEquals(8, recordCount);
+ }
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestWindowFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestWindowFunctions.java
new file mode 100644
index 000000000..780a7ce4c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestWindowFunctions.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.sql;
+
+import org.apache.drill.BaseTestQuery;
+import org.junit.Test;
+
+public class TestWindowFunctions extends BaseTestQuery {
+ @Test
+ public void testWindowSum() throws Exception {
+ test("select sum(position_id) over w from cp.`employee.json` window w as ( partition by position_id order by position_id)");
+ }
+}
diff --git a/exec/java-exec/src/test/resources/window/mediumData.json b/exec/java-exec/src/test/resources/window/mediumData.json
new file mode 100644
index 000000000..ad866271b
--- /dev/null
+++ b/exec/java-exec/src/test/resources/window/mediumData.json
@@ -0,0 +1,1000 @@
+{"id":814, "a": 1626, "group": 203}
+{"id":425, "a": 849, "group": 106}
+{"id":900, "a": 1800, "group": 225}
+{"id":156, "a": 312, "group": 39}
+{"id":348, "a": 696, "group": 87}
+{"id":987, "a": 1971, "group": 246}
+{"id":255, "a": 507, "group": 63}
+{"id":4, "a": 8, "group": 1}
+{"id":512, "a": 1024, "group": 128}
+{"id":341, "a": 681, "group": 85}
+{"id":113, "a": 225, "group": 28}
+{"id":311, "a": 619, "group": 77}
+{"id":906, "a": 1810, "group": 226}
+{"id":889, "a": 1777, "group": 222}
+{"id":611, "a": 1219, "group": 152}
+{"id":963, "a": 1923, "group": 240}
+{"id":522, "a": 1042, "group": 130}
+{"id":615, "a": 1227, "group": 153}
+{"id":227, "a": 451, "group": 56}
+{"id":365, "a": 729, "group": 91}
+{"id":73, "a": 145, "group": 18}
+{"id":747, "a": 1491, "group": 186}
+{"id":580, "a": 1160, "group": 145}
+{"id":552, "a": 1104, "group": 138}
+{"id":716, "a": 1432, "group": 179}
+{"id":982, "a": 1962, "group": 245}
+{"id":118, "a": 234, "group": 29}
+{"id":639, "a": 1275, "group": 159}
+{"id":273, "a": 545, "group": 68}
+{"id":679, "a": 1355, "group": 169}
+{"id":338, "a": 674, "group": 84}
+{"id":402, "a": 802, "group": 100}
+{"id":476, "a": 952, "group": 119}
+{"id":628, "a": 1256, "group": 157}
+{"id":325, "a": 649, "group": 81}
+{"id":749, "a": 1497, "group": 187}
+{"id":912, "a": 1824, "group": 228}
+{"id":995, "a": 1987, "group": 248}
+{"id":605, "a": 1209, "group": 151}
+{"id":141, "a": 281, "group": 35}
+{"id":700, "a": 1400, "group": 175}
+{"id":61, "a": 121, "group": 15}
+{"id":478, "a": 954, "group": 119}
+{"id":556, "a": 1112, "group": 139}
+{"id":229, "a": 457, "group": 57}
+{"id":487, "a": 971, "group": 121}
+{"id":824, "a": 1648, "group": 206}
+{"id":431, "a": 859, "group": 107}
+{"id":443, "a": 883, "group": 110}
+{"id":135, "a": 267, "group": 33}
+{"id":417, "a": 833, "group": 104}
+{"id":980, "a": 1960, "group": 245}
+{"id":785, "a": 1569, "group": 196}
+{"id":917, "a": 1833, "group": 229}
+{"id":656, "a": 1312, "group": 164}
+{"id":210, "a": 418, "group": 52}
+{"id":196, "a": 392, "group": 49}
+{"id":361, "a": 721, "group": 90}
+{"id":281, "a": 561, "group": 70}
+{"id":550, "a": 1098, "group": 137}
+{"id":558, "a": 1114, "group": 139}
+{"id":677, "a": 1353, "group": 169}
+{"id":604, "a": 1208, "group": 151}
+{"id":8, "a": 16, "group": 2}
+{"id":290, "a": 578, "group": 72}
+{"id":932, "a": 1864, "group": 233}
+{"id":731, "a": 1459, "group": 182}
+{"id":477, "a": 953, "group": 119}
+{"id":859, "a": 1715, "group": 214}
+{"id":291, "a": 579, "group": 72}
+{"id":531, "a": 1059, "group": 132}
+{"id":499, "a": 995, "group": 124}
+{"id":389, "a": 777, "group": 97}
+{"id":182, "a": 362, "group": 45}
+{"id":959, "a": 1915, "group": 239}
+{"id":523, "a": 1043, "group": 130}
+{"id":81, "a": 161, "group": 20}
+{"id":439, "a": 875, "group": 109}
+{"id":228, "a": 456, "group": 57}
+{"id":301, "a": 601, "group": 75}
+{"id":208, "a": 416, "group": 52}
+{"id":370, "a": 738, "group": 92}
+{"id":383, "a": 763, "group": 95}
+{"id":209, "a": 417, "group": 52}
+{"id":462, "a": 922, "group": 115}
+{"id":729, "a": 1457, "group": 182}
+{"id":602, "a": 1202, "group": 150}
+{"id":936, "a": 1872, "group": 234}
+{"id":750, "a": 1498, "group": 187}
+{"id":871, "a": 1739, "group": 217}
+{"id":120, "a": 240, "group": 30}
+{"id":843, "a": 1683, "group": 210}
+{"id":260, "a": 520, "group": 65}
+{"id":240, "a": 480, "group": 60}
+{"id":976, "a": 1952, "group": 244}
+{"id":344, "a": 688, "group": 86}
+{"id":385, "a": 769, "group": 96}
+{"id":410, "a": 818, "group": 102}
+{"id":931, "a": 1859, "group": 232}
+{"id":891, "a": 1779, "group": 222}
+{"id":745, "a": 1489, "group": 186}
+{"id":813, "a": 1625, "group": 203}
+{"id":129, "a": 257, "group": 32}
+{"id":596, "a": 1192, "group": 149}
+{"id":517, "a": 1033, "group": 129}
+{"id":755, "a": 1507, "group": 188}
+{"id":663, "a": 1323, "group": 165}
+{"id":233, "a": 465, "group": 58}
+{"id":401, "a": 801, "group": 100}
+{"id":473, "a": 945, "group": 118}
+{"id":990, "a": 1978, "group": 247}
+{"id":384, "a": 768, "group": 96}
+{"id":178, "a": 354, "group": 44}
+{"id":446, "a": 890, "group": 111}
+{"id":828, "a": 1656, "group": 207}
+{"id":356, "a": 712, "group": 89}
+{"id":249, "a": 497, "group": 62}
+{"id":553, "a": 1105, "group": 138}
+{"id":378, "a": 754, "group": 94}
+{"id":126, "a": 250, "group": 31}
+{"id":806, "a": 1610, "group": 201}
+{"id":540, "a": 1080, "group": 135}
+{"id":545, "a": 1089, "group": 136}
+{"id":398, "a": 794, "group": 99}
+{"id":848, "a": 1696, "group": 212}
+{"id":493, "a": 985, "group": 123}
+{"id":928, "a": 1856, "group": 232}
+{"id":408, "a": 816, "group": 102}
+{"id":285, "a": 569, "group": 71}
+{"id":795, "a": 1587, "group": 198}
+{"id":74, "a": 146, "group": 18}
+{"id":332, "a": 664, "group": 83}
+{"id":712, "a": 1424, "group": 178}
+{"id":858, "a": 1714, "group": 214}
+{"id":961, "a": 1921, "group": 240}
+{"id":212, "a": 424, "group": 53}
+{"id":11, "a": 19, "group": 2}
+{"id":839, "a": 1675, "group": 209}
+{"id":302, "a": 602, "group": 75}
+{"id":117, "a": 233, "group": 29}
+{"id":852, "a": 1704, "group": 213}
+{"id":528, "a": 1056, "group": 132}
+{"id":829, "a": 1657, "group": 207}
+{"id":563, "a": 1123, "group": 140}
+{"id":968, "a": 1936, "group": 242}
+{"id":658, "a": 1314, "group": 164}
+{"id":49, "a": 97, "group": 12}
+{"id":52, "a": 104, "group": 13}
+{"id":186, "a": 370, "group": 46}
+{"id":407, "a": 811, "group": 101}
+{"id":98, "a": 194, "group": 24}
+{"id":377, "a": 753, "group": 94}
+{"id":195, "a": 387, "group": 48}
+{"id":826, "a": 1650, "group": 206}
+{"id":783, "a": 1563, "group": 195}
+{"id":284, "a": 568, "group": 71}
+{"id":34, "a": 66, "group": 8}
+{"id":752, "a": 1504, "group": 188}
+{"id":472, "a": 944, "group": 118}
+{"id":500, "a": 1000, "group": 125}
+{"id":812, "a": 1624, "group": 203}
+{"id":300, "a": 600, "group": 75}
+{"id":691, "a": 1379, "group": 172}
+{"id":435, "a": 867, "group": 108}
+{"id":693, "a": 1385, "group": 173}
+{"id":847, "a": 1691, "group": 211}
+{"id":235, "a": 467, "group": 58}
+{"id":45, "a": 89, "group": 11}
+{"id":947, "a": 1891, "group": 236}
+{"id":184, "a": 368, "group": 46}
+{"id":996, "a": 1992, "group": 249}
+{"id":150, "a": 298, "group": 37}
+{"id":413, "a": 825, "group": 103}
+{"id":952, "a": 1904, "group": 238}
+{"id":594, "a": 1186, "group": 148}
+{"id":133, "a": 265, "group": 33}
+{"id":587, "a": 1171, "group": 146}
+{"id":612, "a": 1224, "group": 153}
+{"id":515, "a": 1027, "group": 128}
+{"id":718, "a": 1434, "group": 179}
+{"id":884, "a": 1768, "group": 221}
+{"id":887, "a": 1771, "group": 221}
+{"id":585, "a": 1169, "group": 146}
+{"id":695, "a": 1387, "group": 173}
+{"id":965, "a": 1929, "group": 241}
+{"id":591, "a": 1179, "group": 147}
+{"id":374, "a": 746, "group": 93}
+{"id":780, "a": 1560, "group": 195}
+{"id":305, "a": 609, "group": 76}
+{"id":71, "a": 139, "group": 17}
+{"id":84, "a": 168, "group": 21}
+{"id":58, "a": 114, "group": 14}
+{"id":12, "a": 24, "group": 3}
+{"id":315, "a": 627, "group": 78}
+{"id":131, "a": 259, "group": 32}
+{"id":362, "a": 722, "group": 90}
+{"id":490, "a": 978, "group": 122}
+{"id":234, "a": 466, "group": 58}
+{"id":349, "a": 697, "group": 87}
+{"id":688, "a": 1376, "group": 172}
+{"id":379, "a": 755, "group": 94}
+{"id":561, "a": 1121, "group": 140}
+{"id":363, "a": 723, "group": 90}
+{"id":287, "a": 571, "group": 71}
+{"id":770, "a": 1538, "group": 192}
+{"id":127, "a": 251, "group": 31}
+{"id":583, "a": 1163, "group": 145}
+{"id":471, "a": 939, "group": 117}
+{"id":788, "a": 1576, "group": 197}
+{"id":897, "a": 1793, "group": 224}
+{"id":916, "a": 1832, "group": 229}
+{"id":956, "a": 1912, "group": 239}
+{"id":224, "a": 448, "group": 56}
+{"id":787, "a": 1571, "group": 196}
+{"id":173, "a": 345, "group": 43}
+{"id":47, "a": 91, "group": 11}
+{"id":180, "a": 360, "group": 45}
+{"id":488, "a": 976, "group": 122}
+{"id":764, "a": 1528, "group": 191}
+{"id":112, "a": 224, "group": 28}
+{"id":781, "a": 1561, "group": 195}
+{"id":14, "a": 26, "group": 3}
+{"id":204, "a": 408, "group": 51}
+{"id":317, "a": 633, "group": 79}
+{"id":784, "a": 1568, "group": 196}
+{"id":796, "a": 1592, "group": 199}
+{"id":375, "a": 747, "group": 93}
+{"id":618, "a": 1234, "group": 154}
+{"id":207, "a": 411, "group": 51}
+{"id":179, "a": 355, "group": 44}
+{"id":297, "a": 593, "group": 74}
+{"id":838, "a": 1674, "group": 209}
+{"id":699, "a": 1395, "group": 174}
+{"id":320, "a": 640, "group": 80}
+{"id":675, "a": 1347, "group": 168}
+{"id":925, "a": 1849, "group": 231}
+{"id":684, "a": 1368, "group": 171}
+{"id":986, "a": 1970, "group": 246}
+{"id":930, "a": 1858, "group": 232}
+{"id":911, "a": 1819, "group": 227}
+{"id":977, "a": 1953, "group": 244}
+{"id":48, "a": 96, "group": 12}
+{"id":496, "a": 992, "group": 124}
+{"id":794, "a": 1586, "group": 198}
+{"id":867, "a": 1731, "group": 216}
+{"id":520, "a": 1040, "group": 130}
+{"id":621, "a": 1241, "group": 155}
+{"id":475, "a": 947, "group": 118}
+{"id":270, "a": 538, "group": 67}
+{"id":648, "a": 1296, "group": 162}
+{"id":842, "a": 1682, "group": 210}
+{"id":200, "a": 400, "group": 50}
+{"id":924, "a": 1848, "group": 231}
+{"id":466, "a": 930, "group": 116}
+{"id":40, "a": 80, "group": 10}
+{"id":600, "a": 1200, "group": 150}
+{"id":883, "a": 1763, "group": 220}
+{"id":221, "a": 441, "group": 55}
+{"id":106, "a": 210, "group": 26}
+{"id":313, "a": 625, "group": 78}
+{"id":761, "a": 1521, "group": 190}
+{"id":800, "a": 1600, "group": 200}
+{"id":241, "a": 481, "group": 60}
+{"id":640, "a": 1280, "group": 160}
+{"id":358, "a": 714, "group": 89}
+{"id":960, "a": 1920, "group": 240}
+{"id":347, "a": 691, "group": 86}
+{"id":646, "a": 1290, "group": 161}
+{"id":236, "a": 472, "group": 59}
+{"id":920, "a": 1840, "group": 230}
+{"id":586, "a": 1170, "group": 146}
+{"id":175, "a": 347, "group": 43}
+{"id":371, "a": 739, "group": 92}
+{"id":741, "a": 1481, "group": 185}
+{"id":652, "a": 1304, "group": 163}
+{"id":164, "a": 328, "group": 41}
+{"id":444, "a": 888, "group": 111}
+{"id":949, "a": 1897, "group": 237}
+{"id":115, "a": 227, "group": 28}
+{"id":893, "a": 1785, "group": 223}
+{"id":940, "a": 1880, "group": 235}
+{"id":261, "a": 521, "group": 65}
+{"id":105, "a": 209, "group": 26}
+{"id":449, "a": 897, "group": 112}
+{"id":94, "a": 186, "group": 23}
+{"id":810, "a": 1618, "group": 202}
+{"id":252, "a": 504, "group": 63}
+{"id":946, "a": 1890, "group": 236}
+{"id":136, "a": 272, "group": 34}
+{"id":70, "a": 138, "group": 17}
+{"id":203, "a": 403, "group": 50}
+{"id":276, "a": 552, "group": 69}
+{"id":703, "a": 1403, "group": 175}
+{"id":714, "a": 1426, "group": 178}
+{"id":144, "a": 288, "group": 36}
+{"id":763, "a": 1523, "group": 190}
+{"id":142, "a": 282, "group": 35}
+{"id":406, "a": 810, "group": 101}
+{"id":225, "a": 449, "group": 56}
+{"id":93, "a": 185, "group": 23}
+{"id":622, "a": 1242, "group": 155}
+{"id":461, "a": 921, "group": 115}
+{"id":923, "a": 1843, "group": 230}
+{"id":971, "a": 1939, "group": 242}
+{"id":748, "a": 1496, "group": 187}
+{"id":687, "a": 1371, "group": 171}
+{"id":340, "a": 680, "group": 85}
+{"id":223, "a": 443, "group": 55}
+{"id":625, "a": 1249, "group": 156}
+{"id":895, "a": 1787, "group": 223}
+{"id":738, "a": 1474, "group": 184}
+{"id":35, "a": 67, "group": 8}
+{"id":159, "a": 315, "group": 39}
+{"id":981, "a": 1961, "group": 245}
+{"id":521, "a": 1041, "group": 130}
+{"id":36, "a": 72, "group": 9}
+{"id":360, "a": 720, "group": 90}
+{"id":194, "a": 386, "group": 48}
+{"id":333, "a": 665, "group": 83}
+{"id":816, "a": 1632, "group": 204}
+{"id":805, "a": 1609, "group": 201}
+{"id":122, "a": 242, "group": 30}
+{"id":67, "a": 131, "group": 16}
+{"id":866, "a": 1730, "group": 216}
+{"id":219, "a": 435, "group": 54}
+{"id":274, "a": 546, "group": 68}
+{"id":102, "a": 202, "group": 25}
+{"id":951, "a": 1899, "group": 237}
+{"id":836, "a": 1672, "group": 209}
+{"id":191, "a": 379, "group": 47}
+{"id":337, "a": 673, "group": 84}
+{"id":841, "a": 1681, "group": 210}
+{"id":92, "a": 184, "group": 23}
+{"id":481, "a": 961, "group": 120}
+{"id":970, "a": 1938, "group": 242}
+{"id":878, "a": 1754, "group": 219}
+{"id":294, "a": 586, "group": 73}
+{"id":386, "a": 770, "group": 96}
+{"id":484, "a": 968, "group": 121}
+{"id":789, "a": 1577, "group": 197}
+{"id":492, "a": 984, "group": 123}
+{"id":19, "a": 35, "group": 4}
+{"id":263, "a": 523, "group": 65}
+{"id":514, "a": 1026, "group": 128}
+{"id":352, "a": 704, "group": 88}
+{"id":503, "a": 1003, "group": 125}
+{"id":726, "a": 1450, "group": 181}
+{"id":890, "a": 1778, "group": 222}
+{"id":926, "a": 1850, "group": 231}
+{"id":707, "a": 1411, "group": 176}
+{"id":216, "a": 432, "group": 54}
+{"id":807, "a": 1611, "group": 201}
+{"id":942, "a": 1882, "group": 235}
+{"id":678, "a": 1354, "group": 169}
+{"id":354, "a": 706, "group": 88}
+{"id":77, "a": 153, "group": 19}
+{"id":75, "a": 147, "group": 18}
+{"id":830, "a": 1658, "group": 207}
+{"id":215, "a": 427, "group": 53}
+{"id":966, "a": 1930, "group": 241}
+{"id":603, "a": 1203, "group": 150}
+{"id":137, "a": 273, "group": 34}
+{"id":17, "a": 33, "group": 4}
+{"id":991, "a": 1979, "group": 247}
+{"id":299, "a": 595, "group": 74}
+{"id":643, "a": 1283, "group": 160}
+{"id":190, "a": 378, "group": 47}
+{"id":967, "a": 1931, "group": 241}
+{"id":169, "a": 337, "group": 42}
+{"id":460, "a": 920, "group": 115}
+{"id":330, "a": 658, "group": 82}
+{"id":436, "a": 872, "group": 109}
+{"id":393, "a": 785, "group": 98}
+{"id":329, "a": 657, "group": 82}
+{"id":80, "a": 160, "group": 20}
+{"id":395, "a": 787, "group": 98}
+{"id":623, "a": 1243, "group": 155}
+{"id":110, "a": 218, "group": 27}
+{"id":213, "a": 425, "group": 53}
+{"id":448, "a": 896, "group": 112}
+{"id":671, "a": 1339, "group": 167}
+{"id":751, "a": 1499, "group": 187}
+{"id":606, "a": 1210, "group": 151}
+{"id":624, "a": 1248, "group": 156}
+{"id":766, "a": 1530, "group": 191}
+{"id":31, "a": 59, "group": 7}
+{"id":649, "a": 1297, "group": 162}
+{"id":863, "a": 1723, "group": 215}
+{"id":328, "a": 656, "group": 82}
+{"id":686, "a": 1370, "group": 171}
+{"id":343, "a": 683, "group": 85}
+{"id":418, "a": 834, "group": 104}
+{"id":850, "a": 1698, "group": 212}
+{"id":892, "a": 1784, "group": 223}
+{"id":657, "a": 1313, "group": 164}
+{"id":880, "a": 1760, "group": 220}
+{"id":988, "a": 1976, "group": 247}
+{"id":772, "a": 1544, "group": 193}
+{"id":909, "a": 1817, "group": 227}
+{"id":394, "a": 786, "group": 98}
+{"id":999, "a": 1995, "group": 249}
+{"id":161, "a": 321, "group": 40}
+{"id":754, "a": 1506, "group": 188}
+{"id":56, "a": 112, "group": 14}
+{"id":733, "a": 1465, "group": 183}
+{"id":870, "a": 1738, "group": 217}
+{"id":456, "a": 912, "group": 114}
+{"id":114, "a": 226, "group": 28}
+{"id":571, "a": 1139, "group": 142}
+{"id":567, "a": 1131, "group": 141}
+{"id":827, "a": 1651, "group": 206}
+{"id":757, "a": 1513, "group": 189}
+{"id":720, "a": 1440, "group": 180}
+{"id":709, "a": 1417, "group": 177}
+{"id":831, "a": 1659, "group": 207}
+{"id":773, "a": 1545, "group": 193}
+{"id":201, "a": 401, "group": 50}
+{"id":23, "a": 43, "group": 5}
+{"id":421, "a": 841, "group": 105}
+{"id":516, "a": 1032, "group": 129}
+{"id":22, "a": 42, "group": 5}
+{"id":538, "a": 1074, "group": 134}
+{"id":588, "a": 1176, "group": 147}
+{"id":326, "a": 650, "group": 81}
+{"id":815, "a": 1627, "group": 203}
+{"id":319, "a": 635, "group": 79}
+{"id":440, "a": 880, "group": 110}
+{"id":875, "a": 1747, "group": 218}
+{"id":634, "a": 1266, "group": 158}
+{"id":172, "a": 344, "group": 43}
+{"id":694, "a": 1386, "group": 173}
+{"id":767, "a": 1531, "group": 191}
+{"id":324, "a": 648, "group": 81}
+{"id":33, "a": 65, "group": 8}
+{"id":935, "a": 1867, "group": 233}
+{"id":667, "a": 1331, "group": 166}
+{"id":91, "a": 179, "group": 22}
+{"id":719, "a": 1435, "group": 179}
+{"id":582, "a": 1162, "group": 145}
+{"id":739, "a": 1475, "group": 184}
+{"id":635, "a": 1267, "group": 158}
+{"id":367, "a": 731, "group": 91}
+{"id":636, "a": 1272, "group": 159}
+{"id":743, "a": 1483, "group": 185}
+{"id":463, "a": 923, "group": 115}
+{"id":834, "a": 1666, "group": 208}
+{"id":532, "a": 1064, "group": 133}
+{"id":704, "a": 1408, "group": 176}
+{"id":387, "a": 771, "group": 96}
+{"id":57, "a": 113, "group": 14}
+{"id":153, "a": 305, "group": 38}
+{"id":364, "a": 728, "group": 91}
+{"id":905, "a": 1809, "group": 226}
+{"id":578, "a": 1154, "group": 144}
+{"id":265, "a": 529, "group": 66}
+{"id":642, "a": 1282, "group": 160}
+{"id":689, "a": 1377, "group": 172}
+{"id":574, "a": 1146, "group": 143}
+{"id":318, "a": 634, "group": 79}
+{"id":519, "a": 1035, "group": 129}
+{"id":411, "a": 819, "group": 102}
+{"id":465, "a": 929, "group": 116}
+{"id":174, "a": 346, "group": 43}
+{"id":286, "a": 570, "group": 71}
+{"id":162, "a": 322, "group": 40}
+{"id":894, "a": 1786, "group": 223}
+{"id":445, "a": 889, "group": 111}
+{"id":295, "a": 587, "group": 73}
+{"id":599, "a": 1195, "group": 149}
+{"id":1000, "a": 2000, "group": 250}
+{"id":491, "a": 979, "group": 122}
+{"id":539, "a": 1075, "group": 134}
+{"id":664, "a": 1328, "group": 166}
+{"id":771, "a": 1539, "group": 192}
+{"id":244, "a": 488, "group": 61}
+{"id":123, "a": 243, "group": 30}
+{"id":230, "a": 458, "group": 57}
+{"id":149, "a": 297, "group": 37}
+{"id":467, "a": 931, "group": 116}
+{"id":372, "a": 744, "group": 93}
+{"id":921, "a": 1841, "group": 230}
+{"id":388, "a": 776, "group": 97}
+{"id":898, "a": 1794, "group": 224}
+{"id":239, "a": 475, "group": 59}
+{"id":390, "a": 778, "group": 97}
+{"id":903, "a": 1803, "group": 225}
+{"id":382, "a": 762, "group": 95}
+{"id":715, "a": 1427, "group": 178}
+{"id":774, "a": 1546, "group": 193}
+{"id":259, "a": 515, "group": 64}
+{"id":419, "a": 835, "group": 104}
+{"id":51, "a": 99, "group": 12}
+{"id":929, "a": 1857, "group": 232}
+{"id":455, "a": 907, "group": 113}
+{"id":404, "a": 808, "group": 101}
+{"id":526, "a": 1050, "group": 131}
+{"id":985, "a": 1969, "group": 246}
+{"id":518, "a": 1034, "group": 129}
+{"id":323, "a": 643, "group": 80}
+{"id":821, "a": 1641, "group": 205}
+{"id":427, "a": 851, "group": 106}
+{"id":833, "a": 1665, "group": 208}
+{"id":723, "a": 1443, "group": 180}
+{"id":973, "a": 1945, "group": 243}
+{"id":555, "a": 1107, "group": 138}
+{"id":513, "a": 1025, "group": 128}
+{"id":251, "a": 499, "group": 62}
+{"id":217, "a": 433, "group": 54}
+{"id":581, "a": 1161, "group": 145}
+{"id":345, "a": 689, "group": 86}
+{"id":498, "a": 994, "group": 124}
+{"id":637, "a": 1273, "group": 159}
+{"id":955, "a": 1907, "group": 238}
+{"id":680, "a": 1360, "group": 170}
+{"id":310, "a": 618, "group": 77}
+{"id":817, "a": 1633, "group": 204}
+{"id":346, "a": 690, "group": 86}
+{"id":958, "a": 1914, "group": 239}
+{"id":506, "a": 1010, "group": 126}
+{"id":403, "a": 803, "group": 100}
+{"id":865, "a": 1729, "group": 216}
+{"id":666, "a": 1330, "group": 166}
+{"id":264, "a": 528, "group": 66}
+{"id":258, "a": 514, "group": 64}
+{"id":944, "a": 1888, "group": 236}
+{"id":422, "a": 842, "group": 105}
+{"id":914, "a": 1826, "group": 228}
+{"id":862, "a": 1722, "group": 215}
+{"id":405, "a": 809, "group": 101}
+{"id":250, "a": 498, "group": 62}
+{"id":541, "a": 1081, "group": 135}
+{"id":644, "a": 1288, "group": 161}
+{"id":423, "a": 843, "group": 105}
+{"id":480, "a": 960, "group": 120}
+{"id":28, "a": 56, "group": 7}
+{"id":734, "a": 1466, "group": 183}
+{"id":452, "a": 904, "group": 113}
+{"id":268, "a": 536, "group": 67}
+{"id":708, "a": 1416, "group": 177}
+{"id":451, "a": 899, "group": 112}
+{"id":535, "a": 1067, "group": 133}
+{"id":1, "a": 1, "group": 0}
+{"id":943, "a": 1883, "group": 235}
+{"id":510, "a": 1018, "group": 127}
+{"id":464, "a": 928, "group": 116}
+{"id":705, "a": 1409, "group": 176}
+{"id":381, "a": 761, "group": 95}
+{"id":6, "a": 10, "group": 1}
+{"id":257, "a": 513, "group": 64}
+{"id":851, "a": 1699, "group": 212}
+{"id":938, "a": 1874, "group": 234}
+{"id":835, "a": 1667, "group": 208}
+{"id":501, "a": 1001, "group": 125}
+{"id":296, "a": 592, "group": 74}
+{"id":818, "a": 1634, "group": 204}
+{"id":577, "a": 1153, "group": 144}
+{"id":730, "a": 1458, "group": 182}
+{"id":450, "a": 898, "group": 112}
+{"id":391, "a": 779, "group": 97}
+{"id":256, "a": 512, "group": 64}
+{"id":544, "a": 1088, "group": 136}
+{"id":629, "a": 1257, "group": 157}
+{"id":189, "a": 377, "group": 47}
+{"id":304, "a": 608, "group": 76}
+{"id":508, "a": 1016, "group": 127}
+{"id":681, "a": 1361, "group": 170}
+{"id":86, "a": 170, "group": 21}
+{"id":901, "a": 1801, "group": 225}
+{"id":55, "a": 107, "group": 13}
+{"id":647, "a": 1291, "group": 161}
+{"id":737, "a": 1473, "group": 184}
+{"id":5, "a": 9, "group": 1}
+{"id":879, "a": 1755, "group": 219}
+{"id":913, "a": 1825, "group": 228}
+{"id":557, "a": 1113, "group": 139}
+{"id":430, "a": 858, "group": 107}
+{"id":30, "a": 58, "group": 7}
+{"id":779, "a": 1555, "group": 194}
+{"id":237, "a": 473, "group": 59}
+{"id":238, "a": 474, "group": 59}
+{"id":69, "a": 137, "group": 17}
+{"id":165, "a": 329, "group": 41}
+{"id":804, "a": 1608, "group": 201}
+{"id":672, "a": 1344, "group": 168}
+{"id":904, "a": 1808, "group": 226}
+{"id":20, "a": 40, "group": 5}
+{"id":650, "a": 1298, "group": 162}
+{"id":124, "a": 248, "group": 31}
+{"id":819, "a": 1635, "group": 204}
+{"id":76, "a": 152, "group": 19}
+{"id":918, "a": 1834, "group": 229}
+{"id":855, "a": 1707, "group": 213}
+{"id":922, "a": 1842, "group": 230}
+{"id":562, "a": 1122, "group": 140}
+{"id":101, "a": 201, "group": 25}
+{"id":96, "a": 192, "group": 24}
+{"id":357, "a": 713, "group": 89}
+{"id":279, "a": 555, "group": 69}
+{"id":759, "a": 1515, "group": 189}
+{"id":293, "a": 585, "group": 73}
+{"id":653, "a": 1305, "group": 163}
+{"id":108, "a": 216, "group": 27}
+{"id":254, "a": 506, "group": 63}
+{"id":655, "a": 1307, "group": 163}
+{"id":945, "a": 1889, "group": 236}
+{"id":572, "a": 1144, "group": 143}
+{"id":756, "a": 1512, "group": 189}
+{"id":822, "a": 1642, "group": 205}
+{"id":288, "a": 576, "group": 72}
+{"id":641, "a": 1281, "group": 160}
+{"id":275, "a": 547, "group": 68}
+{"id":654, "a": 1306, "group": 163}
+{"id":896, "a": 1792, "group": 224}
+{"id":192, "a": 384, "group": 48}
+{"id":885, "a": 1769, "group": 221}
+{"id":660, "a": 1320, "group": 165}
+{"id":573, "a": 1145, "group": 143}
+{"id":163, "a": 323, "group": 40}
+{"id":802, "a": 1602, "group": 200}
+{"id":874, "a": 1746, "group": 218}
+{"id":791, "a": 1579, "group": 197}
+{"id":303, "a": 603, "group": 75}
+{"id":267, "a": 531, "group": 66}
+{"id":529, "a": 1057, "group": 132}
+{"id":811, "a": 1619, "group": 202}
+{"id":713, "a": 1425, "group": 178}
+{"id":193, "a": 385, "group": 48}
+{"id":886, "a": 1770, "group": 221}
+{"id":416, "a": 832, "group": 104}
+{"id":786, "a": 1570, "group": 196}
+{"id":15, "a": 27, "group": 3}
+{"id":626, "a": 1250, "group": 156}
+{"id":83, "a": 163, "group": 20}
+{"id":231, "a": 459, "group": 57}
+{"id":777, "a": 1553, "group": 194}
+{"id":78, "a": 154, "group": 19}
+{"id":877, "a": 1753, "group": 219}
+{"id":232, "a": 464, "group": 58}
+{"id":607, "a": 1211, "group": 151}
+{"id":525, "a": 1049, "group": 131}
+{"id":322, "a": 642, "group": 80}
+{"id":41, "a": 81, "group": 10}
+{"id":882, "a": 1762, "group": 220}
+{"id":957, "a": 1913, "group": 239}
+{"id":21, "a": 41, "group": 5}
+{"id":728, "a": 1456, "group": 182}
+{"id":206, "a": 410, "group": 51}
+{"id":775, "a": 1547, "group": 193}
+{"id":2, "a": 2, "group": 0}
+{"id":673, "a": 1345, "group": 168}
+{"id":64, "a": 128, "group": 16}
+{"id":309, "a": 617, "group": 77}
+{"id":415, "a": 827, "group": 103}
+{"id":537, "a": 1073, "group": 134}
+{"id":597, "a": 1193, "group": 149}
+{"id":458, "a": 914, "group": 114}
+{"id":872, "a": 1744, "group": 218}
+{"id":355, "a": 707, "group": 88}
+{"id":638, "a": 1274, "group": 159}
+{"id":546, "a": 1090, "group": 136}
+{"id":140, "a": 280, "group": 35}
+{"id":331, "a": 659, "group": 82}
+{"id":697, "a": 1393, "group": 174}
+{"id":9, "a": 17, "group": 2}
+{"id":60, "a": 120, "group": 15}
+{"id":849, "a": 1697, "group": 212}
+{"id":119, "a": 235, "group": 29}
+{"id":316, "a": 632, "group": 79}
+{"id":782, "a": 1562, "group": 195}
+{"id":565, "a": 1129, "group": 141}
+{"id":494, "a": 986, "group": 123}
+{"id":437, "a": 873, "group": 109}
+{"id":856, "a": 1712, "group": 214}
+{"id":397, "a": 793, "group": 99}
+{"id":742, "a": 1482, "group": 185}
+{"id":692, "a": 1384, "group": 173}
+{"id":854, "a": 1706, "group": 213}
+{"id":68, "a": 136, "group": 17}
+{"id":869, "a": 1737, "group": 217}
+{"id":280, "a": 560, "group": 70}
+{"id":242, "a": 482, "group": 60}
+{"id":66, "a": 130, "group": 16}
+{"id":823, "a": 1643, "group": 205}
+{"id":964, "a": 1928, "group": 241}
+{"id":158, "a": 314, "group": 39}
+{"id":690, "a": 1378, "group": 172}
+{"id":185, "a": 369, "group": 46}
+{"id":619, "a": 1235, "group": 154}
+{"id":400, "a": 800, "group": 100}
+{"id":908, "a": 1816, "group": 227}
+{"id":109, "a": 217, "group": 27}
+{"id":54, "a": 106, "group": 13}
+{"id":511, "a": 1019, "group": 127}
+{"id":111, "a": 219, "group": 27}
+{"id":125, "a": 249, "group": 31}
+{"id":85, "a": 169, "group": 21}
+{"id":617, "a": 1233, "group": 154}
+{"id":798, "a": 1594, "group": 199}
+{"id":399, "a": 795, "group": 99}
+{"id":470, "a": 938, "group": 117}
+{"id":645, "a": 1289, "group": 161}
+{"id":187, "a": 371, "group": 46}
+{"id":474, "a": 946, "group": 118}
+{"id":134, "a": 266, "group": 33}
+{"id":335, "a": 667, "group": 83}
+{"id":711, "a": 1419, "group": 177}
+{"id":145, "a": 289, "group": 36}
+{"id":157, "a": 313, "group": 39}
+{"id":177, "a": 353, "group": 44}
+{"id":808, "a": 1616, "group": 202}
+{"id":662, "a": 1322, "group": 165}
+{"id":420, "a": 840, "group": 105}
+{"id":568, "a": 1136, "group": 142}
+{"id":130, "a": 258, "group": 32}
+{"id":864, "a": 1728, "group": 216}
+{"id":542, "a": 1082, "group": 135}
+{"id":89, "a": 177, "group": 22}
+{"id":26, "a": 50, "group": 6}
+{"id":969, "a": 1937, "group": 242}
+{"id":366, "a": 730, "group": 91}
+{"id":575, "a": 1147, "group": 143}
+{"id":368, "a": 736, "group": 92}
+{"id":308, "a": 616, "group": 77}
+{"id":941, "a": 1881, "group": 235}
+{"id":590, "a": 1178, "group": 147}
+{"id":825, "a": 1649, "group": 206}
+{"id":732, "a": 1464, "group": 183}
+{"id":569, "a": 1137, "group": 142}
+{"id":601, "a": 1201, "group": 150}
+{"id":746, "a": 1490, "group": 186}
+{"id":246, "a": 490, "group": 61}
+{"id":116, "a": 232, "group": 29}
+{"id":873, "a": 1745, "group": 218}
+{"id":181, "a": 361, "group": 45}
+{"id":876, "a": 1752, "group": 219}
+{"id":632, "a": 1264, "group": 158}
+{"id":336, "a": 672, "group": 84}
+{"id":128, "a": 256, "group": 32}
+{"id":292, "a": 584, "group": 73}
+{"id":205, "a": 409, "group": 51}
+{"id":429, "a": 857, "group": 107}
+{"id":845, "a": 1689, "group": 211}
+{"id":143, "a": 283, "group": 35}
+{"id":668, "a": 1336, "group": 167}
+{"id":744, "a": 1488, "group": 186}
+{"id":630, "a": 1258, "group": 157}
+{"id":53, "a": 105, "group": 13}
+{"id":792, "a": 1584, "group": 198}
+{"id":593, "a": 1185, "group": 148}
+{"id":803, "a": 1603, "group": 200}
+{"id":899, "a": 1795, "group": 224}
+{"id":253, "a": 505, "group": 63}
+{"id":243, "a": 483, "group": 60}
+{"id":245, "a": 489, "group": 61}
+{"id":82, "a": 162, "group": 20}
+{"id":497, "a": 993, "group": 124}
+{"id":95, "a": 187, "group": 23}
+{"id":7, "a": 11, "group": 1}
+{"id":919, "a": 1835, "group": 229}
+{"id":710, "a": 1418, "group": 177}
+{"id":351, "a": 699, "group": 87}
+{"id":651, "a": 1299, "group": 162}
+{"id":954, "a": 1906, "group": 238}
+{"id":170, "a": 338, "group": 42}
+{"id":32, "a": 64, "group": 8}
+{"id":222, "a": 442, "group": 55}
+{"id":479, "a": 955, "group": 119}
+{"id":706, "a": 1410, "group": 176}
+{"id":564, "a": 1128, "group": 141}
+{"id":769, "a": 1537, "group": 192}
+{"id":524, "a": 1048, "group": 131}
+{"id":46, "a": 90, "group": 11}
+{"id":793, "a": 1585, "group": 198}
+{"id":837, "a": 1673, "group": 209}
+{"id":979, "a": 1955, "group": 244}
+{"id":962, "a": 1922, "group": 240}
+{"id":740, "a": 1480, "group": 185}
+{"id":282, "a": 562, "group": 70}
+{"id":724, "a": 1448, "group": 181}
+{"id":509, "a": 1017, "group": 127}
+{"id":266, "a": 530, "group": 66}
+{"id":271, "a": 539, "group": 67}
+{"id":155, "a": 307, "group": 38}
+{"id":18, "a": 34, "group": 4}
+{"id":339, "a": 675, "group": 84}
+{"id":598, "a": 1194, "group": 149}
+{"id":948, "a": 1896, "group": 237}
+{"id":910, "a": 1818, "group": 227}
+{"id":846, "a": 1690, "group": 211}
+{"id":138, "a": 274, "group": 34}
+{"id":760, "a": 1520, "group": 190}
+{"id":853, "a": 1705, "group": 213}
+{"id":860, "a": 1720, "group": 215}
+{"id":283, "a": 563, "group": 70}
+{"id":188, "a": 376, "group": 47}
+{"id":613, "a": 1225, "group": 153}
+{"id":321, "a": 641, "group": 80}
+{"id":148, "a": 296, "group": 37}
+{"id":674, "a": 1346, "group": 168}
+{"id":226, "a": 450, "group": 56}
+{"id":994, "a": 1986, "group": 248}
+{"id":998, "a": 1994, "group": 249}
+{"id":97, "a": 193, "group": 24}
+{"id":160, "a": 320, "group": 40}
+{"id":616, "a": 1232, "group": 154}
+{"id":424, "a": 848, "group": 106}
+{"id":937, "a": 1873, "group": 234}
+{"id":392, "a": 784, "group": 98}
+{"id":25, "a": 49, "group": 6}
+{"id":88, "a": 176, "group": 22}
+{"id":534, "a": 1066, "group": 133}
+{"id":536, "a": 1072, "group": 134}
+{"id":530, "a": 1058, "group": 132}
+{"id":289, "a": 577, "group": 72}
+{"id":861, "a": 1721, "group": 215}
+{"id":376, "a": 752, "group": 94}
+{"id":682, "a": 1362, "group": 170}
+{"id":327, "a": 651, "group": 81}
+{"id":566, "a": 1130, "group": 141}
+{"id":199, "a": 395, "group": 49}
+{"id":902, "a": 1802, "group": 225}
+{"id":433, "a": 865, "group": 108}
+{"id":13, "a": 25, "group": 3}
+{"id":103, "a": 203, "group": 25}
+{"id":907, "a": 1811, "group": 226}
+{"id":989, "a": 1977, "group": 247}
+{"id":974, "a": 1946, "group": 243}
+{"id":978, "a": 1954, "group": 244}
+{"id":107, "a": 211, "group": 26}
+{"id":722, "a": 1442, "group": 180}
+{"id":38, "a": 74, "group": 9}
+{"id":549, "a": 1097, "group": 137}
+{"id":485, "a": 969, "group": 121}
+{"id":218, "a": 434, "group": 54}
+{"id":927, "a": 1851, "group": 231}
+{"id":725, "a": 1449, "group": 181}
+{"id":702, "a": 1402, "group": 175}
+{"id":198, "a": 394, "group": 49}
+{"id":214, "a": 426, "group": 53}
+{"id":373, "a": 745, "group": 93}
+{"id":147, "a": 291, "group": 36}
+{"id":63, "a": 123, "group": 15}
+{"id":79, "a": 155, "group": 19}
+{"id":543, "a": 1083, "group": 135}
+{"id":334, "a": 666, "group": 83}
+{"id":59, "a": 115, "group": 14}
+{"id":459, "a": 915, "group": 114}
+{"id":495, "a": 987, "group": 123}
+{"id":211, "a": 419, "group": 52}
+{"id":554, "a": 1106, "group": 138}
+{"id":62, "a": 122, "group": 15}
+{"id":758, "a": 1514, "group": 189}
+{"id":272, "a": 544, "group": 68}
+{"id":727, "a": 1451, "group": 181}
+{"id":868, "a": 1736, "group": 217}
+{"id":631, "a": 1259, "group": 157}
+{"id":152, "a": 304, "group": 38}
+{"id":669, "a": 1337, "group": 167}
+{"id":840, "a": 1680, "group": 210}
+{"id":983, "a": 1963, "group": 245}
+{"id":595, "a": 1187, "group": 148}
+{"id":685, "a": 1369, "group": 171}
+{"id":441, "a": 881, "group": 110}
+{"id":527, "a": 1051, "group": 131}
+{"id":820, "a": 1640, "group": 205}
+{"id":434, "a": 866, "group": 108}
+{"id":277, "a": 553, "group": 69}
+{"id":90, "a": 178, "group": 22}
+{"id":676, "a": 1352, "group": 169}
+{"id":627, "a": 1251, "group": 156}
+{"id":614, "a": 1226, "group": 153}
+{"id":432, "a": 864, "group": 108}
+{"id":350, "a": 698, "group": 87}
+{"id":247, "a": 491, "group": 61}
+{"id":551, "a": 1099, "group": 137}
+{"id":29, "a": 57, "group": 7}
+{"id":104, "a": 208, "group": 26}
+{"id":801, "a": 1601, "group": 200}
+{"id":589, "a": 1177, "group": 147}
+{"id":409, "a": 817, "group": 102}
+{"id":248, "a": 496, "group": 62}
+{"id":39, "a": 75, "group": 9}
+{"id":953, "a": 1905, "group": 238}
+{"id":438, "a": 874, "group": 109}
+{"id":278, "a": 554, "group": 69}
+{"id":447, "a": 891, "group": 111}
+{"id":16, "a": 32, "group": 4}
+{"id":426, "a": 850, "group": 106}
+{"id":950, "a": 1898, "group": 237}
+{"id":533, "a": 1065, "group": 133}
+{"id":171, "a": 339, "group": 42}
+{"id":482, "a": 962, "group": 120}
+{"id":933, "a": 1865, "group": 233}
+{"id":701, "a": 1401, "group": 175}
+{"id":428, "a": 856, "group": 107}
+{"id":915, "a": 1827, "group": 228}
+{"id":972, "a": 1944, "group": 243}
+{"id":857, "a": 1713, "group": 214}
+{"id":844, "a": 1688, "group": 211}
+{"id":683, "a": 1363, "group": 170}
+{"id":3, "a": 3, "group": 0}
+{"id":65, "a": 129, "group": 16}
+{"id":121, "a": 241, "group": 30}
+{"id":202, "a": 402, "group": 50}
+{"id":753, "a": 1505, "group": 188}
+{"id":369, "a": 737, "group": 92}
+{"id":765, "a": 1529, "group": 191}
+{"id":661, "a": 1321, "group": 165}
+{"id":342, "a": 682, "group": 85}
+{"id":442, "a": 882, "group": 110}
+{"id":592, "a": 1184, "group": 148}
+{"id":717, "a": 1433, "group": 179}
+{"id":608, "a": 1216, "group": 152}
+{"id":72, "a": 144, "group": 18}
+{"id":698, "a": 1394, "group": 174}
+{"id":560, "a": 1120, "group": 140}
+{"id":809, "a": 1617, "group": 202}
+{"id":721, "a": 1441, "group": 180}
+{"id":176, "a": 352, "group": 44}
+{"id":87, "a": 171, "group": 21}
+{"id":10, "a": 18, "group": 2}
+{"id":414, "a": 826, "group": 103}
+{"id":548, "a": 1096, "group": 137}
+{"id":37, "a": 73, "group": 9}
+{"id":183, "a": 363, "group": 45}
+{"id":559, "a": 1115, "group": 139}
+{"id":736, "a": 1472, "group": 184}
+{"id":146, "a": 290, "group": 36}
+{"id":776, "a": 1552, "group": 194}
+{"id":505, "a": 1009, "group": 126}
+{"id":993, "a": 1985, "group": 248}
+{"id":992, "a": 1984, "group": 248}
+{"id":312, "a": 624, "group": 78}
+{"id":166, "a": 330, "group": 41}
+{"id":696, "a": 1392, "group": 174}
+{"id":27, "a": 51, "group": 6}
+{"id":269, "a": 537, "group": 67}
+{"id":139, "a": 275, "group": 34}
+{"id":504, "a": 1008, "group": 126}
+{"id":43, "a": 83, "group": 10}
+{"id":469, "a": 937, "group": 117}
+{"id":832, "a": 1664, "group": 208}
+{"id":380, "a": 760, "group": 95}
+{"id":168, "a": 336, "group": 42}
+{"id":768, "a": 1536, "group": 192}
+{"id":42, "a": 82, "group": 10}
+{"id":489, "a": 977, "group": 122}
+{"id":396, "a": 792, "group": 99}
+{"id":584, "a": 1168, "group": 146}
+{"id":975, "a": 1947, "group": 243}
+{"id":359, "a": 715, "group": 89}
+{"id":220, "a": 440, "group": 55}
+{"id":797, "a": 1593, "group": 199}
+{"id":298, "a": 594, "group": 74}
+{"id":486, "a": 970, "group": 121}
+{"id":997, "a": 1993, "group": 249}
+{"id":790, "a": 1578, "group": 197}
+{"id":453, "a": 905, "group": 113}
+{"id":735, "a": 1467, "group": 183}
+{"id":24, "a": 48, "group": 6}
+{"id":502, "a": 1002, "group": 125}
+{"id":939, "a": 1875, "group": 234}
+{"id":314, "a": 626, "group": 78}
+{"id":457, "a": 913, "group": 114}
+{"id":132, "a": 264, "group": 33}
+{"id":50, "a": 98, "group": 12}
+{"id":454, "a": 906, "group": 113}
+{"id":576, "a": 1152, "group": 144}
+{"id":881, "a": 1761, "group": 220}
+{"id":633, "a": 1265, "group": 158}
+{"id":353, "a": 705, "group": 88}
+{"id":934, "a": 1866, "group": 233}
+{"id":620, "a": 1240, "group": 155}
+{"id":167, "a": 331, "group": 41}
+{"id":579, "a": 1155, "group": 144}
+{"id":99, "a": 195, "group": 24}
+{"id":799, "a": 1595, "group": 199}
+{"id":762, "a": 1522, "group": 190}
+{"id":547, "a": 1091, "group": 136}
+{"id":100, "a": 200, "group": 25}
+{"id":154, "a": 306, "group": 38}
+{"id":778, "a": 1554, "group": 194}
+{"id":151, "a": 299, "group": 37}
+{"id":570, "a": 1138, "group": 142}
+{"id":888, "a": 1776, "group": 222}
+{"id":665, "a": 1329, "group": 166}
+{"id":44, "a": 88, "group": 11}
+{"id":670, "a": 1338, "group": 167}
+{"id":412, "a": 824, "group": 103}
+{"id":984, "a": 1968, "group": 246}
+{"id":659, "a": 1315, "group": 164}
+{"id":307, "a": 611, "group": 76}
+{"id":197, "a": 393, "group": 49}
+{"id":306, "a": 610, "group": 76}
+{"id":468, "a": 936, "group": 117}
+{"id":262, "a": 522, "group": 65}
+{"id":483, "a": 963, "group": 120}
+{"id":610, "a": 1218, "group": 152}
+{"id":507, "a": 1011, "group": 126}
+{"id":609, "a": 1217, "group": 152}
diff --git a/exec/java-exec/src/test/resources/window/oneKeyCount.json b/exec/java-exec/src/test/resources/window/oneKeyCount.json
new file mode 100644
index 000000000..d8965fbe1
--- /dev/null
+++ b/exec/java-exec/src/test/resources/window/oneKeyCount.json
@@ -0,0 +1,43 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"fs-scan",
+ format: {type: "json"},
+ storage:{type: "file", connection: "file:///"},
+ files:["#{DATA_FILE}"]
+ },
+ {
+ @id:2,
+ child: 1,
+ pop:"sort",
+ orderings: [
+ {expr: "a"}
+ ]
+ },
+ {
+ @id:3,
+ child: 2,
+ pop:"window",
+ within: [
+ { ref: "a", expr: "a" }
+ ],
+ aggregations: [
+ { ref: "cnt", expr:"count(1)" },
+ { ref: "sum", expr:"sum(b)" }
+ ]
+ },
+ {
+ @id: 4,
+ child: 3,
+ pop: "screen"
+ }
+ ]
+} \ No newline at end of file
diff --git a/exec/java-exec/src/test/resources/window/oneKeyCountData.json b/exec/java-exec/src/test/resources/window/oneKeyCountData.json
new file mode 100644
index 000000000..3c0115e74
--- /dev/null
+++ b/exec/java-exec/src/test/resources/window/oneKeyCountData.json
@@ -0,0 +1,4 @@
+{"a": 1, "b": 100}
+ {"a": 1, "b": 50}
+ {"a": 2, "b": 25}
+ {"a": 2, "b": 50} \ No newline at end of file
diff --git a/exec/java-exec/src/test/resources/window/oneKeyCountMultiBatch.json b/exec/java-exec/src/test/resources/window/oneKeyCountMultiBatch.json
new file mode 100644
index 000000000..069bc1f4f
--- /dev/null
+++ b/exec/java-exec/src/test/resources/window/oneKeyCountMultiBatch.json
@@ -0,0 +1,72 @@
+{
+ "head" : {
+ "version" : 1,
+ "generator" : {
+ "type" : "DefaultSqlHandler",
+ "info" : ""
+ },
+ "type" : "APACHE_DRILL_PHYSICAL",
+ "options" : [ ],
+ "resultMode" : "EXEC"
+ },
+ "graph" : [{
+ @id:1,
+ pop:"fs-scan",
+ format: {type: "json"},
+ storage:{type: "file", connection: "file:///"},
+ files:["#{DATA_FILE}"]
+ },
+ {
+ @id:2,
+ child: 1,
+ pop:"sort",
+ orderings: [
+ {expr: "group"},
+ {expr: "a"}
+ ]
+ },
+ {
+ "pop" : "window",
+ "@id" : 3,
+ "child" : 2,
+ "aggregations" : [ {
+ "ref" : "`w0$o0`",
+ "expr" : "count(`a`) "
+ }, {
+ "ref" : "`w0$o1`",
+ "expr" : "$sum0(`a`) "
+ } ],
+ "start" : -9223372036854775808,
+ "end" : -9223372036854775808,
+ "initialAllocation" : 1000000,
+ "maxAllocation" : 10000000000,
+ "withins" : [ {
+ "ref" : "`group`",
+ "expr" : "`group`"
+ } ]
+ }, {
+ "pop" : "project",
+ "@id" : 4,
+ "exprs" : [ {
+ "ref" : "`output`",
+ "expr" : " ( if (greater_than(`w0$o0`, 0) ) then (`w0$o1` ) else (NULL) end ) "
+ },
+ {
+ "ref" : "cnt",
+ "expr": "w0$o0"
+ },
+ {
+ "ref" : "sum",
+ "expr": "w0$o1"
+ }],
+ "child" : 3,
+ "initialAllocation" : 1000000,
+ "maxAllocation" : 10000000000
+ }, {
+ "pop" : "screen",
+ "@id" : 5,
+ "child" : 4,
+ "initialAllocation" : 1000000,
+ "maxAllocation" : 10000000000
+ } ]
+} \ No newline at end of file
diff --git a/exec/java-exec/src/test/resources/window/twoKeys.json b/exec/java-exec/src/test/resources/window/twoKeys.json
new file mode 100644
index 000000000..6282ad244
--- /dev/null
+++ b/exec/java-exec/src/test/resources/window/twoKeys.json
@@ -0,0 +1,44 @@
+{
+ head:{
+ type:"APACHE_DRILL_PHYSICAL",
+ version:"1",
+ generator:{
+ type:"manual"
+ }
+ },
+ graph:[
+ {
+ @id:1,
+ pop:"fs-scan",
+ format: {type: "json"},
+ storage:{type: "file", connection: "file:///"},
+ files:["#{DATA_FILE}"]
+ },
+ {
+ @id:2,
+ child: 1,
+ pop:"sort",
+ orderings: [
+ {expr: "a"}
+ ]
+ },
+ {
+ @id:3,
+ child: 2,
+ pop:"window",
+ within: [
+ { ref: "a", expr: "a" },
+ { ref: "b", expr: "b" }
+ ],
+ aggregations: [
+ { ref: "cnt", expr:"count(1)" },
+ { ref: "sum", expr:"sum(c)" }
+ ]
+ },
+ {
+ @id: 4,
+ child: 3,
+ pop: "screen"
+ }
+ ]
+} \ No newline at end of file
diff --git a/exec/java-exec/src/test/resources/window/twoKeysData.json b/exec/java-exec/src/test/resources/window/twoKeysData.json
new file mode 100644
index 000000000..fd09236f3
--- /dev/null
+++ b/exec/java-exec/src/test/resources/window/twoKeysData.json
@@ -0,0 +1,8 @@
+{"a": 1, "b": "group1", "c": 5}
+{"a": 1, "b": "group1", "c": 10}
+{"a": 1, "b": "group2", "c": 15}
+{"a": 1, "b": "group2", "c": 20}
+{"a": 2, "b": "group3", "c": 25}
+{"a": 2, "b": "group3", "c": 30}
+{"a": 2, "b": "group4", "c": 35}
+{"a": 2, "b": "group4", "c": 40} \ No newline at end of file