diff options
author | Timothy Chen <tnachen@gmail.com> | 2014-09-21 23:54:40 -0700 |
---|---|---|
committer | Steven Phillips <sphillips@maprtech.com> | 2014-09-23 22:24:21 -0700 |
commit | 8def6e91489455c0ae670f49ef5ba51ef71b31bd (patch) | |
tree | 1e53ac7654039b05b6a0531039aa1b317665f45a /exec/java-exec | |
parent | 9bc71fc54b97b52ac5c7335247b6ca7887045fd2 (diff) |
Patch for DRILL-705
Currently only supports partitioning/ordering, not yet preceding or
after offsets
Diffstat (limited to 'exec/java-exec')
66 files changed, 2847 insertions, 124 deletions
diff --git a/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd b/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd index 812c289e6..5b4041c80 100644 --- a/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd +++ b/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd @@ -78,7 +78,7 @@ {inputType: "NullableVarBinary", outputType: "VarBinary", runningType: "VarBinary", major: "VarBytes"} ] }, - {className: "Sum", funcName: "sum", types: [ + {className: "Sum", funcName: "sum", types: [ {inputType: "Bit", outputType: "Bit", runningType: "Bit", major: "Numeric"}, {inputType: "Int", outputType: "BigInt", runningType: "BigInt", major: "Numeric"}, {inputType: "BigInt", outputType: "BigInt", runningType: "BigInt", major: "Numeric"}, @@ -96,7 +96,7 @@ {inputType: "Interval", outputType: "Interval", runningType: "Interval", major: "Date", initialValue: "0"}, {inputType: "NullableInterval", outputType: "Interval", runningType: "Interval", major: "Date", initialValue: "0"} ] - }, + }, {className: "Count", funcName: "count", types: [ {inputType: "Bit", outputType: "Bit", runningType: "Bit", major: "Numeric"}, {inputType: "Int", outputType: "BigInt", runningType: "BigInt", major: "Numeric"}, @@ -129,4 +129,4 @@ ] } ] -} +} diff --git a/exec/java-exec/src/main/codegen/templates/TypeHelper.java b/exec/java-exec/src/main/codegen/templates/TypeHelper.java index 3c0d9d3af..cb6a030d6 100644 --- a/exec/java-exec/src/main/codegen/templates/TypeHelper.java +++ b/exec/java-exec/src/main/codegen/templates/TypeHelper.java @@ -73,8 +73,7 @@ public class TypeHelper { case LIST: return new GenericAccessor(vector); } - - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("Unable to find sql accessor for minor type [" + vector.getField().getType().getMinorType() + "] and mode [" + vector.getField().getType().getMode() + "]"); } public static ValueVector getNewVector(SchemaPath parentPath, String name, BufferAllocator allocator, MajorType type){ @@ -255,7 +254,7 @@ public class TypeHelper { default: break; } - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("Unable to find holder type for minorType: " + type); } public static ValueVector getNewVector(MaterializedField field, BufferAllocator allocator){ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java index a5b7beeb3..6280c40b2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/EvaluationVisitor.java @@ -71,6 +71,9 @@ import com.sun.codemodel.JLabel; import com.sun.codemodel.JType; import com.sun.codemodel.JVar; +/** + * Visitor that generates code for eval + */ public class EvaluationVisitor { private final FunctionImplementationRegistry registry; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java index 6ef46de2c..18c507273 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java @@ -17,6 +17,9 @@ */ package org.apache.drill.exec.memory; +import com.google.common.collect.LinkedListMultimap; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; import io.netty.buffer.ByteBuf; import io.netty.buffer.DrillBuf; @@ -31,9 +34,9 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.util.AssertionUtil; -import com.google.common.collect.LinkedListMultimap; -import com.google.common.collect.Maps; -import com.google.common.collect.Multimap; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.ConcurrentMap; public class Accountor { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Accountor.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java index 876ba37af..98202aca7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java @@ -17,12 +17,7 @@ */ package org.apache.drill.exec.opt; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; - +import com.google.common.collect.Lists; import org.apache.drill.common.JSONOptions; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.ExecutionSetupException; @@ -40,6 +35,7 @@ import org.apache.drill.common.logical.data.Project; import org.apache.drill.common.logical.data.Scan; import org.apache.drill.common.logical.data.SinkOperator; import org.apache.drill.common.logical.data.Store; +import org.apache.drill.common.logical.data.Window; import org.apache.drill.common.logical.data.visitors.AbstractLogicalVisitor; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.DataMode; @@ -54,13 +50,18 @@ import org.apache.drill.exec.physical.config.Screen; import org.apache.drill.exec.physical.config.SelectionVectorRemover; import org.apache.drill.exec.physical.config.Sort; import org.apache.drill.exec.physical.config.StreamingAggregate; +import org.apache.drill.exec.physical.config.WindowPOP; import org.apache.drill.exec.rpc.user.UserServer; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.StoragePlugin; import org.eigenbase.rel.RelFieldCollation.Direction; import org.eigenbase.rel.RelFieldCollation.NullDirection; -import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; public class BasicOptimizer extends Optimizer{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicOptimizer.class); @@ -183,13 +184,22 @@ public class BasicOptimizer extends Optimizer{ return sa; } + @Override + public PhysicalOperator visitWindow(Window window, Object value) throws OptimizerException { + PhysicalOperator input = window.getInput().accept(this, value); + List<Ordering> ods = Lists.newArrayList(); + + input = new Sort(input, ods, false); + + return new WindowPOP(input, window.getWithins(), window.getAggregations(), window.getStart(), window.getEnd()); + } @Override public PhysicalOperator visitOrder(Order order, Object value) throws OptimizerException { PhysicalOperator input = order.getInput().accept(this, value); List<Ordering> ods = Lists.newArrayList(); - for(Ordering o : order.getOrderings()){ + for (Ordering o : order.getOrderings()){ ods.add(o); } return new SelectionVectorRemover(new Sort(input, ods, false)); @@ -250,7 +260,6 @@ public class BasicOptimizer extends Optimizer{ @Override public PhysicalOperator visitProject(Project project, Object obj) throws OptimizerException { -// return project.getInput().accept(this, obj); return new org.apache.drill.exec.physical.config.Project(Arrays.asList(project.getSelections()), project.iterator().next().accept(this, obj)); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java index 48b38011f..031ab10c5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java @@ -39,6 +39,7 @@ import org.apache.drill.exec.physical.config.Trace; import org.apache.drill.exec.physical.config.UnionAll; import org.apache.drill.exec.physical.config.UnionExchange; import org.apache.drill.exec.physical.config.UnorderedReceiver; +import org.apache.drill.exec.physical.config.WindowPOP; public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> implements PhysicalVisitor<T, X, E> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractPhysicalVisitor.class); @@ -64,6 +65,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme } @Override + public T visitWindowFrame(WindowPOP windowFrame, X value) throws E { + return visitOp(windowFrame, value); + } + + @Override public T visitProject(Project project, X value) throws E{ return visitOp(project, value); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java index 2b10e6d0a..6d6c5912b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractSingle.java @@ -28,7 +28,7 @@ import com.google.common.collect.Iterators; * Describes an operator that expects a single child operator as its input. * @param <T> The type of Exec model supported. */ -public abstract class AbstractSingle extends AbstractBase{ +public abstract class AbstractSingle extends AbstractBase { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractSingle.class); protected final PhysicalOperator child; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java index 8f5139029..a5518caf6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java @@ -37,7 +37,7 @@ import com.fasterxml.jackson.annotation.ObjectIdGenerators; @JsonPropertyOrder({ "@id" }) @JsonIdentityInfo(generator = ObjectIdGenerators.PropertyGenerator.class, property = "@id") @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "pop") -public interface PhysicalOperator extends GraphValue<PhysicalOperator> { +public interface PhysicalOperator extends GraphValue<PhysicalOperator> { /** * Describes whether or not a particular physical operator can actually be executed. Most physical operators can be diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java index 8da06cbb8..f9a9c21af 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java @@ -39,6 +39,7 @@ import org.apache.drill.exec.physical.config.Trace; import org.apache.drill.exec.physical.config.UnionAll; import org.apache.drill.exec.physical.config.UnionExchange; import org.apache.drill.exec.physical.config.UnorderedReceiver; +import org.apache.drill.exec.physical.config.WindowPOP; /** * Visitor class designed to traversal of a operator tree. Basis for a number of operator manipulations including fragmentation and materialization. @@ -80,6 +81,7 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> { public RETURN visitScreen(Screen op, EXTRA value) throws EXCEP; public RETURN visitSingleSender(SingleSender op, EXTRA value) throws EXCEP; public RETURN visitUnionExchange(UnionExchange op, EXTRA value) throws EXCEP; + public RETURN visitWindowFrame(WindowPOP op, EXTRA value) throws EXCEP; public RETURN visitProducerConsumer(ProducerConsumer op, EXTRA value) throws EXCEP; public RETURN visitIteratorValidator(IteratorValidator op, EXTRA value) throws EXCEP; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java new file mode 100644 index 000000000..17738eecd --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/WindowPOP.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.config; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import org.apache.drill.common.logical.data.NamedExpression; +import org.apache.drill.exec.physical.base.AbstractSingle; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.proto.UserBitShared; + +@JsonTypeName("window") +public class WindowPOP extends AbstractSingle { + + private final NamedExpression[] withins; + private final NamedExpression[] aggregations; + private final long start; + private final long end; + + public WindowPOP(@JsonProperty("child") PhysicalOperator child, + @JsonProperty("within") NamedExpression[] withins, + @JsonProperty("aggregations") NamedExpression[] aggregations, + @JsonProperty("start") long start, + @JsonProperty("end") long end) { + super(child); + this.withins = withins; + this.aggregations = aggregations; + this.start = start; + this.end = end; + } + + @Override + protected PhysicalOperator getNewWithChild(PhysicalOperator child) { + return new WindowPOP(child, withins, aggregations, start, end); + } + + @Override + public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { + return physicalVisitor.visitWindowFrame(this, value); + } + + @Override + public int getOperatorType() { + return UserBitShared.CoreOperatorType.WINDOW_VALUE; + } + + public long getStart() { + return start; + } + + public long getEnd() { + return end; + } + + public NamedExpression[] getAggregations() { + return aggregations; + } + + public NamedExpression[] getWithins() { + return withins; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java index e6900605f..dae9eaedb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java @@ -35,7 +35,11 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{ private final SelectionVector4 sv4; public InternalBatch(RecordBatch incoming) { - switch(incoming.getSchema().getSelectionVectorMode()) { + this(incoming, null); + } + + public InternalBatch(RecordBatch incoming, VectorWrapper[] ignoreWrappers){ + switch(incoming.getSchema().getSelectionVectorMode()){ case FOUR_BYTE: this.sv4 = incoming.getSelectionVector4().createNewWrapperCurrent(); this.sv2 = null; @@ -49,7 +53,7 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{ this.sv2 = null; } this.schema = incoming.getSchema(); - this.container = VectorContainer.getTransferClone(incoming); + this.container = VectorContainer.getTransferClone(incoming, ignoreWrappers); } public BatchSchema getSchema() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index ced51798f..4d3925ea5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -325,9 +325,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { default: throw new IllegalStateException(); - } - } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java index f1fcce0d6..85f664cfc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java @@ -76,13 +76,15 @@ public class FilterRecordBatch extends AbstractSingleRecordBatch<Filter>{ } @Override - protected void doWork() { + protected IterOutcome doWork() { int recordCount = incoming.getRecordCount(); filter.filterBatch(recordCount); // for (VectorWrapper<?> v : container) { // ValueVector.Mutator m = v.getValueVector().getMutator(); // m.setValueCount(recordCount); // } + + return IterOutcome.OK; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java index f5bc9f91b..8f4d90f3f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/limit/LimitRecordBatch.java @@ -122,7 +122,7 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { } @Override - protected void doWork() { + protected IterOutcome doWork() { for(TransferPair tp : transfers) { tp.transfer(); } @@ -139,6 +139,8 @@ public class LimitRecordBatch extends AbstractSingleRecordBatch<Limit> { limitWithNoSV(recordCount); } } + + return IterOutcome.OK; } private IterOutcome produceEmptyFirstBatch() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index a1a834052..224753e08 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -70,7 +70,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.sun.codemodel.JExpr; -public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ +public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProjectRecordBatch.class); private Projector projector; @@ -133,13 +133,13 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ } @Override - protected void doWork() { + protected IterOutcome doWork() { // VectorUtil.showVectorAccessibleContent(incoming, ","); int incomingRecordCount = incoming.getRecordCount(); if (!doAlloc()) { outOfMemory = true; - return; + return IterOutcome.OUT_OF_MEMORY; } int outputRecords = projector.projectRecords(0, incomingRecordCount, 0); @@ -160,6 +160,8 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ if (complexWriters != null) { container.buildSchema(SelectionVectorMode.NONE); } + + return IterOutcome.OK; } private void handleRemainder() { @@ -177,7 +179,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ setValueCount(remainingRecordCount); hasRemainder = false; remainderIndex = 0; - for(VectorWrapper<?> v: incoming) { + for (VectorWrapper<?> v : incoming) { v.clear(); } this.recordCount = remainingRecordCount; @@ -259,7 +261,7 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ } @Override - protected void setupNewSchema() throws SchemaChangeException{ + protected void setupNewSchema() throws SchemaChangeException { this.allocationVectors = Lists.newArrayList(); container.clear(); final List<NamedExpression> exprs = getExpressionList(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java index 97f3608b2..7178d4c23 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java @@ -91,7 +91,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect } @Override - protected void doWork() { + protected IterOutcome doWork() { int incomingRecordCount = incoming.getRecordCount(); int copiedRecords = copier.copyRecords(0, incomingRecordCount); @@ -125,6 +125,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect incomingRecordCount, incomingRecordCount - remainderIndex, incoming.getSchema()); + return IterOutcome.OK; } private void handleRemainder() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java index 6d909623a..4e644dfed 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/trace/TraceRecordBatch.java @@ -100,7 +100,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> { * this record batch to a log file. */ @Override - protected void doWork() { + protected IterOutcome doWork() { boolean incomingHasSv2 = incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.TWO_BYTE; if (incomingHasSv2) { @@ -121,6 +121,7 @@ public class TraceRecordBatch extends AbstractSingleRecordBatch<Trace> { if (incomingHasSv2) { sv = wrap.getSv2(); } + return IterOutcome.OK; } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameBatchCreator.java new file mode 100644 index 000000000..9b8929f7b --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameBatchCreator.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.impl.window; + +import com.google.common.collect.Iterables; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.WindowPOP; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.record.RecordBatch; + +import java.util.List; + +public class StreamingWindowFrameBatchCreator implements BatchCreator<WindowPOP> { + + @Override + public RecordBatch getBatch(FragmentContext context, WindowPOP config, List<RecordBatch> children) throws ExecutionSetupException { + return new StreamingWindowFrameRecordBatch(config, context, Iterables.getOnlyElement(children)); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java new file mode 100644 index 000000000..2a9208965 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameRecordBatch.java @@ -0,0 +1,268 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.impl.window; + +import com.google.common.collect.Lists; +import com.sun.codemodel.JExpr; +import com.sun.codemodel.JVar; +import org.apache.drill.common.expression.ErrorCollector; +import org.apache.drill.common.expression.ErrorCollectorImpl; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.logical.data.NamedExpression; +import org.apache.drill.exec.compile.sig.GeneratorMapping; +import org.apache.drill.exec.compile.sig.MappingSet; +import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.expr.CodeGenerator; +import org.apache.drill.exec.expr.ExpressionTreeMaterializer; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.expr.ValueVectorReadExpression; +import org.apache.drill.exec.expr.ValueVectorWriteExpression; +import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; +import org.apache.drill.exec.memory.OutOfMemoryException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.WindowPOP; +import org.apache.drill.exec.record.AbstractSingleRecordBatch; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.record.selection.SelectionVector2; +import org.apache.drill.exec.record.selection.SelectionVector4; +import org.apache.drill.exec.vector.ValueVector; + +import java.io.IOException; +import java.util.List; + +public class StreamingWindowFrameRecordBatch extends AbstractSingleRecordBatch<WindowPOP> { + private StreamingWindowFramer framer; + + public StreamingWindowFrameRecordBatch(WindowPOP popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { + super(popConfig, context, incoming); + } + + @Override + protected void setupNewSchema() throws SchemaChangeException { + container.clear(); + + try { + this.framer = createFramer(); + } catch (ClassTransformationException | IOException ex) { + throw new SchemaChangeException("Failed to create framer: " + ex); + } + } + + private void getIndex(ClassGenerator<StreamingWindowFramer> g) { + switch (incoming.getSchema().getSelectionVectorMode()) { + case FOUR_BYTE: { + JVar var = g.declareClassField("sv4_", g.getModel()._ref(SelectionVector4.class)); + g.getBlock("setupInterior").assign(var, JExpr.direct("incoming").invoke("getSelectionVector4")); + g.getBlock("getVectorIndex")._return(var.invoke("get").arg(JExpr.direct("recordIndex"))); + return; + } + case NONE: { + g.getBlock("getVectorIndex")._return(JExpr.direct("recordIndex")); + return; + } + case TWO_BYTE: { + JVar var = g.declareClassField("sv2_", g.getModel()._ref(SelectionVector2.class)); + g.getBlock("setupInterior").assign(var, JExpr.direct("incoming").invoke("getSelectionVector2")); + g.getBlock("getVectorIndex")._return(var.invoke("get").arg(JExpr.direct("recordIndex"))); + return; + } + + default: + throw new IllegalStateException(); + } + } + + private StreamingWindowFramer createFramer() throws SchemaChangeException, IOException, ClassTransformationException { + int configLength = popConfig.getAggregations().length; + List<LogicalExpression> valueExprs = Lists.newArrayList(); + LogicalExpression[] keyExprs = new LogicalExpression[popConfig.getWithins().length]; + + ErrorCollector collector = new ErrorCollectorImpl(); + + for (int i = 0; i < configLength; i++) { + NamedExpression ne = popConfig.getAggregations()[i]; + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry()); + if (expr == null) { + continue; + } + + final MaterializedField outputField = MaterializedField.create(ne.getRef(), expr.getMajorType()); + ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); + TypedFieldId id = container.add(vector); + valueExprs.add(new ValueVectorWriteExpression(id, expr, true)); + } + + int j = 0; + LogicalExpression[] windowExprs = new LogicalExpression[incoming.getSchema().getFieldCount()]; + // TODO: Should transfer all existing columns instead of copy. Currently this is not easily doable because + // we are not processing one entire batch in one iteration, so cannot simply transfer. + for (VectorWrapper wrapper : incoming) { + ValueVector vv = wrapper.isHyper() ? wrapper.getValueVectors()[0] : wrapper.getValueVector(); + ValueVector vector = TypeHelper.getNewVector(vv.getField(), oContext.getAllocator()); + TypedFieldId id = container.add(vector); + final LogicalExpression expr = ExpressionTreeMaterializer.materialize( + new ValueVectorReadExpression(new TypedFieldId(vv.getField().getType(), wrapper.isHyper(), j)), + incoming, + collector, + context.getFunctionRegistry()); + windowExprs[j] = new ValueVectorWriteExpression(id, expr, true); + j++; + } + + for (int i = 0; i < keyExprs.length; i++) { + NamedExpression ne = popConfig.getWithins()[i]; + + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(ne.getExpr(), incoming, collector, context.getFunctionRegistry()); + if (expr == null) { + continue; + } + + keyExprs[i] = expr; + } + + if (collector.hasErrors()) { + throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } + + final ClassGenerator<StreamingWindowFramer> cg = CodeGenerator.getRoot(StreamingWindowFramer.TEMPLATE_DEFINITION, context.getFunctionRegistry()); + setupIsSame(cg, keyExprs); + setupIsSameFromBatch(cg, keyExprs); + addRecordValues(cg, valueExprs.toArray(new LogicalExpression[valueExprs.size()])); + outputWindowValues(cg, windowExprs); + + cg.getBlock("resetValues")._return(JExpr.TRUE); + + container.buildSchema(BatchSchema.SelectionVectorMode.NONE); + getIndex(cg); + StreamingWindowFramer agg = context.getImplementationClass(cg); + agg.setup( + context, + incoming, + this, + StreamingWindowFrameTemplate.UNBOUNDED, StreamingWindowFrameTemplate.CURRENT_ROW + ); + return agg; + } + + private final GeneratorMapping IS_SAME_PREV_INTERNAL_BATCH_READ = GeneratorMapping.create("isSameFromBatch", "isSameFromBatch", null, null); // the internal batch changes each time so we need to redo setup. + private final GeneratorMapping IS_SAME_PREV = GeneratorMapping.create("setupInterior", "isSameFromBatch", null, null); + private final MappingSet ISA_B1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PREV_INTERNAL_BATCH_READ, IS_SAME_PREV_INTERNAL_BATCH_READ); + private final MappingSet ISA_B2 = new MappingSet("b2Index", null, "incoming", null, IS_SAME_PREV, IS_SAME_PREV); + + private void setupIsSameFromBatch(ClassGenerator<StreamingWindowFramer> cg, LogicalExpression[] keyExprs) { + cg.setMappingSet(ISA_B1); + for (LogicalExpression expr : keyExprs) { + // first, we rewrite the evaluation stack for each side of the comparison. + cg.setMappingSet(ISA_B1); + ClassGenerator.HoldingContainer first = cg.addExpr(expr, false); + cg.setMappingSet(ISA_B2); + ClassGenerator.HoldingContainer second = cg.addExpr(expr, false); + + LogicalExpression fh = FunctionGenerationHelper.getComparator(first, second, context.getFunctionRegistry()); + ClassGenerator.HoldingContainer out = cg.addExpr(fh, false); + cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE); + } + cg.getEvalBlock()._return(JExpr.TRUE); + } + + private final GeneratorMapping IS_SAME = GeneratorMapping.create("setupInterior", "isSame", null, null); + private final MappingSet IS_SAME_I1 = new MappingSet("index1", null, IS_SAME, IS_SAME); + private final MappingSet IS_SAME_I2 = new MappingSet("index2", null, IS_SAME, IS_SAME); + + private void setupIsSame(ClassGenerator<StreamingWindowFramer> cg, LogicalExpression[] keyExprs) { + cg.setMappingSet(IS_SAME_I1); + for (LogicalExpression expr : keyExprs) { + // first, we rewrite the evaluation stack for each side of the comparison. + cg.setMappingSet(IS_SAME_I1); + ClassGenerator.HoldingContainer first = cg.addExpr(expr, false); + cg.setMappingSet(IS_SAME_I2); + ClassGenerator.HoldingContainer second = cg.addExpr(expr, false); + + LogicalExpression fh = FunctionGenerationHelper.getComparator(first, second, context.getFunctionRegistry()); + ClassGenerator.HoldingContainer out = cg.addExpr(fh, false); + cg.getEvalBlock()._if(out.getValue().ne(JExpr.lit(0)))._then()._return(JExpr.FALSE); + } + cg.getEvalBlock()._return(JExpr.TRUE); + } + + private final GeneratorMapping EVAL_INSIDE = GeneratorMapping.create("setupInterior", "addRecord", null, null); + private final GeneratorMapping EVAL_OUTSIDE = GeneratorMapping.create("setupInterior", "outputRecordValues", "resetValues", "cleanup"); + private final MappingSet EVAL = new MappingSet("index", "outIndex", EVAL_INSIDE, EVAL_OUTSIDE, EVAL_INSIDE); + + private void addRecordValues(ClassGenerator<StreamingWindowFramer> cg, LogicalExpression[] valueExprs) { + cg.setMappingSet(EVAL); + for (LogicalExpression ex : valueExprs) { + ClassGenerator.HoldingContainer hc = cg.addExpr(ex); + cg.getBlock(ClassGenerator.BlockType.EVAL)._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE); + } + cg.getBlock(ClassGenerator.BlockType.EVAL)._return(JExpr.TRUE); + } + + private final GeneratorMapping OUTPUT_WINDOW_VALUES = GeneratorMapping.create("setupInterior", "outputWindowValues", null, null); + private final MappingSet WINDOW_VALUES = new MappingSet("inIndex" /* read index */, "outIndex" /* write index */, "incoming", "outgoing", OUTPUT_WINDOW_VALUES, OUTPUT_WINDOW_VALUES); + + private void outputWindowValues(ClassGenerator<StreamingWindowFramer> cg, LogicalExpression[] valueExprs) { + cg.setMappingSet(WINDOW_VALUES); + for (int i = 0; i < valueExprs.length; i++) { + ClassGenerator.HoldingContainer hc = cg.addExpr(valueExprs[i]); + cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE); + } + cg.getEvalBlock()._return(JExpr.TRUE); + } + + @Override + protected IterOutcome doWork() { + StreamingWindowFramer.AggOutcome out = framer.doWork(); + + while (out == StreamingWindowFramer.AggOutcome.UPDATE_AGGREGATOR) { + framer = null; + try { + setupNewSchema(); + } catch (SchemaChangeException e) { + return IterOutcome.STOP; + } + out = framer.doWork(); + } + + if (out == StreamingWindowFramer.AggOutcome.RETURN_AND_COMPLETE) { + done = true; + } + + return framer.getOutcome(); + } + + @Override + public int getRecordCount() { + return framer.getOutputCount(); + } + + @Override + public void cleanup() { + if (framer != null) { + framer.cleanup(); + } + super.cleanup(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java new file mode 100644 index 000000000..b4e3fed5e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFrameTemplate.java @@ -0,0 +1,286 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.impl.window; + +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.impl.aggregate.InternalBatch; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.VectorWrapper; + +import javax.inject.Named; + +public abstract class StreamingWindowFrameTemplate implements StreamingWindowFramer { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingWindowFramer.class); + private static final String TOO_BIG_ERROR = "Couldn't add value to an empty batch. This likely means that a single value is too long for a varlen field."; + private static final boolean EXTRA_DEBUG = false; + public static final int UNBOUNDED = -1; + public static final int CURRENT_ROW = 0; + private boolean first = true; + private int previousIndex = 0; + private int underlyingIndex = -1; + private int currentIndex; + private boolean pendingOutput = false; + private RecordBatch.IterOutcome outcome; + private int outputCount = 0; + private RecordBatch incoming; + private RecordBatch outgoing; + private FragmentContext context; + private InternalBatch previousBatch = null; + private int precedingConfig = UNBOUNDED; + private int followingConfig = CURRENT_ROW; + + + @Override + public void setup(FragmentContext context, + RecordBatch incoming, + RecordBatch outgoing, + int precedingConfig, + int followingConfig) throws SchemaChangeException { + this.context = context; + this.incoming = incoming; + this.outgoing = outgoing; + this.precedingConfig = precedingConfig; + this.followingConfig = followingConfig; + + setupInterior(incoming, outgoing); + } + + + private void allocateOutgoing() { + for (VectorWrapper<?> w : outgoing){ + w.getValueVector().allocateNew(); + } + } + + @Override + public RecordBatch.IterOutcome getOutcome() { + return outcome; + } + + @Override + public int getOutputCount() { + return outputCount; + } + + private AggOutcome tooBigFailure() { + context.fail(new Exception(TOO_BIG_ERROR)); + this.outcome = RecordBatch.IterOutcome.STOP; + return AggOutcome.RETURN_OUTCOME; + } + + @Override + public AggOutcome doWork() { + // if we're in the first state, allocate outgoing. + try { + if (first) { + allocateOutgoing(); + } + + // setup for new output and pick any remainder. + if (pendingOutput) { + allocateOutgoing(); + pendingOutput = false; + outputToBatch(previousIndex); + } + + boolean recordsAdded = false; + + outside: while (true) { + if (EXTRA_DEBUG) { + logger.trace("Looping from underlying index {}, previous {}, current {}", underlyingIndex, previousIndex, currentIndex); + logger.debug("Processing {} records in window framer", incoming.getRecordCount()); + } + // loop through existing records, adding as necessary. + while(incIndex()) { + if (previousBatch != null) { + boolean isSameFromBatch = isSameFromBatch(previousIndex, previousBatch, currentIndex); + if (EXTRA_DEBUG) { + logger.trace("Same as previous batch: {}, previous index {}, current index {}", isSameFromBatch, previousIndex, currentIndex); + } + + if(!isSameFromBatch) { + resetValues(); + } + previousBatch.clear(); + previousBatch = null; + } else if (!isSame(previousIndex, currentIndex)) { + resetValues(); + } + + addRecord(currentIndex); + + if (!outputToBatch(currentIndex)) { + if (outputCount == 0) { + return tooBigFailure(); + } + + // mark the pending output but move forward for the next cycle. + pendingOutput = true; + incIndex(); + return setOkAndReturn(); + } + + recordsAdded = true; + } + + if (EXTRA_DEBUG) { + logger.debug("Exit Loop from underlying index {}, previous {}, current {}", underlyingIndex, previousIndex, currentIndex); + } + + previousBatch = new InternalBatch(incoming); + + while (true) { + RecordBatch.IterOutcome out = incoming.next(); + switch (out) { + case NONE: + outcome = innerOutcome(out, recordsAdded); + if (EXTRA_DEBUG) { + logger.trace("Received IterOutcome of {}, assigning {} outcome", out, outcome); + } + return AggOutcome.RETURN_AND_COMPLETE; + case NOT_YET: + outcome = innerOutcome(out, recordsAdded); + if (EXTRA_DEBUG) { + logger.trace("Received IterOutcome of {}, assigning {} outcome", out, outcome); + } + return AggOutcome.RETURN_OUTCOME; + + case OK_NEW_SCHEMA: + if (EXTRA_DEBUG) { + logger.trace("Received new schema. Batch has {} records.", incoming.getRecordCount()); + } + resetIndex(); + return AggOutcome.UPDATE_AGGREGATOR; + + case OK: + if (EXTRA_DEBUG) { + logger.trace("Received OK with {} records.", incoming.getRecordCount()); + } + resetIndex(); + if (incoming.getRecordCount() == 0) { + continue; + } else { + continue outside; + } + case STOP: + default: + outcome = out; + if (EXTRA_DEBUG) { + logger.trace("Stop received.", incoming.getRecordCount()); + } + return AggOutcome.RETURN_OUTCOME; + } + } + } + } finally { + first = false; + } + } + + private RecordBatch.IterOutcome innerOutcome(RecordBatch.IterOutcome innerOutcome, boolean newRecordsAdded) { + if(newRecordsAdded) { + setOkAndReturn(); + return outcome; + } + return innerOutcome; + } + + + private final boolean incIndex() { + underlyingIndex++; + + if(currentIndex != -1) { + previousIndex = currentIndex; + } + + if (underlyingIndex >= incoming.getRecordCount()) { + return false; + } + + currentIndex = getVectorIndex(underlyingIndex); + return true; + } + + private final void resetIndex() { + underlyingIndex = -1; + currentIndex = getVectorIndex(underlyingIndex); + if (EXTRA_DEBUG) { + logger.trace("Reset new indexes: underlying {}, previous {}, current {}", underlyingIndex, previousIndex, currentIndex); + } + } + + private final AggOutcome setOkAndReturn() { + if (first) { + this.outcome = RecordBatch.IterOutcome.OK_NEW_SCHEMA; + } else { + this.outcome = RecordBatch.IterOutcome.OK; + } + + if (EXTRA_DEBUG) { + logger.debug("Setting output count {}", outputCount); + } + for (VectorWrapper<?> v : outgoing) { + v.getValueVector().getMutator().setValueCount(outputCount); + } + return AggOutcome.RETURN_OUTCOME; + } + + private final boolean outputToBatch(int inIndex) { + boolean success = outputRecordValues(outputCount) + && outputWindowValues(inIndex, outputCount); + + if (success) { + outputCount++; + } + + return success; + } + + @Override + public void cleanup() { + if(previousBatch != null) { + previousBatch.clear(); + previousBatch = null; + } + } + + public abstract void setupInterior(@Named("incoming") RecordBatch incoming, @Named("outgoing") RecordBatch outgoing) throws SchemaChangeException; + + /** + * Compares withins from two indexes in the same batch + * @param index1 First record index + * @param index2 Second record index + * @return does within value match + */ + public abstract boolean isSame(@Named("index1") int index1, @Named("index2") int index2); + /** + * Compares withins from one index of given batch (typically previous completed batch), and one index from current batch + * @param b1Index First record index + * @param index2 Second record index + * @return does within value match + */ + public abstract boolean isSameFromBatch(@Named("b1Index") int b1Index, @Named("b1") InternalBatch b1, @Named("b2Index") int index2); + public abstract void addRecord(@Named("index") int index); + public abstract boolean outputRecordValues(@Named("outIndex") int outIndex); + public abstract boolean outputWindowValues(@Named("inIndex") int inIndex, @Named("outIndex") int outIndex); + public abstract int getVectorIndex(@Named("recordIndex") int recordIndex); + + public abstract boolean resetValues(); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFramer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFramer.java new file mode 100644 index 000000000..9588ceffa --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/window/StreamingWindowFramer.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.physical.impl.window; + +import org.apache.drill.exec.compile.TemplateClassDefinition; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.record.RecordBatch; + +public interface StreamingWindowFramer { + public static TemplateClassDefinition<StreamingWindowFramer> TEMPLATE_DEFINITION = new TemplateClassDefinition<>(StreamingWindowFramer.class, StreamingWindowFrameTemplate.class); + + + public static enum AggOutcome { + RETURN_OUTCOME, UPDATE_AGGREGATOR, RETURN_AND_COMPLETE; + } + + public abstract void setup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing, + int precedingConfig, int followingConfig) throws SchemaChangeException; + + public abstract RecordBatch.IterOutcome getOutcome(); + + public abstract int getOutputCount(); + + public abstract AggOutcome doWork(); + + public abstract void cleanup(); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillWindowRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillWindowRelBase.java new file mode 100644 index 000000000..fcf52ee62 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillWindowRelBase.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.planner.common; + +import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.WindowRelBase; +import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelTraitSet; +import org.eigenbase.reltype.RelDataType; +import org.eigenbase.rex.RexLiteral; + +import java.util.List; + +public class DrillWindowRelBase extends WindowRelBase implements DrillRelNode { + + public DrillWindowRelBase( + RelOptCluster cluster, + RelTraitSet traits, + RelNode child, + List<RexLiteral> constants, + RelDataType rowType, + List<Window> windows) { + super(cluster, traits, child, constants, rowType, windows); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java index 3fc3b89d1..a2685ada5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java @@ -24,6 +24,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.Store; import org.apache.drill.exec.physical.base.SubScan; import org.apache.drill.exec.physical.config.Limit; +import org.apache.drill.exec.physical.config.WindowPOP; import org.apache.drill.exec.planner.AbstractOpWrapperVisitor; import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair; @@ -113,6 +114,11 @@ public class StatsCollector { return null; } + @Override + public Void visitWindowFrame(WindowPOP window, Wrapper value) throws RuntimeException { + return visitOp(window, value); + } + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java index 6b0c3b4b4..ee035c635 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAggregateRel.java @@ -82,7 +82,7 @@ public class DrillAggregateRel extends DrillAggregateRelBase implements DrillRel return builder.build(); } - private LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillImplementor implementor) { + public static LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillImplementor implementor) { List<LogicalExpression> args = Lists.newArrayList(); for(Integer i : call.getArgList()) { args.add(new FieldReference(fn.get(i))); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java index c3b0d00c6..f6c910e34 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLimitRule.java @@ -25,7 +25,7 @@ import org.eigenbase.relopt.RelOptRuleCall; import org.eigenbase.relopt.RelTraitSet; /** - * This rule converts a SortRel that has either a offset and fetch into a Drill Sort and Limit Rel + * This rule converts a SortRel that has either a offset and fetch into a Drill Sort and LimitPOP Rel */ public class DrillLimitRule extends RelOptRule { public static DrillLimitRule INSTANCE = new DrillLimitRule(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java index 082daccbf..fcfced25c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java @@ -27,6 +27,9 @@ import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.exec.planner.physical.PrelUtil; import org.apache.drill.exec.planner.physical.PrelUtil.ProjectPushInfo; import org.eigenbase.rel.ProjectRel; +import org.eigenbase.rel.ProjectRelBase; +import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.rules.PushProjector; import org.eigenbase.rel.rules.RemoveTrivialProjectRule; import org.eigenbase.relopt.RelOptRule; import org.eigenbase.relopt.RelOptRuleCall; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java index 7eca54e9b..7ed7885ff 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRel.java @@ -27,7 +27,7 @@ import org.eigenbase.relopt.Convention; public interface DrillRel extends DrillRelNode { /** Calling convention for relational expressions that are "implemented" by * generating Drill logical plans. */ - Convention DRILL_LOGICAL = new Convention.Impl("LOGICAL", DrillRel.class); + public static final Convention DRILL_LOGICAL = new Convention.Impl("LOGICAL", DrillRel.class); LogicalOperator implement(DrillImplementor implementor); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java index dbb85b22d..1d3ce9a2e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java @@ -38,6 +38,8 @@ import org.apache.drill.exec.planner.physical.ScreenPrule; import org.apache.drill.exec.planner.physical.SortConvertPrule; import org.apache.drill.exec.planner.physical.SortPrule; import org.apache.drill.exec.planner.physical.StreamAggPrule; +import org.apache.drill.exec.planner.physical.StreamingWindowPrule; +import org.apache.drill.exec.planner.physical.StreamingWindowPrule; import org.apache.drill.exec.planner.physical.UnionAllPrule; import org.apache.drill.exec.planner.physical.WriterPrule; import org.eigenbase.rel.RelFactories; @@ -100,6 +102,7 @@ public class DrillRuleSets { DrillScanRule.INSTANCE, DrillFilterRule.INSTANCE, DrillProjectRule.INSTANCE, + DrillWindowRule.INSTANCE, DrillAggregateRule.INSTANCE, DrillLimitRule.INSTANCE, @@ -139,6 +142,8 @@ public class DrillRuleSets { HashJoinPrule.INSTANCE, FilterPrule.INSTANCE, LimitPrule.INSTANCE, + WindowPrule.INSTANCE, + WriterPrule.INSTANCE, PushLimitToTopN.INSTANCE @@ -182,6 +187,7 @@ public class DrillRuleSets { ruleList.add(FilterPrule.INSTANCE); ruleList.add(LimitPrule.INSTANCE); ruleList.add(WriterPrule.INSTANCE); + ruleList.add(StreamingWindowPrule.INSTANCE); ruleList.add(PushLimitToTopN.INSTANCE); ruleList.add(UnionAllPrule.INSTANCE); // ruleList.add(UnionDistinctPrule.INSTANCE); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWindowRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWindowRel.java new file mode 100644 index 000000000..113f98c45 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWindowRel.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.planner.logical; + +import com.google.common.collect.Lists; +import net.hydromatic.linq4j.Ord; +import net.hydromatic.optiq.util.BitSets; +import org.apache.drill.common.expression.ExpressionPosition; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.FunctionCall; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.ValueExpressions; +import org.apache.drill.common.logical.data.LogicalOperator; +import org.apache.drill.common.logical.data.Order; +import org.apache.drill.exec.planner.common.DrillWindowRelBase; +import org.eigenbase.rel.AggregateCall; +import org.eigenbase.rel.RelFieldCollation; +import org.eigenbase.rel.RelNode; +import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelTraitSet; +import org.eigenbase.reltype.RelDataType; +import org.eigenbase.rex.RexLiteral; + +import java.util.List; + +public class DrillWindowRel extends DrillWindowRelBase implements DrillRel { + /** + * Creates a window relational expression. + * + * @param cluster Cluster + * @param traits + * @param child Input relational expression + * @param rowType Output row type + * @param windows Windows + */ + public DrillWindowRel( + RelOptCluster cluster, + RelTraitSet traits, + RelNode child, + List<RexLiteral> constants, + RelDataType rowType, + List<Window> windows) { + super(cluster, traits, child, constants, rowType, windows); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new DrillWindowRel(getCluster(), traitSet, sole(inputs), constants, getRowType(), windows); + } + + @Override + public LogicalOperator implement(DrillImplementor implementor) { + final LogicalOperator inputOp = implementor.visitChild(this, 0, getChild()); + org.apache.drill.common.logical.data.Window.Builder builder = new org.apache.drill.common.logical.data.Window.Builder(); + final List<String> fields = getRowType().getFieldNames(); + final List<String> childFields = getChild().getRowType().getFieldNames(); + for (Window window : windows) { + + for(RelFieldCollation orderKey : window.orderKeys.getFieldCollations()) { + builder.addOrdering(new Order.Ordering(orderKey.getDirection(), new FieldReference(fields.get(orderKey.getFieldIndex())))); + } + + for (int group : BitSets.toIter(window.groupSet)) { + FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN); + builder.addWithin(fr, fr); + } + + int groupCardinality = window.groupSet.cardinality(); + for (Ord<AggregateCall> aggCall : Ord.zip(window.getAggregateCalls(this))) { + FieldReference ref = new FieldReference(fields.get(groupCardinality + aggCall.i)); + LogicalExpression expr = toDrill(aggCall.e, childFields); + builder.addAggregation(ref, expr); + } + } + builder.setInput(inputOp); + org.apache.drill.common.logical.data.Window frame = builder.build(); + return frame; + } + + protected LogicalExpression toDrill(AggregateCall call, List<String> fn) { + List<LogicalExpression> args = Lists.newArrayList(); + for (Integer i : call.getArgList()) { + args.add(new FieldReference(fn.get(i))); + } + + // for count(1). + if (args.isEmpty()) { + args.add(new ValueExpressions.LongExpression(1l)); + } + LogicalExpression expr = new FunctionCall(call.getAggregation().getName().toLowerCase(), args, ExpressionPosition.UNKNOWN); + return expr; + } +} + + diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWindowRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWindowRule.java new file mode 100644 index 000000000..847e87aa1 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillWindowRule.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.planner.logical; + +import com.google.common.collect.Lists; +import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.WindowRel; +import org.eigenbase.relopt.Convention; +import org.eigenbase.relopt.RelOptRule; +import org.eigenbase.relopt.RelOptRuleCall; +import org.eigenbase.relopt.RelTraitSet; +import org.eigenbase.rex.RexLiteral; + +public class DrillWindowRule extends RelOptRule { + public static final RelOptRule INSTANCE = new DrillWindowRule(); + + private DrillWindowRule() { + super(RelOptHelper.some(WindowRel.class, Convention.NONE, RelOptHelper.any(RelNode.class)), "DrillWindowRule"); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final WindowRel window = call.rel(0); + final RelNode input = call.rel(1); + final RelTraitSet traits = window.getTraitSet().plus(DrillRel.DRILL_LOGICAL); + final RelNode convertedInput = convert(input, traits); + call.transformTo( + new DrillWindowRel( + window.getCluster(), + traits, + convertedInput, + Lists.<RexLiteral>newArrayList(), + window.getRowType(), + window.windows)); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java index 05fb64a9c..a69188b43 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java @@ -17,21 +17,16 @@ */ package org.apache.drill.exec.planner.physical; -import java.util.BitSet; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; - +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import net.hydromatic.linq4j.Ord; import net.hydromatic.optiq.util.BitSets; - import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.FunctionCall; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.ValueExpressions; import org.apache.drill.common.logical.data.NamedExpression; -import org.apache.drill.exec.planner.logical.DrillParseContext; import org.apache.drill.exec.planner.physical.visitor.PrelVisitor; import org.eigenbase.rel.AggregateCall; import org.eigenbase.rel.AggregateRelBase; @@ -48,8 +43,10 @@ import org.eigenbase.sql.SqlKind; import org.eigenbase.sql.type.OperandTypes; import org.eigenbase.sql.type.ReturnTypes; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; +import java.util.BitSet; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; public abstract class AggPrelBase extends AggregateRelBase implements Prel { @@ -130,7 +127,7 @@ public abstract class AggPrelBase extends AggregateRelBase implements Prel { for (Ord<AggregateCall> aggCall : Ord.zip(aggCalls)) { int aggExprOrdinal = groupSet.cardinality() + aggCall.i; FieldReference ref = new FieldReference(fields.get(aggExprOrdinal)); - LogicalExpression expr = toDrill(aggCall.e, childFields, new DrillParseContext()); + LogicalExpression expr = toDrill(aggCall.e, childFields); NamedExpression ne = new NamedExpression(expr, ref); aggExprs.add(ne); @@ -162,7 +159,7 @@ public abstract class AggPrelBase extends AggregateRelBase implements Prel { } } - protected LogicalExpression toDrill(AggregateCall call, List<String> fn, DrillParseContext pContext) { + protected LogicalExpression toDrill(AggregateCall call, List<String> fn) { List<LogicalExpression> args = Lists.newArrayList(); for (Integer i : call.getArgList()) { args.add(new FieldReference(fn.get(i))); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java index 5060195bd..6012a5a7a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LimitPrel.java @@ -17,10 +17,6 @@ */ package org.apache.drill.exec.planner.physical; -import java.io.IOException; -import java.util.Iterator; -import java.util.List; - import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.Limit; import org.apache.drill.exec.planner.common.DrillLimitRelBase; @@ -32,6 +28,10 @@ import org.eigenbase.relopt.RelTraitSet; import org.eigenbase.rex.RexLiteral; import org.eigenbase.rex.RexNode; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + public class LimitPrel extends DrillLimitRelBase implements Prel { public LimitPrel(RelOptCluster cluster, RelTraitSet traitSet, RelNode child, RexNode offset, RexNode fetch) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamingWindowPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamingWindowPrel.java new file mode 100644 index 000000000..f1a8bc0b6 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamingWindowPrel.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.planner.physical; + +import com.google.common.collect.Lists; +import net.hydromatic.optiq.util.BitSets; +import org.apache.drill.common.expression.ExpressionPosition; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.FunctionCall; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.ValueExpressions; +import org.apache.drill.common.logical.data.NamedExpression; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.WindowPOP; +import org.apache.drill.exec.planner.common.DrillWindowRelBase; +import org.apache.drill.exec.planner.physical.visitor.PrelVisitor; +import org.apache.drill.exec.record.BatchSchema; +import org.eigenbase.rel.AggregateCall; +import org.eigenbase.rel.RelNode; +import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelTraitSet; +import org.eigenbase.reltype.RelDataType; +import org.eigenbase.rex.RexLiteral; + +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import static com.google.common.base.Preconditions.checkState; + +public class StreamingWindowPrel extends DrillWindowRelBase implements Prel { + public StreamingWindowPrel(RelOptCluster cluster, + RelTraitSet traits, + RelNode child, + List<RexLiteral> constants, + RelDataType rowType, + Window window) { + super(cluster, traits, child, constants, rowType, Collections.singletonList(window)); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new StreamingWindowPrel(getCluster(), traitSet, sole(inputs), constants, getRowType(), windows.get(0)); + } + + @Override + public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { + Prel child = (Prel) this.getChild(); + + PhysicalOperator childPOP = child.getPhysicalOperator(creator); + + final List<String> childFields = getChild().getRowType().getFieldNames(); + + checkState(windows.size() == 1, "Only one window is expected in WindowPrel"); + + Window window = windows.get(0); + List<NamedExpression> withins = Lists.newArrayList(); + List<NamedExpression> aggs = Lists.newArrayList(); + for (int group : BitSets.toIter(window.groupSet)) { + FieldReference fr = new FieldReference(childFields.get(group), ExpressionPosition.UNKNOWN); + withins.add(new NamedExpression(fr, fr)); + } + + for (AggregateCall aggCall : window.getAggregateCalls(this)) { + FieldReference ref = new FieldReference(aggCall.getName()); + LogicalExpression expr = toDrill(aggCall, childFields); + aggs.add(new NamedExpression(expr, ref)); + } + + WindowPOP windowPOP = new WindowPOP( + childPOP, + withins.toArray(new NamedExpression[withins.size()]), + aggs.toArray(new NamedExpression[aggs.size()]), + Long.MIN_VALUE, //TODO: Get first/last to work + Long.MIN_VALUE); + + creator.addMetadata(this, windowPOP); + return windowPOP; + } + + protected LogicalExpression toDrill(AggregateCall call, List<String> fn) { + List<LogicalExpression> args = Lists.newArrayList(); + for (Integer i : call.getArgList()) { + args.add(new FieldReference(fn.get(i))); + } + + // for count(1). + if (args.isEmpty()) { + args.add(new ValueExpressions.LongExpression(1l)); + } + LogicalExpression expr = new FunctionCall(call.getAggregation().getName().toLowerCase(), args, ExpressionPosition.UNKNOWN); + return expr; + } + + @Override + public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) throws E { + return logicalVisitor.visitPrel(this, value); + } + + @Override + public BatchSchema.SelectionVectorMode[] getSupportedEncodings() { + return BatchSchema.SelectionVectorMode.ALL; + } + + @Override + public BatchSchema.SelectionVectorMode getEncoding() { + return BatchSchema.SelectionVectorMode.NONE; + } + + @Override + public boolean needsFinalColumnReordering() { + return false; + } + + @Override + public Iterator<Prel> iterator() { + return PrelUtil.iter(getChild()); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamingWindowPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamingWindowPrule.java new file mode 100644 index 000000000..00c20b23e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamingWindowPrule.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.planner.physical; + +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import net.hydromatic.linq4j.Ord; +import net.hydromatic.optiq.util.BitSets; +import org.apache.drill.exec.planner.logical.DrillRel; +import org.apache.drill.exec.planner.logical.DrillWindowRel; +import org.apache.drill.exec.planner.logical.RelOptHelper; +import org.eigenbase.rel.RelCollation; +import org.eigenbase.rel.RelCollationImpl; +import org.eigenbase.rel.RelFieldCollation; +import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.WindowRelBase; +import org.eigenbase.relopt.RelOptRule; +import org.eigenbase.relopt.RelOptRuleCall; +import org.eigenbase.relopt.RelTraitSet; +import org.eigenbase.reltype.RelDataType; +import org.eigenbase.reltype.RelDataTypeField; +import org.eigenbase.reltype.RelRecordType; +import org.eigenbase.sql.SqlAggFunction; + +import java.util.List; + +public class StreamingWindowPrule extends RelOptRule { + public static final RelOptRule INSTANCE = new StreamingWindowPrule(); + + private StreamingWindowPrule() { + super(RelOptHelper.some(DrillWindowRel.class, DrillRel.DRILL_LOGICAL, RelOptHelper.any(RelNode.class)), "Prel.WindowPrule"); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final DrillWindowRel window = call.rel(0); + RelNode input = call.rel(1); + + // TODO: Order window based on existing partition by + //input.getTraitSet().subsumes() + + for (final Ord<WindowRelBase.Window> w : Ord.zip(window.windows)) { + WindowRelBase.Window windowBase = w.getValue(); + DrillDistributionTrait distOnAllKeys = + new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, + ImmutableList.copyOf(getDistributionFields(windowBase))); + + RelCollation collation = getCollation(windowBase); + RelTraitSet traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(distOnAllKeys); + final RelNode convertedInput = convert(input, traits); + + List<RelDataTypeField> newRowFields = Lists.newArrayList(); + for(RelDataTypeField field : convertedInput.getRowType().getFieldList()) { + newRowFields.add(field); + } + + Iterable<RelDataTypeField> newWindowFields = Iterables.filter(window.getRowType().getFieldList(), new Predicate<RelDataTypeField>() { + @Override + public boolean apply(RelDataTypeField relDataTypeField) { + return relDataTypeField.getName().startsWith("w" + w.i + "$"); + } + }); + + for(RelDataTypeField newField : newWindowFields) { + newRowFields.add(newField); + } + + RelDataType rowType = new RelRecordType(newRowFields); + + List<WindowRelBase.RexWinAggCall> newWinAggCalls = Lists.newArrayList(); + for(Ord<WindowRelBase.RexWinAggCall> aggOrd : Ord.zip(windowBase.aggCalls)) { + WindowRelBase.RexWinAggCall aggCall = aggOrd.getValue(); + newWinAggCalls.add(new WindowRelBase.RexWinAggCall( + (SqlAggFunction)aggCall.getOperator(), aggCall.getType(), aggCall.getOperands(), aggOrd.i) + ); + } + + windowBase = new WindowRelBase.Window( + windowBase.groupSet, + windowBase.isRows, + windowBase.lowerBound, + windowBase.upperBound, + windowBase.orderKeys, + newWinAggCalls + ); + + input = new StreamingWindowPrel( + window.getCluster(), + window.getTraitSet().merge(traits), + convertedInput, + window.getConstants(), + rowType, + windowBase); + } + + call.transformTo(input); + } + + private RelCollation getCollation(WindowRelBase.Window window) { + List<RelFieldCollation> fields = Lists.newArrayList(); + for (int group : BitSets.toIter(window.groupSet)) { + fields.add(new RelFieldCollation(group)); + } + return RelCollationImpl.of(fields); + } + + private List<DrillDistributionTrait.DistributionField> getDistributionFields(WindowRelBase.Window window) { + List<DrillDistributionTrait.DistributionField> groupByFields = Lists.newArrayList(); + for (int group : BitSets.toIter(window.groupSet)) { + DrillDistributionTrait.DistributionField field = new DrillDistributionTrait.DistributionField(group); + groupByFields.add(field); + } + return groupByFields; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java index 85a5734f2..97d873c50 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java @@ -17,8 +17,8 @@ */ package org.apache.drill.exec.planner.sql; -import java.util.List; - +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Lists; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; import org.eigenbase.sql.SqlFunctionCategory; import org.eigenbase.sql.SqlIdentifier; @@ -27,8 +27,7 @@ import org.eigenbase.sql.SqlOperatorTable; import org.eigenbase.sql.SqlSyntax; import org.eigenbase.sql.fun.SqlStdOperatorTable; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Lists; +import java.util.List; public class DrillOperatorTable extends SqlStdOperatorTable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillOperatorTable.class); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlAggOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlAggOperator.java index 0b8668b38..7ab2e9fb7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlAggOperator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlAggOperator.java @@ -60,4 +60,4 @@ public class DrillSqlAggOperator extends SqlAggFunction { public RelDataType getReturnType(RelDataTypeFactory typeFactory) { return getAny(typeFactory); } -}
\ No newline at end of file +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java index 2238155a9..2de46ee9d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java @@ -28,7 +28,6 @@ import net.hydromatic.optiq.tools.Planner; import net.hydromatic.optiq.tools.RelConversionException; import net.hydromatic.optiq.tools.RuleSet; import net.hydromatic.optiq.tools.ValidationException; - import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.planner.cost.DrillCostBase; @@ -38,14 +37,19 @@ import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler; import org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler; import org.apache.drill.exec.planner.sql.handlers.ExplainHandler; import org.apache.drill.exec.planner.sql.handlers.SetOptionHandler; +import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig; import org.apache.drill.exec.planner.sql.parser.DrillSqlCall; import org.apache.drill.exec.planner.sql.parser.impl.DrillParserWithCompoundIdConverter; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.util.Pointer; import org.eigenbase.rel.RelCollationTraitDef; +import org.eigenbase.rel.rules.ReduceExpressionsRule; +import org.eigenbase.rel.rules.WindowedAggSplitterRule; import org.eigenbase.relopt.ConventionTraitDef; import org.eigenbase.relopt.RelOptCostFactory; import org.eigenbase.relopt.RelTraitDef; +import org.eigenbase.relopt.hep.HepPlanner; +import org.eigenbase.relopt.hep.HepProgramBuilder; import org.eigenbase.sql.SqlNode; import org.eigenbase.sql.parser.SqlParseException; @@ -53,6 +57,7 @@ public class DrillSqlWorker { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSqlWorker.class); private final Planner planner; + private final HepPlanner hepPlanner; public final static int LOGICAL_RULES = 0; public final static int PHYSICAL_MEM_RULES = 1; private final QueryContext context; @@ -79,7 +84,12 @@ public class DrillSqlWorker { .costFactory(costFactory) // .build(); this.planner = Frameworks.getPlanner(config); - + HepProgramBuilder builder = new HepProgramBuilder(); + builder.addRuleClass(ReduceExpressionsRule.class); + builder.addRuleClass(WindowedAggSplitterRule.class); + this.hepPlanner = new HepPlanner(builder.build()); + hepPlanner.addRule(ReduceExpressionsRule.CALC_INSTANCE); + hepPlanner.addRule(WindowedAggSplitterRule.PROJECT); } private RuleSet[] getRules(QueryContext context) { @@ -99,23 +109,24 @@ public class DrillSqlWorker { SqlNode sqlNode = planner.parse(sql); AbstractSqlHandler handler; + SqlHandlerConfig config = new SqlHandlerConfig(hepPlanner, planner, context); // TODO: make this use path scanning or something similar. switch(sqlNode.getKind()){ case EXPLAIN: - handler = new ExplainHandler(planner, context); + handler = new ExplainHandler(config); break; case SET_OPTION: handler = new SetOptionHandler(context); break; case OTHER: if (sqlNode instanceof DrillSqlCall) { - handler = ((DrillSqlCall)sqlNode).getSqlHandler(planner, context); + handler = ((DrillSqlCall)sqlNode).getSqlHandler(config); break; } // fallthrough default: - handler = new DefaultSqlHandler(planner, context, textPlan); + handler = new DefaultSqlHandler(config, textPlan); } return handler.getPlan(sqlNode); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java index 708951a27..df2f8076d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateTableHandler.java @@ -40,13 +40,14 @@ import org.apache.drill.exec.planner.types.DrillFixedRelDataTypeImpl; import org.apache.drill.exec.store.AbstractSchema; import org.eigenbase.rel.RelNode; import org.eigenbase.relopt.RelOptUtil; +import org.eigenbase.relopt.hep.HepPlanner; import org.eigenbase.reltype.RelDataType; import org.eigenbase.sql.SqlNode; public class CreateTableHandler extends DefaultSqlHandler { - public CreateTableHandler(Planner planner, QueryContext context) { - super(planner, context); + public CreateTableHandler(SqlHandlerConfig config) { + super(config); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java index e63474f13..0bb59bf6a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java @@ -60,6 +60,7 @@ import org.apache.drill.exec.util.Pointer; import org.eigenbase.rel.RelNode; import org.eigenbase.relopt.RelOptUtil; import org.eigenbase.relopt.RelTraitSet; +import org.eigenbase.relopt.hep.HepPlanner; import org.eigenbase.sql.SqlExplainLevel; import org.eigenbase.sql.SqlNode; @@ -70,19 +71,23 @@ import com.google.common.collect.Lists; public class DefaultSqlHandler extends AbstractSqlHandler { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DefaultSqlHandler.class); - protected final Planner planner; + protected final SqlHandlerConfig config; protected final QueryContext context; + protected final HepPlanner hepPlanner; + protected final Planner planner; private Pointer<String> textPlan; private final long targetSliceSize; - public DefaultSqlHandler(Planner planner, QueryContext context) { - this(planner, context, null); + public DefaultSqlHandler(SqlHandlerConfig config) { + this(config, null); } - public DefaultSqlHandler(Planner planner, QueryContext context, Pointer<String> textPlan) { + public DefaultSqlHandler(SqlHandlerConfig config, Pointer<String> textPlan) { super(); - this.planner = planner; - this.context = context; + this.planner = config.getPlanner(); + this.context = config.getContext(); + this.hepPlanner = config.getHepPlanner(); + this.config = config; this.textPlan = textPlan; targetSliceSize = context.getOptions().getOption(ExecConstants.SLICE_TARGET).num_val; } @@ -139,7 +144,9 @@ public class DefaultSqlHandler extends AbstractSqlHandler { } protected RelNode convertToRel(SqlNode node) throws RelConversionException { - return planner.convert(node); + RelNode convertedNode = planner.convert(node); + hepPlanner.setRoot(convertedNode); + return hepPlanner.findBestExp(); } protected DrillRel convertToDrel(RelNode relNode) throws RelConversionException { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java index e6f1fe1c4..84082e3e9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DescribeTableHandler.java @@ -30,6 +30,7 @@ import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.planner.sql.parser.DrillParserUtil; import org.apache.drill.exec.planner.sql.parser.SqlDescribeTable; import org.apache.drill.exec.store.AbstractSchema; +import org.eigenbase.relopt.hep.HepPlanner; import org.eigenbase.sql.SqlIdentifier; import org.eigenbase.sql.SqlLiteral; import org.eigenbase.sql.SqlNode; @@ -43,7 +44,7 @@ import com.google.common.collect.ImmutableList; public class DescribeTableHandler extends DefaultSqlHandler { - public DescribeTableHandler(Planner planner, QueryContext context) { super(planner, context); } + public DescribeTableHandler(SqlHandlerConfig config) { super(config); } /** Rewrite the parse tree as SELECT ... FROM INFORMATION_SCHEMA.COLUMNS ... */ @Override @@ -104,4 +105,3 @@ public class DescribeTableHandler extends DefaultSqlHandler { } } } - diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java index f3243212e..8beed3478 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java @@ -17,12 +17,8 @@ */ package org.apache.drill.exec.planner.sql.handlers; -import java.io.IOException; - -import net.hydromatic.optiq.tools.Planner; import net.hydromatic.optiq.tools.RelConversionException; import net.hydromatic.optiq.tools.ValidationException; - import org.apache.drill.common.logical.LogicalPlan; import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode; import org.apache.drill.exec.ops.QueryContext; @@ -41,13 +37,15 @@ import org.eigenbase.sql.SqlExplainLevel; import org.eigenbase.sql.SqlLiteral; import org.eigenbase.sql.SqlNode; +import java.io.IOException; + public class ExplainHandler extends DefaultSqlHandler { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExplainHandler.class); private ResultMode mode; private SqlExplainLevel level = SqlExplainLevel.ALL_ATTRIBUTES; - public ExplainHandler(Planner planner, QueryContext context) { - super(planner, context); + public ExplainHandler(SqlHandlerConfig config) { + super(config); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java index 3627a7b05..ff3542da8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowFileHandler.java @@ -35,6 +35,7 @@ import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory.WorkspaceSchema; import org.apache.drill.exec.store.dfs.shim.DrillFileSystem; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; +import org.eigenbase.relopt.hep.HepPlanner; import org.eigenbase.sql.SqlIdentifier; import org.eigenbase.sql.SqlNode; @@ -42,8 +43,8 @@ import org.eigenbase.sql.SqlNode; public class ShowFileHandler extends DefaultSqlHandler { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SetOptionHandler.class); - public ShowFileHandler(Planner planner, QueryContext context) { - super(planner, context); + public ShowFileHandler(SqlHandlerConfig config) { + super(config); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowSchemasHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowSchemasHandler.java index 5e77628f0..b05521868 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowSchemasHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowSchemasHandler.java @@ -26,6 +26,7 @@ import net.hydromatic.optiq.tools.RelConversionException; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.planner.sql.parser.DrillParserUtil; import org.apache.drill.exec.planner.sql.parser.SqlShowSchemas; +import org.eigenbase.relopt.hep.HepPlanner; import org.eigenbase.sql.SqlIdentifier; import org.eigenbase.sql.SqlNode; import org.eigenbase.sql.SqlNodeList; @@ -37,7 +38,7 @@ import com.google.common.collect.ImmutableList; public class ShowSchemasHandler extends DefaultSqlHandler { - public ShowSchemasHandler(Planner planner, QueryContext context) { super(planner, context); } + public ShowSchemasHandler(SqlHandlerConfig config) { super(config); } /** Rewrite the parse tree as SELECT ... FROM INFORMATION_SCHEMA.SCHEMATA ... */ @Override @@ -61,4 +62,3 @@ public class ShowSchemasHandler extends DefaultSqlHandler { fromClause, where, null, null, null, null, null, null); } } - diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java index a1c5aee92..0a029f733 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ShowTablesHandler.java @@ -35,6 +35,7 @@ import org.eigenbase.sql.SqlLiteral; import org.eigenbase.sql.SqlNode; import org.eigenbase.sql.SqlNodeList; import org.eigenbase.sql.SqlSelect; +import org.eigenbase.relopt.hep.HepPlanner; import org.eigenbase.sql.fun.SqlStdOperatorTable; import org.eigenbase.sql.parser.SqlParserPos; @@ -43,7 +44,7 @@ import com.google.common.collect.Lists; public class ShowTablesHandler extends DefaultSqlHandler { - public ShowTablesHandler(Planner planner, QueryContext context) { super(planner, context); } + public ShowTablesHandler(SqlHandlerConfig config) { super(config); } /** Rewrite the parse tree as SELECT ... FROM INFORMATION_SCHEMA.`TABLES` ... */ @Override @@ -105,4 +106,3 @@ public class ShowTablesHandler extends DefaultSqlHandler { fromClause, where, null, null, null, null, null, null); } } - diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerConfig.java new file mode 100644 index 000000000..132a2c97b --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SqlHandlerConfig.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.planner.sql.handlers; + +import net.hydromatic.optiq.tools.Planner; +import org.apache.drill.exec.ops.QueryContext; +import org.eigenbase.relopt.hep.HepPlanner; + +public class SqlHandlerConfig { + private final QueryContext context; + private final HepPlanner hepPlanner; + private final Planner planner; + + public SqlHandlerConfig(HepPlanner hepPlanner, Planner planner, QueryContext context) { + this.hepPlanner = hepPlanner; + this.planner = planner; + this.context = context; + } + + public Planner getPlanner() { + return planner; + } + + public HepPlanner getHepPlanner() { + return hepPlanner; + } + + public QueryContext getContext() { + return context; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillSqlCall.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillSqlCall.java index 4005b8164..a6bd8b725 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillSqlCall.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/DrillSqlCall.java @@ -22,6 +22,8 @@ import net.hydromatic.optiq.tools.Planner; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler; import org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler; +import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig; +import org.eigenbase.relopt.hep.HepPlanner; import org.eigenbase.sql.SqlCall; import org.eigenbase.sql.parser.SqlParserPos; @@ -34,7 +36,7 @@ public abstract class DrillSqlCall extends SqlCall { super(pos); } - public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) { - return new DefaultSqlHandler(planner, context); + public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) { + return new DefaultSqlHandler(config); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java index 10db4c49b..5e3c21560 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateTable.java @@ -24,6 +24,8 @@ import net.hydromatic.optiq.tools.Planner; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler; import org.apache.drill.exec.planner.sql.handlers.CreateTableHandler; +import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig; +import org.eigenbase.relopt.hep.HepPlanner; import org.eigenbase.sql.SqlCall; import org.eigenbase.sql.SqlIdentifier; import org.eigenbase.sql.SqlKind; @@ -90,8 +92,8 @@ public class SqlCreateTable extends DrillSqlCall { } @Override - public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) { - return new CreateTableHandler(planner, context); + public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) { + return new CreateTableHandler(config); } public List<String> getSchemaPath() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java index ccd08e14c..b7352b4b4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlCreateView.java @@ -17,12 +17,10 @@ */ package org.apache.drill.exec.planner.sql.parser; -import java.util.List; - -import net.hydromatic.optiq.tools.Planner; - -import org.apache.drill.exec.ops.QueryContext; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler; +import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig; import org.apache.drill.exec.planner.sql.handlers.ViewHandler; import org.eigenbase.sql.SqlCall; import org.eigenbase.sql.SqlIdentifier; @@ -35,8 +33,7 @@ import org.eigenbase.sql.SqlSpecialOperator; import org.eigenbase.sql.SqlWriter; import org.eigenbase.sql.parser.SqlParserPos; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; +import java.util.List; public class SqlCreateView extends DrillSqlCall { public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE_VIEW", SqlKind.OTHER) { @@ -103,8 +100,8 @@ public class SqlCreateView extends DrillSqlCall { } @Override - public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) { - return new ViewHandler.CreateView(planner, context); + public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) { + return new ViewHandler.CreateView(config.getPlanner(), config.getContext()); } public List<String> getSchemaPath() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDescribeTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDescribeTable.java index 29275d753..7d464e144 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDescribeTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDescribeTable.java @@ -24,6 +24,8 @@ import net.hydromatic.optiq.tools.Planner; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler; import org.apache.drill.exec.planner.sql.handlers.DescribeTableHandler; +import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig; +import org.eigenbase.relopt.hep.HepPlanner; import org.eigenbase.sql.SqlCall; import org.eigenbase.sql.SqlIdentifier; import org.eigenbase.sql.SqlKind; @@ -89,8 +91,8 @@ public class SqlDescribeTable extends DrillSqlCall { } @Override - public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) { - return new DescribeTableHandler(planner, context); + public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) { + return new DescribeTableHandler(config); } public SqlIdentifier getTable() { return table; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropView.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropView.java index 33b71b74c..a0d6f7b2d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropView.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlDropView.java @@ -24,7 +24,9 @@ import net.hydromatic.optiq.tools.Planner; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler; +import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig; import org.apache.drill.exec.planner.sql.handlers.ViewHandler.DropView; +import org.eigenbase.relopt.hep.HepPlanner; import org.eigenbase.sql.SqlCall; import org.eigenbase.sql.SqlIdentifier; import org.eigenbase.sql.SqlKind; @@ -70,8 +72,8 @@ public class SqlDropView extends DrillSqlCall { } @Override - public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) { - return new DropView(context); + public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) { + return new DropView(config.getContext()); } public List<String> getSchemaPath() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java index 8779969e9..38abfeb83 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowFiles.java @@ -25,6 +25,8 @@ import net.hydromatic.optiq.tools.Planner; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler; import org.apache.drill.exec.planner.sql.handlers.ShowFileHandler; +import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig; +import org.eigenbase.relopt.hep.HepPlanner; import org.eigenbase.sql.SqlCall; import org.eigenbase.sql.SqlIdentifier; import org.eigenbase.sql.SqlKind; @@ -76,8 +78,8 @@ public class SqlShowFiles extends DrillSqlCall { } @Override - public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) { - return new ShowFileHandler(planner, context); + public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) { + return new ShowFileHandler(config); } public SqlIdentifier getDb() { return db; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowSchemas.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowSchemas.java index 9b4229548..9d8771a11 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowSchemas.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowSchemas.java @@ -24,6 +24,8 @@ import net.hydromatic.optiq.tools.Planner; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler; import org.apache.drill.exec.planner.sql.handlers.ShowSchemasHandler; +import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig; +import org.eigenbase.relopt.hep.HepPlanner; import org.eigenbase.sql.SqlCall; import org.eigenbase.sql.SqlKind; import org.eigenbase.sql.SqlLiteral; @@ -85,8 +87,8 @@ public class SqlShowSchemas extends DrillSqlCall { } @Override - public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) { - return new ShowSchemasHandler(planner, context); + public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) { + return new ShowSchemasHandler(config); } public SqlNode getLikePattern() { return likePattern; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowTables.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowTables.java index 33d20aa4d..da3f0fd1b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowTables.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlShowTables.java @@ -17,13 +17,10 @@ */ package org.apache.drill.exec.planner.sql.parser; -import java.util.List; - -import net.hydromatic.optiq.tools.Planner; - -import org.apache.drill.exec.ops.QueryContext; +import com.google.common.collect.Lists; import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler; import org.apache.drill.exec.planner.sql.handlers.ShowTablesHandler; +import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig; import org.eigenbase.sql.SqlCall; import org.eigenbase.sql.SqlIdentifier; import org.eigenbase.sql.SqlKind; @@ -34,7 +31,7 @@ import org.eigenbase.sql.SqlSpecialOperator; import org.eigenbase.sql.SqlWriter; import org.eigenbase.sql.parser.SqlParserPos; -import com.google.common.collect.Lists; +import java.util.List; /** * Sql parse tree node to represent statement: @@ -92,8 +89,8 @@ public class SqlShowTables extends DrillSqlCall { } @Override - public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) { - return new ShowTablesHandler(planner, context); + public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) { + return new ShowTablesHandler(config); } public SqlIdentifier getDb() { return db; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java index ed4695ed6..c8af0021d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlUseSchema.java @@ -17,13 +17,8 @@ */ package org.apache.drill.exec.planner.sql.parser; -import java.util.Collections; -import java.util.List; - -import net.hydromatic.optiq.tools.Planner; - -import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler; +import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig; import org.apache.drill.exec.planner.sql.handlers.UseSchemaHandler; import org.eigenbase.sql.SqlCall; import org.eigenbase.sql.SqlIdentifier; @@ -35,6 +30,9 @@ import org.eigenbase.sql.SqlSpecialOperator; import org.eigenbase.sql.SqlWriter; import org.eigenbase.sql.parser.SqlParserPos; +import java.util.Collections; +import java.util.List; + /** * Sql parser tree node to represent <code>USE SCHEMA</code> statement. */ @@ -73,8 +71,8 @@ public class SqlUseSchema extends DrillSqlCall { } @Override - public AbstractSqlHandler getSqlHandler(Planner planner, QueryContext context) { - return new UseSchemaHandler(context); + public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) { + return new UseSchemaHandler(config.getContext()); } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java index 0adc09ec1..f05243dcc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java @@ -27,6 +27,7 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte protected final RecordBatch incoming; private boolean first = true; + protected boolean done = false; protected boolean outOfMemory = false; public AbstractSingleRecordBatch(T popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { @@ -41,6 +42,11 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte @Override public IterOutcome innerNext() { + // Short circuit if record batch has already sent all data and is done + if (done) { + return IterOutcome.NONE; + } + IterOutcome upstream = next(incoming); if (!first && upstream == IterOutcome.OK && incoming.getRecordCount() == 0) { do { @@ -100,6 +106,5 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte } protected abstract void setupNewSchema() throws SchemaChangeException; - protected abstract void doWork(); - + protected abstract IterOutcome doWork(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java index e2f4a954b..b1b7c7627 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java @@ -105,6 +105,23 @@ public class VectorContainer extends AbstractMapVector implements Iterable<Vecto return vc; } + public static VectorContainer getTransferClone(VectorAccessible incoming, VectorWrapper[] ignoreWrappers) { + VectorContainer vc = new VectorContainer(); + for (VectorWrapper<?> w : incoming) { + if(ignoreWrappers != null) { + for(VectorWrapper wrapper : ignoreWrappers) { + if (w == wrapper) { + continue; + } + } + } + + vc.cloneAndTransfer(w); + } + + return vc; + } + public static VectorContainer canonicalize(VectorContainer original) { VectorContainer vc = new VectorContainer(); List<VectorWrapper<?>> canonicalWrappers = new ArrayList<VectorWrapper<?>>(original.wrappers); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java index e7c6dc06f..0272b2310 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/ExecTest.java @@ -21,7 +21,7 @@ import org.apache.drill.exec.metrics.DrillMetrics; import org.apache.drill.test.DrillTest; import org.junit.After; -public class ExecTest extends DrillTest{ +public class ExecTest extends DrillTest { @After public void clear(){ diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java index 3ba6cb175..7cdb41a31 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/limit/TestSimpleLimit.java @@ -117,6 +117,7 @@ public class TestSimpleLimit extends ExecTest { if(context.getFailureCause() != null){ throw context.getFailureCause(); } + assertTrue(!context.isFailed()); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java new file mode 100644 index 000000000..ac7b035e2 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/window/TestWindowFrame.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.window; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.expression.PathSegment; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.util.FileUtils; +import org.apache.drill.exec.client.DrillClient; +import org.apache.drill.exec.pop.PopUnitTestBase; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.rpc.user.QueryResultBatch; +import org.apache.drill.exec.server.Drillbit; +import org.apache.drill.exec.server.RemoteServiceSet; +import org.apache.drill.exec.vector.BigIntVector; +import org.apache.drill.exec.vector.NullableBigIntVector; +import org.apache.drill.exec.vector.ValueVector; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class TestWindowFrame extends PopUnitTestBase { + + @Test + public void testWindowFrameWithOneKeyCount() throws Throwable { + try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + Drillbit bit = new Drillbit(CONFIG, serviceSet); + DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + + // run query. + bit.run(); + client.connect(); + List<QueryResultBatch> results = client.runQuery(UserBitShared.QueryType.PHYSICAL, + Files.toString(FileUtils.getResourceAsFile("/window/oneKeyCount.json"), Charsets.UTF_8) + .replace("#{DATA_FILE}", FileUtils.getResourceAsFile("/window/oneKeyCountData.json").toURI().toString()) + ); + + long[] cntArr = {1, 2, 1, 2}; + long[] sumArr = {100, 150, 25, 75}; + + // look at records + RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator()); + int recordCount = 0; + + assertEquals(2, results.size()); + + QueryResultBatch batch = results.get(0); + assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData())); + batchLoader.load(batch.getHeader().getDef(), batch.getData()); + + for (int r = 0; r < batchLoader.getRecordCount(); r++) { + recordCount++; + VectorWrapper<?> wrapper = batchLoader.getValueAccessorById( + BigIntVector.class, + batchLoader.getValueVectorId(new SchemaPath(new PathSegment.NameSegment("cnt"))).getFieldIds()[0] + ); + assertEquals(cntArr[r], wrapper.getValueVector().getAccessor().getObject(r)); + wrapper = batchLoader.getValueAccessorById( + NullableBigIntVector.class, + batchLoader.getValueVectorId(new SchemaPath(new PathSegment.NameSegment("sum"))).getFieldIds()[0] + ); + assertEquals(sumArr[r], wrapper.getValueVector().getAccessor().getObject(r)); + } + batchLoader.clear(); + batch.release(); + + assertEquals(4, recordCount); + } + } + + @Test + public void testWindowFrameWithOneKeyMultipleBatches() throws Throwable { + try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + Drillbit bit = new Drillbit(CONFIG, serviceSet); + DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + + // run query. + bit.run(); + client.connect(); + List<QueryResultBatch> results = client.runQuery(UserBitShared.QueryType.PHYSICAL, + Files.toString(FileUtils.getResourceAsFile("/window/oneKeyCountMultiBatch.json"), Charsets.UTF_8) + .replace("#{DATA_FILE}", FileUtils.getResourceAsFile("/window/mediumData.json").toURI().toString())); + + // look at records + RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator()); + int recordCount = 0; + + assertEquals(2, results.size()); + + QueryResultBatch batch = results.get(0); + assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData())); + batchLoader.load(batch.getHeader().getDef(), batch.getData()); + ValueVector.Accessor output = batchLoader.getValueAccessorById(NullableBigIntVector.class, + batchLoader.getValueVectorId( + new SchemaPath(new PathSegment.NameSegment("output"))).getFieldIds()[0] + ).getValueVector().getAccessor(); + ValueVector.Accessor sum = batchLoader.getValueAccessorById( + BigIntVector.class, + batchLoader.getValueVectorId( + new SchemaPath(new PathSegment.NameSegment("sum"))).getFieldIds()[0] + ).getValueVector().getAccessor(); + ValueVector.Accessor cnt = batchLoader.getValueAccessorById( + BigIntVector.class, + batchLoader.getValueVectorId( + new SchemaPath(new PathSegment.NameSegment("cnt"))).getFieldIds()[0] + ).getValueVector().getAccessor(); + int lastGroup = -1; + long groupCounter = 0; + long s = 0; + for (int r = 1; r <= batchLoader.getRecordCount(); r++) { + recordCount++; + int group = r / 4; + if(lastGroup != group) { + lastGroup = group; + groupCounter = 1; + s = 0; + } else { + groupCounter++; + } + + s += group * 8 + r % 4; + + assertEquals("Count, Row " + r, groupCounter, cnt.getObject(r - 1)); + assertEquals("Sum, Row " + r, s, sum.getObject(r - 1)); + assertEquals("Output, Row " + r, s, output.getObject(r - 1)); + } + batchLoader.clear(); + batch.release(); + + assertEquals(1000, recordCount); + } + } + + @Test + public void testWindowFrameWithTwoKeys() throws Throwable { + try (RemoteServiceSet serviceSet = RemoteServiceSet.getLocalServiceSet(); + Drillbit bit = new Drillbit(CONFIG, serviceSet); + DrillClient client = new DrillClient(CONFIG, serviceSet.getCoordinator())) { + + // run query. + bit.run(); + client.connect(); + List<QueryResultBatch> results = client.runQuery(UserBitShared.QueryType.PHYSICAL, + Files.toString(FileUtils.getResourceAsFile("/window/twoKeys.json"), Charsets.UTF_8) + .replace("#{DATA_FILE}", FileUtils.getResourceAsFile("/window/twoKeysData.json").toURI().toString()) + ); + + long[] cntArr = {1, 2, 1, 2, 1, 2, 1, 2}; + long[] sumArr = {5, 15, 15, 35, 25, 55, 35, 75}; + + // look at records + RecordBatchLoader batchLoader = new RecordBatchLoader(bit.getContext().getAllocator()); + int recordCount = 0; + + assertEquals(2, results.size()); + + QueryResultBatch batch = results.get(0); + assertTrue(batchLoader.load(batch.getHeader().getDef(), batch.getData())); + batchLoader.load(batch.getHeader().getDef(), batch.getData()); + + for (int r = 0; r < batchLoader.getRecordCount(); r++) { + recordCount++; + VectorWrapper<?> wrapper = batchLoader.getValueAccessorById( + BigIntVector.class, + batchLoader.getValueVectorId(new SchemaPath(new PathSegment.NameSegment("cnt"))).getFieldIds()[0] + ); + assertEquals(cntArr[r], wrapper.getValueVector().getAccessor().getObject(r)); + wrapper = batchLoader.getValueAccessorById( + NullableBigIntVector.class, + batchLoader.getValueVectorId(new SchemaPath(new PathSegment.NameSegment("sum"))).getFieldIds()[0] + ); + assertEquals(sumArr[r], wrapper.getValueVector().getAccessor().getObject(r)); + } + batchLoader.clear(); + batch.release(); + + assertEquals(8, recordCount); + } + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestWindowFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestWindowFunctions.java new file mode 100644 index 000000000..780a7ce4c --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestWindowFunctions.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.sql; + +import org.apache.drill.BaseTestQuery; +import org.junit.Test; + +public class TestWindowFunctions extends BaseTestQuery { + @Test + public void testWindowSum() throws Exception { + test("select sum(position_id) over w from cp.`employee.json` window w as ( partition by position_id order by position_id)"); + } +} diff --git a/exec/java-exec/src/test/resources/window/mediumData.json b/exec/java-exec/src/test/resources/window/mediumData.json new file mode 100644 index 000000000..ad866271b --- /dev/null +++ b/exec/java-exec/src/test/resources/window/mediumData.json @@ -0,0 +1,1000 @@ +{"id":814, "a": 1626, "group": 203} +{"id":425, "a": 849, "group": 106} +{"id":900, "a": 1800, "group": 225} +{"id":156, "a": 312, "group": 39} +{"id":348, "a": 696, "group": 87} +{"id":987, "a": 1971, "group": 246} +{"id":255, "a": 507, "group": 63} +{"id":4, "a": 8, "group": 1} +{"id":512, "a": 1024, "group": 128} +{"id":341, "a": 681, "group": 85} +{"id":113, "a": 225, "group": 28} +{"id":311, "a": 619, "group": 77} +{"id":906, "a": 1810, "group": 226} +{"id":889, "a": 1777, "group": 222} +{"id":611, "a": 1219, "group": 152} +{"id":963, "a": 1923, "group": 240} +{"id":522, "a": 1042, "group": 130} +{"id":615, "a": 1227, "group": 153} +{"id":227, "a": 451, "group": 56} +{"id":365, "a": 729, "group": 91} +{"id":73, "a": 145, "group": 18} +{"id":747, "a": 1491, "group": 186} +{"id":580, "a": 1160, "group": 145} +{"id":552, "a": 1104, "group": 138} +{"id":716, "a": 1432, "group": 179} +{"id":982, "a": 1962, "group": 245} +{"id":118, "a": 234, "group": 29} +{"id":639, "a": 1275, "group": 159} +{"id":273, "a": 545, "group": 68} +{"id":679, "a": 1355, "group": 169} +{"id":338, "a": 674, "group": 84} +{"id":402, "a": 802, "group": 100} +{"id":476, "a": 952, "group": 119} +{"id":628, "a": 1256, "group": 157} +{"id":325, "a": 649, "group": 81} +{"id":749, "a": 1497, "group": 187} +{"id":912, "a": 1824, "group": 228} +{"id":995, "a": 1987, "group": 248} +{"id":605, "a": 1209, "group": 151} +{"id":141, "a": 281, "group": 35} +{"id":700, "a": 1400, "group": 175} +{"id":61, "a": 121, "group": 15} +{"id":478, "a": 954, "group": 119} +{"id":556, "a": 1112, "group": 139} +{"id":229, "a": 457, "group": 57} +{"id":487, "a": 971, "group": 121} +{"id":824, "a": 1648, "group": 206} +{"id":431, "a": 859, "group": 107} +{"id":443, "a": 883, "group": 110} +{"id":135, "a": 267, "group": 33} +{"id":417, "a": 833, "group": 104} +{"id":980, "a": 1960, "group": 245} +{"id":785, "a": 1569, "group": 196} +{"id":917, "a": 1833, "group": 229} +{"id":656, "a": 1312, "group": 164} +{"id":210, "a": 418, "group": 52} +{"id":196, "a": 392, "group": 49} +{"id":361, "a": 721, "group": 90} +{"id":281, "a": 561, "group": 70} +{"id":550, "a": 1098, "group": 137} +{"id":558, "a": 1114, "group": 139} +{"id":677, "a": 1353, "group": 169} +{"id":604, "a": 1208, "group": 151} +{"id":8, "a": 16, "group": 2} +{"id":290, "a": 578, "group": 72} +{"id":932, "a": 1864, "group": 233} +{"id":731, "a": 1459, "group": 182} +{"id":477, "a": 953, "group": 119} +{"id":859, "a": 1715, "group": 214} +{"id":291, "a": 579, "group": 72} +{"id":531, "a": 1059, "group": 132} +{"id":499, "a": 995, "group": 124} +{"id":389, "a": 777, "group": 97} +{"id":182, "a": 362, "group": 45} +{"id":959, "a": 1915, "group": 239} +{"id":523, "a": 1043, "group": 130} +{"id":81, "a": 161, "group": 20} +{"id":439, "a": 875, "group": 109} +{"id":228, "a": 456, "group": 57} +{"id":301, "a": 601, "group": 75} +{"id":208, "a": 416, "group": 52} +{"id":370, "a": 738, "group": 92} +{"id":383, "a": 763, "group": 95} +{"id":209, "a": 417, "group": 52} +{"id":462, "a": 922, "group": 115} +{"id":729, "a": 1457, "group": 182} +{"id":602, "a": 1202, "group": 150} +{"id":936, "a": 1872, "group": 234} +{"id":750, "a": 1498, "group": 187} +{"id":871, "a": 1739, "group": 217} +{"id":120, "a": 240, "group": 30} +{"id":843, "a": 1683, "group": 210} +{"id":260, "a": 520, "group": 65} +{"id":240, "a": 480, "group": 60} +{"id":976, "a": 1952, "group": 244} +{"id":344, "a": 688, "group": 86} +{"id":385, "a": 769, "group": 96} +{"id":410, "a": 818, "group": 102} +{"id":931, "a": 1859, "group": 232} +{"id":891, "a": 1779, "group": 222} +{"id":745, "a": 1489, "group": 186} +{"id":813, "a": 1625, "group": 203} +{"id":129, "a": 257, "group": 32} +{"id":596, "a": 1192, "group": 149} +{"id":517, "a": 1033, "group": 129} +{"id":755, "a": 1507, "group": 188} +{"id":663, "a": 1323, "group": 165} +{"id":233, "a": 465, "group": 58} +{"id":401, "a": 801, "group": 100} +{"id":473, "a": 945, "group": 118} +{"id":990, "a": 1978, "group": 247} +{"id":384, "a": 768, "group": 96} +{"id":178, "a": 354, "group": 44} +{"id":446, "a": 890, "group": 111} +{"id":828, "a": 1656, "group": 207} +{"id":356, "a": 712, "group": 89} +{"id":249, "a": 497, "group": 62} +{"id":553, "a": 1105, "group": 138} +{"id":378, "a": 754, "group": 94} +{"id":126, "a": 250, "group": 31} +{"id":806, "a": 1610, "group": 201} +{"id":540, "a": 1080, "group": 135} +{"id":545, "a": 1089, "group": 136} +{"id":398, "a": 794, "group": 99} +{"id":848, "a": 1696, "group": 212} +{"id":493, "a": 985, "group": 123} +{"id":928, "a": 1856, "group": 232} +{"id":408, "a": 816, "group": 102} +{"id":285, "a": 569, "group": 71} +{"id":795, "a": 1587, "group": 198} +{"id":74, "a": 146, "group": 18} +{"id":332, "a": 664, "group": 83} +{"id":712, "a": 1424, "group": 178} +{"id":858, "a": 1714, "group": 214} +{"id":961, "a": 1921, "group": 240} +{"id":212, "a": 424, "group": 53} +{"id":11, "a": 19, "group": 2} +{"id":839, "a": 1675, "group": 209} +{"id":302, "a": 602, "group": 75} +{"id":117, "a": 233, "group": 29} +{"id":852, "a": 1704, "group": 213} +{"id":528, "a": 1056, "group": 132} +{"id":829, "a": 1657, "group": 207} +{"id":563, "a": 1123, "group": 140} +{"id":968, "a": 1936, "group": 242} +{"id":658, "a": 1314, "group": 164} +{"id":49, "a": 97, "group": 12} +{"id":52, "a": 104, "group": 13} +{"id":186, "a": 370, "group": 46} +{"id":407, "a": 811, "group": 101} +{"id":98, "a": 194, "group": 24} +{"id":377, "a": 753, "group": 94} +{"id":195, "a": 387, "group": 48} +{"id":826, "a": 1650, "group": 206} +{"id":783, "a": 1563, "group": 195} +{"id":284, "a": 568, "group": 71} +{"id":34, "a": 66, "group": 8} +{"id":752, "a": 1504, "group": 188} +{"id":472, "a": 944, "group": 118} +{"id":500, "a": 1000, "group": 125} +{"id":812, "a": 1624, "group": 203} +{"id":300, "a": 600, "group": 75} +{"id":691, "a": 1379, "group": 172} +{"id":435, "a": 867, "group": 108} +{"id":693, "a": 1385, "group": 173} +{"id":847, "a": 1691, "group": 211} +{"id":235, "a": 467, "group": 58} +{"id":45, "a": 89, "group": 11} +{"id":947, "a": 1891, "group": 236} +{"id":184, "a": 368, "group": 46} +{"id":996, "a": 1992, "group": 249} +{"id":150, "a": 298, "group": 37} +{"id":413, "a": 825, "group": 103} +{"id":952, "a": 1904, "group": 238} +{"id":594, "a": 1186, "group": 148} +{"id":133, "a": 265, "group": 33} +{"id":587, "a": 1171, "group": 146} +{"id":612, "a": 1224, "group": 153} +{"id":515, "a": 1027, "group": 128} +{"id":718, "a": 1434, "group": 179} +{"id":884, "a": 1768, "group": 221} +{"id":887, "a": 1771, "group": 221} +{"id":585, "a": 1169, "group": 146} +{"id":695, "a": 1387, "group": 173} +{"id":965, "a": 1929, "group": 241} +{"id":591, "a": 1179, "group": 147} +{"id":374, "a": 746, "group": 93} +{"id":780, "a": 1560, "group": 195} +{"id":305, "a": 609, "group": 76} +{"id":71, "a": 139, "group": 17} +{"id":84, "a": 168, "group": 21} +{"id":58, "a": 114, "group": 14} +{"id":12, "a": 24, "group": 3} +{"id":315, "a": 627, "group": 78} +{"id":131, "a": 259, "group": 32} +{"id":362, "a": 722, "group": 90} +{"id":490, "a": 978, "group": 122} +{"id":234, "a": 466, "group": 58} +{"id":349, "a": 697, "group": 87} +{"id":688, "a": 1376, "group": 172} +{"id":379, "a": 755, "group": 94} +{"id":561, "a": 1121, "group": 140} +{"id":363, "a": 723, "group": 90} +{"id":287, "a": 571, "group": 71} +{"id":770, "a": 1538, "group": 192} +{"id":127, "a": 251, "group": 31} +{"id":583, "a": 1163, "group": 145} +{"id":471, "a": 939, "group": 117} +{"id":788, "a": 1576, "group": 197} +{"id":897, "a": 1793, "group": 224} +{"id":916, "a": 1832, "group": 229} +{"id":956, "a": 1912, "group": 239} +{"id":224, "a": 448, "group": 56} +{"id":787, "a": 1571, "group": 196} +{"id":173, "a": 345, "group": 43} +{"id":47, "a": 91, "group": 11} +{"id":180, "a": 360, "group": 45} +{"id":488, "a": 976, "group": 122} +{"id":764, "a": 1528, "group": 191} +{"id":112, "a": 224, "group": 28} +{"id":781, "a": 1561, "group": 195} +{"id":14, "a": 26, "group": 3} +{"id":204, "a": 408, "group": 51} +{"id":317, "a": 633, "group": 79} +{"id":784, "a": 1568, "group": 196} +{"id":796, "a": 1592, "group": 199} +{"id":375, "a": 747, "group": 93} +{"id":618, "a": 1234, "group": 154} +{"id":207, "a": 411, "group": 51} +{"id":179, "a": 355, "group": 44} +{"id":297, "a": 593, "group": 74} +{"id":838, "a": 1674, "group": 209} +{"id":699, "a": 1395, "group": 174} +{"id":320, "a": 640, "group": 80} +{"id":675, "a": 1347, "group": 168} +{"id":925, "a": 1849, "group": 231} +{"id":684, "a": 1368, "group": 171} +{"id":986, "a": 1970, "group": 246} +{"id":930, "a": 1858, "group": 232} +{"id":911, "a": 1819, "group": 227} +{"id":977, "a": 1953, "group": 244} +{"id":48, "a": 96, "group": 12} +{"id":496, "a": 992, "group": 124} +{"id":794, "a": 1586, "group": 198} +{"id":867, "a": 1731, "group": 216} +{"id":520, "a": 1040, "group": 130} +{"id":621, "a": 1241, "group": 155} +{"id":475, "a": 947, "group": 118} +{"id":270, "a": 538, "group": 67} +{"id":648, "a": 1296, "group": 162} +{"id":842, "a": 1682, "group": 210} +{"id":200, "a": 400, "group": 50} +{"id":924, "a": 1848, "group": 231} +{"id":466, "a": 930, "group": 116} +{"id":40, "a": 80, "group": 10} +{"id":600, "a": 1200, "group": 150} +{"id":883, "a": 1763, "group": 220} +{"id":221, "a": 441, "group": 55} +{"id":106, "a": 210, "group": 26} +{"id":313, "a": 625, "group": 78} +{"id":761, "a": 1521, "group": 190} +{"id":800, "a": 1600, "group": 200} +{"id":241, "a": 481, "group": 60} +{"id":640, "a": 1280, "group": 160} +{"id":358, "a": 714, "group": 89} +{"id":960, "a": 1920, "group": 240} +{"id":347, "a": 691, "group": 86} +{"id":646, "a": 1290, "group": 161} +{"id":236, "a": 472, "group": 59} +{"id":920, "a": 1840, "group": 230} +{"id":586, "a": 1170, "group": 146} +{"id":175, "a": 347, "group": 43} +{"id":371, "a": 739, "group": 92} +{"id":741, "a": 1481, "group": 185} +{"id":652, "a": 1304, "group": 163} +{"id":164, "a": 328, "group": 41} +{"id":444, "a": 888, "group": 111} +{"id":949, "a": 1897, "group": 237} +{"id":115, "a": 227, "group": 28} +{"id":893, "a": 1785, "group": 223} +{"id":940, "a": 1880, "group": 235} +{"id":261, "a": 521, "group": 65} +{"id":105, "a": 209, "group": 26} +{"id":449, "a": 897, "group": 112} +{"id":94, "a": 186, "group": 23} +{"id":810, "a": 1618, "group": 202} +{"id":252, "a": 504, "group": 63} +{"id":946, "a": 1890, "group": 236} +{"id":136, "a": 272, "group": 34} +{"id":70, "a": 138, "group": 17} +{"id":203, "a": 403, "group": 50} +{"id":276, "a": 552, "group": 69} +{"id":703, "a": 1403, "group": 175} +{"id":714, "a": 1426, "group": 178} +{"id":144, "a": 288, "group": 36} +{"id":763, "a": 1523, "group": 190} +{"id":142, "a": 282, "group": 35} +{"id":406, "a": 810, "group": 101} +{"id":225, "a": 449, "group": 56} +{"id":93, "a": 185, "group": 23} +{"id":622, "a": 1242, "group": 155} +{"id":461, "a": 921, "group": 115} +{"id":923, "a": 1843, "group": 230} +{"id":971, "a": 1939, "group": 242} +{"id":748, "a": 1496, "group": 187} +{"id":687, "a": 1371, "group": 171} +{"id":340, "a": 680, "group": 85} +{"id":223, "a": 443, "group": 55} +{"id":625, "a": 1249, "group": 156} +{"id":895, "a": 1787, "group": 223} +{"id":738, "a": 1474, "group": 184} +{"id":35, "a": 67, "group": 8} +{"id":159, "a": 315, "group": 39} +{"id":981, "a": 1961, "group": 245} +{"id":521, "a": 1041, "group": 130} +{"id":36, "a": 72, "group": 9} +{"id":360, "a": 720, "group": 90} +{"id":194, "a": 386, "group": 48} +{"id":333, "a": 665, "group": 83} +{"id":816, "a": 1632, "group": 204} +{"id":805, "a": 1609, "group": 201} +{"id":122, "a": 242, "group": 30} +{"id":67, "a": 131, "group": 16} +{"id":866, "a": 1730, "group": 216} +{"id":219, "a": 435, "group": 54} +{"id":274, "a": 546, "group": 68} +{"id":102, "a": 202, "group": 25} +{"id":951, "a": 1899, "group": 237} +{"id":836, "a": 1672, "group": 209} +{"id":191, "a": 379, "group": 47} +{"id":337, "a": 673, "group": 84} +{"id":841, "a": 1681, "group": 210} +{"id":92, "a": 184, "group": 23} +{"id":481, "a": 961, "group": 120} +{"id":970, "a": 1938, "group": 242} +{"id":878, "a": 1754, "group": 219} +{"id":294, "a": 586, "group": 73} +{"id":386, "a": 770, "group": 96} +{"id":484, "a": 968, "group": 121} +{"id":789, "a": 1577, "group": 197} +{"id":492, "a": 984, "group": 123} +{"id":19, "a": 35, "group": 4} +{"id":263, "a": 523, "group": 65} +{"id":514, "a": 1026, "group": 128} +{"id":352, "a": 704, "group": 88} +{"id":503, "a": 1003, "group": 125} +{"id":726, "a": 1450, "group": 181} +{"id":890, "a": 1778, "group": 222} +{"id":926, "a": 1850, "group": 231} +{"id":707, "a": 1411, "group": 176} +{"id":216, "a": 432, "group": 54} +{"id":807, "a": 1611, "group": 201} +{"id":942, "a": 1882, "group": 235} +{"id":678, "a": 1354, "group": 169} +{"id":354, "a": 706, "group": 88} +{"id":77, "a": 153, "group": 19} +{"id":75, "a": 147, "group": 18} +{"id":830, "a": 1658, "group": 207} +{"id":215, "a": 427, "group": 53} +{"id":966, "a": 1930, "group": 241} +{"id":603, "a": 1203, "group": 150} +{"id":137, "a": 273, "group": 34} +{"id":17, "a": 33, "group": 4} +{"id":991, "a": 1979, "group": 247} +{"id":299, "a": 595, "group": 74} +{"id":643, "a": 1283, "group": 160} +{"id":190, "a": 378, "group": 47} +{"id":967, "a": 1931, "group": 241} +{"id":169, "a": 337, "group": 42} +{"id":460, "a": 920, "group": 115} +{"id":330, "a": 658, "group": 82} +{"id":436, "a": 872, "group": 109} +{"id":393, "a": 785, "group": 98} +{"id":329, "a": 657, "group": 82} +{"id":80, "a": 160, "group": 20} +{"id":395, "a": 787, "group": 98} +{"id":623, "a": 1243, "group": 155} +{"id":110, "a": 218, "group": 27} +{"id":213, "a": 425, "group": 53} +{"id":448, "a": 896, "group": 112} +{"id":671, "a": 1339, "group": 167} +{"id":751, "a": 1499, "group": 187} +{"id":606, "a": 1210, "group": 151} +{"id":624, "a": 1248, "group": 156} +{"id":766, "a": 1530, "group": 191} +{"id":31, "a": 59, "group": 7} +{"id":649, "a": 1297, "group": 162} +{"id":863, "a": 1723, "group": 215} +{"id":328, "a": 656, "group": 82} +{"id":686, "a": 1370, "group": 171} +{"id":343, "a": 683, "group": 85} +{"id":418, "a": 834, "group": 104} +{"id":850, "a": 1698, "group": 212} +{"id":892, "a": 1784, "group": 223} +{"id":657, "a": 1313, "group": 164} +{"id":880, "a": 1760, "group": 220} +{"id":988, "a": 1976, "group": 247} +{"id":772, "a": 1544, "group": 193} +{"id":909, "a": 1817, "group": 227} +{"id":394, "a": 786, "group": 98} +{"id":999, "a": 1995, "group": 249} +{"id":161, "a": 321, "group": 40} +{"id":754, "a": 1506, "group": 188} +{"id":56, "a": 112, "group": 14} +{"id":733, "a": 1465, "group": 183} +{"id":870, "a": 1738, "group": 217} +{"id":456, "a": 912, "group": 114} +{"id":114, "a": 226, "group": 28} +{"id":571, "a": 1139, "group": 142} +{"id":567, "a": 1131, "group": 141} +{"id":827, "a": 1651, "group": 206} +{"id":757, "a": 1513, "group": 189} +{"id":720, "a": 1440, "group": 180} +{"id":709, "a": 1417, "group": 177} +{"id":831, "a": 1659, "group": 207} +{"id":773, "a": 1545, "group": 193} +{"id":201, "a": 401, "group": 50} +{"id":23, "a": 43, "group": 5} +{"id":421, "a": 841, "group": 105} +{"id":516, "a": 1032, "group": 129} +{"id":22, "a": 42, "group": 5} +{"id":538, "a": 1074, "group": 134} +{"id":588, "a": 1176, "group": 147} +{"id":326, "a": 650, "group": 81} +{"id":815, "a": 1627, "group": 203} +{"id":319, "a": 635, "group": 79} +{"id":440, "a": 880, "group": 110} +{"id":875, "a": 1747, "group": 218} +{"id":634, "a": 1266, "group": 158} +{"id":172, "a": 344, "group": 43} +{"id":694, "a": 1386, "group": 173} +{"id":767, "a": 1531, "group": 191} +{"id":324, "a": 648, "group": 81} +{"id":33, "a": 65, "group": 8} +{"id":935, "a": 1867, "group": 233} +{"id":667, "a": 1331, "group": 166} +{"id":91, "a": 179, "group": 22} +{"id":719, "a": 1435, "group": 179} +{"id":582, "a": 1162, "group": 145} +{"id":739, "a": 1475, "group": 184} +{"id":635, "a": 1267, "group": 158} +{"id":367, "a": 731, "group": 91} +{"id":636, "a": 1272, "group": 159} +{"id":743, "a": 1483, "group": 185} +{"id":463, "a": 923, "group": 115} +{"id":834, "a": 1666, "group": 208} +{"id":532, "a": 1064, "group": 133} +{"id":704, "a": 1408, "group": 176} +{"id":387, "a": 771, "group": 96} +{"id":57, "a": 113, "group": 14} +{"id":153, "a": 305, "group": 38} +{"id":364, "a": 728, "group": 91} +{"id":905, "a": 1809, "group": 226} +{"id":578, "a": 1154, "group": 144} +{"id":265, "a": 529, "group": 66} +{"id":642, "a": 1282, "group": 160} +{"id":689, "a": 1377, "group": 172} +{"id":574, "a": 1146, "group": 143} +{"id":318, "a": 634, "group": 79} +{"id":519, "a": 1035, "group": 129} +{"id":411, "a": 819, "group": 102} +{"id":465, "a": 929, "group": 116} +{"id":174, "a": 346, "group": 43} +{"id":286, "a": 570, "group": 71} +{"id":162, "a": 322, "group": 40} +{"id":894, "a": 1786, "group": 223} +{"id":445, "a": 889, "group": 111} +{"id":295, "a": 587, "group": 73} +{"id":599, "a": 1195, "group": 149} +{"id":1000, "a": 2000, "group": 250} +{"id":491, "a": 979, "group": 122} +{"id":539, "a": 1075, "group": 134} +{"id":664, "a": 1328, "group": 166} +{"id":771, "a": 1539, "group": 192} +{"id":244, "a": 488, "group": 61} +{"id":123, "a": 243, "group": 30} +{"id":230, "a": 458, "group": 57} +{"id":149, "a": 297, "group": 37} +{"id":467, "a": 931, "group": 116} +{"id":372, "a": 744, "group": 93} +{"id":921, "a": 1841, "group": 230} +{"id":388, "a": 776, "group": 97} +{"id":898, "a": 1794, "group": 224} +{"id":239, "a": 475, "group": 59} +{"id":390, "a": 778, "group": 97} +{"id":903, "a": 1803, "group": 225} +{"id":382, "a": 762, "group": 95} +{"id":715, "a": 1427, "group": 178} +{"id":774, "a": 1546, "group": 193} +{"id":259, "a": 515, "group": 64} +{"id":419, "a": 835, "group": 104} +{"id":51, "a": 99, "group": 12} +{"id":929, "a": 1857, "group": 232} +{"id":455, "a": 907, "group": 113} +{"id":404, "a": 808, "group": 101} +{"id":526, "a": 1050, "group": 131} +{"id":985, "a": 1969, "group": 246} +{"id":518, "a": 1034, "group": 129} +{"id":323, "a": 643, "group": 80} +{"id":821, "a": 1641, "group": 205} +{"id":427, "a": 851, "group": 106} +{"id":833, "a": 1665, "group": 208} +{"id":723, "a": 1443, "group": 180} +{"id":973, "a": 1945, "group": 243} +{"id":555, "a": 1107, "group": 138} +{"id":513, "a": 1025, "group": 128} +{"id":251, "a": 499, "group": 62} +{"id":217, "a": 433, "group": 54} +{"id":581, "a": 1161, "group": 145} +{"id":345, "a": 689, "group": 86} +{"id":498, "a": 994, "group": 124} +{"id":637, "a": 1273, "group": 159} +{"id":955, "a": 1907, "group": 238} +{"id":680, "a": 1360, "group": 170} +{"id":310, "a": 618, "group": 77} +{"id":817, "a": 1633, "group": 204} +{"id":346, "a": 690, "group": 86} +{"id":958, "a": 1914, "group": 239} +{"id":506, "a": 1010, "group": 126} +{"id":403, "a": 803, "group": 100} +{"id":865, "a": 1729, "group": 216} +{"id":666, "a": 1330, "group": 166} +{"id":264, "a": 528, "group": 66} +{"id":258, "a": 514, "group": 64} +{"id":944, "a": 1888, "group": 236} +{"id":422, "a": 842, "group": 105} +{"id":914, "a": 1826, "group": 228} +{"id":862, "a": 1722, "group": 215} +{"id":405, "a": 809, "group": 101} +{"id":250, "a": 498, "group": 62} +{"id":541, "a": 1081, "group": 135} +{"id":644, "a": 1288, "group": 161} +{"id":423, "a": 843, "group": 105} +{"id":480, "a": 960, "group": 120} +{"id":28, "a": 56, "group": 7} +{"id":734, "a": 1466, "group": 183} +{"id":452, "a": 904, "group": 113} +{"id":268, "a": 536, "group": 67} +{"id":708, "a": 1416, "group": 177} +{"id":451, "a": 899, "group": 112} +{"id":535, "a": 1067, "group": 133} +{"id":1, "a": 1, "group": 0} +{"id":943, "a": 1883, "group": 235} +{"id":510, "a": 1018, "group": 127} +{"id":464, "a": 928, "group": 116} +{"id":705, "a": 1409, "group": 176} +{"id":381, "a": 761, "group": 95} +{"id":6, "a": 10, "group": 1} +{"id":257, "a": 513, "group": 64} +{"id":851, "a": 1699, "group": 212} +{"id":938, "a": 1874, "group": 234} +{"id":835, "a": 1667, "group": 208} +{"id":501, "a": 1001, "group": 125} +{"id":296, "a": 592, "group": 74} +{"id":818, "a": 1634, "group": 204} +{"id":577, "a": 1153, "group": 144} +{"id":730, "a": 1458, "group": 182} +{"id":450, "a": 898, "group": 112} +{"id":391, "a": 779, "group": 97} +{"id":256, "a": 512, "group": 64} +{"id":544, "a": 1088, "group": 136} +{"id":629, "a": 1257, "group": 157} +{"id":189, "a": 377, "group": 47} +{"id":304, "a": 608, "group": 76} +{"id":508, "a": 1016, "group": 127} +{"id":681, "a": 1361, "group": 170} +{"id":86, "a": 170, "group": 21} +{"id":901, "a": 1801, "group": 225} +{"id":55, "a": 107, "group": 13} +{"id":647, "a": 1291, "group": 161} +{"id":737, "a": 1473, "group": 184} +{"id":5, "a": 9, "group": 1} +{"id":879, "a": 1755, "group": 219} +{"id":913, "a": 1825, "group": 228} +{"id":557, "a": 1113, "group": 139} +{"id":430, "a": 858, "group": 107} +{"id":30, "a": 58, "group": 7} +{"id":779, "a": 1555, "group": 194} +{"id":237, "a": 473, "group": 59} +{"id":238, "a": 474, "group": 59} +{"id":69, "a": 137, "group": 17} +{"id":165, "a": 329, "group": 41} +{"id":804, "a": 1608, "group": 201} +{"id":672, "a": 1344, "group": 168} +{"id":904, "a": 1808, "group": 226} +{"id":20, "a": 40, "group": 5} +{"id":650, "a": 1298, "group": 162} +{"id":124, "a": 248, "group": 31} +{"id":819, "a": 1635, "group": 204} +{"id":76, "a": 152, "group": 19} +{"id":918, "a": 1834, "group": 229} +{"id":855, "a": 1707, "group": 213} +{"id":922, "a": 1842, "group": 230} +{"id":562, "a": 1122, "group": 140} +{"id":101, "a": 201, "group": 25} +{"id":96, "a": 192, "group": 24} +{"id":357, "a": 713, "group": 89} +{"id":279, "a": 555, "group": 69} +{"id":759, "a": 1515, "group": 189} +{"id":293, "a": 585, "group": 73} +{"id":653, "a": 1305, "group": 163} +{"id":108, "a": 216, "group": 27} +{"id":254, "a": 506, "group": 63} +{"id":655, "a": 1307, "group": 163} +{"id":945, "a": 1889, "group": 236} +{"id":572, "a": 1144, "group": 143} +{"id":756, "a": 1512, "group": 189} +{"id":822, "a": 1642, "group": 205} +{"id":288, "a": 576, "group": 72} +{"id":641, "a": 1281, "group": 160} +{"id":275, "a": 547, "group": 68} +{"id":654, "a": 1306, "group": 163} +{"id":896, "a": 1792, "group": 224} +{"id":192, "a": 384, "group": 48} +{"id":885, "a": 1769, "group": 221} +{"id":660, "a": 1320, "group": 165} +{"id":573, "a": 1145, "group": 143} +{"id":163, "a": 323, "group": 40} +{"id":802, "a": 1602, "group": 200} +{"id":874, "a": 1746, "group": 218} +{"id":791, "a": 1579, "group": 197} +{"id":303, "a": 603, "group": 75} +{"id":267, "a": 531, "group": 66} +{"id":529, "a": 1057, "group": 132} +{"id":811, "a": 1619, "group": 202} +{"id":713, "a": 1425, "group": 178} +{"id":193, "a": 385, "group": 48} +{"id":886, "a": 1770, "group": 221} +{"id":416, "a": 832, "group": 104} +{"id":786, "a": 1570, "group": 196} +{"id":15, "a": 27, "group": 3} +{"id":626, "a": 1250, "group": 156} +{"id":83, "a": 163, "group": 20} +{"id":231, "a": 459, "group": 57} +{"id":777, "a": 1553, "group": 194} +{"id":78, "a": 154, "group": 19} +{"id":877, "a": 1753, "group": 219} +{"id":232, "a": 464, "group": 58} +{"id":607, "a": 1211, "group": 151} +{"id":525, "a": 1049, "group": 131} +{"id":322, "a": 642, "group": 80} +{"id":41, "a": 81, "group": 10} +{"id":882, "a": 1762, "group": 220} +{"id":957, "a": 1913, "group": 239} +{"id":21, "a": 41, "group": 5} +{"id":728, "a": 1456, "group": 182} +{"id":206, "a": 410, "group": 51} +{"id":775, "a": 1547, "group": 193} +{"id":2, "a": 2, "group": 0} +{"id":673, "a": 1345, "group": 168} +{"id":64, "a": 128, "group": 16} +{"id":309, "a": 617, "group": 77} +{"id":415, "a": 827, "group": 103} +{"id":537, "a": 1073, "group": 134} +{"id":597, "a": 1193, "group": 149} +{"id":458, "a": 914, "group": 114} +{"id":872, "a": 1744, "group": 218} +{"id":355, "a": 707, "group": 88} +{"id":638, "a": 1274, "group": 159} +{"id":546, "a": 1090, "group": 136} +{"id":140, "a": 280, "group": 35} +{"id":331, "a": 659, "group": 82} +{"id":697, "a": 1393, "group": 174} +{"id":9, "a": 17, "group": 2} +{"id":60, "a": 120, "group": 15} +{"id":849, "a": 1697, "group": 212} +{"id":119, "a": 235, "group": 29} +{"id":316, "a": 632, "group": 79} +{"id":782, "a": 1562, "group": 195} +{"id":565, "a": 1129, "group": 141} +{"id":494, "a": 986, "group": 123} +{"id":437, "a": 873, "group": 109} +{"id":856, "a": 1712, "group": 214} +{"id":397, "a": 793, "group": 99} +{"id":742, "a": 1482, "group": 185} +{"id":692, "a": 1384, "group": 173} +{"id":854, "a": 1706, "group": 213} +{"id":68, "a": 136, "group": 17} +{"id":869, "a": 1737, "group": 217} +{"id":280, "a": 560, "group": 70} +{"id":242, "a": 482, "group": 60} +{"id":66, "a": 130, "group": 16} +{"id":823, "a": 1643, "group": 205} +{"id":964, "a": 1928, "group": 241} +{"id":158, "a": 314, "group": 39} +{"id":690, "a": 1378, "group": 172} +{"id":185, "a": 369, "group": 46} +{"id":619, "a": 1235, "group": 154} +{"id":400, "a": 800, "group": 100} +{"id":908, "a": 1816, "group": 227} +{"id":109, "a": 217, "group": 27} +{"id":54, "a": 106, "group": 13} +{"id":511, "a": 1019, "group": 127} +{"id":111, "a": 219, "group": 27} +{"id":125, "a": 249, "group": 31} +{"id":85, "a": 169, "group": 21} +{"id":617, "a": 1233, "group": 154} +{"id":798, "a": 1594, "group": 199} +{"id":399, "a": 795, "group": 99} +{"id":470, "a": 938, "group": 117} +{"id":645, "a": 1289, "group": 161} +{"id":187, "a": 371, "group": 46} +{"id":474, "a": 946, "group": 118} +{"id":134, "a": 266, "group": 33} +{"id":335, "a": 667, "group": 83} +{"id":711, "a": 1419, "group": 177} +{"id":145, "a": 289, "group": 36} +{"id":157, "a": 313, "group": 39} +{"id":177, "a": 353, "group": 44} +{"id":808, "a": 1616, "group": 202} +{"id":662, "a": 1322, "group": 165} +{"id":420, "a": 840, "group": 105} +{"id":568, "a": 1136, "group": 142} +{"id":130, "a": 258, "group": 32} +{"id":864, "a": 1728, "group": 216} +{"id":542, "a": 1082, "group": 135} +{"id":89, "a": 177, "group": 22} +{"id":26, "a": 50, "group": 6} +{"id":969, "a": 1937, "group": 242} +{"id":366, "a": 730, "group": 91} +{"id":575, "a": 1147, "group": 143} +{"id":368, "a": 736, "group": 92} +{"id":308, "a": 616, "group": 77} +{"id":941, "a": 1881, "group": 235} +{"id":590, "a": 1178, "group": 147} +{"id":825, "a": 1649, "group": 206} +{"id":732, "a": 1464, "group": 183} +{"id":569, "a": 1137, "group": 142} +{"id":601, "a": 1201, "group": 150} +{"id":746, "a": 1490, "group": 186} +{"id":246, "a": 490, "group": 61} +{"id":116, "a": 232, "group": 29} +{"id":873, "a": 1745, "group": 218} +{"id":181, "a": 361, "group": 45} +{"id":876, "a": 1752, "group": 219} +{"id":632, "a": 1264, "group": 158} +{"id":336, "a": 672, "group": 84} +{"id":128, "a": 256, "group": 32} +{"id":292, "a": 584, "group": 73} +{"id":205, "a": 409, "group": 51} +{"id":429, "a": 857, "group": 107} +{"id":845, "a": 1689, "group": 211} +{"id":143, "a": 283, "group": 35} +{"id":668, "a": 1336, "group": 167} +{"id":744, "a": 1488, "group": 186} +{"id":630, "a": 1258, "group": 157} +{"id":53, "a": 105, "group": 13} +{"id":792, "a": 1584, "group": 198} +{"id":593, "a": 1185, "group": 148} +{"id":803, "a": 1603, "group": 200} +{"id":899, "a": 1795, "group": 224} +{"id":253, "a": 505, "group": 63} +{"id":243, "a": 483, "group": 60} +{"id":245, "a": 489, "group": 61} +{"id":82, "a": 162, "group": 20} +{"id":497, "a": 993, "group": 124} +{"id":95, "a": 187, "group": 23} +{"id":7, "a": 11, "group": 1} +{"id":919, "a": 1835, "group": 229} +{"id":710, "a": 1418, "group": 177} +{"id":351, "a": 699, "group": 87} +{"id":651, "a": 1299, "group": 162} +{"id":954, "a": 1906, "group": 238} +{"id":170, "a": 338, "group": 42} +{"id":32, "a": 64, "group": 8} +{"id":222, "a": 442, "group": 55} +{"id":479, "a": 955, "group": 119} +{"id":706, "a": 1410, "group": 176} +{"id":564, "a": 1128, "group": 141} +{"id":769, "a": 1537, "group": 192} +{"id":524, "a": 1048, "group": 131} +{"id":46, "a": 90, "group": 11} +{"id":793, "a": 1585, "group": 198} +{"id":837, "a": 1673, "group": 209} +{"id":979, "a": 1955, "group": 244} +{"id":962, "a": 1922, "group": 240} +{"id":740, "a": 1480, "group": 185} +{"id":282, "a": 562, "group": 70} +{"id":724, "a": 1448, "group": 181} +{"id":509, "a": 1017, "group": 127} +{"id":266, "a": 530, "group": 66} +{"id":271, "a": 539, "group": 67} +{"id":155, "a": 307, "group": 38} +{"id":18, "a": 34, "group": 4} +{"id":339, "a": 675, "group": 84} +{"id":598, "a": 1194, "group": 149} +{"id":948, "a": 1896, "group": 237} +{"id":910, "a": 1818, "group": 227} +{"id":846, "a": 1690, "group": 211} +{"id":138, "a": 274, "group": 34} +{"id":760, "a": 1520, "group": 190} +{"id":853, "a": 1705, "group": 213} +{"id":860, "a": 1720, "group": 215} +{"id":283, "a": 563, "group": 70} +{"id":188, "a": 376, "group": 47} +{"id":613, "a": 1225, "group": 153} +{"id":321, "a": 641, "group": 80} +{"id":148, "a": 296, "group": 37} +{"id":674, "a": 1346, "group": 168} +{"id":226, "a": 450, "group": 56} +{"id":994, "a": 1986, "group": 248} +{"id":998, "a": 1994, "group": 249} +{"id":97, "a": 193, "group": 24} +{"id":160, "a": 320, "group": 40} +{"id":616, "a": 1232, "group": 154} +{"id":424, "a": 848, "group": 106} +{"id":937, "a": 1873, "group": 234} +{"id":392, "a": 784, "group": 98} +{"id":25, "a": 49, "group": 6} +{"id":88, "a": 176, "group": 22} +{"id":534, "a": 1066, "group": 133} +{"id":536, "a": 1072, "group": 134} +{"id":530, "a": 1058, "group": 132} +{"id":289, "a": 577, "group": 72} +{"id":861, "a": 1721, "group": 215} +{"id":376, "a": 752, "group": 94} +{"id":682, "a": 1362, "group": 170} +{"id":327, "a": 651, "group": 81} +{"id":566, "a": 1130, "group": 141} +{"id":199, "a": 395, "group": 49} +{"id":902, "a": 1802, "group": 225} +{"id":433, "a": 865, "group": 108} +{"id":13, "a": 25, "group": 3} +{"id":103, "a": 203, "group": 25} +{"id":907, "a": 1811, "group": 226} +{"id":989, "a": 1977, "group": 247} +{"id":974, "a": 1946, "group": 243} +{"id":978, "a": 1954, "group": 244} +{"id":107, "a": 211, "group": 26} +{"id":722, "a": 1442, "group": 180} +{"id":38, "a": 74, "group": 9} +{"id":549, "a": 1097, "group": 137} +{"id":485, "a": 969, "group": 121} +{"id":218, "a": 434, "group": 54} +{"id":927, "a": 1851, "group": 231} +{"id":725, "a": 1449, "group": 181} +{"id":702, "a": 1402, "group": 175} +{"id":198, "a": 394, "group": 49} +{"id":214, "a": 426, "group": 53} +{"id":373, "a": 745, "group": 93} +{"id":147, "a": 291, "group": 36} +{"id":63, "a": 123, "group": 15} +{"id":79, "a": 155, "group": 19} +{"id":543, "a": 1083, "group": 135} +{"id":334, "a": 666, "group": 83} +{"id":59, "a": 115, "group": 14} +{"id":459, "a": 915, "group": 114} +{"id":495, "a": 987, "group": 123} +{"id":211, "a": 419, "group": 52} +{"id":554, "a": 1106, "group": 138} +{"id":62, "a": 122, "group": 15} +{"id":758, "a": 1514, "group": 189} +{"id":272, "a": 544, "group": 68} +{"id":727, "a": 1451, "group": 181} +{"id":868, "a": 1736, "group": 217} +{"id":631, "a": 1259, "group": 157} +{"id":152, "a": 304, "group": 38} +{"id":669, "a": 1337, "group": 167} +{"id":840, "a": 1680, "group": 210} +{"id":983, "a": 1963, "group": 245} +{"id":595, "a": 1187, "group": 148} +{"id":685, "a": 1369, "group": 171} +{"id":441, "a": 881, "group": 110} +{"id":527, "a": 1051, "group": 131} +{"id":820, "a": 1640, "group": 205} +{"id":434, "a": 866, "group": 108} +{"id":277, "a": 553, "group": 69} +{"id":90, "a": 178, "group": 22} +{"id":676, "a": 1352, "group": 169} +{"id":627, "a": 1251, "group": 156} +{"id":614, "a": 1226, "group": 153} +{"id":432, "a": 864, "group": 108} +{"id":350, "a": 698, "group": 87} +{"id":247, "a": 491, "group": 61} +{"id":551, "a": 1099, "group": 137} +{"id":29, "a": 57, "group": 7} +{"id":104, "a": 208, "group": 26} +{"id":801, "a": 1601, "group": 200} +{"id":589, "a": 1177, "group": 147} +{"id":409, "a": 817, "group": 102} +{"id":248, "a": 496, "group": 62} +{"id":39, "a": 75, "group": 9} +{"id":953, "a": 1905, "group": 238} +{"id":438, "a": 874, "group": 109} +{"id":278, "a": 554, "group": 69} +{"id":447, "a": 891, "group": 111} +{"id":16, "a": 32, "group": 4} +{"id":426, "a": 850, "group": 106} +{"id":950, "a": 1898, "group": 237} +{"id":533, "a": 1065, "group": 133} +{"id":171, "a": 339, "group": 42} +{"id":482, "a": 962, "group": 120} +{"id":933, "a": 1865, "group": 233} +{"id":701, "a": 1401, "group": 175} +{"id":428, "a": 856, "group": 107} +{"id":915, "a": 1827, "group": 228} +{"id":972, "a": 1944, "group": 243} +{"id":857, "a": 1713, "group": 214} +{"id":844, "a": 1688, "group": 211} +{"id":683, "a": 1363, "group": 170} +{"id":3, "a": 3, "group": 0} +{"id":65, "a": 129, "group": 16} +{"id":121, "a": 241, "group": 30} +{"id":202, "a": 402, "group": 50} +{"id":753, "a": 1505, "group": 188} +{"id":369, "a": 737, "group": 92} +{"id":765, "a": 1529, "group": 191} +{"id":661, "a": 1321, "group": 165} +{"id":342, "a": 682, "group": 85} +{"id":442, "a": 882, "group": 110} +{"id":592, "a": 1184, "group": 148} +{"id":717, "a": 1433, "group": 179} +{"id":608, "a": 1216, "group": 152} +{"id":72, "a": 144, "group": 18} +{"id":698, "a": 1394, "group": 174} +{"id":560, "a": 1120, "group": 140} +{"id":809, "a": 1617, "group": 202} +{"id":721, "a": 1441, "group": 180} +{"id":176, "a": 352, "group": 44} +{"id":87, "a": 171, "group": 21} +{"id":10, "a": 18, "group": 2} +{"id":414, "a": 826, "group": 103} +{"id":548, "a": 1096, "group": 137} +{"id":37, "a": 73, "group": 9} +{"id":183, "a": 363, "group": 45} +{"id":559, "a": 1115, "group": 139} +{"id":736, "a": 1472, "group": 184} +{"id":146, "a": 290, "group": 36} +{"id":776, "a": 1552, "group": 194} +{"id":505, "a": 1009, "group": 126} +{"id":993, "a": 1985, "group": 248} +{"id":992, "a": 1984, "group": 248} +{"id":312, "a": 624, "group": 78} +{"id":166, "a": 330, "group": 41} +{"id":696, "a": 1392, "group": 174} +{"id":27, "a": 51, "group": 6} +{"id":269, "a": 537, "group": 67} +{"id":139, "a": 275, "group": 34} +{"id":504, "a": 1008, "group": 126} +{"id":43, "a": 83, "group": 10} +{"id":469, "a": 937, "group": 117} +{"id":832, "a": 1664, "group": 208} +{"id":380, "a": 760, "group": 95} +{"id":168, "a": 336, "group": 42} +{"id":768, "a": 1536, "group": 192} +{"id":42, "a": 82, "group": 10} +{"id":489, "a": 977, "group": 122} +{"id":396, "a": 792, "group": 99} +{"id":584, "a": 1168, "group": 146} +{"id":975, "a": 1947, "group": 243} +{"id":359, "a": 715, "group": 89} +{"id":220, "a": 440, "group": 55} +{"id":797, "a": 1593, "group": 199} +{"id":298, "a": 594, "group": 74} +{"id":486, "a": 970, "group": 121} +{"id":997, "a": 1993, "group": 249} +{"id":790, "a": 1578, "group": 197} +{"id":453, "a": 905, "group": 113} +{"id":735, "a": 1467, "group": 183} +{"id":24, "a": 48, "group": 6} +{"id":502, "a": 1002, "group": 125} +{"id":939, "a": 1875, "group": 234} +{"id":314, "a": 626, "group": 78} +{"id":457, "a": 913, "group": 114} +{"id":132, "a": 264, "group": 33} +{"id":50, "a": 98, "group": 12} +{"id":454, "a": 906, "group": 113} +{"id":576, "a": 1152, "group": 144} +{"id":881, "a": 1761, "group": 220} +{"id":633, "a": 1265, "group": 158} +{"id":353, "a": 705, "group": 88} +{"id":934, "a": 1866, "group": 233} +{"id":620, "a": 1240, "group": 155} +{"id":167, "a": 331, "group": 41} +{"id":579, "a": 1155, "group": 144} +{"id":99, "a": 195, "group": 24} +{"id":799, "a": 1595, "group": 199} +{"id":762, "a": 1522, "group": 190} +{"id":547, "a": 1091, "group": 136} +{"id":100, "a": 200, "group": 25} +{"id":154, "a": 306, "group": 38} +{"id":778, "a": 1554, "group": 194} +{"id":151, "a": 299, "group": 37} +{"id":570, "a": 1138, "group": 142} +{"id":888, "a": 1776, "group": 222} +{"id":665, "a": 1329, "group": 166} +{"id":44, "a": 88, "group": 11} +{"id":670, "a": 1338, "group": 167} +{"id":412, "a": 824, "group": 103} +{"id":984, "a": 1968, "group": 246} +{"id":659, "a": 1315, "group": 164} +{"id":307, "a": 611, "group": 76} +{"id":197, "a": 393, "group": 49} +{"id":306, "a": 610, "group": 76} +{"id":468, "a": 936, "group": 117} +{"id":262, "a": 522, "group": 65} +{"id":483, "a": 963, "group": 120} +{"id":610, "a": 1218, "group": 152} +{"id":507, "a": 1011, "group": 126} +{"id":609, "a": 1217, "group": 152} diff --git a/exec/java-exec/src/test/resources/window/oneKeyCount.json b/exec/java-exec/src/test/resources/window/oneKeyCount.json new file mode 100644 index 000000000..d8965fbe1 --- /dev/null +++ b/exec/java-exec/src/test/resources/window/oneKeyCount.json @@ -0,0 +1,43 @@ +{ + head:{ + type:"APACHE_DRILL_PHYSICAL", + version:"1", + generator:{ + type:"manual" + } + }, + graph:[ + { + @id:1, + pop:"fs-scan", + format: {type: "json"}, + storage:{type: "file", connection: "file:///"}, + files:["#{DATA_FILE}"] + }, + { + @id:2, + child: 1, + pop:"sort", + orderings: [ + {expr: "a"} + ] + }, + { + @id:3, + child: 2, + pop:"window", + within: [ + { ref: "a", expr: "a" } + ], + aggregations: [ + { ref: "cnt", expr:"count(1)" }, + { ref: "sum", expr:"sum(b)" } + ] + }, + { + @id: 4, + child: 3, + pop: "screen" + } + ] +}
\ No newline at end of file diff --git a/exec/java-exec/src/test/resources/window/oneKeyCountData.json b/exec/java-exec/src/test/resources/window/oneKeyCountData.json new file mode 100644 index 000000000..3c0115e74 --- /dev/null +++ b/exec/java-exec/src/test/resources/window/oneKeyCountData.json @@ -0,0 +1,4 @@ +{"a": 1, "b": 100} + {"a": 1, "b": 50} + {"a": 2, "b": 25} + {"a": 2, "b": 50}
\ No newline at end of file diff --git a/exec/java-exec/src/test/resources/window/oneKeyCountMultiBatch.json b/exec/java-exec/src/test/resources/window/oneKeyCountMultiBatch.json new file mode 100644 index 000000000..069bc1f4f --- /dev/null +++ b/exec/java-exec/src/test/resources/window/oneKeyCountMultiBatch.json @@ -0,0 +1,72 @@ +{ + "head" : { + "version" : 1, + "generator" : { + "type" : "DefaultSqlHandler", + "info" : "" + }, + "type" : "APACHE_DRILL_PHYSICAL", + "options" : [ ], + "resultMode" : "EXEC" + }, + "graph" : [{ + @id:1, + pop:"fs-scan", + format: {type: "json"}, + storage:{type: "file", connection: "file:///"}, + files:["#{DATA_FILE}"] + }, + { + @id:2, + child: 1, + pop:"sort", + orderings: [ + {expr: "group"}, + {expr: "a"} + ] + }, + { + "pop" : "window", + "@id" : 3, + "child" : 2, + "aggregations" : [ { + "ref" : "`w0$o0`", + "expr" : "count(`a`) " + }, { + "ref" : "`w0$o1`", + "expr" : "$sum0(`a`) " + } ], + "start" : -9223372036854775808, + "end" : -9223372036854775808, + "initialAllocation" : 1000000, + "maxAllocation" : 10000000000, + "withins" : [ { + "ref" : "`group`", + "expr" : "`group`" + } ] + }, { + "pop" : "project", + "@id" : 4, + "exprs" : [ { + "ref" : "`output`", + "expr" : " ( if (greater_than(`w0$o0`, 0) ) then (`w0$o1` ) else (NULL) end ) " + }, + { + "ref" : "cnt", + "expr": "w0$o0" + }, + { + "ref" : "sum", + "expr": "w0$o1" + }], + "child" : 3, + "initialAllocation" : 1000000, + "maxAllocation" : 10000000000 + }, { + "pop" : "screen", + "@id" : 5, + "child" : 4, + "initialAllocation" : 1000000, + "maxAllocation" : 10000000000 + } ] +}
\ No newline at end of file diff --git a/exec/java-exec/src/test/resources/window/twoKeys.json b/exec/java-exec/src/test/resources/window/twoKeys.json new file mode 100644 index 000000000..6282ad244 --- /dev/null +++ b/exec/java-exec/src/test/resources/window/twoKeys.json @@ -0,0 +1,44 @@ +{ + head:{ + type:"APACHE_DRILL_PHYSICAL", + version:"1", + generator:{ + type:"manual" + } + }, + graph:[ + { + @id:1, + pop:"fs-scan", + format: {type: "json"}, + storage:{type: "file", connection: "file:///"}, + files:["#{DATA_FILE}"] + }, + { + @id:2, + child: 1, + pop:"sort", + orderings: [ + {expr: "a"} + ] + }, + { + @id:3, + child: 2, + pop:"window", + within: [ + { ref: "a", expr: "a" }, + { ref: "b", expr: "b" } + ], + aggregations: [ + { ref: "cnt", expr:"count(1)" }, + { ref: "sum", expr:"sum(c)" } + ] + }, + { + @id: 4, + child: 3, + pop: "screen" + } + ] +}
\ No newline at end of file diff --git a/exec/java-exec/src/test/resources/window/twoKeysData.json b/exec/java-exec/src/test/resources/window/twoKeysData.json new file mode 100644 index 000000000..fd09236f3 --- /dev/null +++ b/exec/java-exec/src/test/resources/window/twoKeysData.json @@ -0,0 +1,8 @@ +{"a": 1, "b": "group1", "c": 5} +{"a": 1, "b": "group1", "c": 10} +{"a": 1, "b": "group2", "c": 15} +{"a": 1, "b": "group2", "c": 20} +{"a": 2, "b": "group3", "c": 25} +{"a": 2, "b": "group3", "c": 30} +{"a": 2, "b": "group4", "c": 35} +{"a": 2, "b": "group4", "c": 40}
\ No newline at end of file |