diff options
author | Cliff Buchanan <cbuchanan@maprtech.com> | 2018-10-16 15:17:43 -0700 |
---|---|---|
committer | Gautam Parai <gparai@maprtech.com> | 2019-02-28 12:01:11 -0800 |
commit | 3233d8aaff57ac71bd3b726efcd5fdaa92aef861 (patch) | |
tree | 1902365b3632531738acb6229400e2220cba8f72 | |
parent | 4627973bde9847a4eb2672c44941136c167326a1 (diff) |
DRILL-1328: Support table statistics
53 files changed, 2356 insertions, 53 deletions
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java index aeb117a8a..1c30264fd 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/TableFormatPlugin.java @@ -94,7 +94,7 @@ public abstract class TableFormatPlugin implements FormatPlugin { @Override public AbstractWriter getWriter(PhysicalOperator child, String location, - List<String> partitionColumns) throws IOException { + boolean append, List<String> partitionColumns) throws IOException { throw new UnsupportedOperationException(); } diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java index 206954bf3..76466ab65 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java @@ -64,7 +64,7 @@ public class StreamsFormatPlugin extends TableFormatPlugin { @Override public AbstractWriter getWriter(PhysicalOperator child, String location, - List<String> partitionColumns) throws IOException { + boolean append, List<String> partitionColumns) throws IOException { throw new UnsupportedOperationException(); } diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml index df150e562..d7f7393f2 100644 --- a/exec/java-exec/pom.xml +++ b/exec/java-exec/pom.xml @@ -352,6 +352,11 @@ <artifactId>commons-compiler</artifactId> </dependency> <dependency> + <groupId>com.clearspring.analytics</groupId> + <artifactId>stream</artifactId> + <version>2.7.0</version> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <exclusions> diff --git a/exec/java-exec/src/main/codegen/data/Parser.tdd b/exec/java-exec/src/main/codegen/data/Parser.tdd index 820ecb5b5..ec56af455 100644 --- a/exec/java-exec/src/main/codegen/data/Parser.tdd +++ b/exec/java-exec/src/main/codegen/data/Parser.tdd @@ -38,6 +38,30 @@ "IF", "JAR", "PROPERTIES" + "ANALYZE", + "COMPUTE", + "ESTIMATE", + "STATISTICS", + "COLUMNS", + "SAMPLE" + ] + + # List of keywords from "keywords" section that are not reserved by SQL:2003 standard. + # Example: "DATABASES", "TABLES" are keywords but are not reserved by SQL:2003 standard. + # First keyword that starts the statement should be a reserved keyword, otherwise the current parser + # ends up considering it as a expression and fails. + nonReservedKeywords: [ + "DATABASES", + "REPLACE", + "SCHEMAS", + "TABLES", + "FILES", + "METADATA", + "COMPUTE", + "ESTIMATE", + "STATISTICS", + "COLUMNS", + "SAMPLE" ] # List of methods for parsing custom SQL statements. @@ -52,7 +76,9 @@ "SqlShowFiles()", "SqlRefreshMetadata()", "SqlCreateFunction()", - "SqlDropFunction()" + "SqlDropFunction()", + "SqlRefreshMetadata()", + "SqlAnalyzeTable()" ] # List of methods for parsing custom literals. diff --git a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl index 8afc8f855..2606006d7 100644 --- a/exec/java-exec/src/main/codegen/includes/parserImpls.ftl +++ b/exec/java-exec/src/main/codegen/includes/parserImpls.ftl @@ -557,4 +557,48 @@ Pair<SqlNodeList, SqlNodeList> ParenthesizedCompoundIdentifierList() : return Pair.of(new SqlNodeList(list, getPos()), null); } } -</#if>
\ No newline at end of file +</#if> +/** + * Parses a analyze statement. + * ANALYZE TABLE tblname {COMPUTE | ESTIMATE} | STATISTICS FOR + * {ALL COLUMNS | COLUMNS (field1, field2, ...)} [ SAMPLE numeric PERCENT ] + */ +SqlNode SqlAnalyzeTable() : +{ + SqlParserPos pos; + SqlIdentifier tblName; + SqlLiteral estimate = null; + SqlNodeList fieldList = null; + SqlNumericLiteral percent = null; +} +{ + <ANALYZE> { pos = getPos(); } + <TABLE> + tblName = CompoundIdentifier() + ( + <COMPUTE> { estimate = SqlLiteral.createBoolean(false, pos); } + | + <ESTIMATE> { estimate = SqlLiteral.createBoolean(true, pos); } + ) + <STATISTICS> <FOR> + ( + ( <ALL> <COLUMNS> ) + | + ( <COLUMNS> fieldList = ParseRequiredFieldList("Table") ) + ) + [ + <SAMPLE> percent = UnsignedNumericLiteral() <PERCENT> + { + BigDecimal rate = percent.bigDecimalValue(); + if (rate.compareTo(BigDecimal.ZERO) <= 0 || + rate.compareTo(BigDecimal.valueOf(100L)) > 0) + { + throw new ParseException("Invalid percentage for ANALYZE TABLE"); + } + } + ] + { + if (percent == null) { percent = SqlLiteral.createExactNumeric("100.0", pos); } + return new SqlAnalyzeTable(pos, tblName, estimate, fieldList, percent); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillType.java b/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillType.java index 673e1c793..589e98295 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillType.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillType.java @@ -21,10 +21,9 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; public enum DotDrillType { - VIEW; + VIEW, // ,FORMAT - // ,STATS - + STATS; private final String ending; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillUtil.java index b6571dfe6..32759d2a8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/dotdrill/DotDrillUtil.java @@ -17,19 +17,18 @@ */ package org.apache.drill.exec.dotdrill; -import java.io.IOException; import java.io.FileNotFoundException; -import java.util.List; -import java.util.Arrays; +import java.io.IOException; import java.util.ArrayList; - +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.GlobPattern; import org.apache.hadoop.fs.Path; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; - public class DotDrillUtil { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DotDrillUtil.class); @@ -42,6 +41,9 @@ public class DotDrillUtil { * @return List of matched DotDrillFile objects */ private static List<DotDrillFile> getDrillFiles(DrillFileSystem fs, List<FileStatus> statuses, DotDrillType... types){ + if (statuses == null) { + return Collections.emptyList(); + } List<DotDrillFile> files = Lists.newArrayList(); for(FileStatus s : statuses){ DotDrillFile f = DotDrillFile.create(fs, s); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StatisticsAggrFunctions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StatisticsAggrFunctions.java new file mode 100644 index 000000000..c6430dd9c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StatisticsAggrFunctions.java @@ -0,0 +1,285 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + ******************************************************************************/ + +/* + * This class is automatically generated from AggrTypeFunctions2.tdd using FreeMarker. + */ + +package org.apache.drill.exec.expr.fn.impl; + +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.expr.DrillAggFunc; +import org.apache.drill.exec.expr.DrillSimpleFunc; +import org.apache.drill.exec.expr.annotations.FunctionTemplate; +import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling; +import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope; +import org.apache.drill.exec.expr.annotations.Output; +import org.apache.drill.exec.expr.annotations.Param; +import org.apache.drill.exec.expr.annotations.Workspace; +import org.apache.drill.exec.expr.holders.BigIntHolder; +import org.apache.drill.exec.expr.holders.NullableBigIntHolder; +import org.apache.drill.exec.expr.holders.NullableVarBinaryHolder; +import org.apache.drill.exec.expr.holders.ObjectHolder; +import org.apache.drill.exec.vector.complex.reader.FieldReader; + +import javax.inject.Inject; + +@SuppressWarnings("unused") +public class StatisticsAggrFunctions { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StatisticsAggrFunctions.class); + + @FunctionTemplate(name = "statcount", scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE) + public static class StatCount implements DrillAggFunc { + @Param FieldReader in; + @Workspace BigIntHolder count; + @Output NullableBigIntHolder out; + + @Override + public void setup() { + count = new BigIntHolder(); + } + + @Override + public void add() { + count.value++; + } + + @Override + public void output() { + out.isSet = 1; + out.value = count.value; + } + + @Override + public void reset() { + count.value = 0; + } + } + + @FunctionTemplate(name = "nonnullstatcount", scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE) + public static class NonNullStatCount implements DrillAggFunc { + @Param FieldReader in; + @Workspace BigIntHolder count; + @Output NullableBigIntHolder out; + + @Override + public void setup() { + count = new BigIntHolder(); + } + + @Override + public void add() { + if (in.isSet()) { + count.value++; + } + } + + @Override + public void output() { + out.isSet = 1; + out.value = count.value; + } + + @Override + public void reset() { + count.value = 0; + } + } + + @FunctionTemplate(name = "hll", scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE) + public static class HllFieldReader implements DrillAggFunc { + @Param FieldReader in; + @Workspace ObjectHolder work; + @Output NullableVarBinaryHolder out; + @Inject DrillBuf buffer; + + @Override + public void setup() { + work = new ObjectHolder(); + work.obj = new com.clearspring.analytics.stream.cardinality.HyperLogLog(10); + } + + @Override + public void add() { + if (work.obj != null) { + com.clearspring.analytics.stream.cardinality.HyperLogLog hll = + (com.clearspring.analytics.stream.cardinality.HyperLogLog) work.obj; + int mode = in.getType().getMode().getNumber(); + int type = in.getType().getMinorType().getNumber(); + + switch (mode) { + case org.apache.drill.common.types.TypeProtos.DataMode.OPTIONAL_VALUE: + if (!in.isSet()) { + hll.offer(null); + break; + } + // fall through // + case org.apache.drill.common.types.TypeProtos.DataMode.REQUIRED_VALUE: + switch (type) { + case org.apache.drill.common.types.TypeProtos.MinorType.VARCHAR_VALUE: + hll.offer(in.readText().toString()); + break; + default: + work.obj = null; + } + break; + default: + work.obj = null; + } + } + } + + @Override + public void output() { + if (work.obj != null) { + com.clearspring.analytics.stream.cardinality.HyperLogLog hll = + (com.clearspring.analytics.stream.cardinality.HyperLogLog) work.obj; + + try { + byte[] ba = hll.getBytes(); + out.buffer = buffer.reallocIfNeeded(ba.length); + out.start = 0; + out.end = ba.length; + out.buffer.setBytes(0, ba); + out.isSet = 1; + } catch (java.io.IOException e) { + throw new org.apache.drill.common.exceptions.DrillRuntimeException("Failed to get HyperLogLog output", e); + } + } else { + out.isSet = 0; + } + } + + @Override + public void reset() { + work.obj = new com.clearspring.analytics.stream.cardinality.HyperLogLog(10); + } + } + + + @FunctionTemplate(name = "ndv", scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE) + public static class NdvVarBinary implements DrillAggFunc { + @Param + FieldReader in; + @Workspace + ObjectHolder work; + @Output + NullableBigIntHolder out; + + @Override + public void setup() { + work = new ObjectHolder(); + work.obj = new com.clearspring.analytics.stream.cardinality.HyperLogLog(10); + } + + @Override + public void add() { + if (work.obj != null) { + com.clearspring.analytics.stream.cardinality.HyperLogLog hll = + (com.clearspring.analytics.stream.cardinality.HyperLogLog) work.obj; + int mode = in.getType().getMode().getNumber(); + int type = in.getType().getMinorType().getNumber(); + + switch (mode) { + case org.apache.drill.common.types.TypeProtos.DataMode.OPTIONAL_VALUE: + if (!in.isSet()) { + hll.offer(null); + break; + } + // fall through // + case org.apache.drill.common.types.TypeProtos.DataMode.REQUIRED_VALUE: + switch (type) { + case org.apache.drill.common.types.TypeProtos.MinorType.VARCHAR_VALUE: + hll.offer(in.readText().toString()); + break; + case org.apache.drill.common.types.TypeProtos.MinorType.FLOAT8_VALUE: + hll.offer(in.readDouble()); + break; + case org.apache.drill.common.types.TypeProtos.MinorType.INT_VALUE: + hll.offer(in.readInteger()); + break; + case org.apache.drill.common.types.TypeProtos.MinorType.BIGINT_VALUE: + hll.offer(in.readLong()); + break; + case org.apache.drill.common.types.TypeProtos.MinorType.DATE_VALUE: + case org.apache.drill.common.types.TypeProtos.MinorType.TIMESTAMP_VALUE: + case org.apache.drill.common.types.TypeProtos.MinorType.TIME_VALUE: + case org.apache.drill.common.types.TypeProtos.MinorType.TIMETZ_VALUE: + hll.offer(in.readLocalDateTime()); + break; + case org.apache.drill.common.types.TypeProtos.MinorType.VARBINARY_VALUE: + hll.offer(in.readByteArray()); + break; + default: + work.obj = null; + } + break; + default: + work.obj = null; + } + } + } + + @Override + public void output() { + if (work.obj != null) { + com.clearspring.analytics.stream.cardinality.HyperLogLog hll = + (com.clearspring.analytics.stream.cardinality.HyperLogLog) work.obj; + + out.isSet = 1; + out.value = hll.cardinality(); + } else { + out.isSet = 0; + } + } + + @Override + public void reset() { + work.obj = new com.clearspring.analytics.stream.cardinality.HyperLogLog(10); + } + } + + + @FunctionTemplate(name = "hll_decode", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) + public static class HllDecode implements DrillSimpleFunc { + + @Param + NullableVarBinaryHolder in; + @Output + BigIntHolder out; + + @Override + public void setup() { + } + + public void eval() { + out.value = -1; + + if (in.isSet != 0) { + byte[] din = new byte[in.end - in.start]; + in.buffer.getBytes(in.start, din); + try { + out.value = com.clearspring.analytics.stream.cardinality.HyperLogLog.Builder.build(din).cardinality(); + } catch (java.io.IOException e) { + throw new org.apache.drill.common.exceptions.DrillRuntimeException("Failure evaluation hll_decode", e); + } + } + } + } + +}
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java index c3e6bdaac..d7706366a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java @@ -288,6 +288,10 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem return new PartitionExplorerImpl(getRootSchema()); } + public DrillbitContext getDrillbitContext() { + return drillbitContext; + } + @Override public ValueHolder getConstantValueHolder(String value, MinorType type, Function<DrillBuf, ValueHolder> holderInitializer) { if (!constantValueHolderCache.containsKey(value)) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java index 262e7e5ba..85d2a2ba0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractPhysicalVisitor.java @@ -35,11 +35,13 @@ import org.apache.drill.exec.physical.config.RowKeyJoinPOP; import org.apache.drill.exec.physical.config.Screen; import org.apache.drill.exec.physical.config.SingleSender; import org.apache.drill.exec.physical.config.Sort; +import org.apache.drill.exec.physical.config.StatisticsAggregate; import org.apache.drill.exec.physical.config.StreamingAggregate; import org.apache.drill.exec.physical.config.Trace; import org.apache.drill.exec.physical.config.UnionAll; import org.apache.drill.exec.physical.config.UnnestPOP; import org.apache.drill.exec.physical.config.UnorderedReceiver; +import org.apache.drill.exec.physical.config.UnpivotMaps; import org.apache.drill.exec.physical.config.Values; import org.apache.drill.exec.physical.config.WindowPOP; @@ -97,6 +99,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme } @Override + public T visitStatisticsAggregate(StatisticsAggregate agg, X value) throws E { + return visitOp(agg, value); + } + + @Override public T visitHashAggregate(HashAggregate agg, X value) throws E { return visitOp(agg, value); } @@ -215,6 +222,11 @@ public abstract class AbstractPhysicalVisitor<T, X, E extends Throwable> impleme } @Override + public T visitUnpivot(UnpivotMaps op, X value) throws E { + return visitOp(op, value); + } + + @Override public T visitOp(PhysicalOperator op, X value) throws E{ throw new UnsupportedOperationException(String.format( "The PhysicalVisitor of type %s does not currently support visiting the PhysicalOperator type %s.", this diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java index 77a87c278..a21f578d5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalVisitor.java @@ -35,11 +35,13 @@ import org.apache.drill.exec.physical.config.RowKeyJoinPOP; import org.apache.drill.exec.physical.config.Screen; import org.apache.drill.exec.physical.config.SingleSender; import org.apache.drill.exec.physical.config.Sort; +import org.apache.drill.exec.physical.config.StatisticsAggregate; import org.apache.drill.exec.physical.config.StreamingAggregate; import org.apache.drill.exec.physical.config.Trace; import org.apache.drill.exec.physical.config.UnionAll; import org.apache.drill.exec.physical.config.UnnestPOP; import org.apache.drill.exec.physical.config.UnorderedReceiver; +import org.apache.drill.exec.physical.config.UnpivotMaps; import org.apache.drill.exec.physical.config.Values; import org.apache.drill.exec.physical.config.WindowPOP; @@ -68,8 +70,10 @@ public interface PhysicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> { public RETURN visitRowKeyJoin(RowKeyJoinPOP join, EXTRA value) throws EXCEP; public RETURN visitReceiver(Receiver receiver, EXTRA value) throws EXCEP; public RETURN visitStreamingAggregate(StreamingAggregate agg, EXTRA value) throws EXCEP; + public RETURN visitStatisticsAggregate(StatisticsAggregate agg, EXTRA value) throws EXCEP; public RETURN visitHashAggregate(HashAggregate agg, EXTRA value) throws EXCEP; public RETURN visitWriter(Writer op, EXTRA value) throws EXCEP; + public RETURN visitUnpivot(UnpivotMaps op, EXTRA value) throws EXCEP; public RETURN visitValues(Values op, EXTRA value) throws EXCEP; public RETURN visitOp(PhysicalOperator op, EXTRA value) throws EXCEP; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/StatisticsAggregate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/StatisticsAggregate.java new file mode 100644 index 000000000..95ee6bff7 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/StatisticsAggregate.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.config; + +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; + +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +import java.util.List; + +@JsonTypeName("statistics-aggregate") +public class StatisticsAggregate extends StreamingAggregate { + // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StatisticsAggregate.class); + + private final List<String> functions; + + @JsonCreator + public StatisticsAggregate( + @JsonProperty("child") PhysicalOperator child, + @JsonProperty("functions") List<String> functions) { + super(child, null, null, 0.f); + this.functions = ImmutableList.copyOf(functions); + } + + public List<String> getFunctions() { + return functions; + } + + @Override + public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) + throws E { + return physicalVisitor.visitStatisticsAggregate(this, value); + } + + @Override + protected PhysicalOperator getNewWithChild(PhysicalOperator child) { + return new StatisticsAggregate(child, functions); + } + + @Override + public int getOperatorType() { + return CoreOperatorType.STATISTICS_AGGREGATE_VALUE; + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnpivotMaps.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnpivotMaps.java new file mode 100644 index 000000000..ac71b11ce --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnpivotMaps.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.config; + +import java.util.List; + +import org.apache.drill.exec.physical.base.AbstractSingle; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.base.PhysicalVisitor; +import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName("unpivot-maps") +public class UnpivotMaps extends AbstractSingle { + private final List<String> mapFieldsNames; + + @JsonCreator + public UnpivotMaps(@JsonProperty("child") PhysicalOperator child, List<String> mapFieldsNames) { + super(child); + this.mapFieldsNames = mapFieldsNames; + } + + public List<String> getMapFieldNames() { + return mapFieldsNames; + } + + @Override + public <T, X, E extends Throwable> T accept(PhysicalVisitor<T, X, E> physicalVisitor, X value) throws E { + return physicalVisitor.visitUnpivot(this, value); + } + + @Override + protected PhysicalOperator getNewWithChild(PhysicalOperator child) { + return new UnpivotMaps(child, mapFieldsNames); + } + + @Override + public int getOperatorType() { + return CoreOperatorType.UNPIVOT_MAPS_VALUE; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java index 1d1d3cb16..396fd3692 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/InternalBatch.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.aggregate; import java.util.Iterator; +import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RecordBatch; @@ -31,6 +32,7 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(InternalBatch.class); private final VectorContainer container; + private final RecordBatch incoming; private final BatchSchema schema; private final SelectionVector2 sv2; private final SelectionVector4 sv4; @@ -54,6 +56,7 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{ this.sv2 = null; } this.schema = incoming.getSchema(); + this.incoming = incoming; this.container = VectorContainer.getTransferClone(incoming, ignoreWrappers, oContext); } @@ -88,4 +91,7 @@ public class InternalBatch implements Iterable<VectorWrapper<?>>{ return container.getValueAccessorById(clazz, fieldIds); } + public FragmentContext getContext() { + return incoming.getContext(); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StatisticsAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StatisticsAggBatch.java new file mode 100644 index 000000000..ea5a7b3e4 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StatisticsAggBatch.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.aggregate; + +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import com.sun.codemodel.JExpr; +import org.apache.drill.common.expression.ErrorCollector; +import org.apache.drill.common.expression.ErrorCollectorImpl; +import org.apache.drill.common.expression.FunctionCallFactory; +import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.ValueExpressions; +import org.apache.drill.exec.exception.ClassTransformationException; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.ClassGenerator; +import org.apache.drill.exec.expr.CodeGenerator; +import org.apache.drill.exec.expr.ExpressionTreeMaterializer; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.expr.ValueVectorWriteExpression; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.StatisticsAggregate; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.TypedFieldId; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.FieldIdUtil; +import org.apache.drill.exec.vector.complex.MapVector; + +import java.io.IOException; +import java.util.List; + +/** + * TODO: This needs cleanup. Currently the key values are constants and we compare the constants for + * every record. Seems unnecessary. + * + * Example input and output: + * Schema of incoming batch: region_id (VARCHAR), sales_city (VARCHAR), cnt (BIGINT) + * Schema of output: + * "schema" : BIGINT - Schema number. For each schema change this number is incremented. + * "computed" : BIGINT - What time is it computed? + * "columns" : MAP - Column names + * "region_id" : VARCHAR + * "sales_city" : VARCHAR + * "cnt" : VARCHAR + * "statscount" : MAP + * "region_id" : BIGINT - statscount(region_id) - aggregation over all values of region_id in incoming batch + * "sales_city" : BIGINT - statscount(sales_city) + * "cnt" : BIGINT - statscount(cnt) + * "nonnullstatcount" : MAP + * "region_id" : BIGINT - nonnullstatcount(region_id) + * "sales_city" : BIGINT - nonnullstatcount(sales_city) + * "cnt" : BIGINT - nonnullstatcount(cnt) + * .... another map for next stats function .... + */ +public class StatisticsAggBatch extends StreamingAggBatch { + private List<String> functions; + private int schema = 0; + + public StatisticsAggBatch(StatisticsAggregate popConfig, RecordBatch incoming, FragmentContext context) + throws OutOfMemoryException { + super(popConfig, incoming, context); + this.functions = popConfig.getFunctions(); + } + + private void createKeyColumn(String name, LogicalExpression expr, List<LogicalExpression> keyExprs, List<TypedFieldId> keyOutputIds) + throws SchemaChangeException { + ErrorCollector collector = new ErrorCollectorImpl(); + + LogicalExpression mle = ExpressionTreeMaterializer.materialize(expr, incoming, collector, context.getFunctionRegistry()); + + MaterializedField outputField = MaterializedField.create(name, mle.getMajorType()); + ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); + + keyExprs.add(mle); + keyOutputIds.add(container.add(vector)); + + if (collector.hasErrors()) { + throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } + } + + private void createNestedKeyColumn(MapVector parent, String name, LogicalExpression expr, List<LogicalExpression> keyExprs, List<TypedFieldId> keyOutputIds) + throws SchemaChangeException { + ErrorCollector collector = new ErrorCollectorImpl(); + + LogicalExpression mle = ExpressionTreeMaterializer.materialize(expr, incoming, collector, context.getFunctionRegistry()); + + Class<? extends ValueVector> vvc = + TypeHelper.getValueVectorClass(mle.getMajorType().getMinorType(), mle.getMajorType().getMode()); + + ValueVector vv = parent.addOrGet(name, mle.getMajorType(), vvc); + + TypedFieldId pfid = container.getValueVectorId(SchemaPath.getSimplePath(parent.getField().getName())); + assert pfid.getFieldIds().length == 1; + TypedFieldId.Builder builder = TypedFieldId.newBuilder(); + builder.addId(pfid.getFieldIds()[0]); + TypedFieldId id = + FieldIdUtil.getFieldIdIfMatches(parent, builder, true, + SchemaPath.getSimplePath(vv.getField().getName()).getRootSegment()); + + keyExprs.add(mle); + keyOutputIds.add(id); + + if (collector.hasErrors()) { + throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } + } + + private void addMapVector(String name, MapVector parent, LogicalExpression expr, List<LogicalExpression> valueExprs) + throws SchemaChangeException { + ErrorCollector collector = new ErrorCollectorImpl(); + + LogicalExpression mle = ExpressionTreeMaterializer.materialize(expr, incoming, collector, context.getFunctionRegistry()); + + Class<? extends ValueVector> vvc = + TypeHelper.getValueVectorClass(mle.getMajorType().getMinorType(), mle.getMajorType().getMode()); + ValueVector vv = parent.addOrGet(name, mle.getMajorType(), vvc); + + TypedFieldId pfid = container.getValueVectorId(SchemaPath.getSimplePath(parent.getField().getName())); + assert pfid.getFieldIds().length == 1; + TypedFieldId.Builder builder = TypedFieldId.newBuilder(); + builder.addId(pfid.getFieldIds()[0]); + TypedFieldId id = FieldIdUtil.getFieldIdIfMatches(parent, builder, true, + SchemaPath.getSimplePath(vv.getField().getName()).getRootSegment()); + + valueExprs.add(new ValueVectorWriteExpression(id, mle, true)); + + if (collector.hasErrors()) { + throw new SchemaChangeException("Failure while materializing expression. " + collector.toErrorString()); + } + } + + private StreamingAggregator codegenAggregator(List<LogicalExpression> keyExprs, List<LogicalExpression> valueExprs, List<TypedFieldId> keyOutputIds) + throws SchemaChangeException, ClassTransformationException, IOException { + ClassGenerator<StreamingAggregator> cg = CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, context.getOptions()); + + LogicalExpression[] keyExprsArray = new LogicalExpression[keyExprs.size()]; + LogicalExpression[] valueExprsArray = new LogicalExpression[valueExprs.size()]; + TypedFieldId[] keyOutputIdsArray = new TypedFieldId[keyOutputIds.size()]; + + keyExprs.toArray(keyExprsArray); + valueExprs.toArray(valueExprsArray); + keyOutputIds.toArray(keyOutputIdsArray); + + setupIsSame(cg, keyExprsArray); + setupIsSameApart(cg, keyExprsArray); + addRecordValues(cg, valueExprsArray); + outputRecordKeys(cg, keyOutputIdsArray, keyExprsArray); + outputRecordKeysPrev(cg, keyOutputIdsArray, keyExprsArray); + + cg.getBlock("resetValues")._return(JExpr.TRUE); + getIndex(cg); + + container.buildSchema(SelectionVectorMode.NONE); + StreamingAggregator agg = context.getImplementationClass(cg); + agg.setup(oContext, incoming, this, ValueVector.MAX_ROW_COUNT); + return agg; + } + + protected StreamingAggregator createAggregatorInternal() + throws SchemaChangeException, ClassTransformationException, IOException { + container.clear(); + + List<LogicalExpression> keyExprs = Lists.newArrayList(); + List<LogicalExpression> valueExprs = Lists.newArrayList(); + List<TypedFieldId> keyOutputIds = Lists.newArrayList(); + + createKeyColumn("schema", + ValueExpressions.getBigInt(schema++), + keyExprs, + keyOutputIds + ); + createKeyColumn("computed", + ValueExpressions.getBigInt(System.currentTimeMillis()), + keyExprs, + keyOutputIds + ); + + MapVector cparent = new MapVector("column", oContext.getAllocator(), null); + container.add(cparent); + for (MaterializedField mf : incoming.getSchema()) { + createNestedKeyColumn( + cparent, + mf.getName(), + ValueExpressions.getChar(mf.getName(), 0), + keyExprs, + keyOutputIds + ); + } + + for (String func : functions) { + MapVector parent = new MapVector(func, oContext.getAllocator(), null); + container.add(parent); + + for (MaterializedField mf : incoming.getSchema()) { + List<LogicalExpression> args = Lists.newArrayList(); + args.add(SchemaPath.getSimplePath(mf.getName())); + LogicalExpression call = FunctionCallFactory.createExpression(func, args); + + addMapVector(mf.getName(), parent, call, valueExprs); + } + } + + return codegenAggregator(keyExprs, valueExprs, keyOutputIds); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StatisticsAggBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StatisticsAggBatchCreator.java new file mode 100644 index 000000000..aba325cb3 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StatisticsAggBatchCreator.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.aggregate; + +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import java.util.List; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.ExecutorFragmentContext; +import org.apache.drill.exec.physical.config.StatisticsAggregate; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.record.CloseableRecordBatch; +import org.apache.drill.exec.record.RecordBatch; + +@SuppressWarnings("unused") +public class StatisticsAggBatchCreator implements BatchCreator<StatisticsAggregate>{ + + @Override + public CloseableRecordBatch getBatch(ExecutorFragmentContext context, StatisticsAggregate config, List<RecordBatch> children) + throws ExecutionSetupException { + Preconditions.checkArgument(children.size() == 1); + return new StatisticsAggBatch(config, children.iterator().next(), context); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java index ffcfa78a7..e1e43bd10 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java @@ -77,8 +77,8 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.STOP; public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamingAggBatch.class); - private StreamingAggregator aggregator; - private final RecordBatch incoming; + protected StreamingAggregator aggregator; + protected final RecordBatch incoming; private List<BaseWriter.ComplexWriter> complexWriters; // // Streaming agg can be in (a) a normal pipeline or (b) it may be in a pipeline that is part of a subquery involving @@ -533,7 +533,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { private final MappingSet IS_SAME_I1 = new MappingSet("index1", null, IS_SAME, IS_SAME); private final MappingSet IS_SAME_I2 = new MappingSet("index2", null, IS_SAME, IS_SAME); - private void setupIsSame(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs) { + protected void setupIsSame(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs) { cg.setMappingSet(IS_SAME_I1); for (final LogicalExpression expr : keyExprs) { // first, we rewrite the evaluation stack for each side of the comparison. @@ -556,7 +556,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { private final MappingSet ISA_B1 = new MappingSet("b1Index", null, "b1", null, IS_SAME_PREV_INTERNAL_BATCH_READ, IS_SAME_PREV_INTERNAL_BATCH_READ); private final MappingSet ISA_B2 = new MappingSet("b2Index", null, "incoming", null, IS_SAME_PREV, IS_SAME_PREV); - private void setupIsSameApart(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs) { + protected void setupIsSameApart(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] keyExprs) { cg.setMappingSet(ISA_B1); for (final LogicalExpression expr : keyExprs) { // first, we rewrite the evaluation stack for each side of the comparison. @@ -578,7 +578,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { private final GeneratorMapping EVAL_OUTSIDE = GeneratorMapping.create("setupInterior", "outputRecordValues", "resetValues", "cleanup"); private final MappingSet EVAL = new MappingSet("index", "outIndex", "incoming", "outgoing", EVAL_INSIDE, EVAL_OUTSIDE, EVAL_INSIDE); - private void addRecordValues(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] valueExprs) { + protected void addRecordValues(ClassGenerator<StreamingAggregator> cg, LogicalExpression[] valueExprs) { cg.setMappingSet(EVAL); for (final LogicalExpression ex : valueExprs) { cg.addExpr(ex); @@ -587,7 +587,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { private final MappingSet RECORD_KEYS = new MappingSet(GeneratorMapping.create("setupInterior", "outputRecordKeys", null, null)); - private void outputRecordKeys(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs) { + protected void outputRecordKeys(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs) { cg.setMappingSet(RECORD_KEYS); for (int i = 0; i < keyExprs.length; i++) { cg.addExpr(new ValueVectorWriteExpression(keyOutputIds[i], keyExprs[i], true)); @@ -600,7 +600,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { private final GeneratorMapping PREVIOUS_KEYS = GeneratorMapping.create("outputRecordKeysPrev", "outputRecordKeysPrev", null, null); private final MappingSet RECORD_KEYS_PREV = new MappingSet("previousIndex", "outIndex", "previous", null, PREVIOUS_KEYS, PREVIOUS_KEYS); - private void outputRecordKeysPrev(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs) { + protected void outputRecordKeysPrev(ClassGenerator<StreamingAggregator> cg, TypedFieldId[] keyOutputIds, LogicalExpression[] keyExprs) { cg.setMappingSet(RECORD_KEYS_PREV); for (int i = 0; i < keyExprs.length; i++) { @@ -614,7 +614,7 @@ public class StreamingAggBatch extends AbstractRecordBatch<StreamingAggregate> { } } - private void getIndex(ClassGenerator<StreamingAggregator> g) { + protected void getIndex(ClassGenerator<StreamingAggregator> g) { switch (incoming.getSchema().getSelectionVectorMode()) { case FOUR_BYTE: { JVar var = g.declareClassField("sv4_", g.getModel()._ref(SelectionVector4.class)); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsBatchCreator.java new file mode 100644 index 000000000..733524f21 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsBatchCreator.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.unpivot; + +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import java.util.List; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.ExecutorFragmentContext; +import org.apache.drill.exec.physical.config.UnpivotMaps; +import org.apache.drill.exec.physical.impl.BatchCreator; +import org.apache.drill.exec.record.CloseableRecordBatch; +import org.apache.drill.exec.record.RecordBatch; + +@SuppressWarnings("unused") +public class UnpivotMapsBatchCreator implements BatchCreator<UnpivotMaps>{ + + @Override + public CloseableRecordBatch getBatch(ExecutorFragmentContext context, UnpivotMaps config, List<RecordBatch> children) + throws ExecutionSetupException { + Preconditions.checkArgument(children.size() == 1); + return new UnpivotMapsRecordBatch(config, children.iterator().next(), context); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java new file mode 100644 index 000000000..e98d70e20 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unpivot/UnpivotMapsRecordBatch.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.unpivot; + +import java.util.List; +import java.util.Map; +import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.TypeHelper; +import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.config.UnpivotMaps; +import org.apache.drill.exec.record.AbstractSingleRecordBatch; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.record.TransferPair; +import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.vector.ValueVector; +import org.apache.drill.exec.vector.complex.MapVector; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import org.apache.drill.shaded.guava.com.google.common.collect.Maps; + +/** + * TODO: This needs cleanup, especially in state transitions. + * + * Unpivot maps. Assumptions are: + * 1) all child vectors in a map are of same type. + * 2) Each map contains the same number of fields and field names are also same (types could be different). + * + * Example input and output: + * Schema of input: + * "schema" : BIGINT - Schema number. For each schema change this number is incremented. + * "computed" : BIGINT - What time is it computed? + * "columns" : MAP - Column names + * "region_id" : VARCHAR + * "sales_city" : VARCHAR + * "cnt" : VARCHAR + * "statscount" : MAP + * "region_id" : BIGINT - statscount(region_id) - aggregation over all values of region_id in incoming batch + * "sales_city" : BIGINT - statscount(sales_city) + * "cnt" : BIGINT - statscount(cnt) + * "nonnullstatcount" : MAP + * "region_id" : BIGINT - nonnullstatcount(region_id) + * "sales_city" : BIGINT - nonnullstatcount(sales_city) + * "cnt" : BIGINT - nonnullstatcount(cnt) + * .... another map for next stats function .... + * + * Schema of output: + * "schema" : BIGINT - Schema number. For each schema change this number is incremented. + * "computed" : BIGINT - What time is this computed? + * "column" : column name + * "statscount" : BIGINT + * "nonnullstatcount" : BIGINT + * .... one column for each map type ... + */ +public class UnpivotMapsRecordBatch extends AbstractSingleRecordBatch<UnpivotMaps> { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnpivotMapsRecordBatch.class); + + private final List<String> mapFieldsNames; + + private int keyIndex = 0; + private List<String> keyList = null; + + private Map<MaterializedField, Map<String, ValueVector>> dataSrcVecMap = null; + + // Map of non-map fields to VV in the incoming schema + private Map<MaterializedField, ValueVector> copySrcVecMap = null; + + private List<TransferPair> transferList; + private int recordCount = 0; + + public UnpivotMapsRecordBatch(UnpivotMaps pop, RecordBatch incoming, FragmentContext context) + throws OutOfMemoryException { + super(pop, context, incoming); + this.mapFieldsNames = pop.getMapFieldNames(); + } + + @Override + public int getRecordCount() { + return recordCount; + } + + @Override + public IterOutcome innerNext() { + if (keyIndex != 0) { + doWork(); + return IterOutcome.OK; + } else { + return super.innerNext(); + } + } + + public VectorContainer getOutgoingContainer() { + return this.container; + } + + private void doTransfer() { + final int inputCount = incoming.getRecordCount(); + + for (TransferPair tp : transferList) { + tp.splitAndTransfer(0, inputCount); + } + } + + @Override + protected IterOutcome doWork() { + int outRecordCount = incoming.getRecordCount(); + + prepareTransfers(); + doTransfer(); + + keyIndex = (keyIndex + 1) % keyList.size(); + recordCount = outRecordCount; + + if (keyIndex == 0) { + for (VectorWrapper w : incoming) { + w.clear(); + } + } + return IterOutcome.OK; + } + + /** + * Identify the list of fields within a map which are unpivoted as columns in output + */ + private void buildKeyList() { + List<String> lastMapKeyList = null; + for (VectorWrapper<?> vw : incoming) { + if (vw.getField().getType().getMinorType() != MinorType.MAP) { + continue; + } + + keyList = Lists.newArrayList(); + + for (ValueVector vv : vw.getValueVector()) { + keyList.add(vv.getField().getName()); + } + + if (lastMapKeyList == null) { + lastMapKeyList = keyList; + } else { + if (keyList.size() != lastMapKeyList.size() || !lastMapKeyList.containsAll(keyList)) { + throw new UnsupportedOperationException("Maps have different fields"); + } + } + } + } + + private void buildOutputContainer() { + dataSrcVecMap = Maps.newHashMap(); + copySrcVecMap = Maps.newHashMap(); + for (VectorWrapper<?> vw : incoming) { + MaterializedField ds = vw.getField(); + String col = vw.getField().getName(); + + if (!mapFieldsNames.contains(col)) { + MajorType mt = vw.getValueVector().getField().getType(); + MaterializedField mf = MaterializedField.create(col, mt); + container.add(TypeHelper.getNewVector(mf, oContext.getAllocator())); + copySrcVecMap.put(mf, vw.getValueVector()); + continue; + } + + MapVector mapVector = (MapVector) vw.getValueVector(); + assert mapVector.getPrimitiveVectors().size() > 0; + + MajorType mt = mapVector.iterator().next().getField().getType(); + MaterializedField mf = MaterializedField.create(col, mt); + assert !dataSrcVecMap.containsKey(mf); + container.add(TypeHelper.getNewVector(mf, oContext.getAllocator())); + + Map<String, ValueVector> m = Maps.newHashMap(); + dataSrcVecMap.put(mf, m); + + for (ValueVector vv : mapVector) { + String fieldName = vv.getField().getName(); + + if (!keyList.contains(fieldName)) { + throw new UnsupportedOperationException("Unpivot data vector " + + ds + " contains key " + fieldName + " not contained in key source!"); + } + + if (vv.getField().getType().getMinorType() == MinorType.MAP) { + throw new UnsupportedOperationException("Unpivot of nested map is not supported!"); + } + + m.put(fieldName, vv); + } + } + + container.buildSchema(incoming.getSchema().getSelectionVectorMode()); + } + + private void prepareTransfers() { + transferList = Lists.newArrayList(); + for (VectorWrapper<?> vw : container) { + MaterializedField mf = vw.getField(); + + ValueVector vv; + TransferPair tp; + if (dataSrcVecMap.containsKey(mf)) { + String k = keyList.get(keyIndex); + vv = dataSrcVecMap.get(mf).get(k); + tp = vv.makeTransferPair(vw.getValueVector()); + } else { + vv = copySrcVecMap.get(mf); + tp = vv.makeTransferPair(vw.getValueVector()); + } + + transferList.add(tp); + } + } + + @Override + protected boolean setupNewSchema() throws SchemaChangeException { + container.clear(); + + buildKeyList(); + buildOutputContainer(); + return true; + } + + @Override + public void dump() { + logger.error("UnpivotMapsRecordbatch[recordCount={}, container={}]", recordCount, container); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java index 6cdb18d2e..b6b0f58a0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java @@ -60,6 +60,7 @@ import org.apache.drill.exec.planner.logical.DrillValuesRule; import org.apache.drill.exec.planner.logical.DrillWindowRule; import org.apache.drill.exec.planner.logical.partition.ParquetPruneScanRule; import org.apache.drill.exec.planner.logical.partition.PruneScanRule; +import org.apache.drill.exec.planner.physical.AnalyzePrule; import org.apache.drill.exec.planner.physical.ConvertCountToDirectScan; import org.apache.drill.exec.planner.physical.LateralJoinPrule; import org.apache.drill.exec.planner.physical.DirectScanPrule; @@ -516,6 +517,7 @@ public enum PlannerPhase { ruleList.add(ValuesPrule.INSTANCE); ruleList.add(DirectScanPrule.INSTANCE); ruleList.add(RowKeyJoinPrule.INSTANCE); + ruleList.add(AnalyzePrule.INSTANCE); ruleList.add(UnnestPrule.INSTANCE); ruleList.add(LateralJoinPrule.INSTANCE); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java index cde49e4b9..42fbedb4a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java @@ -20,27 +20,28 @@ package org.apache.drill.exec.planner.common; import java.util.Collections; import java.util.HashSet; import java.util.List; - +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.CorrelationId; +import org.apache.calcite.rel.core.Join; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.logical.LogicalJoin; import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.ImmutableBitSet; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.expr.holders.IntHolder; -import org.apache.drill.exec.planner.cost.DrillCostBase; import org.apache.drill.exec.physical.impl.join.JoinUtils; import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinCategory; +import org.apache.drill.exec.planner.cost.DrillCostBase; import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory; import org.apache.drill.exec.planner.logical.DrillJoin; import org.apache.drill.exec.planner.physical.PrelUtil; -import org.apache.calcite.rel.core.Join; -import org.apache.calcite.rel.core.JoinRelType; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.plan.RelOptCluster; -import org.apache.calcite.plan.RelOptCost; -import org.apache.calcite.plan.RelOptPlanner; -import org.apache.calcite.plan.RelTraitSet; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rex.RexNode; - import org.apache.drill.shaded.guava.com.google.common.collect.Lists; /** @@ -100,9 +101,31 @@ public abstract class DrillJoinRelBase extends Join implements DrillJoin { public double estimateRowCount(RelMetadataQuery mq) { if (this.condition.isAlwaysTrue()) { return joinRowFactor * this.getLeft().estimateRowCount(mq) * this.getRight().estimateRowCount(mq); - } else { - return joinRowFactor * Math.max(this.getLeft().estimateRowCount(mq), this.getRight().estimateRowCount(mq)); } + + int[] joinFields = new int[2]; + + LogicalJoin jr = LogicalJoin.create(this.getLeft(), this.getRight(), this.getCondition(), + this.getVariablesSet(), this.getJoinType()); + + if (RelOptUtil.analyzeSimpleEquiJoin(jr, joinFields)) { + ImmutableBitSet leq = ImmutableBitSet.of(joinFields[0]); + ImmutableBitSet req = ImmutableBitSet.of(joinFields[1]); + + Double ldrc = mq.getDistinctRowCount(this.getLeft(), leq, null); + Double rdrc = mq.getDistinctRowCount(this.getRight(), req, null); + + Double lrc = mq.getRowCount(this.getLeft()); + Double rrc = mq.getRowCount(this.getRight()); + + if (ldrc != null && rdrc != null && lrc != null && rrc != null) { + return (lrc * rrc) / Math.max(ldrc, rdrc); + } + } + + return joinRowFactor * Math.max( + mq.getRowCount(left), + mq.getRowCount(right)); } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillStatsTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillStatsTable.java new file mode 100644 index 000000000..a22552b3e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillStatsTable.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.common; + +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.apache.drill.shaded.guava.com.google.common.collect.Maps; +import java.util.Map; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelVisitor; +import org.apache.calcite.rel.core.TableScan; +import org.apache.drill.exec.client.DrillClient; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.planner.logical.DrillTable; +import org.apache.drill.exec.server.DrillbitContext; + +/** + * Wraps the stats table info including schema and tableName. Also materializes stats from storage and keeps them in + * memory. + */ +public class DrillStatsTable { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillStatsTable.class); + + /** + * List of columns in stats table. + */ + public static final String COL_COLUMN = "column"; + public static final String COL_COMPUTED = "computed"; + public static final String COL_STATCOUNT = "statcount"; + public static final String COL_NDV = "ndv"; + + private final String schemaName; + private final String tableName; + + private final Map<String, Long> ndv = Maps.newHashMap(); + private double rowCount = -1; + + private boolean materialized = false; + + public DrillStatsTable(String schemaName, String tableName) { + this.schemaName = schemaName; + this.tableName = tableName; + } + + /** + * Get number of distinct values of given column. If stats are not present for the given column, a null is returned. + * + * Note: returned data may not be accurate. Accuracy depends on whether the table data has changed after the + * stats are computed. + * + * @param col + * @return + */ + public Double getNdv(String col) { + Preconditions.checkState(materialized, "Stats are not yet materialized."); + + final String upperCol = col.toUpperCase(); + final Long ndvCol = ndv.get(upperCol); + if (ndvCol != null) { + return Math.min(ndvCol, rowCount); + } + + return null; + } + + /** + * Get row count of the table. Returns null if stats are not present. + * + * Note: returned data may not be accurate. Accuracy depends on whether the table data has changed after the + * stats are computed. + * + * @return + */ + public Double getRowCount() { + Preconditions.checkState(materialized, "Stats are not yet materialized."); + return rowCount > 0 ? rowCount : null; + } + + /** + * Read the stats from storage and keep them in memory. + * @param context + * @throws Exception + */ + public void materialize(final QueryContext context) throws Exception { + if (materialized) { + return; + } + + final String fullTableName = "`" + schemaName + "`.`" + tableName + "`"; + final String sql = "SELECT a.* FROM " + fullTableName + " AS a INNER JOIN " + + "(SELECT `" + COL_COLUMN + "`, max(`" + COL_COMPUTED +"`) AS `" + COL_COMPUTED + "` " + + "FROM " + fullTableName + " GROUP BY `" + COL_COLUMN + "`) AS b " + + "ON a.`" + COL_COLUMN + "` = b.`" + COL_COLUMN +"` and a.`" + COL_COMPUTED + "` = b.`" + COL_COMPUTED + "`"; + + final DrillbitContext dc = context.getDrillbitContext(); + try(final DrillClient client = new DrillClient(dc.getConfig(), dc.getClusterCoordinator(), dc.getAllocator())) { + /*final Listener listener = new Listener(dc.getAllocator()); + + client.connect(); + client.runQuery(UserBitShared.QueryType.SQL, sql, listener); + + listener.waitForCompletion(); + + for (Map<String, String> r : listener.results) { + ndv.put(r.get(COL_COLUMN).toUpperCase(), Long.valueOf(r.get(COL_NDV))); + rowCount = Math.max(rowCount, Long.valueOf(r.get(COL_STATCOUNT))); + }*/ + } + + materialized = true; + } + + /** + * materialize on nodes that have an attached stats table + */ + public static class StatsMaterializationVisitor extends RelVisitor { + private QueryContext context; + + public static void materialize(final RelNode relNode, final QueryContext context) { + new StatsMaterializationVisitor(context).go(relNode); + } + + private StatsMaterializationVisitor(QueryContext context) { + this.context = context; + } + + @Override + public void visit(RelNode node, int ordinal, RelNode parent) { + if (node instanceof TableScan) { + try { + final DrillTable drillTable = node.getTable().unwrap(DrillTable.class); + final DrillStatsTable statsTable = drillTable.getStatsTable(); + if (statsTable != null) { + statsTable.materialize(context); + } + } catch (Exception e) { + // Log a warning and proceed. We don't want to fail a query. + logger.warn("Failed to materialize the stats. Continuing without stats.", e); + } + } + super.visit(node, ordinal, parent); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java index 373683f70..13f16008c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdDistinctRowCount.java @@ -17,17 +17,22 @@ */ package org.apache.drill.exec.planner.cost; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.metadata.ReflectiveRelMetadataProvider; +import org.apache.calcite.rel.metadata.RelColumnOrigin; import org.apache.calcite.rel.metadata.RelMdDistinctRowCount; import org.apache.calcite.rel.metadata.RelMetadataProvider; import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rex.RexNode; import org.apache.calcite.util.BuiltInMethod; import org.apache.calcite.util.ImmutableBitSet; +import org.apache.drill.exec.planner.common.DrillStatsTable; import org.apache.drill.exec.planner.logical.DrillScanRel; +import java.util.List; + public class DrillRelMdDistinctRowCount extends RelMdDistinctRowCount { private static final DrillRelMdDistinctRowCount INSTANCE = new DrillRelMdDistinctRowCount(); @@ -61,4 +66,51 @@ public class DrillRelMdDistinctRowCount extends RelMdDistinctRowCount { // Consistent with the estimation of Aggregate row count in RelMdRowCount : distinctRowCount = rowCount * 10%. return scan.estimateRowCount(mq) * 0.1; } + + public Double getDistinctRowCount(RelNode rel, RelMetadataQuery mq, ImmutableBitSet groupKey, RexNode predicate) { + if (rel instanceof DrillScanRel) { + return getDistinctRowCount((DrillScanRel) rel, mq, groupKey); + } else { + return super.getDistinctRowCount(rel, mq, groupKey, predicate); + } + } + + /** + * Estimates the number of rows which would be produced by a GROUP BY on the + * set of columns indicated by groupKey. + * column"). + */ + private Double getDistinctRowCount(DrillScanRel scan, RelMetadataQuery mq, ImmutableBitSet groupKey) { + if (scan.getDrillTable() == null || scan.getDrillTable().getStatsTable() == null) { + // If there is no table or metadata (stats) table associated with scan, estimate the distinct row count. + // Consistent with the estimation of Aggregate row count in RelMdRowCount : distinctRowCount = rowCount * 10%. + return scan.getRows() * 0.1; + } + + // TODO: may be we should get the column origin of each group by key before we look up it in metadata table? + List<RelColumnOrigin> cols = Lists.newArrayList(); + + if (groupKey.length() == 0) { + return new Double(0); + } + + DrillStatsTable md = scan.getDrillTable().getStatsTable(); + + final double rc = mq.getRowCount(scan); + double s = 1.0; + for (int i = 0; i < groupKey.length(); i++) { + final String colName = scan.getRowType().getFieldNames().get(i); + if (!groupKey.get(i) && colName.equals("*")) { + continue; + } + + Double d = md.getNdv(colName); + if (d == null) { + continue; + } + + s *= 1 - d / rc; + } + return new Double((1 - s) * rc); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdRowCount.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdRowCount.java index 7f15fb39b..343affb91 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdRowCount.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelMdRowCount.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.planner.cost; +import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.SingleRel; import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.Filter; @@ -31,6 +32,8 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.util.BuiltInMethod; import org.apache.calcite.util.ImmutableBitSet; import org.apache.drill.exec.planner.common.DrillLimitRelBase; +import org.apache.drill.exec.planner.common.DrillStatsTable; +import org.apache.drill.exec.planner.logical.DrillScanRel; public class DrillRelMdRowCount extends RelMdRowCount { private static final DrillRelMdRowCount INSTANCE = new DrillRelMdRowCount(); @@ -81,4 +84,20 @@ public class DrillRelMdRowCount extends RelMdRowCount { public Double getRowCount(Join rel, RelMetadataQuery mq) { return rel.estimateRowCount(mq); } + + public Double getRowCount(RelNode rel, RelMetadataQuery mq) { + if (rel instanceof DrillScanRel) { + return getRowCount((DrillScanRel)rel, mq); + } + return super.getRowCount(rel, mq); + } + + private Double getRowCount(DrillScanRel scanRel, RelMetadataQuery mq) { + final DrillStatsTable md = scanRel.getDrillTable().getStatsTable(); + if (md != null) { + return md.getRowCount(); + } + + return super.getRowCount(scanRel, mq); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAnalyzeRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAnalyzeRel.java new file mode 100644 index 000000000..5d570f3f2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillAnalyzeRel.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelOptCost; +import org.apache.calcite.plan.RelOptPlanner; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.InvalidRelException; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.SingleRel; +import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.drill.common.logical.data.Analyze; +import org.apache.drill.common.logical.data.LogicalOperator; +import org.apache.drill.exec.planner.cost.DrillCostBase; +import org.apache.drill.exec.planner.torel.ConversionContext; + +import java.util.List; + +/** + * Drill logical node for "Analyze". + */ +public class DrillAnalyzeRel extends SingleRel implements DrillRel { + + public DrillAnalyzeRel(RelOptCluster cluster, RelTraitSet traits, RelNode child) { + super(cluster, traits, child); + } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) { + final double dRows = mq.getRowCount(getInput()); + final double dCpu = dRows * DrillCostBase.COMPARE_CPU_COST; + final double dIo = 0; + return planner.getCostFactory().makeCost(dRows, dCpu, dIo); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new DrillAnalyzeRel(getCluster(), traitSet, sole(inputs)); + } + + @Override + public LogicalOperator implement(DrillImplementor implementor) { + final LogicalOperator inputOp = implementor.visitChild(this, 0, getInput()); + final Analyze rel = new Analyze(); + rel.setInput(inputOp); + + return rel; + } + + public static DrillAnalyzeRel convert(Analyze analyze, ConversionContext context) throws InvalidRelException { + RelNode input = context.toRel(analyze.getInput()); + return new DrillAnalyzeRel(context.getCluster(), context.getLogicalTraits(), input); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java index ed9b32fe2..afddbfc43 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillTable.java @@ -30,6 +30,7 @@ import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlNode; import org.apache.drill.common.JSONOptions; import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.planner.common.DrillStatsTable; import org.apache.drill.exec.physical.base.SchemalessScan; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.server.options.SessionOptionManager; @@ -45,6 +46,7 @@ public abstract class DrillTable implements Table { private final Object selection; private final StoragePlugin plugin; private final String userName; + private DrillStatsTable statsTable; private GroupScan scan; private SessionOptionManager options; @@ -131,6 +133,14 @@ public abstract class DrillTable implements Table { return Statistics.UNKNOWN; } + public DrillStatsTable getStatsTable() { + return statsTable; + } + + public void setStatsTable(DrillStatsTable statsTable) { + this.statsTable = statsTable; + } + public RelNode toRel(RelOptTable.ToRelContext context, RelOptTable table) { return new DrillScanRel(context.getCluster(), context.getCluster().traitSetOf(DrillRel.DRILL_LOGICAL), diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java index 23ea23fd5..6869616c8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/FileSystemCreateTableEntry.java @@ -46,6 +46,7 @@ public class FileSystemCreateTableEntry implements CreateTableEntry { private FileSystemConfig storageConfig; private FormatPlugin formatPlugin; private String location; + private boolean append; private final List<String> partitionColumns; private final StorageStrategy storageStrategy; @@ -53,6 +54,7 @@ public class FileSystemCreateTableEntry implements CreateTableEntry { public FileSystemCreateTableEntry(@JsonProperty("storageConfig") FileSystemConfig storageConfig, @JsonProperty("formatConfig") FormatPluginConfig formatConfig, @JsonProperty("location") String location, + @JsonProperty("append") boolean append, @JsonProperty("partitionColumn") List<String> partitionColumns, @JsonProperty("storageStrategy") StorageStrategy storageStrategy, @JacksonInject StoragePluginRegistry engineRegistry) @@ -67,11 +69,13 @@ public class FileSystemCreateTableEntry implements CreateTableEntry { public FileSystemCreateTableEntry(FileSystemConfig storageConfig, FormatPlugin formatPlugin, String location, + boolean append, List<String> partitionColumns, StorageStrategy storageStrategy) { this.storageConfig = storageConfig; this.formatPlugin = formatPlugin; this.location = location; + this.append = append; this.partitionColumns = partitionColumns; this.storageStrategy = storageStrategy; } @@ -94,7 +98,7 @@ public class FileSystemCreateTableEntry implements CreateTableEntry { formatPlugin.getName())).build(logger); } - AbstractWriter writer = formatPlugin.getWriter(child, location, partitionColumns); + AbstractWriter writer = formatPlugin.getWriter(child, location, append, partitionColumns); writer.setStorageStrategy(storageStrategy); return writer; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AnalyzePrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AnalyzePrule.java new file mode 100644 index 000000000..4cac5d946 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AnalyzePrule.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.physical; + +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.SingleRel; +import org.apache.drill.exec.planner.common.DrillStatsTable; +import org.apache.drill.exec.planner.logical.DrillAnalyzeRel; +import org.apache.drill.exec.planner.logical.DrillRel; +import org.apache.drill.exec.planner.logical.RelOptHelper; + +import java.util.List; + +public class AnalyzePrule extends Prule { + public static final RelOptRule INSTANCE = new AnalyzePrule(); + + private static final List<String> FUNCTIONS = ImmutableList.of( + "statcount", // total number of entries in the table + "nonnullstatcount", // total number of non-null entries in the table + "ndv", // total distinctive values in table + "hll" // HyperLogLog + ); + + public AnalyzePrule() { + super(RelOptHelper.some(DrillAnalyzeRel.class, DrillRel.DRILL_LOGICAL, RelOptHelper.any(RelNode.class)), "Prel.AnalyzePrule"); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final DrillAnalyzeRel analyze = (DrillAnalyzeRel) call.rel(0); + final RelNode input = call.rel(1); + + final RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(DrillDistributionTrait.SINGLETON); + final RelNode convertedInput = convert(input, traits); + + final StatsAggPrel statsAggPrel = new StatsAggPrel(convertedInput, analyze.getCluster(), FUNCTIONS); + + final List<String> mapFileds = Lists.newArrayList(FUNCTIONS); + mapFileds.add(DrillStatsTable.COL_COLUMN); + final SingleRel newAnalyze = new UnpivotMapsPrel(statsAggPrel, analyze.getCluster(), mapFileds); + + call.transformTo(newAnalyze); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StatsAggPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StatsAggPrel.java new file mode 100644 index 000000000..124246baf --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StatsAggPrel.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.physical; + +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.SingleRel; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.StatisticsAggregate; +import org.apache.drill.exec.planner.common.DrillRelNode; +import org.apache.drill.exec.planner.physical.visitor.PrelVisitor; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +public class StatsAggPrel extends SingleRel implements DrillRelNode, Prel { + + private List<String> functions; + + public StatsAggPrel(RelNode child, RelOptCluster cluster, List<String> functions) { + super(cluster, child.getTraitSet(), child); + this.functions = ImmutableList.copyOf(functions); + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new StatsAggPrel(sole(inputs), getCluster(), ImmutableList.copyOf(functions)); + } + + @Override + public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) + throws IOException { + Prel child = (Prel) this.getInput(); + + PhysicalOperator childPOP = child.getPhysicalOperator(creator); + + StatisticsAggregate g = new StatisticsAggregate(childPOP, functions); + + return creator.addMetadata(this, g); + } + + @Override + public Iterator<Prel> iterator() { + return PrelUtil.iter(getInput()); + } + + @Override + public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) + throws E { + return logicalVisitor.visitPrel(this, value); + } + + @Override + public SelectionVectorMode[] getSupportedEncodings() { + return SelectionVectorMode.ALL; + } + + @Override + public SelectionVectorMode getEncoding() { + return SelectionVectorMode.NONE; + } + + @Override + public boolean needsFinalColumnReordering() { + return true; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnpivotMapsPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnpivotMapsPrel.java new file mode 100644 index 000000000..4fc7aaec0 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/UnpivotMapsPrel.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.physical; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.SingleRel; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.UnpivotMaps; +import org.apache.drill.exec.planner.common.DrillRelNode; +import org.apache.drill.exec.planner.physical.visitor.PrelVisitor; +import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; + +public class UnpivotMapsPrel extends SingleRel implements Prel, DrillRelNode { + + private List<String> mapFieldsNames; + + public UnpivotMapsPrel(RelNode child, RelOptCluster cluster, List<String> mapFieldsNames) { + super(cluster, child.getTraitSet(), child); + this.mapFieldsNames = mapFieldsNames; + } + + @Override + public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { + return new UnpivotMapsPrel(sole(inputs), getCluster(), mapFieldsNames); + } + + @Override + public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) + throws IOException { + Prel child = (Prel) this.getInput(); + + PhysicalOperator childPOP = child.getPhysicalOperator(creator); + + UnpivotMaps um = new UnpivotMaps(childPOP, mapFieldsNames); + return creator.addMetadata(this, um); + } + + @Override + public Iterator<Prel> iterator() { + return PrelUtil.iter(getInput()); + } + + @Override + public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> logicalVisitor, X value) + throws E { + return logicalVisitor.visitPrel(this, value); + } + + @Override + public SelectionVectorMode[] getSupportedEncodings() { + return SelectionVectorMode.DEFAULT; + } + + @Override + public SelectionVectorMode getEncoding() { + return SelectionVectorMode.NONE; + } + + @Override + public boolean needsFinalColumnReordering() { + return false; + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AnalyzeTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AnalyzeTableHandler.java new file mode 100644 index 000000000..cdfe31bbc --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AnalyzeTableHandler.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.sql.handlers; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlSelect; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.tools.RelConversionException; +import org.apache.calcite.tools.ValidationException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.planner.logical.DrillAnalyzeRel; +import org.apache.drill.exec.planner.logical.DrillRel; +import org.apache.drill.exec.planner.logical.DrillScreenRel; +import org.apache.drill.exec.planner.logical.DrillStoreRel; +import org.apache.drill.exec.planner.logical.DrillWriterRel; +import org.apache.drill.exec.planner.physical.Prel; +import org.apache.drill.exec.planner.sql.SchemaUtilites; +import org.apache.drill.exec.planner.sql.parser.SqlAnalyzeTable; +import org.apache.drill.exec.store.AbstractSchema; +import org.apache.drill.exec.util.Pointer; +import org.apache.drill.exec.work.foreman.ForemanSetupException; +import org.apache.drill.exec.work.foreman.SqlUnsupportedException; + +import java.io.IOException; +import java.util.List; + +public class AnalyzeTableHandler extends DefaultSqlHandler { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AnalyzeTableHandler.class); + + public AnalyzeTableHandler(SqlHandlerConfig config, Pointer<String> textPlan) { + super(config, textPlan); + } + + @Override + public PhysicalPlan getPlan(SqlNode sqlNode) + throws ValidationException, RelConversionException, IOException, ForemanSetupException { + final SqlAnalyzeTable sqlAnalyzeTable = unwrap(sqlNode, SqlAnalyzeTable.class); + + verifyNoUnsupportedFunctions(sqlAnalyzeTable); + + SqlIdentifier tableIdentifier = sqlAnalyzeTable.getTableIdentifier(); + SqlSelect scanSql = new SqlSelect( + SqlParserPos.ZERO, /* position */ + SqlNodeList.EMPTY, /* keyword list */ + getColumnList(sqlAnalyzeTable), /*select list */ + tableIdentifier, /* from */ + null, /* where */ + null, /* group by */ + null, /* having */ + null, /* windowDecls */ + null, /* orderBy */ + null, /* offset */ + null /* fetch */ + ); + + final ConvertedRelNode convertedRelNode = validateAndConvert(rewrite(scanSql)); + final RelDataType validatedRowType = convertedRelNode.getValidatedRowType(); + + final RelNode relScan = convertedRelNode.getConvertedNode(); + + final String tableName = sqlAnalyzeTable.getName(); + final AbstractSchema drillSchema = SchemaUtilites.resolveToMutableDrillSchema( + config.getConverter().getDefaultSchema(), sqlAnalyzeTable.getSchemaPath()); + + if (SqlHandlerUtil.getTableFromSchema(drillSchema, tableName) == null) { + throw UserException.validationError() + .message("No table with given name [%s] exists in schema [%s]", tableName, drillSchema.getFullSchemaName()) + .build(logger); + } + + // Convert the query to Drill Logical plan and insert a writer operator on top. + DrillRel drel = convertToDrel(relScan, drillSchema, tableName); + Prel prel = convertToPrel(drel, validatedRowType); + logAndSetTextPlan("Drill Physical", prel, logger); + PhysicalOperator pop = convertToPop(prel); + PhysicalPlan plan = convertToPlan(pop); + log("Drill Plan", plan, logger); + + return plan; + } + + private SqlNodeList getColumnList(final SqlAnalyzeTable sqlAnalyzeTable) { + final SqlNodeList columnList = new SqlNodeList(SqlParserPos.ZERO); + + final List<String> fields = sqlAnalyzeTable.getFieldNames(); + if (fields == null || fields.size() <= 0) { + columnList.add(new SqlIdentifier("*", SqlParserPos.ZERO)); + } else { + for(String field : fields) { + columnList.add(new SqlIdentifier(field, SqlParserPos.ZERO)); + } + } + + return columnList; + } + + protected DrillRel convertToDrel(RelNode relNode, AbstractSchema schema, String analyzeTableName) + throws RelConversionException, SqlUnsupportedException { + final DrillRel convertedRelNode = convertToDrel(relNode); + + if (convertedRelNode instanceof DrillStoreRel) { + throw new UnsupportedOperationException(); + } + + final RelNode analyzeRel = new DrillAnalyzeRel( + convertedRelNode.getCluster(), + convertedRelNode.getTraitSet(), + convertedRelNode + ); + + final RelNode writerRel = new DrillWriterRel( + analyzeRel.getCluster(), + analyzeRel.getTraitSet(), + analyzeRel, + schema.appendToStatsTable(analyzeTableName) + ); + + return new DrillScreenRel(writerRel.getCluster(), writerRel.getTraitSet(), writerRel); + } + + // make sure no unsupported features in ANALYZE statement are used + private static void verifyNoUnsupportedFunctions(final SqlAnalyzeTable analyzeTable) { + // throw unsupported error for functions that are not yet implemented + if (analyzeTable.getEstimate()) { + throw UserException.unsupportedError() + .message("Statistics estimation is not yet supported.") + .build(logger); + } + + if (analyzeTable.getPercent() != 100) { + throw UserException.unsupportedError() + .message("Statistics from sampling is not yet supported.") + .build(logger); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java index 0881dc1c7..52ae7b1a3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java @@ -73,6 +73,7 @@ import org.apache.drill.exec.physical.impl.join.JoinUtils; import org.apache.drill.exec.planner.PlannerPhase; import org.apache.drill.exec.planner.PlannerType; import org.apache.drill.exec.planner.common.DrillRelOptUtil; +import org.apache.drill.exec.planner.common.DrillStatsTable.StatsMaterializationVisitor; import org.apache.drill.exec.planner.cost.DrillDefaultRelMetadataProvider; import org.apache.drill.exec.planner.logical.DrillProjectRel; import org.apache.drill.exec.planner.logical.DrillRel; @@ -230,6 +231,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler { } try { + StatsMaterializationVisitor.materialize(relNode, context); // HEP for rules, which are failed at the LOGICAL_PLANNING stage for Volcano planner final RelNode setOpTransposeNode = transform(PlannerType.HEP, PlannerPhase.PRE_LOGICAL_PLANNING, relNode); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java index 57a7e1753..210f43b89 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/CompoundIdentifierConverter.java @@ -64,6 +64,7 @@ public class CompoundIdentifierConverter extends SqlShuttle { // Every element of the array corresponds to the item in the list // returned by getOperandList() method for concrete SqlCall implementation. REWRITE_RULES = ImmutableMap.<Class<? extends SqlCall>, RewriteType[]>builder() + .put(SqlAnalyzeTable.class, arrayOf(D, D, E, D)) .put(SqlSelect.class, arrayOf(D, E, D, E, E, E, E, E, D, D)) .put(SqlCreateTable.class, arrayOf(D, D, D, E, D, D)) .put(SqlCreateView.class, arrayOf(D, E, E, D)) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlAnalyzeTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlAnalyzeTable.java new file mode 100644 index 000000000..91f83bf72 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlAnalyzeTable.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.sql.parser; + +import java.util.List; + +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlNumericLiteral; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.calcite.util.Util; +import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler; +import org.apache.drill.exec.planner.sql.handlers.AnalyzeTableHandler; +import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig; +import org.apache.drill.exec.util.Pointer; + +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; + +/** + * SQL tree for ANALYZE statement. + */ +public class SqlAnalyzeTable extends DrillSqlCall { + public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("ANALYZE_TABLE", SqlKind.OTHER) { + public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) { + Preconditions.checkArgument(operands.length == 4, "SqlAnalyzeTable.createCall() has to get 4 operands!"); + return new SqlAnalyzeTable(pos, (SqlIdentifier) operands[0], (SqlLiteral) operands[1], + (SqlNodeList) operands[2], (SqlNumericLiteral) operands[3] + ); + } + }; + + private final SqlIdentifier tblName; + private final SqlLiteral estimate; + private final SqlNodeList fieldList; + private final SqlNumericLiteral percent; + + public SqlAnalyzeTable(SqlParserPos pos, SqlIdentifier tblName, SqlLiteral estimate, + SqlNodeList fieldList, SqlNumericLiteral percent) { + super(pos); + this.tblName = tblName; + this.estimate = estimate; + this.fieldList = fieldList; + this.percent = percent; + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List<SqlNode> getOperandList() { + final List<SqlNode> operands = Lists.newArrayListWithCapacity(4); + operands.add(tblName); + operands.add(estimate); + operands.add(fieldList); + operands.add(percent); + return operands; + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("ANALYZE"); + writer.keyword("TABLE"); + tblName.unparse(writer, leftPrec, rightPrec); + writer.keyword(estimate.booleanValue() ? "ESTIMATE" : "COMPUTE"); + writer.keyword("STATISTICS"); + writer.keyword("FOR"); + + if (fieldList != null && fieldList.size() > 0) { + writer.keyword("COLUMNS"); + writer.keyword("("); + fieldList.get(0).unparse(writer, leftPrec, rightPrec); + for (int i = 1; i < fieldList.size(); i++) { + writer.keyword(","); + fieldList.get(i).unparse(writer, leftPrec, rightPrec); + } + writer.keyword(")"); + } else { + writer.keyword("ALL"); + writer.keyword("COLUMNS"); + } + writer.keyword("SAMPLE"); + percent.unparse(writer, leftPrec, rightPrec); + writer.keyword("PERCENT"); + } + + @Override + public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config, Pointer<String> textPlan) { + return new AnalyzeTableHandler(config, textPlan); + } + + @Override + public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) { + return getSqlHandler(config, null); + } + + public List<String> getSchemaPath() { + if (tblName.isSimple()) { + return ImmutableList.of(); + } + + return tblName.names.subList(0, tblName.names.size() - 1); + } + + public SqlIdentifier getTableIdentifier() { + return tblName; + } + + public String getName() { + return Util.last(tblName.names); + } + + public List<String> getFieldNames() { + if (fieldList == null) { + return ImmutableList.of(); + } + + List<String> columnNames = Lists.newArrayList(); + for (SqlNode node : fieldList.getList()) { + columnNames.add(node.toString()); + } + return columnNames; + } + + public boolean getEstimate() { + return estimate.booleanValue(); + } + + public int getPercent() { + return percent.intValue(true); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java index 8f4f067aa..c510525c5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java @@ -149,6 +149,40 @@ public abstract class AbstractSchema implements Schema, SchemaPartitionExplorer, } /** + * Create stats table entry for given <i>tableName</i>. + * @param tableName + * @return + */ + public CreateTableEntry createStatsTable(String tableName) { + throw UserException.unsupportedError() + .message("Statistics tables are not supported in schema [%s]", getSchemaPath()) + .build(logger); + } + + /** + * Create an append statistics table entry for given <i>tableName</i>. If there is not existing + * statistics table, a new one is created. + * @param tableName + * @return + */ + public CreateTableEntry appendToStatsTable(String tableName) { + throw UserException.unsupportedError() + .message("Statistics tables are not supported in schema [%s]", getSchemaPath()) + .build(logger); + } + + /** + * Get the statistics table for given <i>tableName</i> + * @param tableName + * @return + */ + public Table getStatsTable(String tableName) { + throw UserException.unsupportedError() + .message("Statistics tables are not supported in schema [%s]", getSchemaPath()) + .build(logger); + } + + /** * Reports whether to show items from this schema in INFORMATION_SCHEMA * tables. * (Controls ... TODO: Doc.: Mention what this typically controls or diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java index 3a747a67a..2539c640f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java @@ -68,6 +68,21 @@ public class SubSchemaWrapper extends AbstractSchema { } @Override + public CreateTableEntry createStatsTable(String tableName) { + return innerSchema.createStatsTable(tableName); + } + + @Override + public CreateTableEntry appendToStatsTable(String tableName) { + return innerSchema.appendToStatsTable(tableName); + } + + @Override + public Table getStatsTable(String tableName) { + return innerSchema.getStatsTable(tableName); + } + + @Override public Collection<Function> getFunctions(String name) { return innerSchema.getFunctions(name); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java index 795cbd21d..45dea13af 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemSchemaFactory.java @@ -164,6 +164,21 @@ public class FileSystemSchemaFactory extends AbstractSchemaFactory { } @Override + public CreateTableEntry createStatsTable(String tableName) { + return defaultSchema.createStatsTable(tableName); + } + + @Override + public CreateTableEntry appendToStatsTable(String tableName) { + return defaultSchema.appendToStatsTable(tableName); + } + + @Override + public Table getStatsTable(String tableName) { + return defaultSchema.getStatsTable(tableName); + } + + @Override public AbstractSchema getDefaultSchema() { return defaultSchema; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java index 27a72e3e1..bf258c2d9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java @@ -49,7 +49,7 @@ public interface FormatPlugin { FormatMatcher getMatcher(); - AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) throws IOException; + public AbstractWriter getWriter(PhysicalOperator child, String location, boolean append, List<String> partitionColumns) throws IOException; Set<StoragePluginOptimizerRule> getOptimizerRules(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java index 493278c27..fc512212f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/WorkspaceSchemaFactory.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.store.dfs; import static java.util.Collections.unmodifiableList; +import static org.apache.drill.exec.dotdrill.DotDrillType.STATS; import java.io.FileNotFoundException; import java.io.IOException; @@ -44,6 +45,7 @@ import org.apache.calcite.schema.TranslatableTable; import org.apache.commons.lang3.SystemUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.drill.common.config.LogicalPlanPersistence; +import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.logical.FormatPluginConfig; @@ -53,7 +55,7 @@ import org.apache.drill.exec.dotdrill.DotDrillFile; import org.apache.drill.exec.dotdrill.DotDrillType; import org.apache.drill.exec.dotdrill.DotDrillUtil; import org.apache.drill.exec.dotdrill.View; -import org.apache.drill.exec.store.StorageStrategy; +import org.apache.drill.exec.planner.common.DrillStatsTable; import org.apache.drill.exec.planner.logical.CreateTableEntry; import org.apache.drill.exec.planner.logical.DrillTable; import org.apache.drill.exec.planner.logical.DrillTranslatableTable; @@ -65,6 +67,8 @@ import org.apache.drill.exec.store.AbstractSchema; import org.apache.drill.exec.store.PartitionNotFoundException; import org.apache.drill.exec.store.SchemaConfig; import org.apache.drill.exec.util.DrillFileSystemUtil; +import org.apache.drill.exec.store.StorageStrategy; +import org.apache.drill.exec.store.easy.json.JSONFormatPlugin; import org.apache.drill.exec.util.ImpersonationUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; @@ -202,6 +206,17 @@ public class WorkspaceSchemaFactory { return plugin; } + // Ensure given tableName is not a stats table + private static void ensureNotStatsTable(final String tableName) { + if (tableName.toLowerCase().endsWith(STATS.getEnding())) { + throw UserException + .validationError() + .message("Given table [%s] is already a stats table. " + + "Cannot perform stats operations on a stats table.", tableName) + .build(logger); + } + } + /** * Implementation of a table macro that generates a table based on parameters */ @@ -551,7 +566,47 @@ public class WorkspaceSchemaFactory { } catch (UnsupportedOperationException e) { logger.debug("The filesystem for this workspace does not support this operation.", e); } - return tables.get(tableKey); + final DrillTable table = tables.get(tableKey); + setMetadataTable(table, tableName); + return table; + } + + private void setMetadataTable(final DrillTable table, final String tableName) { + if (table == null) { + return; + } + + // If this itself is the stats table, then skip it. + if (tableName.toLowerCase().endsWith(STATS.getEnding())) { + return; + } + + try { + if (table.getStatsTable() == null) { + Table statsTable = getStatsTable(tableName); + if (statsTable != null) { + table.setStatsTable(new DrillStatsTable(getFullSchemaName(), getStatsTableName(tableName))); + } + } + } catch (final Exception e) { + logger.warn("Failed to find the stats table for table [{}] in schema [{}]", tableName, getFullSchemaName()); + } + } + + // Get stats table name for a given table name. + private String getStatsTableName(final String tableName) { + final Path tablePath = new Path(config.getLocation(), tableName); + try { + if (fs.isDirectory(tablePath)) { + return tableName + Path.SEPARATOR + STATS.getEnding(); + } else { + return tableName + STATS.getEnding(); + } + } catch (final Exception e) { + throw new DrillRuntimeException( + String.format("Failed to find the location of the stats for table [%s] in schema [%s]", + tableName, getFullSchemaName())); + } } @Override @@ -571,6 +626,34 @@ public class WorkspaceSchemaFactory { public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy storageStrategy) { String storage = schemaConfig.getOption(ExecConstants.OUTPUT_FORMAT_OPTION).string_val; FormatPlugin formatPlugin = plugin.getFormatPlugin(storage); + return createOrAppendToTable(tableName, false, formatPlugin, partitionColumns, storageStrategy); + } + + @Override + public CreateTableEntry createStatsTable(String tableName) { + ensureNotStatsTable(tableName); + final String statsTableName = getStatsTableName(tableName); + FormatPlugin formatPlugin = plugin.getFormatPlugin(JSONFormatPlugin.DEFAULT_NAME); + return createOrAppendToTable(statsTableName, false, formatPlugin, ImmutableList.<String>of(), + StorageStrategy.DEFAULT); + } + + @Override + public CreateTableEntry appendToStatsTable(String tableName) { + ensureNotStatsTable(tableName); + final String statsTableName = getStatsTableName(tableName); + FormatPlugin formatPlugin = plugin.getFormatPlugin(JSONFormatPlugin.DEFAULT_NAME); + return createOrAppendToTable(statsTableName, true, formatPlugin, ImmutableList.<String>of(), + StorageStrategy.DEFAULT); + } + + @Override + public Table getStatsTable(String tableName) { + return getTable(getStatsTableName(tableName)); + } + + private CreateTableEntry createOrAppendToTable(String tableName, boolean append, FormatPlugin formatPlugin, + List<String> partitonColumns, StorageStrategy storageStrategy) { if (formatPlugin == null) { throw new UnsupportedOperationException( String.format("Unsupported format '%s' in workspace '%s'", config.getDefaultInputFormat(), @@ -581,7 +664,8 @@ public class WorkspaceSchemaFactory { (FileSystemConfig) plugin.getConfig(), formatPlugin, config.getLocation() + Path.SEPARATOR + tableName, - partitionColumns, + append, + partitonColumns, storageStrategy); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java index 4c550c3ae..ed1651e0f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java @@ -193,8 +193,8 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements } @Override - public AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) throws IOException { - return new EasyWriter(child, location, partitionColumns, this); + public AbstractWriter getWriter(PhysicalOperator child, String location, boolean append, List<String> partitionColumns) throws IOException { + return new EasyWriter(child, location, append, partitionColumns, this); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java index 379e2c93e..9f4120689 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyWriter.java @@ -40,6 +40,7 @@ public class EasyWriter extends AbstractWriter { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(EasyWriter.class); private final String location; + private final boolean append; private final List<String> partitionColumns; private final EasyFormatPlugin<?> formatPlugin; @@ -47,6 +48,7 @@ public class EasyWriter extends AbstractWriter { public EasyWriter( @JsonProperty("child") PhysicalOperator child, @JsonProperty("location") String location, + @JsonProperty("append") boolean append, @JsonProperty("partitionColumns") List<String> partitionColumns, @JsonProperty("storageStrategy") StorageStrategy storageStrategy, @JsonProperty("storage") StoragePluginConfig storageConfig, @@ -57,18 +59,21 @@ public class EasyWriter extends AbstractWriter { this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig); Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config."); this.location = location; + this.append = append; this.partitionColumns = partitionColumns; setStorageStrategy(storageStrategy); } public EasyWriter(PhysicalOperator child, String location, + boolean append, List<String> partitionColumns, EasyFormatPlugin<?> formatPlugin) { super(child); this.formatPlugin = formatPlugin; this.location = location; + this.append = append; this.partitionColumns = partitionColumns; } @@ -77,6 +82,11 @@ public class EasyWriter extends AbstractWriter { return location; } + @JsonProperty("append") + public boolean getAppend() { + return append; + } + @JsonProperty("storage") public StoragePluginConfig getStorageConfig(){ return formatPlugin.getStorageConfig(); @@ -94,7 +104,7 @@ public class EasyWriter extends AbstractWriter { @Override protected PhysicalOperator getNewWithChild(PhysicalOperator child) { - EasyWriter writer = new EasyWriter(child, location, partitionColumns, formatPlugin); + EasyWriter writer = new EasyWriter(child, location, append, partitionColumns, formatPlugin); writer.setStorageStrategy(getStorageStrategy()); return writer; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java index 11dc20421..ab90cda5c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONFormatPlugin.java @@ -70,6 +70,8 @@ public class JSONFormatPlugin extends EasyFormatPlugin<JSONFormatConfig> { Map<String, String> options = new HashMap<>(); options.put("location", writer.getLocation()); + options.put("append", Boolean.toString(writer.getAppend())); + FragmentHandle handle = context.getHandle(); String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId()); options.put("prefix", fragmentId); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java index 2e80b3ffb..d533c0a19 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JsonRecordWriter.java @@ -48,6 +48,7 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr private Path cleanUpLocation; private String location; + private boolean append; private String prefix; private String fieldDelimiter; @@ -74,6 +75,7 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr @Override public void init(Map<String, String> writerOptions) throws IOException { this.location = writerOptions.get("location"); + this.append = writerOptions.get("append").equalsIgnoreCase("true") ? true : false; this.prefix = writerOptions.get("prefix"); this.fieldDelimiter = writerOptions.get("separator"); this.extension = writerOptions.get("extension"); @@ -83,7 +85,11 @@ public class JsonRecordWriter extends JSONOutputRecordWriter implements RecordWr this.fs = FileSystem.get(fsConf); - Path fileName = new Path(location, prefix + "_" + index + "." + extension); + Path fileName; + do { + fileName = new Path(location, prefix + "_" + (index++) + "." + extension); + } while (append && fs.exists(fileName)); + try { // json writer does not support partitions, so only one file can be created // and thus only one location should be deleted in case of abort diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java index f46cc1cf2..876cd5b72 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java @@ -126,14 +126,15 @@ public class ParquetFormatPlugin implements FormatPlugin { } @Override - public AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) { - return new ParquetWriter(child, location, partitionColumns, this); + public AbstractWriter getWriter(PhysicalOperator child, String location, boolean append, List<String> partitionColumns) throws IOException { + return new ParquetWriter(child, location, append, partitionColumns, this); } public RecordWriter getRecordWriter(FragmentContext context, ParquetWriter writer) throws IOException, OutOfMemoryException { Map<String, String> options = new HashMap<>(); options.put("location", writer.getLocation()); + options.put("append", Boolean.toString(writer.getAppend())); FragmentHandle handle = context.getHandle(); String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId()); @@ -261,6 +262,9 @@ public class ParquetFormatPlugin implements FormatPlugin { new FormatSelection(plugin.getConfig(), selection)); } } + if (!super.supportDirectoryReads() && selection.containsDirectories(fs)) { + return null; + } return super.isReadable(fs, selection, fsPlugin, storageEngineName, schemaConfig); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java index aea321859..6298c1a8c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetWriter.java @@ -53,6 +53,7 @@ public class ParquetWriter extends AbstractWriter { public static final int WRITER_VERSION = 2; private final String location; + private final boolean append; private final List<String> partitionColumns; private final ParquetFormatPlugin formatPlugin; @@ -60,6 +61,7 @@ public class ParquetWriter extends AbstractWriter { public ParquetWriter( @JsonProperty("child") PhysicalOperator child, @JsonProperty("location") String location, + @JsonProperty("append") boolean append, @JsonProperty("partitionColumns") List<String> partitionColumns, @JsonProperty("storageStrategy") StorageStrategy storageStrategy, @JsonProperty("storage") StoragePluginConfig storageConfig, @@ -69,18 +71,21 @@ public class ParquetWriter extends AbstractWriter { this.formatPlugin = (ParquetFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, new ParquetFormatConfig()); Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config."); this.location = location; + this.append = append; this.partitionColumns = partitionColumns; setStorageStrategy(storageStrategy); } public ParquetWriter(PhysicalOperator child, String location, + boolean append, List<String> partitionColumns, ParquetFormatPlugin formatPlugin) { super(child); this.formatPlugin = formatPlugin; this.location = location; + this.append = append; this.partitionColumns = partitionColumns; } @@ -89,6 +94,11 @@ public class ParquetWriter extends AbstractWriter { return location; } + @JsonProperty("append") + public boolean getAppend() { + return append; + } + @JsonProperty("storage") public StoragePluginConfig getStorageConfig(){ return formatPlugin.getStorageConfig(); @@ -111,7 +121,7 @@ public class ParquetWriter extends AbstractWriter { @Override protected PhysicalOperator getNewWithChild(PhysicalOperator child) { - ParquetWriter writer = new ParquetWriter(child, location, partitionColumns, formatPlugin); + ParquetWriter writer = new ParquetWriter(child, location, append, partitionColumns, formatPlugin); writer.setStorageStrategy(getStorageStrategy()); return writer; } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestAnalyze.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestAnalyze.java new file mode 100644 index 000000000..0f15fb300 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestAnalyze.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.sql; + +import org.apache.drill.PlanTestBase; +import org.apache.drill.exec.ExecConstants; +import org.junit.Test; + +public class TestAnalyze extends PlanTestBase { + + // Analyze for all columns + @Test + public void basic1() throws Exception { + try { + test("ALTER SESSION SET `planner.slice_target` = 1"); + test("CREATE TABLE dfs_test.tmp.region_basic1 AS SELECT * from cp.`region.json`"); + test("ANALYZE TABLE dfs_test.tmp.region_basic1 COMPUTE STATISTICS FOR ALL COLUMNS"); + test("SELECT * FROM dfs_test.tmp.`region_basic1/.stats.drill`"); + + testBuilder() + .sqlQuery("SELECT `column`, statcount, nonnullstatcount, ndv FROM dfs_test.tmp.`region_basic1/.stats.drill`") + .unOrdered() + .baselineColumns("column", "statcount", "nonnullstatcount", "ndv") + .baselineValues("region_id", 110L, 110L, 107L) + .baselineValues("sales_city", 110L, 110L, 111L) + .baselineValues("sales_state_province", 110L, 110L, 13L) + .baselineValues("sales_district", 110L, 110L, 22L) + .baselineValues("sales_region", 110L, 110L, 8L) + .baselineValues("sales_country", 110L, 110L, 4L) + .baselineValues("sales_district_id", 110L, 110L, 23L) + .go(); + + // we can't compare the ndv for correctness as it is an estimate and not accurate + testBuilder() + .sqlQuery("SELECT statcount FROM dfs_test.tmp.`region_basic1/.stats.drill` WHERE `column` = 'region_id'") + .unOrdered() + .sqlBaselineQuery("SELECT count(region_id) AS statcount FROM dfs_test.tmp.region_basic1") + .go(); + + } finally { + test("ALTER SESSION SET `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT); + } + } + + // Analyze for only a subset of the columns in table + @Test + public void basic2() throws Exception { + try { + test("ALTER SESSION SET `planner.slice_target` = 1"); + test("CREATE TABLE dfs_test.tmp.employee_basic2 AS SELECT * from cp.`employee.json`"); + test("ANALYZE TABLE dfs_test.tmp.employee_basic2 COMPUTE STATISTICS FOR COLUMNS (employee_id, birth_date)"); + test("SELECT * FROM dfs_test.tmp.`employee_basic2/.stats.drill`"); + + testBuilder() + .sqlQuery("SELECT `column`, statcount, nonnullstatcount, ndv FROM dfs_test.tmp.`employee_basic2/.stats.drill`") + .unOrdered() + .baselineColumns("column", "statcount", "nonnullstatcount", "ndv") + .baselineValues("employee_id", 1155L, 1155L, 1144L) + .baselineValues("birth_date", 1155L, 1155L, 53L) + .go(); + + // we can't compare the ndv for correctness as it is an estimate and not accurate + testBuilder() + .sqlQuery("SELECT statcount FROM dfs_test.tmp.`employee_basic2/.stats.drill` WHERE `column` = 'birth_date'") + .unOrdered() + .sqlBaselineQuery("SELECT count(birth_date) AS statcount FROM dfs_test.tmp.employee_basic2") + .go(); + + } finally { + test("ALTER SESSION SET `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT); + } + } + + @Test + public void join() throws Exception { + try { + test("ALTER SESSION SET `planner.slice_target` = 1"); + test("CREATE TABLE dfs_test.tmp.lineitem AS SELECT * FROM cp.`tpch/lineitem.parquet`"); + test("CREATE TABLE dfs_test.tmp.orders AS select * FROM cp.`tpch/orders.parquet`"); + test("ANALYZE TABLE dfs_test.tmp.lineitem COMPUTE STATISTICS FOR ALL COLUMNS"); + test("ANALYZE TABLE dfs_test.tmp.orders COMPUTE STATISTICS FOR ALL COLUMNS"); + test("SELECT * FROM dfs_test.tmp.`lineitem/.stats.drill`"); + test("SELECT * FROM dfs_test.tmp.`orders/.stats.drill`"); + + test("SELECT * FROM dfs_test.tmp.`lineitem` l JOIN dfs_test.tmp.`orders` o ON l.l_orderkey = o.o_orderkey"); + } finally { + test("ALTER SESSION SET `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT); + } + } +} diff --git a/exec/jdbc-all/pom.xml b/exec/jdbc-all/pom.xml index 67a3bb8af..32c2a908e 100644 --- a/exec/jdbc-all/pom.xml +++ b/exec/jdbc-all/pom.xml @@ -172,6 +172,10 @@ <groupId>sqlline</groupId> <artifactId>sqlline</artifactId> </exclusion> + <exclusion> + <artifactId>stream</artifactId> + <groupId>com.clearspring.analytics</groupId> + </exclusion> </exclusions> </dependency> <dependency> diff --git a/logical/src/main/java/org/apache/drill/common/logical/data/Analyze.java b/logical/src/main/java/org/apache/drill/common/logical/data/Analyze.java new file mode 100644 index 000000000..711050d70 --- /dev/null +++ b/logical/src/main/java/org/apache/drill/common/logical/data/Analyze.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.common.logical.data; + +import org.apache.drill.common.logical.data.visitors.LogicalVisitor; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonTypeName; + +@JsonTypeName("analyze") +public class Analyze extends SingleInputOperator { + + @JsonCreator + public Analyze() { } + + @Override + public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E> logicalVisitor, X value) throws E { + return logicalVisitor.visitAnalyze(this, value); + } +} diff --git a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java index 482146fdb..c46dc4333 100644 --- a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java +++ b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/AbstractLogicalVisitor.java @@ -19,6 +19,7 @@ package org.apache.drill.common.logical.data.visitors; import org.apache.drill.common.logical.data.LateralJoin; import org.apache.drill.common.logical.data.Unnest; +import org.apache.drill.common.logical.data.Analyze; import org.apache.drill.common.logical.data.Values; import org.apache.drill.common.logical.data.Filter; import org.apache.drill.common.logical.data.Flatten; @@ -46,6 +47,11 @@ public abstract class AbstractLogicalVisitor<T, X, E extends Throwable> implemen } @Override + public T visitAnalyze(Analyze analyze, X value) throws E { + return visitOp(analyze, value); + } + + @Override public T visitScan(Scan scan, X value) throws E { return visitOp(scan, value); } diff --git a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java index 9d9013e79..ee9036c6e 100644 --- a/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java +++ b/logical/src/main/java/org/apache/drill/common/logical/data/visitors/LogicalVisitor.java @@ -20,6 +20,7 @@ package org.apache.drill.common.logical.data.visitors; import org.apache.drill.common.logical.data.LateralJoin; import org.apache.drill.common.logical.data.Unnest; +import org.apache.drill.common.logical.data.Analyze; import org.apache.drill.common.logical.data.Values; import org.apache.drill.common.logical.data.Filter; import org.apache.drill.common.logical.data.Flatten; @@ -51,6 +52,7 @@ public interface LogicalVisitor<RETURN, EXTRA, EXCEP extends Throwable> { public RETURN visitGroupingAggregate(GroupingAggregate groupBy, EXTRA value) throws EXCEP; public RETURN visitFilter(Filter filter, EXTRA value) throws EXCEP; public RETURN visitFlatten(Flatten flatten, EXTRA value) throws EXCEP; + public RETURN visitAnalyze(Analyze analyze, EXTRA value) throws EXCEP; public RETURN visitProject(Project project, EXTRA value) throws EXCEP; public RETURN visitValues(Values constant, EXTRA value) throws EXCEP; diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java index c540c8f2a..635f972d9 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/UserBitShared.java @@ -601,6 +601,14 @@ public final class UserBitShared { * <code>SYSLOG_SUB_SCAN = 58;</code> */ SYSLOG_SUB_SCAN(58, 58), + /** + * <code>STATISTICS_AGGREGATE = 59;</code> + */ + STATISTICS_AGGREGATE(59, 59), + /** + * <code>UNPIVOT_MAPS = 60;</code> + */ + UNPIVOT_MAPS(60, 60), ; /** @@ -839,6 +847,14 @@ public final class UserBitShared { * <code>SYSLOG_SUB_SCAN = 58;</code> */ public static final int SYSLOG_SUB_SCAN_VALUE = 58; + /** + * <code>STATISTICS_AGGREGATE = 59;</code> + */ + public static final int STATISTICS_AGGREGATE_VALUE = 59; + /** + * <code>UNPIVOT_MAPS = 60;</code> + */ + public static final int UNPIVOT_MAPS_VALUE = 60; public final int getNumber() { return value; } @@ -904,6 +920,8 @@ public final class UserBitShared { case 56: return RUNTIME_FILTER; case 57: return ROWKEY_JOIN; case 58: return SYSLOG_SUB_SCAN; + case 59: return STATISTICS_AGGREGATE; + case 60: return UNPIVOT_MAPS; default: return null; } } @@ -24644,7 +24662,7 @@ public final class UserBitShared { "entState\022\013\n\007SENDING\020\000\022\027\n\023AWAITING_ALLOCA" + "TION\020\001\022\013\n\007RUNNING\020\002\022\014\n\010FINISHED\020\003\022\r\n\tCAN" + "CELLED\020\004\022\n\n\006FAILED\020\005\022\032\n\026CANCELLATION_REQ" + - "UESTED\020\006*\247\t\n\020CoreOperatorType\022\021\n\rSINGLE_" + + "UESTED\020\006*\323\t\n\020CoreOperatorType\022\021\n\rSINGLE_" + "SENDER\020\000\022\024\n\020BROADCAST_SENDER\020\001\022\n\n\006FILTER" + "\020\002\022\022\n\016HASH_AGGREGATE\020\003\022\r\n\tHASH_JOIN\020\004\022\016\n" + "\nMERGE_JOIN\020\005\022\031\n\025HASH_PARTITION_SENDER\020\006" + @@ -24674,10 +24692,11 @@ public final class UserBitShared { "\025\n\021SEQUENCE_SUB_SCAN\0205\022\023\n\017PARTITION_LIMI" + "T\0206\022\023\n\017PCAPNG_SUB_SCAN\0207\022\022\n\016RUNTIME_FILT" + "ER\0208\022\017\n\013ROWKEY_JOIN\0209\022\023\n\017SYSLOG_SUB_SCAN" + - "\020:*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN\020\000\022\016\n\nSA" + - "SL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014SASL" + - "_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org.apach" + - "e.drill.exec.protoB\rUserBitSharedH\001" + "\020:\022\030\n\024STATISTICS_AGGREGATE\020;\022\020\n\014UNPIVOT_" + + "MAPS\020<*g\n\nSaslStatus\022\020\n\014SASL_UNKNOWN\020\000\022\016" + + "\n\nSASL_START\020\001\022\024\n\020SASL_IN_PROGRESS\020\002\022\020\n\014" + + "SASL_SUCCESS\020\003\022\017\n\013SASL_FAILED\020\004B.\n\033org.a" + + "pache.drill.exec.protoB\rUserBitSharedH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { diff --git a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java index 7d5041c5b..051f82f2e 100644 --- a/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java +++ b/protocol/src/main/java/org/apache/drill/exec/proto/beans/CoreOperatorType.java @@ -80,7 +80,9 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO PCAPNG_SUB_SCAN(55), RUNTIME_FILTER(56), ROWKEY_JOIN(57), - SYSLOG_SUB_SCAN(58); + SYSLOG_SUB_SCAN(58), + STATISTICS_AGGREGATE(59), + UNPIVOT_MAPS(60); public final int number; @@ -157,6 +159,8 @@ public enum CoreOperatorType implements com.dyuproject.protostuff.EnumLite<CoreO case 56: return RUNTIME_FILTER; case 57: return ROWKEY_JOIN; case 58: return SYSLOG_SUB_SCAN; + case 59: return STATISTICS_AGGREGATE; + case 60: return UNPIVOT_MAPS; default: return null; } } diff --git a/protocol/src/main/protobuf/UserBitShared.proto b/protocol/src/main/protobuf/UserBitShared.proto index a0438b7bf..ca4e27385 100644 --- a/protocol/src/main/protobuf/UserBitShared.proto +++ b/protocol/src/main/protobuf/UserBitShared.proto @@ -351,6 +351,8 @@ enum CoreOperatorType { RUNTIME_FILTER = 56; ROWKEY_JOIN = 57; SYSLOG_SUB_SCAN = 58; + STATISTICS_AGGREGATE = 59; + UNPIVOT_MAPS = 60; } /* Registry that contains list of jars, each jar contains its name and list of function signatures. |