diff options
36 files changed, 879 insertions, 130 deletions
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java index 311e5797e..21021d396 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java @@ -33,10 +33,12 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.EndpointAffinity; import org.apache.drill.exec.physical.OperatorCost; import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.Size; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.store.StoragePluginRegistry; +import org.apache.drill.exec.store.dfs.easy.EasyGroupScan; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.HTable; @@ -107,6 +109,16 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst getRegionInfos(); } + private HBaseGroupScan(HBaseGroupScan that) { + this.columns = that.columns; + this.endpointAffinities = that.endpointAffinities; + this.hbaseScanSpec = that.hbaseScanSpec; + this.mappings = that.mappings; + this.regionsToScan = that.regionsToScan; + this.storagePlugin = that.storagePlugin; + this.storagePluginConfig = that.storagePluginConfig; + } + private void getRegionInfos() { logger.debug("Getting region locations"); try { @@ -244,4 +256,10 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst + columns + "]"; } + @Override + public GroupScan clone(List<SchemaPath> columns) { + HBaseGroupScan newScan = new HBaseGroupScan(this); + newScan.columns = columns; + return newScan; + } } diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java index 137f6fe3a..50b28135d 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java @@ -49,7 +49,7 @@ public class HBasePushFilterIntoScan extends StoragePluginOptimizerRule { return; //no filter pushdown ==> No transformation. } final GroupScan newGroupsScan = new HBaseGroupScan(groupScan.getStoragePlugin(), newScanSpec, groupScan.getColumns()); - final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan); + final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType()); call.transformTo(newScanPrel); } diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java index 0ecc3796e..c1fb6af08 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java @@ -19,11 +19,14 @@ package org.apache.drill.exec.store.hbase; import java.io.IOException; import java.util.Set; +import java.util.List; import net.hydromatic.optiq.SchemaPlus; import org.apache.drill.common.JSONOptions; import org.apache.drill.exec.rpc.user.DrillUser; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.AbstractStoragePlugin; import org.apache.drill.exec.store.StoragePluginOptimizerRule; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java index 343435b86..69fc44776 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java @@ -18,6 +18,9 @@ package org.apache.drill.exec.physical.base; import java.util.Iterator; +import java.util.List; + +import org.apache.drill.common.expression.SchemaPath; import com.google.common.collect.Iterators; @@ -40,4 +43,8 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca return physicalVisitor.visitGroupScan(this, value); } + @Override + public GroupScan clone(List<SchemaPath> columns) { + throw new UnsupportedOperationException(String.format("%s does not implmemnt clone(columns) method!", this.getClass().getCanonicalName())); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java index 3504be79b..32d68b368 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java @@ -21,6 +21,7 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.PhysicalOperatorSetupException; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -44,4 +45,10 @@ public interface GroupScan extends Scan, HasAffinity{ */ @JsonIgnore public abstract String getDigest(); + + /** + * Returns a clone of Groupscan instance, except that the new GroupScan will use the provided list of columns + */ + @JsonIgnore + public GroupScan clone(List<SchemaPath> columns); }
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java index b37035249..88df65847 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java @@ -35,5 +35,4 @@ public abstract class DrillScanRelBase extends TableAccessRelBase implements Dri this.drillTable = table.unwrap(DrillTable.class); assert drillTable != null; } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java index d19b7a4b3..aa75f5236 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java @@ -28,7 +28,10 @@ import org.apache.drill.exec.planner.torel.ConversionContext; import org.eigenbase.rel.InvalidRelException; import org.eigenbase.rel.ProjectRelBase; import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.metadata.RelMetadataQuery; import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelOptCost; +import org.eigenbase.relopt.RelOptPlanner; import org.eigenbase.relopt.RelTraitSet; import org.eigenbase.reltype.RelDataType; import org.eigenbase.reltype.RelDataTypeField; @@ -75,5 +78,5 @@ public class DrillProjectRel extends DrillProjectRelBase implements DrillRel { } return new DrillProjectRel(context.getCluster(), context.getLogicalTraits(), input, exps, new RelRecordType(fields)); } - + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java new file mode 100644 index 000000000..98028b802 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.planner.logical; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import net.hydromatic.optiq.rules.java.JavaRules.EnumerableTableAccessRel; + +import org.eigenbase.rel.ProjectRel; +import org.eigenbase.rel.ProjectRelBase; +import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.rules.PushProjector; +import org.eigenbase.rel.rules.RemoveTrivialProjectRule; +import org.eigenbase.relopt.RelOptRule; +import org.eigenbase.relopt.RelOptRuleCall; +import org.eigenbase.reltype.RelDataType; +import org.eigenbase.reltype.RelDataTypeFactory; +import org.eigenbase.reltype.RelDataTypeField; +import org.eigenbase.rex.RexInputRef; +import org.eigenbase.rex.RexNode; +import org.eigenbase.rex.RexShuttle; + +import com.google.hive12.common.collect.Lists; + +public class DrillPushProjIntoScan extends RelOptRule { + public static final RelOptRule INSTANCE = new DrillPushProjIntoScan(); + + private DrillPushProjIntoScan() { + super(RelOptHelper.some(ProjectRel.class, RelOptHelper.any(EnumerableTableAccessRel.class)), "DrillPushProjIntoScan"); + } + + @Override + public void onMatch(RelOptRuleCall call) { + final ProjectRel proj = (ProjectRel) call.rel(0); + final EnumerableTableAccessRel scan = (EnumerableTableAccessRel) call.rel(1); + + List<Integer> columnsIds = getRefColumnIds(proj); + + RelDataType newScanRowType = createStructType(scan.getCluster().getTypeFactory(), getProjectedFields(scan.getRowType(),columnsIds)); + + final DrillScanRel newScan = new DrillScanRel(scan.getCluster(), scan.getTraitSet().plus(DrillRel.DRILL_LOGICAL), + scan.getTable(), newScanRowType); + + List<RexNode> convertedExprs = getConvertedProjExp(proj, scan, columnsIds); + + final DrillProjectRel newProj = new DrillProjectRel(proj.getCluster(), proj.getTraitSet().plus(DrillRel.DRILL_LOGICAL), + newScan, convertedExprs, proj.getRowType()); + + if (RemoveTrivialProjectRule.isTrivial(newProj)) { + call.transformTo(newScan); + } else { + call.transformTo(newProj); + } + + } + + private List<RexNode> getConvertedProjExp(ProjectRel proj, RelNode child, List<Integer> columnsIds) { + PushProjector pushProjector = + new PushProjector( + proj, null, child, PushProjector.ExprCondition.FALSE); + ProjectRel topProject = pushProjector.convertProject(null); + + if (topProject !=null) + return topProject.getProjects(); + else + return proj.getProjects(); + } + + private RelDataType createStructType( + RelDataTypeFactory typeFactory, + final List<RelDataTypeField> fields + ) { + final RelDataTypeFactory.FieldInfoBuilder builder = + typeFactory.builder(); + for (RelDataTypeField field : fields) { + builder.add(field.getName(), field.getType()); + } + return builder.build(); + } + + + private List<Integer> getRefColumnIds(ProjectRelBase proj) { + RefFieldsVisitor v = new RefFieldsVisitor(); + + for (RexNode exp : proj.getProjects()) { + v.apply(exp); + } + return new ArrayList<Integer>(v.getReferencedFieldIndex()); + } + + private List<RelDataTypeField> getProjectedFields(RelDataType rowType, List<Integer> columnIds) { + List<RelDataTypeField> oldFields = rowType.getFieldList(); + List<RelDataTypeField> newFields = Lists.newArrayList(); + + for (Integer id : columnIds) { + newFields.add(oldFields.get(id)); + } + + return newFields; + } + + /** Visitor that finds the set of inputs that are used. */ + public static class RefFieldsVisitor extends RexShuttle { + public final SortedSet<Integer> inputPosReferenced = + new TreeSet<Integer>(); + + @Override + public RexNode visitInputRef(RexInputRef inputRef) { + inputPosReferenced.add(inputRef.getIndex()); + return inputRef; + } + + public Set<Integer> getReferencedFieldIndex() { + return this.inputPosReferenced; + } + } + +}
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java index 199d36acd..3777c66c9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java @@ -25,7 +25,6 @@ import org.apache.drill.exec.planner.physical.FilterPrule; import org.apache.drill.exec.planner.physical.LimitPrule; import org.apache.drill.exec.planner.physical.MergeJoinPrule; import org.apache.drill.exec.planner.physical.ProjectPrule; -import org.apache.drill.exec.planner.physical.PushLimitToTopN; import org.apache.drill.exec.planner.physical.ScanPrule; import org.apache.drill.exec.planner.physical.ScreenPrule; import org.apache.drill.exec.planner.physical.SortConvertPrule; @@ -36,10 +35,13 @@ import org.eigenbase.rel.rules.MergeProjectRule; import org.eigenbase.rel.rules.PushFilterPastJoinRule; import org.eigenbase.rel.rules.PushFilterPastProjectRule; import org.eigenbase.rel.rules.PushJoinThroughJoinRule; +import org.eigenbase.rel.rules.PushProjectPastFilterRule; +import org.eigenbase.rel.rules.PushProjectPastJoinRule; import org.eigenbase.rel.rules.ReduceAggregatesRule; import org.eigenbase.rel.rules.RemoveDistinctAggregateRule; import org.eigenbase.rel.rules.RemoveDistinctRule; import org.eigenbase.rel.rules.RemoveSortRule; +import org.eigenbase.rel.rules.RemoveTrivialProjectRule; import org.eigenbase.relopt.RelOptRule; import org.eigenbase.relopt.volcano.AbstractConverter.ExpandConversionRule; @@ -63,7 +65,7 @@ public class DrillRuleSets { // SwapJoinRule.INSTANCE, RemoveDistinctRule.INSTANCE, // UnionToDistinctRule.INSTANCE, -// RemoveTrivialProjectRule.INSTANCE, + RemoveTrivialProjectRule.INSTANCE, // RemoveTrivialCalcRule.INSTANCE, RemoveSortRule.INSTANCE, @@ -72,11 +74,15 @@ public class DrillRuleSets { new MergeProjectRule(true, RelFactories.DEFAULT_PROJECT_FACTORY), RemoveDistinctAggregateRule.INSTANCE, // ReduceAggregatesRule.INSTANCE, // + PushProjectPastJoinRule.INSTANCE, + PushProjectPastFilterRule.INSTANCE, // SwapJoinRule.INSTANCE, // // PushJoinThroughJoinRule.RIGHT, // // PushJoinThroughJoinRule.LEFT, // // PushSortPastProjectRule.INSTANCE, // + DrillPushProjIntoScan.INSTANCE, + //////////////////////////////// DrillScanRule.INSTANCE, DrillFilterRule.INSTANCE, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java index 619e76d0e..2b4f9d7de 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java @@ -17,28 +17,52 @@ */ package org.apache.drill.exec.planner.logical; +import java.io.IOException; + import org.apache.drill.common.JSONOptions; import org.apache.drill.common.logical.data.LogicalOperator; import org.apache.drill.common.logical.data.Scan; +import org.apache.drill.exec.physical.OperatorCost; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.physical.base.Size; import org.apache.drill.exec.planner.common.DrillScanRelBase; +import org.apache.drill.exec.planner.physical.PrelUtil; import org.apache.drill.exec.planner.torel.ConversionContext; import org.eigenbase.relopt.RelOptCluster; +import org.eigenbase.relopt.RelOptCost; +import org.eigenbase.relopt.RelOptPlanner; import org.eigenbase.relopt.RelOptTable; import org.eigenbase.relopt.RelTraitSet; +import org.eigenbase.reltype.RelDataType; /** * GroupScan of a Drill table. */ public class DrillScanRel extends DrillScanRelBase implements DrillRel { - //private final DrillTable drillTable; + final private RelDataType rowType; + private GroupScan groupScan; + + /** Creates a DrillScan. */ + public DrillScanRel(RelOptCluster cluster, RelTraitSet traits, + RelOptTable table) { + this(cluster, traits, table, table.getRowType()); + } /** Creates a DrillScan. */ - public DrillScanRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table) { + public DrillScanRel(RelOptCluster cluster, RelTraitSet traits, + RelOptTable table, RelDataType rowType) { super(DRILL_LOGICAL, cluster, traits, table); - //this.drillTable = table.unwrap(DrillTable.class); - //assert drillTable != null; + this.rowType = rowType; + try { + this.groupScan = this.drillTable.getGroupScan().clone( + PrelUtil.getColumns(rowType)); + } catch (IOException e) { + this.groupScan = null; + e.printStackTrace(); + } } + @Override public LogicalOperator implement(DrillImplementor implementor) { Scan.Builder builder = Scan.builder(); builder.storageEngine(drillTable.getStorageEngineName()); @@ -46,8 +70,34 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel { implementor.registerSource(drillTable); return builder.build(); } - - public static DrillScanRel convert(Scan scan, ConversionContext context){ - return new DrillScanRel(context.getCluster(), context.getLogicalTraits(), context.getTable(scan)); + + public static DrillScanRel convert(Scan scan, ConversionContext context) { + return new DrillScanRel(context.getCluster(), context.getLogicalTraits(), + context.getTable(scan)); + } + + @Override + public RelDataType deriveRowType() { + return this.rowType; } + + @Override + public RelOptCost computeSelfCost(RelOptPlanner planner) { + OperatorCost scanCost = groupScan.getCost(); + Size scanSize = groupScan.getSize(); + int columnCount = this.getRowType().getFieldCount(); + + // FIXME: Use the new cost model + return this + .getCluster() + .getPlanner() + .getCostFactory() + .makeCost(scanSize.getRecordCount() * columnCount, scanCost.getCpu(), + scanCost.getNetwork() * scanCost.getDisk()); + } + + public GroupScan getGroupScan() { + return groupScan; + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillScanPrel.java new file mode 100644 index 000000000..b026c22a2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillScanPrel.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.physical; + +import org.apache.drill.exec.physical.base.GroupScan; + +public interface DrillScanPrel extends Prel{ + + public GroupScan getGroupScan(); + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java index c8412ab16..53804c77e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java @@ -23,6 +23,7 @@ import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.FunctionCall; import org.apache.drill.common.expression.LogicalExpression; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.data.Order.Ordering; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.SelectionVectorRemover; @@ -83,7 +84,21 @@ public class PrelUtil { } return new SelectionVectorRemover(child); } - - - + + public static List<SchemaPath> getColumns(RelDataType rowType) { + final List<String> fields = rowType.getFieldNames(); + + if (fields.isEmpty()) return null; + + List<SchemaPath> columns = Lists.newArrayList(); + + for (String field : fields) { + if (field.startsWith("*")) + continue; + + columns.add(SchemaPath.getSimplePath(field)); + + } + return columns; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java index b22e862cc..60975a1c0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java @@ -24,65 +24,80 @@ import org.apache.drill.exec.physical.OperatorCost; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.Size; -import org.apache.drill.exec.planner.common.DrillScanRelBase; +import org.eigenbase.rel.AbstractRelNode; import org.eigenbase.rel.RelNode; import org.eigenbase.rel.RelWriter; import org.eigenbase.relopt.RelOptCluster; import org.eigenbase.relopt.RelOptCost; import org.eigenbase.relopt.RelOptPlanner; -import org.eigenbase.relopt.RelOptTable; import org.eigenbase.relopt.RelTraitSet; +import org.eigenbase.reltype.RelDataType; -public class ScanPrel extends DrillScanRelBase implements Prel{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanPrel.class); +public class ScanPrel extends AbstractRelNode implements DrillScanPrel { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory + .getLogger(ScanPrel.class); - protected final GroupScan scan; + protected final GroupScan groupScan; + private final RelDataType rowType; - private ScanPrel(RelOptCluster cluster, RelTraitSet traits, RelOptTable tbl, GroupScan scan) { - super(DRILL_PHYSICAL, cluster, traits, tbl); - this.scan = scan; + public ScanPrel(RelOptCluster cluster, RelTraitSet traits, + GroupScan groupScan, RelDataType rowType) { + super(cluster, traits); + this.groupScan = groupScan; + this.rowType = rowType; } @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { - return new ScanPrel(this.getCluster(), traitSet, this.getTable(), this.scan); + return new ScanPrel(this.getCluster(), traitSet, this.groupScan, + this.rowType); } - @Override protected Object clone() throws CloneNotSupportedException { - return new ScanPrel(this.getCluster(), this.getTraitSet(), this.getTable(), this.scan); + return new ScanPrel(this.getCluster(), this.getTraitSet(), this.groupScan, + this.rowType); } - @Override - public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { - return scan; + public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) + throws IOException { + return groupScan; } @Override public RelOptCost computeSelfCost(RelOptPlanner planner) { - OperatorCost scanCost = this.scan.getCost(); - Size scanSize = this.scan.getSize(); + OperatorCost scanCost = this.groupScan.getCost(); + Size scanSize = this.groupScan.getSize(); + int columnCount = this.getRowType().getFieldCount(); + // FIXME: Use the new cost model - return this.getCluster().getPlanner().getCostFactory() - .makeCost(scanSize.getRecordCount(), - scanCost.getCpu(), - scanCost.getNetwork() * scanCost.getDisk()); + return this + .getCluster() + .getPlanner() + .getCostFactory() + .makeCost(scanSize.getRecordCount() * columnCount, scanCost.getCpu(), + scanCost.getNetwork() * scanCost.getDisk()); } + @Override public GroupScan getGroupScan() { - return scan; + return groupScan; } - public static ScanPrel create(DrillScanRelBase old, RelTraitSet traitSets, GroupScan scan) { - return new ScanPrel(old.getCluster(), traitSets, old.getTable(), scan); + public static ScanPrel create(RelNode old, RelTraitSet traitSets, + GroupScan scan, RelDataType rowType) { + return new ScanPrel(old.getCluster(), traitSets, scan, rowType); } - @Override public RelWriter explainTerms(RelWriter pw) { - return super.explainTerms(pw).item("groupscan", scan.getDigest()); + return super.explainTerms(pw).item("groupscan", groupScan.getDigest()); + } + + @Override + public RelDataType deriveRowType() { + return this.rowType; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java index 99ec1f590..201a358c1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java @@ -17,12 +17,11 @@ */ package org.apache.drill.exec.planner.physical; -import java.io.IOException; +import java.util.List; -import org.apache.drill.common.JSONOptions; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.base.GroupScan; -import org.apache.drill.exec.planner.common.DrillScanRelBase; -import org.apache.drill.exec.planner.logical.DrillTable; +import org.apache.drill.exec.planner.logical.DrillScanRel; import org.apache.drill.exec.planner.logical.RelOptHelper; import org.eigenbase.relopt.RelOptRule; import org.eigenbase.relopt.RelOptRuleCall; @@ -31,27 +30,25 @@ import org.eigenbase.relopt.RelTraitSet; public class ScanPrule extends RelOptRule{ public static final RelOptRule INSTANCE = new ScanPrule(); - public ScanPrule() { - super(RelOptHelper.any(DrillScanRelBase.class), "Prel.ScanPrule"); - + super(RelOptHelper.any(DrillScanRel.class), "Prel.ScanPrule"); + } @Override public void onMatch(RelOptRuleCall call) { - try{ - final DrillScanRelBase scan = (DrillScanRelBase) call.rel(0); - final DrillTable table = scan.getTable().unwrap(DrillTable.class); - - final GroupScan groupScan = table.getPlugin().getPhysicalScan(new JSONOptions(table.getSelection())); - final DrillDistributionTrait partition = groupScan.getMaxParallelizationWidth() > 1 ? DrillDistributionTrait.RANDOM_DISTRIBUTED : DrillDistributionTrait.SINGLETON; - final RelTraitSet traits = scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(partition); - - final DrillScanRelBase newScan = ScanPrel.create(scan, traits, groupScan); - call.transformTo(newScan); - }catch(IOException e){ - throw new RuntimeException("Failure getting group scan.", e); - } + final DrillScanRel scan = (DrillScanRel) call.rel(0); + + List<SchemaPath> columns = PrelUtil.getColumns(scan.getRowType()); + + GroupScan groupScan = scan.getGroupScan().clone(columns); + + DrillDistributionTrait partition = groupScan.getMaxParallelizationWidth() > 1 ? DrillDistributionTrait.RANDOM_DISTRIBUTED : DrillDistributionTrait.SINGLETON; + + final RelTraitSet traits = scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(partition); + + final DrillScanPrel newScan = ScanPrel.create(scan, traits, groupScan, scan.getRowType()); + + call.transformTo(newScan); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java index e46e3f82a..296f40049 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java @@ -44,7 +44,7 @@ public class ExplainHandler extends DefaultSqlHandler{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExplainHandler.class); private ResultMode mode; - + private SqlExplainLevel level = SqlExplainLevel.ALL_ATTRIBUTES; public ExplainHandler(Planner planner, QueryContext context) { super(planner, context); } @@ -73,7 +73,7 @@ public class ExplainHandler extends DefaultSqlHandler{ SqlExplain node = unwrap(sqlNode, SqlExplain.class); SqlLiteral op = node.operand(2); SqlExplain.Depth depth = (SqlExplain.Depth) op.getValue(); - + if(node.getDetailLevel() != null) level = node.getDetailLevel(); switch(depth){ case LOGICAL: mode = ResultMode.LOGICAL; @@ -94,7 +94,7 @@ public class ExplainHandler extends DefaultSqlHandler{ public String json; public LogicalExplain(RelNode node){ - this.text = RelOptUtil.toString(node, SqlExplainLevel.DIGEST_ATTRIBUTES); + this.text = RelOptUtil.toString(node, level); DrillImplementor implementor = new DrillImplementor(new DrillParseContext(), ResultMode.LOGICAL); implementor.go( (DrillRel) node); LogicalPlan plan = implementor.getPlan(); @@ -107,7 +107,7 @@ public class ExplainHandler extends DefaultSqlHandler{ public String json; public PhysicalExplain(RelNode node, PhysicalPlan plan){ - this.text = RelOptUtil.toString(node, SqlExplainLevel.ALL_ATTRIBUTES); + this.text = RelOptUtil.toString(node, level); this.json = plan.unparse(context.getConfig().getMapper().writer()); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java index 2a9ae3905..1b64257f2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java @@ -18,9 +18,11 @@ package org.apache.drill.exec.store; import java.io.IOException; +import java.util.List; import java.util.Set; import org.apache.drill.common.JSONOptions; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.base.AbstractGroupScan; import com.google.common.collect.ImmutableSet; @@ -48,9 +50,12 @@ public abstract class AbstractStoragePlugin implements StoragePlugin{ @Override public AbstractGroupScan getPhysicalScan(JSONOptions selection) throws IOException { - throw new UnsupportedOperationException(); + return getPhysicalScan(selection, null); } - - + @Override + public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException { + throw new UnsupportedOperationException(); + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java index 9f528bb6f..653d69db0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java @@ -18,16 +18,15 @@ package org.apache.drill.exec.store; import java.io.IOException; +import java.util.List; import java.util.Set; -import net.hydromatic.optiq.SchemaPlus; - import org.apache.drill.common.JSONOptions; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.physical.base.AbstractGroupScan; -import org.apache.drill.exec.rpc.user.DrillUser; -public interface StoragePlugin extends SchemaFactory{ +public interface StoragePlugin extends SchemaFactory { public boolean supportsRead(); public boolean supportsWrite(); @@ -36,13 +35,27 @@ public interface StoragePlugin extends SchemaFactory{ /** * Get the physical scan operator for the particular GroupScan (read) node. - * - * @param scan - * The configured scan with a storage engine specific selection. + * + * @param selection + * The configured storage engine specific selection. + * @return + * @throws IOException + */ + public AbstractGroupScan getPhysicalScan(JSONOptions selection) + throws IOException; + + /** + * Get the physical scan operator for the particular GroupScan (read) node. + * + * @param selection + * The configured storage engine specific selection. + * @param columns + * (optional) The list of column names to scan from the data source. * @return * @throws IOException */ - public AbstractGroupScan getPhysicalScan(JSONOptions selection) throws IOException; + public AbstractGroupScan getPhysicalScan(JSONOptions selection, + List<SchemaPath> columns) throws IOException; public StoragePluginConfig getConfig(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java index 97427f6fd..dd0cef4cb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java @@ -27,6 +27,7 @@ import net.hydromatic.optiq.SchemaPlus; import org.apache.drill.common.JSONOptions; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.ops.QueryContext; @@ -112,6 +113,11 @@ public class FileSystemPlugin extends AbstractStoragePlugin{ @Override public AbstractGroupScan getPhysicalScan(JSONOptions selection) throws IOException { + return this.getPhysicalScan(selection, null); + } + + @Override + public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException { FormatSelection formatSelection = selection.getWith(context.getConfig(), FormatSelection.class); FormatPlugin plugin; if(formatSelection.getFormat() instanceof NamedFormatPluginConfig){ @@ -120,7 +126,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin{ plugin = formatPluginsByConfig.get(formatSelection.getFormat()); } if(plugin == null) throw new IOException(String.format("Failure getting requested format plugin named '%s'. It was not one of the format plugins registered.", formatSelection.getFormat())); - return plugin.getGroupScan(formatSelection.getSelection()); + return plugin.getGroupScan(formatSelection.getSelection(), columns); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java index 2999d9d7b..788e7ac6e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java @@ -18,8 +18,10 @@ package org.apache.drill.exec.store.dfs; import java.io.IOException; +import java.util.List; import java.util.Set; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.physical.base.AbstractGroupScan; @@ -37,11 +39,13 @@ public interface FormatPlugin { public boolean supportsWrite(); public FormatMatcher getMatcher(); - + public AbstractGroupScan getGroupScan(FileSelection selection) throws IOException; public Set<StoragePluginOptimizerRule> getOptimizerRules(); + public AbstractGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException; + public FormatPluginConfig getConfig(); public StoragePluginConfig getStorageConfig(); public DrillFileSystem getFileSystem(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java index c5a5294a2..7ae10f814 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java @@ -162,6 +162,11 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements } @Override + public AbstractGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException { + return new EasyGroupScan(selection, this, columns, selection.selectionRoot); + } + + @Override public FormatPluginConfig getConfig() { return formatConfig; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java index a4bd1c64d..9b352041a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java @@ -28,6 +28,7 @@ import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.physical.EndpointAffinity; import org.apache.drill.exec.physical.OperatorCost; import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.Size; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; @@ -55,7 +56,7 @@ public class EasyGroupScan extends AbstractGroupScan{ private final FileSelection selection; private final EasyFormatPlugin<?> formatPlugin; private final int maxWidth; - private final List<SchemaPath> columns; + private List<SchemaPath> columns; private ListMultimap<Integer, CompleteFileWork> mappings; private List<CompleteFileWork> chunks; @@ -109,6 +110,17 @@ public class EasyGroupScan extends AbstractGroupScan{ this.selectionRoot = selectionRoot; } + private EasyGroupScan(EasyGroupScan that) { + this.chunks = that.chunks; + this.columns = that.columns; + this.endpointAffinities = that.endpointAffinities; + this.formatPlugin = that.formatPlugin; + this.mappings = that.mappings; + this.maxWidth = that.maxWidth; + this.selection = that.selection; + this.selectionRoot = that.selectionRoot; + } + public String getSelectionRoot() { return selectionRoot; } @@ -206,4 +218,11 @@ public class EasyGroupScan extends AbstractGroupScan{ return toString(); } + @Override + public GroupScan clone(List<SchemaPath> columns) { + EasyGroupScan newScan = new EasyGroupScan(this); + newScan.columns = columns; + return newScan; + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java index 850f2488d..0dbed8935 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java @@ -20,15 +20,19 @@ package org.apache.drill.exec.store.easy.text; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; + import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.RecordReader; +import org.apache.drill.exec.store.dfs.FileSelection; import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin; +import org.apache.drill.exec.store.dfs.easy.EasyGroupScan; import org.apache.drill.exec.store.dfs.easy.FileWork; import org.apache.drill.exec.store.dfs.shim.DrillFileSystem; import org.apache.drill.exec.store.text.DrillTextRecordReader; @@ -38,6 +42,7 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapred.FileSplit; +import java.io.IOException; import java.util.ArrayList; import java.util.List; @@ -61,6 +66,11 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm return new DrillTextRecordReader(split, context, ((TextFormatConfig) formatConfig).getDelimiter().charAt(0), columns); } + @Override + public AbstractGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException { + return new EasyGroupScan(selection, this, null, selection.selectionRoot); //TODO : textformat supports project? + } + @JsonTypeName("text") public static class TextFormatConfig implements FormatPluginConfig { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java index cff075c6b..bcac72eba 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java @@ -21,7 +21,7 @@ import com.google.common.collect.Lists; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.ExpressionPosition; -import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; @@ -65,7 +65,7 @@ public class HiveRecordReader implements RecordReader { protected Partition partition; protected InputSplit inputSplit; protected FragmentContext context; - protected List<FieldReference> columns; + protected List<SchemaPath> columns; protected List<String> columnNames; protected List<String> partitionNames = Lists.newArrayList(); protected List<String> selectedPartitionNames = Lists.newArrayList(); @@ -84,7 +84,7 @@ public class HiveRecordReader implements RecordReader { protected static final int TARGET_RECORD_COUNT = 4000; - public HiveRecordReader(Table table, Partition partition, InputSplit inputSplit, List<FieldReference> columns, FragmentContext context) throws ExecutionSetupException { + public HiveRecordReader(Table table, Partition partition, InputSplit inputSplit, List<SchemaPath> columns, FragmentContext context) throws ExecutionSetupException { this.table = table; this.partition = partition; this.inputSplit = inputSplit; @@ -138,8 +138,8 @@ public class HiveRecordReader implements RecordReader { tableColumns = sTypeInfo.getAllStructFieldNames(); List<Integer> columnIds = Lists.newArrayList(); columnNames = Lists.newArrayList(); - for (FieldReference field : columns) { - String columnName = field.getRootSegment().getPath(); + for (SchemaPath field : columns) { + String columnName = field.getRootSegment().getPath(); //TODO? if (!tableColumns.contains(columnName)) { if (partition != null && partitionNames.contains(columnName)) { selectedPartitionNames.add(columnName); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java index 7c46d155c..29729280a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java @@ -17,19 +17,22 @@ */ package org.apache.drill.exec.store.hive; -import com.fasterxml.jackson.annotation.*; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.io.ByteArrayDataOutput; -import com.google.common.io.ByteStreams; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; import org.apache.commons.codec.binary.Base64; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.EndpointAffinity; import org.apache.drill.exec.physical.OperatorCost; import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.Size; import org.apache.drill.exec.physical.base.SubScan; @@ -45,8 +48,15 @@ import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; -import java.io.IOException; -import java.util.*; +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; @JsonTypeName("hive-scan") public class HiveScan extends AbstractGroupScan { @@ -55,7 +65,7 @@ public class HiveScan extends AbstractGroupScan { @JsonProperty("hive-table") public HiveReadEntry hiveReadEntry; @JsonIgnore - private Table table; + private final Table table; @JsonIgnore private List<InputSplit> inputSplits = Lists.newArrayList(); @JsonIgnore @@ -66,10 +76,10 @@ public class HiveScan extends AbstractGroupScan { @JsonIgnore public List<Partition> partitions; @JsonIgnore - private Collection<DrillbitEndpoint> endpoints; + private final Collection<DrillbitEndpoint> endpoints; @JsonProperty("columns") - public List<FieldReference> columns; + public List<SchemaPath> columns; @JsonIgnore List<List<InputSplit>> mappings; @@ -79,8 +89,8 @@ public class HiveScan extends AbstractGroupScan { @JsonCreator public HiveScan(@JsonProperty("hive-table") HiveReadEntry hiveReadEntry, @JsonProperty("storage-plugin") String storagePluginName, - @JsonProperty("columns") List<FieldReference> columns, - @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException { + @JsonProperty("columns") List<SchemaPath> columns, + @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException { this.hiveReadEntry = hiveReadEntry; this.table = hiveReadEntry.getTable(); this.storagePluginName = storagePluginName; @@ -91,7 +101,7 @@ public class HiveScan extends AbstractGroupScan { endpoints = storagePlugin.getContext().getBits(); } - public HiveScan(HiveReadEntry hiveReadEntry, HiveStoragePlugin storagePlugin, List<FieldReference> columns) throws ExecutionSetupException { + public HiveScan(HiveReadEntry hiveReadEntry, HiveStoragePlugin storagePlugin, List<SchemaPath> columns) throws ExecutionSetupException { this.table = hiveReadEntry.getTable(); this.hiveReadEntry = hiveReadEntry; this.columns = columns; @@ -101,7 +111,20 @@ public class HiveScan extends AbstractGroupScan { this.storagePluginName = storagePlugin.getName(); } - public List<FieldReference> getColumns() { + private HiveScan(HiveScan that) { + this.columns = that.columns; + this.endpoints = that.endpoints; + this.hiveReadEntry = that.hiveReadEntry; + this.inputSplits = that.inputSplits; + this.mappings = that.mappings; + this.partitionMap = that.partitionMap; + this.partitions = that.partitions; + this.storagePlugin = that.storagePlugin; + this.storagePluginName = that.storagePluginName; + this.table = that.table; + } + + public List<SchemaPath> getColumns() { return columns; } @@ -253,9 +276,16 @@ public class HiveScan extends AbstractGroupScan { @Override public String toString() { - return "HiveScan [table=" + table + return "HiveScan [table=" + table + ", inputSplits=" + inputSplits + ", columns=" + columns + "]"; } + @Override + public GroupScan clone(List<SchemaPath> columns) { + HiveScan newScan = new HiveScan(this); + newScan.columns = columns; + return newScan; + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java index e6df66901..0a70b2082 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.store.hive; import java.io.IOException; +import java.util.List; import net.hydromatic.optiq.Schema; import net.hydromatic.optiq.SchemaPlus; @@ -25,6 +26,7 @@ import net.hydromatic.optiq.SchemaPlus; import org.apache.drill.common.JSONOptions; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.rpc.user.DrillUser; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.AbstractStoragePlugin; import org.apache.drill.exec.store.hive.schema.HiveSchemaFactory; @@ -62,10 +64,10 @@ public class HiveStoragePlugin extends AbstractStoragePlugin { } @Override - public HiveScan getPhysicalScan(JSONOptions selection) throws IOException { + public HiveScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException { HiveReadEntry hiveReadEntry = selection.getListWith(new ObjectMapper(), new TypeReference<HiveReadEntry>(){}); try { - return new HiveScan(hiveReadEntry, this, null); + return new HiveScan(hiveReadEntry, this, null); } catch (ExecutionSetupException e) { throw new IOException(e); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java index 8ff7c823c..0a020976d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java @@ -26,7 +26,7 @@ import com.google.common.io.ByteArrayDataInput; import com.google.common.io.ByteStreams; import org.apache.commons.codec.binary.Base64; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.OperatorCost; import org.apache.drill.exec.physical.base.*; import org.apache.hadoop.fs.Path; @@ -53,14 +53,14 @@ public class HiveSubScan extends AbstractBase implements SubScan { public List<String> splitClasses; @JsonProperty("columns") - public List<FieldReference> columns; + public List<SchemaPath> columns; @JsonCreator public HiveSubScan(@JsonProperty("hive-table") Table table, @JsonProperty("partition") List<Partition> partitions, @JsonProperty("splits") List<String> encodedSplits, @JsonProperty("splitClasses") List<String> splitClasses, - @JsonProperty("columns") List<FieldReference> columns) throws IOException, ReflectiveOperationException { + @JsonProperty("columns") List<SchemaPath> columns) throws IOException, ReflectiveOperationException { this.table = table; this.partitions = partitions; this.encodedSplits = encodedSplits; @@ -84,7 +84,7 @@ public class HiveSubScan extends AbstractBase implements SubScan { return split; } - public List<FieldReference> getColumns() { + public List<SchemaPath> getColumns() { return columns; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java index 1e47684cd..13719d627 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.store.hive; import com.google.common.collect.Lists; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.vector.*; import org.apache.drill.exec.vector.allocator.VectorAllocator; @@ -41,7 +41,7 @@ public class HiveTextRecordReader extends HiveRecordReader { public final List<Integer> columnIds; private final int numCols; - public HiveTextRecordReader(Table table, Partition partition, InputSplit inputSplit, List<FieldReference> columns, FragmentContext context) throws ExecutionSetupException { + public HiveTextRecordReader(Table table, Partition partition, InputSplit inputSplit, List<SchemaPath> columns, FragmentContext context) throws ExecutionSetupException { super(table, partition, inputSplit, columns, context); String d = table.getSd().getSerdeInfo().getParameters().get("field.delim"); if (d != null) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java index a9261be55..b0d8ca575 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java @@ -22,13 +22,16 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.PhysicalOperatorSetupException; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.EndpointAffinity; import org.apache.drill.exec.physical.OperatorCost; import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.Size; import org.apache.drill.exec.physical.base.SubScan; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.store.dfs.easy.EasyGroupScan; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; @@ -41,11 +44,24 @@ public class InfoSchemaGroupScan extends AbstractGroupScan{ private final SelectedTable table; + private List<SchemaPath> columns; + @JsonCreator - public InfoSchemaGroupScan(@JsonProperty("table") SelectedTable table) { + public InfoSchemaGroupScan(@JsonProperty("table") SelectedTable table, + @JsonProperty("columns") List<SchemaPath> columns) { this.table = table; + this.columns = columns; + } + + private InfoSchemaGroupScan(InfoSchemaGroupScan that) { + this.table = that.table; + this.columns = that.columns; } + public List<SchemaPath> getColumns() { + return columns; + } + @Override public void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException { Preconditions.checkArgument(endpoints.size() == 1); @@ -84,7 +100,13 @@ public class InfoSchemaGroupScan extends AbstractGroupScan{ @Override public String getDigest() { - return this.table.toString(); + return this.table.toString() + "columns=" + columns; } + @Override + public GroupScan clone(List<SchemaPath> columns) { + InfoSchemaGroupScan newScan = new InfoSchemaGroupScan (this); + newScan.columns = columns; + return newScan; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java index 313ea867c..e7e3f377d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java @@ -18,12 +18,14 @@ package org.apache.drill.exec.store.ischema; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Set; import net.hydromatic.optiq.SchemaPlus; import org.apache.drill.common.JSONOptions; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.planner.logical.DrillTable; import org.apache.drill.exec.rpc.user.DrillUser; @@ -53,9 +55,9 @@ public class InfoSchemaStoragePlugin extends AbstractStoragePlugin{ } @Override - public InfoSchemaGroupScan getPhysicalScan(JSONOptions selection) throws IOException { + public InfoSchemaGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException { SelectedTable table = selection.getWith(context.getConfig(), SelectedTable.class); - return new InfoSchemaGroupScan(table); + return new InfoSchemaGroupScan(table, columns); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java index 10f6e08d6..4fe1d1be6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java @@ -19,10 +19,12 @@ package org.apache.drill.exec.store.mock; import java.io.IOException; import java.util.ArrayList; +import java.util.List; import net.hydromatic.optiq.SchemaPlus; import org.apache.drill.common.JSONOptions; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.rpc.user.DrillUser; @@ -43,7 +45,7 @@ public class MockStorageEngine extends AbstractStoragePlugin { } @Override - public AbstractGroupScan getPhysicalScan(JSONOptions selection) throws IOException { + public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException { ArrayList<MockScanEntry> readEntries = selection.getListWith(new ObjectMapper(), new TypeReference<ArrayList<MockScanEntry>>() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java index 4fd05871d..ec6456bb0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java @@ -18,9 +18,11 @@ package org.apache.drill.exec.store.parquet; import java.io.IOException; +import java.util.List; import java.util.Set; import java.util.regex.Pattern; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.store.StoragePluginOptimizerRule; @@ -100,7 +102,12 @@ public class ParquetFormatPlugin implements FormatPlugin{ @Override public ParquetGroupScan getGroupScan(FileSelection selection) throws IOException { - return new ParquetGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot); + return new ParquetGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, null); + } + + @Override + public ParquetGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException { + return new ParquetGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, columns); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index 6c3bd27e6..a8fff8ace 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -33,6 +33,7 @@ import org.apache.drill.exec.metrics.DrillMetrics; import org.apache.drill.exec.physical.EndpointAffinity; import org.apache.drill.exec.physical.OperatorCost; import org.apache.drill.exec.physical.base.AbstractGroupScan; +import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.Size; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; @@ -134,10 +135,11 @@ public class ParquetGroupScan extends AbstractGroupScan { public ParquetGroupScan(List<FileStatus> files, // ParquetFormatPlugin formatPlugin, // - String selectionRoot) // + String selectionRoot, + List<SchemaPath> columns) // throws IOException { this.formatPlugin = formatPlugin; - this.columns = null; + this.columns = columns; this.formatConfig = formatPlugin.getConfig(); this.fs = formatPlugin.getFileSystem().getUnderlying(); @@ -150,6 +152,21 @@ public class ParquetGroupScan extends AbstractGroupScan { readFooter(files); } + + /* + * This is used to clone another copy of the group scan. + */ + private ParquetGroupScan(ParquetGroupScan that){ + this.columns = that.columns; + this.endpointAffinities = that.endpointAffinities; + this.entries = that.entries; + this.formatConfig = that.formatConfig; + this.formatPlugin = that.formatPlugin; + this.fs = that.fs; + this.mappings = that.mappings; + this.rowGroupInfos = that.rowGroupInfos; + this.selectionRoot = that.selectionRoot; + } private void readFooterFromEntries() throws IOException { List<FileStatus> files = Lists.newArrayList(); @@ -348,4 +365,10 @@ public class ParquetGroupScan extends AbstractGroupScan { + ", columns=" + columns + "]"; } + @Override + public GroupScan clone(List<SchemaPath> columns) { + ParquetGroupScan newScan = new ParquetGroupScan(this); + newScan.columns = columns; + return newScan; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java index 17d2adb5e..ef05b4c81 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java @@ -17,8 +17,9 @@ */ package org.apache.drill.exec.store.text; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.List; + import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.FieldReference; @@ -36,19 +37,24 @@ import org.apache.drill.exec.vector.RepeatedVarCharVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.*; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.TextInputFormat; -import java.io.IOException; -import java.util.List; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; public class DrillTextRecordReader implements RecordReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTextRecordReader.class); + static final String COL_NAME = "columns"; + private org.apache.hadoop.mapred.RecordReader<LongWritable, Text> reader; private List<ValueVector> vectors = Lists.newArrayList(); private byte delimiter; private int targetRecordCount; - private FieldReference ref = new FieldReference("columns"); + private FieldReference ref = new FieldReference(COL_NAME); private FragmentContext context; private RepeatedVarCharVector vector; private List<Integer> columnIds = Lists.newArrayList(); @@ -63,9 +69,13 @@ public class DrillTextRecordReader implements RecordReader { if(columns != null) { for (SchemaPath path : columns) { assert path.getRootSegment().isNamed(); - Preconditions.checkArgument(path.getRootSegment().getChild().isArray(),"Selected column must be an array index"); - int index = path.getRootSegment().getChild().getArraySegment().getIndex(); - columnIds.add(index); + Preconditions.checkArgument(path.getRootSegment().getPath().equals(COL_NAME), "Selected column must have name 'columns'"); + // FIXME: need re-work for text column push-down. + if (path.getRootSegment().getChild() != null) { + Preconditions.checkArgument(path.getRootSegment().getChild().isArray(),"Selected column must be an array index"); + int index = path.getRootSegment().getChild().getArraySegment().getIndex(); + columnIds.add(index); + } } } targetRecordCount = context.getConfig().getInt(ExecConstants.TEXT_LINE_READER_BATCH_SIZE); @@ -161,7 +171,7 @@ public class DrillTextRecordReader implements RecordReader { @Override public void cleanup() { try { - reader.close(); + reader.close(); } catch (IOException e) { logger.warn("Exception closing reader: {}", e); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java new file mode 100644 index 000000000..079857726 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill; + +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.util.TestTools; +import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.rpc.user.QueryResultBatch; +import org.apache.drill.exec.vector.NullableVarCharVector; +import org.apache.drill.exec.vector.ValueVector; +import org.eigenbase.sql.SqlExplain.Depth; +import org.eigenbase.sql.SqlExplainLevel; + +public class PlanTestBase extends BaseTestQuery { + + protected static final String OPTIQ_FORMAT = "text"; + protected static final String JSON_FORMAT = "json"; + + /** + * This method will take a SQL string statement, get the PHYSICAL plan in json + * format. Then check the physical plan against the list expected substrs. + * Verify all the expected strings are contained in the physical plan string. + */ + public void testPhysicalPlan(String sql, String... expectedSubstrs) + throws Exception { + sql = "EXPLAIN PLAN for " + sql.replace("[WORKING_PATH]", TestTools.getWorkingPath()); + + String planStr = getPlanInString(sql, JSON_FORMAT); + + for (String colNames : expectedSubstrs) { + assertTrue(planStr.contains(colNames)); + } + } + + /** + * This method will take a SQL string statement, get the PHYSICAL plan in + * Optiq RelNode format. Then check the physical plan against the list + * expected substrs. Verify all the expected strings are contained in the + * physical plan string. + */ + public void testRelPhysicalPlanLevDigest(String sql, String... expectedSubstrs) + throws Exception { + String planStr = getDrillRelPlanInString(sql, SqlExplainLevel.DIGEST_ATTRIBUTES, Depth.PHYSICAL); + + for (String substr : expectedSubstrs) { + assertTrue(planStr.contains(substr)); + } + + } + + /** + * This method will take a SQL string statement, get the LOGICAL plan in Optiq + * RelNode format. Then check the physical plan against the list expected + * substrs. Verify all the expected strings are contained in the physical plan + * string. + */ + public void testRelLogicalPlanLevDigest(String sql, String... expectedSubstrs) + throws Exception { + String planStr = getDrillRelPlanInString(sql, + SqlExplainLevel.DIGEST_ATTRIBUTES, Depth.LOGICAL); + + for (String substr : expectedSubstrs) { + assertTrue(planStr.contains(substr)); + } + } + + /** + * This method will take a SQL string statement, get the PHYSICAL plan in + * Optiq RelNode format. Then check the physical plan against the list + * expected substrs. Verify all the expected strings are contained in the + * physical plan string. + */ + public void testRelPhysicalPlanLevExplain(String sql, String... expectedSubstrs) throws Exception { + String planStr = getDrillRelPlanInString(sql, SqlExplainLevel.EXPPLAN_ATTRIBUTES, Depth.PHYSICAL); + + for (String substr : expectedSubstrs) { + assertTrue(planStr.contains(substr)); + } + } + + /** + * This method will take a SQL string statement, get the LOGICAL plan in Optiq + * RelNode format. Then check the physical plan against the list expected + * substrs. Verify all the expected strings are contained in the physical plan + * string. + */ + public void testRelLogicalPlanLevExplain(String sql, String... expectedSubstrs) throws Exception { + String planStr = getDrillRelPlanInString(sql, SqlExplainLevel.EXPPLAN_ATTRIBUTES, Depth.LOGICAL); + + for (String substr : expectedSubstrs) { + assertTrue(planStr.contains(substr)); + } + } + + + /* + * This will get the plan (either logical or physical) in Optiq RelNode + * format, based on SqlExplainLevel and Depth. + */ + private String getDrillRelPlanInString(String sql, SqlExplainLevel level, + Depth depth) throws Exception { + String levelStr, depthStr; + switch (level) { + case NO_ATTRIBUTES: + levelStr = "EXCLUDING ATTRIBUTES"; + break; + case EXPPLAN_ATTRIBUTES: + levelStr = "INCLUDING ATTRIBUTES"; + break; + case ALL_ATTRIBUTES: + levelStr = "INCLUDING ALL ATTRIBUTES"; + break; + default: + throw new UnsupportedOperationException(); + } + + switch (depth) { + case TYPE: + depthStr = "WITH TYPE"; + break; + case LOGICAL: + depthStr = "WITHOUT IMPLEMENTATION"; + break; + case PHYSICAL: + depthStr = "WITH IMPLEMENTATION"; + break; + default: + throw new UnsupportedOperationException(); + } + + sql = "EXPLAIN PLAN " + levelStr + " " + depthStr + " for " + + sql.replace("[WORKING_PATH]", TestTools.getWorkingPath()); + + return getPlanInString(sql, OPTIQ_FORMAT); + } + + /* + * This will submit an "EXPLAIN" statement, and return the column value which + * contains the plan's string. + */ + private String getPlanInString(String sql, String columnName) + throws Exception { + List<QueryResultBatch> results = testSqlWithResults(sql); + + RecordBatchLoader loader = new RecordBatchLoader(bit.getContext() + .getAllocator()); + StringBuilder builder = new StringBuilder(); + + for (QueryResultBatch b : results) { + if (!b.hasData()) + continue; + + loader.load(b.getHeader().getDef(), b.getData()); + + VectorWrapper<?> vw = loader.getValueAccessorById(loader + .getValueVectorId(SchemaPath.getSimplePath(columnName)).getFieldId(), + NullableVarCharVector.class); + + System.out.println(vw.getValueVector().getField().toExpr()); + ValueVector vv = vw.getValueVector(); + for (int i = 0; i < vv.getAccessor().getValueCount(); i++) { + Object o = vv.getAccessor().getObject(i); + builder.append(o); + System.out.println(vv.getAccessor().getObject(i)); + } + } + + return builder.toString(); + } + +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java new file mode 100644 index 000000000..675cddb98 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill; + +import org.junit.Test; + +// Test the optimizer plan in terms of project pushdown. +// When a query refers to a subset of columns in a table, optimizer should push the list +// of refereed columns to the SCAN operator, so that SCAN operator would only retrieve +// the column values in the subset of columns. + +public class TestProjectPushDown extends PlanTestBase { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory + .getLogger(TestProjectPushDown.class); + + @Test + public void testGroupBy() throws Exception { + String expectedColNames = " \"columns\" : [ \"`marital_status`\" ]"; + testPhysicalPlan( + "select marital_status, COUNT(1) as cnt from cp.`employee.json` group by marital_status", + expectedColNames); + } + + @Test + public void testOrderBy() throws Exception { + String expectedColNames = "\"columns\" : [ \"`employee_id`\", \"`full_name`\", \"`first_name`\", \"`last_name`\" ]"; + testPhysicalPlan("select employee_id , full_name, first_name , last_name " + + "from cp.`employee.json` order by first_name, last_name", + expectedColNames); + } + + @Test + public void testExprInSelect() throws Exception { + String expectedColNames = "\"columns\" : [ \"`employee_id`\", \"`full_name`\", \"`first_name`\", \"`last_name`\" ]"; + testPhysicalPlan( + "select employee_id + 100, full_name, first_name , last_name " + + "from cp.`employee.json` order by first_name, last_name", + expectedColNames); + } + + @Test + public void testExprInWhere() throws Exception { + String expectedColNames = "\"columns\" : [ \"`employee_id`\", \"`full_name`\", \"`first_name`\", \"`last_name`\" ]"; + testPhysicalPlan( + "select employee_id + 100, full_name, first_name , last_name " + + "from cp.`employee.json` where employee_id + 500 < 1000 ", + expectedColNames); + } + + @Test + public void testJoin() throws Exception { + String expectedColNames1 = "\"columns\" : [ \"`N_REGIONKEY`\", \"`N_NAME`\" ]"; + String expectedColNames2 = "\"columns\" : [ \"`R_REGIONKEY`\", \"`R_NAME`\" ]"; + + testPhysicalPlan("SELECT\n" + " nations.N_NAME,\n" + " regions.R_NAME\n" + + "FROM\n" + + " dfs.`[WORKING_PATH]/../../sample-data/nation.parquet` nations\n" + + "JOIN\n" + + " dfs.`[WORKING_PATH]/../../sample-data/region.parquet` regions\n" + + " on nations.N_REGIONKEY = regions.R_REGIONKEY", expectedColNames1, + expectedColNames2); + } + + @Test + public void testFromInfoSchema() throws Exception { + String expectedColNames = " \"columns\" : [ \"`CATALOG_DESCRIPTION`\" ]"; + testPhysicalPlan( + "select count(CATALOG_DESCRIPTION) from INFORMATION_SCHEMA.CATALOGS", + expectedColNames); + } + + @Test + public void testTPCH1() throws Exception { + String expectedColNames = " \"columns\" : [ \"`l_returnflag`\", \"`l_linestatus`\", \"`l_shipdate`\", \"`l_quantity`\", \"`l_extendedprice`\", \"`l_discount`\", \"`l_tax`\" ]"; + testPhysicalPlanFromFile("queries/tpch/01.sql", expectedColNames); + } + + @Test + public void testTPCH3() throws Exception { + String expectedColNames1 = "\"columns\" : [ \"`c_mktsegment`\", \"`c_custkey`\" ]"; + String expectedColNames2 = " \"columns\" : [ \"`o_orderdate`\", \"`o_shippriority`\", \"`o_custkey`\", \"`o_orderkey`\" "; + String expectedColNames3 = "\"columns\" : [ \"`l_orderkey`\", \"`l_shipdate`\", \"`l_extendedprice`\", \"`l_discount`\" ]"; + testPhysicalPlanFromFile("queries/tpch/03.sql", expectedColNames1, expectedColNames2, expectedColNames3); + } + + private void testPhysicalPlanFromFile(String fileName, String... expectedSubstrs) + throws Exception { + String query = getFile(fileName); + String[] queries = query.split(";"); + for (String q : queries) { + if (q.trim().isEmpty()) + continue; + testPhysicalPlan(q, expectedSubstrs); + } + } + +} diff --git a/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java b/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java index 148c115bb..6952c5636 100644 --- a/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java +++ b/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java @@ -44,7 +44,7 @@ public class TestJdbcQuery extends JdbcTest{ // Set a timeout unless we're debugging. - @Rule public TestRule TIMEOUT = TestTools.getTimeoutRule(200000000); + @Rule public TestRule TIMEOUT = TestTools.getTimeoutRule(20000); private static final String WORKING_PATH; static{ @@ -78,11 +78,11 @@ public class TestJdbcQuery extends JdbcTest{ @Test public void testInfoSchema() throws Exception{ -// testQuery("select * from INFORMATION_SCHEMA.SCHEMATA"); + testQuery("select * from INFORMATION_SCHEMA.SCHEMATA"); testQuery("select * from INFORMATION_SCHEMA.CATALOGS"); -// testQuery("select * from INFORMATION_SCHEMA.VIEWS"); + testQuery("select * from INFORMATION_SCHEMA.VIEWS"); // testQuery("select * from INFORMATION_SCHEMA.TABLES"); -// testQuery("select * from INFORMATION_SCHEMA.COLUMNS"); + testQuery("select * from INFORMATION_SCHEMA.COLUMNS"); } @Test |