aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java18
-rw-r--r--contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java2
-rw-r--r--contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java138
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java64
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillScanPrel.java26
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java21
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java65
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java39
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java29
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java21
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java62
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java26
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java27
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java30
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java192
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java113
-rw-r--r--sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java8
36 files changed, 879 insertions, 130 deletions
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
index 311e5797e..21021d396 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseGroupScan.java
@@ -33,10 +33,12 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.store.StoragePluginRegistry;
+import org.apache.drill.exec.store.dfs.easy.EasyGroupScan;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.HTable;
@@ -107,6 +109,16 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
getRegionInfos();
}
+ private HBaseGroupScan(HBaseGroupScan that) {
+ this.columns = that.columns;
+ this.endpointAffinities = that.endpointAffinities;
+ this.hbaseScanSpec = that.hbaseScanSpec;
+ this.mappings = that.mappings;
+ this.regionsToScan = that.regionsToScan;
+ this.storagePlugin = that.storagePlugin;
+ this.storagePluginConfig = that.storagePluginConfig;
+ }
+
private void getRegionInfos() {
logger.debug("Getting region locations");
try {
@@ -244,4 +256,10 @@ public class HBaseGroupScan extends AbstractGroupScan implements DrillHBaseConst
+ columns + "]";
}
+ @Override
+ public GroupScan clone(List<SchemaPath> columns) {
+ HBaseGroupScan newScan = new HBaseGroupScan(this);
+ newScan.columns = columns;
+ return newScan;
+ }
}
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
index 137f6fe3a..50b28135d 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBasePushFilterIntoScan.java
@@ -49,7 +49,7 @@ public class HBasePushFilterIntoScan extends StoragePluginOptimizerRule {
return; //no filter pushdown ==> No transformation.
}
final GroupScan newGroupsScan = new HBaseGroupScan(groupScan.getStoragePlugin(), newScanSpec, groupScan.getColumns());
- final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan);
+ final ScanPrel newScanPrel = ScanPrel.create(scan, filter.getTraitSet(), newGroupsScan, scan.getRowType());
call.transformTo(newScanPrel);
}
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
index 0ecc3796e..c1fb6af08 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseStoragePlugin.java
@@ -19,11 +19,14 @@ package org.apache.drill.exec.store.hbase;
import java.io.IOException;
import java.util.Set;
+import java.util.List;
import net.hydromatic.optiq.SchemaPlus;
import org.apache.drill.common.JSONOptions;
import org.apache.drill.exec.rpc.user.DrillUser;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index 343435b86..69fc44776 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -18,6 +18,9 @@
package org.apache.drill.exec.physical.base;
import java.util.Iterator;
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
import com.google.common.collect.Iterators;
@@ -40,4 +43,8 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca
return physicalVisitor.visitGroupScan(this, value);
}
+ @Override
+ public GroupScan clone(List<SchemaPath> columns) {
+ throw new UnsupportedOperationException(String.format("%s does not implmemnt clone(columns) method!", this.getClass().getCanonicalName()));
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index 3504be79b..32d68b368 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -21,6 +21,7 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -44,4 +45,10 @@ public interface GroupScan extends Scan, HasAffinity{
*/
@JsonIgnore
public abstract String getDigest();
+
+ /**
+ * Returns a clone of Groupscan instance, except that the new GroupScan will use the provided list of columns
+ */
+ @JsonIgnore
+ public GroupScan clone(List<SchemaPath> columns);
} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
index b37035249..88df65847 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillScanRelBase.java
@@ -35,5 +35,4 @@ public abstract class DrillScanRelBase extends TableAccessRelBase implements Dri
this.drillTable = table.unwrap(DrillTable.class);
assert drillTable != null;
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java
index d19b7a4b3..aa75f5236 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectRel.java
@@ -28,7 +28,10 @@ import org.apache.drill.exec.planner.torel.ConversionContext;
import org.eigenbase.rel.InvalidRelException;
import org.eigenbase.rel.ProjectRelBase;
import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.metadata.RelMetadataQuery;
import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptPlanner;
import org.eigenbase.relopt.RelTraitSet;
import org.eigenbase.reltype.RelDataType;
import org.eigenbase.reltype.RelDataTypeField;
@@ -75,5 +78,5 @@ public class DrillProjectRel extends DrillProjectRelBase implements DrillRel {
}
return new DrillProjectRel(context.getCluster(), context.getLogicalTraits(), input, exps, new RelRecordType(fields));
}
-
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
new file mode 100644
index 000000000..98028b802
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjIntoScan.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.planner.logical;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import net.hydromatic.optiq.rules.java.JavaRules.EnumerableTableAccessRel;
+
+import org.eigenbase.rel.ProjectRel;
+import org.eigenbase.rel.ProjectRelBase;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.rules.PushProjector;
+import org.eigenbase.rel.rules.RemoveTrivialProjectRule;
+import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.reltype.RelDataType;
+import org.eigenbase.reltype.RelDataTypeFactory;
+import org.eigenbase.reltype.RelDataTypeField;
+import org.eigenbase.rex.RexInputRef;
+import org.eigenbase.rex.RexNode;
+import org.eigenbase.rex.RexShuttle;
+
+import com.google.hive12.common.collect.Lists;
+
+public class DrillPushProjIntoScan extends RelOptRule {
+ public static final RelOptRule INSTANCE = new DrillPushProjIntoScan();
+
+ private DrillPushProjIntoScan() {
+ super(RelOptHelper.some(ProjectRel.class, RelOptHelper.any(EnumerableTableAccessRel.class)), "DrillPushProjIntoScan");
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ final ProjectRel proj = (ProjectRel) call.rel(0);
+ final EnumerableTableAccessRel scan = (EnumerableTableAccessRel) call.rel(1);
+
+ List<Integer> columnsIds = getRefColumnIds(proj);
+
+ RelDataType newScanRowType = createStructType(scan.getCluster().getTypeFactory(), getProjectedFields(scan.getRowType(),columnsIds));
+
+ final DrillScanRel newScan = new DrillScanRel(scan.getCluster(), scan.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+ scan.getTable(), newScanRowType);
+
+ List<RexNode> convertedExprs = getConvertedProjExp(proj, scan, columnsIds);
+
+ final DrillProjectRel newProj = new DrillProjectRel(proj.getCluster(), proj.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+ newScan, convertedExprs, proj.getRowType());
+
+ if (RemoveTrivialProjectRule.isTrivial(newProj)) {
+ call.transformTo(newScan);
+ } else {
+ call.transformTo(newProj);
+ }
+
+ }
+
+ private List<RexNode> getConvertedProjExp(ProjectRel proj, RelNode child, List<Integer> columnsIds) {
+ PushProjector pushProjector =
+ new PushProjector(
+ proj, null, child, PushProjector.ExprCondition.FALSE);
+ ProjectRel topProject = pushProjector.convertProject(null);
+
+ if (topProject !=null)
+ return topProject.getProjects();
+ else
+ return proj.getProjects();
+ }
+
+ private RelDataType createStructType(
+ RelDataTypeFactory typeFactory,
+ final List<RelDataTypeField> fields
+ ) {
+ final RelDataTypeFactory.FieldInfoBuilder builder =
+ typeFactory.builder();
+ for (RelDataTypeField field : fields) {
+ builder.add(field.getName(), field.getType());
+ }
+ return builder.build();
+ }
+
+
+ private List<Integer> getRefColumnIds(ProjectRelBase proj) {
+ RefFieldsVisitor v = new RefFieldsVisitor();
+
+ for (RexNode exp : proj.getProjects()) {
+ v.apply(exp);
+ }
+ return new ArrayList<Integer>(v.getReferencedFieldIndex());
+ }
+
+ private List<RelDataTypeField> getProjectedFields(RelDataType rowType, List<Integer> columnIds) {
+ List<RelDataTypeField> oldFields = rowType.getFieldList();
+ List<RelDataTypeField> newFields = Lists.newArrayList();
+
+ for (Integer id : columnIds) {
+ newFields.add(oldFields.get(id));
+ }
+
+ return newFields;
+ }
+
+ /** Visitor that finds the set of inputs that are used. */
+ public static class RefFieldsVisitor extends RexShuttle {
+ public final SortedSet<Integer> inputPosReferenced =
+ new TreeSet<Integer>();
+
+ @Override
+ public RexNode visitInputRef(RexInputRef inputRef) {
+ inputPosReferenced.add(inputRef.getIndex());
+ return inputRef;
+ }
+
+ public Set<Integer> getReferencedFieldIndex() {
+ return this.inputPosReferenced;
+ }
+ }
+
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index 199d36acd..3777c66c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -25,7 +25,6 @@ import org.apache.drill.exec.planner.physical.FilterPrule;
import org.apache.drill.exec.planner.physical.LimitPrule;
import org.apache.drill.exec.planner.physical.MergeJoinPrule;
import org.apache.drill.exec.planner.physical.ProjectPrule;
-import org.apache.drill.exec.planner.physical.PushLimitToTopN;
import org.apache.drill.exec.planner.physical.ScanPrule;
import org.apache.drill.exec.planner.physical.ScreenPrule;
import org.apache.drill.exec.planner.physical.SortConvertPrule;
@@ -36,10 +35,13 @@ import org.eigenbase.rel.rules.MergeProjectRule;
import org.eigenbase.rel.rules.PushFilterPastJoinRule;
import org.eigenbase.rel.rules.PushFilterPastProjectRule;
import org.eigenbase.rel.rules.PushJoinThroughJoinRule;
+import org.eigenbase.rel.rules.PushProjectPastFilterRule;
+import org.eigenbase.rel.rules.PushProjectPastJoinRule;
import org.eigenbase.rel.rules.ReduceAggregatesRule;
import org.eigenbase.rel.rules.RemoveDistinctAggregateRule;
import org.eigenbase.rel.rules.RemoveDistinctRule;
import org.eigenbase.rel.rules.RemoveSortRule;
+import org.eigenbase.rel.rules.RemoveTrivialProjectRule;
import org.eigenbase.relopt.RelOptRule;
import org.eigenbase.relopt.volcano.AbstractConverter.ExpandConversionRule;
@@ -63,7 +65,7 @@ public class DrillRuleSets {
// SwapJoinRule.INSTANCE,
RemoveDistinctRule.INSTANCE,
// UnionToDistinctRule.INSTANCE,
-// RemoveTrivialProjectRule.INSTANCE,
+ RemoveTrivialProjectRule.INSTANCE,
// RemoveTrivialCalcRule.INSTANCE,
RemoveSortRule.INSTANCE,
@@ -72,11 +74,15 @@ public class DrillRuleSets {
new MergeProjectRule(true, RelFactories.DEFAULT_PROJECT_FACTORY),
RemoveDistinctAggregateRule.INSTANCE, //
ReduceAggregatesRule.INSTANCE, //
+ PushProjectPastJoinRule.INSTANCE,
+ PushProjectPastFilterRule.INSTANCE,
// SwapJoinRule.INSTANCE, //
// PushJoinThroughJoinRule.RIGHT, //
// PushJoinThroughJoinRule.LEFT, //
// PushSortPastProjectRule.INSTANCE, //
+ DrillPushProjIntoScan.INSTANCE,
+
////////////////////////////////
DrillScanRule.INSTANCE,
DrillFilterRule.INSTANCE,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
index 619e76d0e..2b4f9d7de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
@@ -17,28 +17,52 @@
*/
package org.apache.drill.exec.planner.logical;
+import java.io.IOException;
+
import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.logical.data.LogicalOperator;
import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.exec.physical.OperatorCost;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.planner.common.DrillScanRelBase;
+import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.planner.torel.ConversionContext;
import org.eigenbase.relopt.RelOptCluster;
+import org.eigenbase.relopt.RelOptCost;
+import org.eigenbase.relopt.RelOptPlanner;
import org.eigenbase.relopt.RelOptTable;
import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.reltype.RelDataType;
/**
* GroupScan of a Drill table.
*/
public class DrillScanRel extends DrillScanRelBase implements DrillRel {
- //private final DrillTable drillTable;
+ final private RelDataType rowType;
+ private GroupScan groupScan;
+
+ /** Creates a DrillScan. */
+ public DrillScanRel(RelOptCluster cluster, RelTraitSet traits,
+ RelOptTable table) {
+ this(cluster, traits, table, table.getRowType());
+ }
/** Creates a DrillScan. */
- public DrillScanRel(RelOptCluster cluster, RelTraitSet traits, RelOptTable table) {
+ public DrillScanRel(RelOptCluster cluster, RelTraitSet traits,
+ RelOptTable table, RelDataType rowType) {
super(DRILL_LOGICAL, cluster, traits, table);
- //this.drillTable = table.unwrap(DrillTable.class);
- //assert drillTable != null;
+ this.rowType = rowType;
+ try {
+ this.groupScan = this.drillTable.getGroupScan().clone(
+ PrelUtil.getColumns(rowType));
+ } catch (IOException e) {
+ this.groupScan = null;
+ e.printStackTrace();
+ }
}
+ @Override
public LogicalOperator implement(DrillImplementor implementor) {
Scan.Builder builder = Scan.builder();
builder.storageEngine(drillTable.getStorageEngineName());
@@ -46,8 +70,34 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel {
implementor.registerSource(drillTable);
return builder.build();
}
-
- public static DrillScanRel convert(Scan scan, ConversionContext context){
- return new DrillScanRel(context.getCluster(), context.getLogicalTraits(), context.getTable(scan));
+
+ public static DrillScanRel convert(Scan scan, ConversionContext context) {
+ return new DrillScanRel(context.getCluster(), context.getLogicalTraits(),
+ context.getTable(scan));
+ }
+
+ @Override
+ public RelDataType deriveRowType() {
+ return this.rowType;
}
+
+ @Override
+ public RelOptCost computeSelfCost(RelOptPlanner planner) {
+ OperatorCost scanCost = groupScan.getCost();
+ Size scanSize = groupScan.getSize();
+ int columnCount = this.getRowType().getFieldCount();
+
+ // FIXME: Use the new cost model
+ return this
+ .getCluster()
+ .getPlanner()
+ .getCostFactory()
+ .makeCost(scanSize.getRecordCount() * columnCount, scanCost.getCpu(),
+ scanCost.getNetwork() * scanCost.getDisk());
+ }
+
+ public GroupScan getGroupScan() {
+ return groupScan;
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillScanPrel.java
new file mode 100644
index 000000000..b026c22a2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/DrillScanPrel.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.apache.drill.exec.physical.base.GroupScan;
+
+public interface DrillScanPrel extends Prel{
+
+ public GroupScan getGroupScan();
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
index c8412ab16..53804c77e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java
@@ -23,6 +23,7 @@ import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.FunctionCall;
import org.apache.drill.common.expression.LogicalExpression;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.Order.Ordering;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.SelectionVectorRemover;
@@ -83,7 +84,21 @@ public class PrelUtil {
}
return new SelectionVectorRemover(child);
}
-
-
-
+
+ public static List<SchemaPath> getColumns(RelDataType rowType) {
+ final List<String> fields = rowType.getFieldNames();
+
+ if (fields.isEmpty()) return null;
+
+ List<SchemaPath> columns = Lists.newArrayList();
+
+ for (String field : fields) {
+ if (field.startsWith("*"))
+ continue;
+
+ columns.add(SchemaPath.getSimplePath(field));
+
+ }
+ return columns;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
index b22e862cc..60975a1c0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrel.java
@@ -24,65 +24,80 @@ import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Size;
-import org.apache.drill.exec.planner.common.DrillScanRelBase;
+import org.eigenbase.rel.AbstractRelNode;
import org.eigenbase.rel.RelNode;
import org.eigenbase.rel.RelWriter;
import org.eigenbase.relopt.RelOptCluster;
import org.eigenbase.relopt.RelOptCost;
import org.eigenbase.relopt.RelOptPlanner;
-import org.eigenbase.relopt.RelOptTable;
import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.reltype.RelDataType;
-public class ScanPrel extends DrillScanRelBase implements Prel{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScanPrel.class);
+public class ScanPrel extends AbstractRelNode implements DrillScanPrel {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
+ .getLogger(ScanPrel.class);
- protected final GroupScan scan;
+ protected final GroupScan groupScan;
+ private final RelDataType rowType;
- private ScanPrel(RelOptCluster cluster, RelTraitSet traits, RelOptTable tbl, GroupScan scan) {
- super(DRILL_PHYSICAL, cluster, traits, tbl);
- this.scan = scan;
+ public ScanPrel(RelOptCluster cluster, RelTraitSet traits,
+ GroupScan groupScan, RelDataType rowType) {
+ super(cluster, traits);
+ this.groupScan = groupScan;
+ this.rowType = rowType;
}
@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
- return new ScanPrel(this.getCluster(), traitSet, this.getTable(), this.scan);
+ return new ScanPrel(this.getCluster(), traitSet, this.groupScan,
+ this.rowType);
}
-
@Override
protected Object clone() throws CloneNotSupportedException {
- return new ScanPrel(this.getCluster(), this.getTraitSet(), this.getTable(), this.scan);
+ return new ScanPrel(this.getCluster(), this.getTraitSet(), this.groupScan,
+ this.rowType);
}
-
@Override
- public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
- return scan;
+ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator)
+ throws IOException {
+ return groupScan;
}
@Override
public RelOptCost computeSelfCost(RelOptPlanner planner) {
- OperatorCost scanCost = this.scan.getCost();
- Size scanSize = this.scan.getSize();
+ OperatorCost scanCost = this.groupScan.getCost();
+ Size scanSize = this.groupScan.getSize();
+ int columnCount = this.getRowType().getFieldCount();
+
// FIXME: Use the new cost model
- return this.getCluster().getPlanner().getCostFactory()
- .makeCost(scanSize.getRecordCount(),
- scanCost.getCpu(),
- scanCost.getNetwork() * scanCost.getDisk());
+ return this
+ .getCluster()
+ .getPlanner()
+ .getCostFactory()
+ .makeCost(scanSize.getRecordCount() * columnCount, scanCost.getCpu(),
+ scanCost.getNetwork() * scanCost.getDisk());
}
+ @Override
public GroupScan getGroupScan() {
- return scan;
+ return groupScan;
}
- public static ScanPrel create(DrillScanRelBase old, RelTraitSet traitSets, GroupScan scan) {
- return new ScanPrel(old.getCluster(), traitSets, old.getTable(), scan);
+ public static ScanPrel create(RelNode old, RelTraitSet traitSets,
+ GroupScan scan, RelDataType rowType) {
+ return new ScanPrel(old.getCluster(), traitSets, scan, rowType);
}
-
@Override
public RelWriter explainTerms(RelWriter pw) {
- return super.explainTerms(pw).item("groupscan", scan.getDigest());
+ return super.explainTerms(pw).item("groupscan", groupScan.getDigest());
+ }
+
+ @Override
+ public RelDataType deriveRowType() {
+ return this.rowType;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java
index 99ec1f590..201a358c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ScanPrule.java
@@ -17,12 +17,11 @@
*/
package org.apache.drill.exec.planner.physical;
-import java.io.IOException;
+import java.util.List;
-import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.base.GroupScan;
-import org.apache.drill.exec.planner.common.DrillScanRelBase;
-import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.eigenbase.relopt.RelOptRule;
import org.eigenbase.relopt.RelOptRuleCall;
@@ -31,27 +30,25 @@ import org.eigenbase.relopt.RelTraitSet;
public class ScanPrule extends RelOptRule{
public static final RelOptRule INSTANCE = new ScanPrule();
-
public ScanPrule() {
- super(RelOptHelper.any(DrillScanRelBase.class), "Prel.ScanPrule");
-
+ super(RelOptHelper.any(DrillScanRel.class), "Prel.ScanPrule");
+
}
@Override
public void onMatch(RelOptRuleCall call) {
- try{
- final DrillScanRelBase scan = (DrillScanRelBase) call.rel(0);
- final DrillTable table = scan.getTable().unwrap(DrillTable.class);
-
- final GroupScan groupScan = table.getPlugin().getPhysicalScan(new JSONOptions(table.getSelection()));
- final DrillDistributionTrait partition = groupScan.getMaxParallelizationWidth() > 1 ? DrillDistributionTrait.RANDOM_DISTRIBUTED : DrillDistributionTrait.SINGLETON;
- final RelTraitSet traits = scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(partition);
-
- final DrillScanRelBase newScan = ScanPrel.create(scan, traits, groupScan);
- call.transformTo(newScan);
- }catch(IOException e){
- throw new RuntimeException("Failure getting group scan.", e);
- }
+ final DrillScanRel scan = (DrillScanRel) call.rel(0);
+
+ List<SchemaPath> columns = PrelUtil.getColumns(scan.getRowType());
+
+ GroupScan groupScan = scan.getGroupScan().clone(columns);
+
+ DrillDistributionTrait partition = groupScan.getMaxParallelizationWidth() > 1 ? DrillDistributionTrait.RANDOM_DISTRIBUTED : DrillDistributionTrait.SINGLETON;
+
+ final RelTraitSet traits = scan.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(partition);
+
+ final DrillScanPrel newScan = ScanPrel.create(scan, traits, groupScan, scan.getRowType());
+
+ call.transformTo(newScan);
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
index e46e3f82a..296f40049 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
@@ -44,7 +44,7 @@ public class ExplainHandler extends DefaultSqlHandler{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ExplainHandler.class);
private ResultMode mode;
-
+ private SqlExplainLevel level = SqlExplainLevel.ALL_ATTRIBUTES;
public ExplainHandler(Planner planner, QueryContext context) {
super(planner, context);
}
@@ -73,7 +73,7 @@ public class ExplainHandler extends DefaultSqlHandler{
SqlExplain node = unwrap(sqlNode, SqlExplain.class);
SqlLiteral op = node.operand(2);
SqlExplain.Depth depth = (SqlExplain.Depth) op.getValue();
-
+ if(node.getDetailLevel() != null) level = node.getDetailLevel();
switch(depth){
case LOGICAL:
mode = ResultMode.LOGICAL;
@@ -94,7 +94,7 @@ public class ExplainHandler extends DefaultSqlHandler{
public String json;
public LogicalExplain(RelNode node){
- this.text = RelOptUtil.toString(node, SqlExplainLevel.DIGEST_ATTRIBUTES);
+ this.text = RelOptUtil.toString(node, level);
DrillImplementor implementor = new DrillImplementor(new DrillParseContext(), ResultMode.LOGICAL);
implementor.go( (DrillRel) node);
LogicalPlan plan = implementor.getPlan();
@@ -107,7 +107,7 @@ public class ExplainHandler extends DefaultSqlHandler{
public String json;
public PhysicalExplain(RelNode node, PhysicalPlan plan){
- this.text = RelOptUtil.toString(node, SqlExplainLevel.ALL_ATTRIBUTES);
+ this.text = RelOptUtil.toString(node, level);
this.json = plan.unparse(context.getConfig().getMapper().writer());
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
index 2a9ae3905..1b64257f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
@@ -18,9 +18,11 @@
package org.apache.drill.exec.store;
import java.io.IOException;
+import java.util.List;
import java.util.Set;
import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import com.google.common.collect.ImmutableSet;
@@ -48,9 +50,12 @@ public abstract class AbstractStoragePlugin implements StoragePlugin{
@Override
public AbstractGroupScan getPhysicalScan(JSONOptions selection) throws IOException {
- throw new UnsupportedOperationException();
+ return getPhysicalScan(selection, null);
}
-
-
+ @Override
+ public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
index 9f528bb6f..653d69db0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePlugin.java
@@ -18,16 +18,15 @@
package org.apache.drill.exec.store;
import java.io.IOException;
+import java.util.List;
import java.util.Set;
-import net.hydromatic.optiq.SchemaPlus;
-
import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
-import org.apache.drill.exec.rpc.user.DrillUser;
-public interface StoragePlugin extends SchemaFactory{
+public interface StoragePlugin extends SchemaFactory {
public boolean supportsRead();
public boolean supportsWrite();
@@ -36,13 +35,27 @@ public interface StoragePlugin extends SchemaFactory{
/**
* Get the physical scan operator for the particular GroupScan (read) node.
- *
- * @param scan
- * The configured scan with a storage engine specific selection.
+ *
+ * @param selection
+ * The configured storage engine specific selection.
+ * @return
+ * @throws IOException
+ */
+ public AbstractGroupScan getPhysicalScan(JSONOptions selection)
+ throws IOException;
+
+ /**
+ * Get the physical scan operator for the particular GroupScan (read) node.
+ *
+ * @param selection
+ * The configured storage engine specific selection.
+ * @param columns
+ * (optional) The list of column names to scan from the data source.
* @return
* @throws IOException
*/
- public AbstractGroupScan getPhysicalScan(JSONOptions selection) throws IOException;
+ public AbstractGroupScan getPhysicalScan(JSONOptions selection,
+ List<SchemaPath> columns) throws IOException;
public StoragePluginConfig getConfig();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index 97427f6fd..dd0cef4cb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -27,6 +27,7 @@ import net.hydromatic.optiq.SchemaPlus;
import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.ops.QueryContext;
@@ -112,6 +113,11 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
@Override
public AbstractGroupScan getPhysicalScan(JSONOptions selection) throws IOException {
+ return this.getPhysicalScan(selection, null);
+ }
+
+ @Override
+ public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
FormatSelection formatSelection = selection.getWith(context.getConfig(), FormatSelection.class);
FormatPlugin plugin;
if(formatSelection.getFormat() instanceof NamedFormatPluginConfig){
@@ -120,7 +126,7 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
plugin = formatPluginsByConfig.get(formatSelection.getFormat());
}
if(plugin == null) throw new IOException(String.format("Failure getting requested format plugin named '%s'. It was not one of the format plugins registered.", formatSelection.getFormat()));
- return plugin.getGroupScan(formatSelection.getSelection());
+ return plugin.getGroupScan(formatSelection.getSelection(), columns);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
index 2999d9d7b..788e7ac6e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatPlugin.java
@@ -18,8 +18,10 @@
package org.apache.drill.exec.store.dfs;
import java.io.IOException;
+import java.util.List;
import java.util.Set;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
@@ -37,11 +39,13 @@ public interface FormatPlugin {
public boolean supportsWrite();
public FormatMatcher getMatcher();
-
+
public AbstractGroupScan getGroupScan(FileSelection selection) throws IOException;
public Set<StoragePluginOptimizerRule> getOptimizerRules();
+ public AbstractGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException;
+
public FormatPluginConfig getConfig();
public StoragePluginConfig getStorageConfig();
public DrillFileSystem getFileSystem();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index c5a5294a2..7ae10f814 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -162,6 +162,11 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
}
@Override
+ public AbstractGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException {
+ return new EasyGroupScan(selection, this, columns, selection.selectionRoot);
+ }
+
+ @Override
public FormatPluginConfig getConfig() {
return formatConfig;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index a4bd1c64d..9b352041a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -55,7 +56,7 @@ public class EasyGroupScan extends AbstractGroupScan{
private final FileSelection selection;
private final EasyFormatPlugin<?> formatPlugin;
private final int maxWidth;
- private final List<SchemaPath> columns;
+ private List<SchemaPath> columns;
private ListMultimap<Integer, CompleteFileWork> mappings;
private List<CompleteFileWork> chunks;
@@ -109,6 +110,17 @@ public class EasyGroupScan extends AbstractGroupScan{
this.selectionRoot = selectionRoot;
}
+ private EasyGroupScan(EasyGroupScan that) {
+ this.chunks = that.chunks;
+ this.columns = that.columns;
+ this.endpointAffinities = that.endpointAffinities;
+ this.formatPlugin = that.formatPlugin;
+ this.mappings = that.mappings;
+ this.maxWidth = that.maxWidth;
+ this.selection = that.selection;
+ this.selectionRoot = that.selectionRoot;
+ }
+
public String getSelectionRoot() {
return selectionRoot;
}
@@ -206,4 +218,11 @@ public class EasyGroupScan extends AbstractGroupScan{
return toString();
}
+ @Override
+ public GroupScan clone(List<SchemaPath> columns) {
+ EasyGroupScan newScan = new EasyGroupScan(this);
+ newScan.columns = columns;
+ return newScan;
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 850f2488d..0dbed8935 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -20,15 +20,19 @@ package org.apache.drill.exec.store.easy.text;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
+
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
+import org.apache.drill.exec.store.dfs.easy.EasyGroupScan;
import org.apache.drill.exec.store.dfs.easy.FileWork;
import org.apache.drill.exec.store.dfs.shim.DrillFileSystem;
import org.apache.drill.exec.store.text.DrillTextRecordReader;
@@ -38,6 +42,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -61,6 +66,11 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
return new DrillTextRecordReader(split, context, ((TextFormatConfig) formatConfig).getDelimiter().charAt(0), columns);
}
+ @Override
+ public AbstractGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException {
+ return new EasyGroupScan(selection, this, null, selection.selectionRoot); //TODO : textformat supports project?
+ }
+
@JsonTypeName("text")
public static class TextFormatConfig implements FormatPluginConfig {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
index cff075c6b..bcac72eba 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
@@ -21,7 +21,7 @@ import com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
@@ -65,7 +65,7 @@ public class HiveRecordReader implements RecordReader {
protected Partition partition;
protected InputSplit inputSplit;
protected FragmentContext context;
- protected List<FieldReference> columns;
+ protected List<SchemaPath> columns;
protected List<String> columnNames;
protected List<String> partitionNames = Lists.newArrayList();
protected List<String> selectedPartitionNames = Lists.newArrayList();
@@ -84,7 +84,7 @@ public class HiveRecordReader implements RecordReader {
protected static final int TARGET_RECORD_COUNT = 4000;
- public HiveRecordReader(Table table, Partition partition, InputSplit inputSplit, List<FieldReference> columns, FragmentContext context) throws ExecutionSetupException {
+ public HiveRecordReader(Table table, Partition partition, InputSplit inputSplit, List<SchemaPath> columns, FragmentContext context) throws ExecutionSetupException {
this.table = table;
this.partition = partition;
this.inputSplit = inputSplit;
@@ -138,8 +138,8 @@ public class HiveRecordReader implements RecordReader {
tableColumns = sTypeInfo.getAllStructFieldNames();
List<Integer> columnIds = Lists.newArrayList();
columnNames = Lists.newArrayList();
- for (FieldReference field : columns) {
- String columnName = field.getRootSegment().getPath();
+ for (SchemaPath field : columns) {
+ String columnName = field.getRootSegment().getPath(); //TODO?
if (!tableColumns.contains(columnName)) {
if (partition != null && partitionNames.contains(columnName)) {
selectedPartitionNames.add(columnName);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
index 7c46d155c..29729280a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveScan.java
@@ -17,19 +17,22 @@
*/
package org.apache.drill.exec.store.hive;
-import com.fasterxml.jackson.annotation.*;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.io.ByteArrayDataOutput;
-import com.google.common.io.ByteStreams;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
import org.apache.commons.codec.binary.Base64;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.physical.base.SubScan;
@@ -45,8 +48,15 @@ import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
-import java.io.IOException;
-import java.util.*;
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.ByteArrayDataOutput;
+import com.google.common.io.ByteStreams;
@JsonTypeName("hive-scan")
public class HiveScan extends AbstractGroupScan {
@@ -55,7 +65,7 @@ public class HiveScan extends AbstractGroupScan {
@JsonProperty("hive-table")
public HiveReadEntry hiveReadEntry;
@JsonIgnore
- private Table table;
+ private final Table table;
@JsonIgnore
private List<InputSplit> inputSplits = Lists.newArrayList();
@JsonIgnore
@@ -66,10 +76,10 @@ public class HiveScan extends AbstractGroupScan {
@JsonIgnore
public List<Partition> partitions;
@JsonIgnore
- private Collection<DrillbitEndpoint> endpoints;
+ private final Collection<DrillbitEndpoint> endpoints;
@JsonProperty("columns")
- public List<FieldReference> columns;
+ public List<SchemaPath> columns;
@JsonIgnore
List<List<InputSplit>> mappings;
@@ -79,8 +89,8 @@ public class HiveScan extends AbstractGroupScan {
@JsonCreator
public HiveScan(@JsonProperty("hive-table") HiveReadEntry hiveReadEntry, @JsonProperty("storage-plugin") String storagePluginName,
- @JsonProperty("columns") List<FieldReference> columns,
- @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
+ @JsonProperty("columns") List<SchemaPath> columns,
+ @JacksonInject StoragePluginRegistry pluginRegistry) throws ExecutionSetupException {
this.hiveReadEntry = hiveReadEntry;
this.table = hiveReadEntry.getTable();
this.storagePluginName = storagePluginName;
@@ -91,7 +101,7 @@ public class HiveScan extends AbstractGroupScan {
endpoints = storagePlugin.getContext().getBits();
}
- public HiveScan(HiveReadEntry hiveReadEntry, HiveStoragePlugin storagePlugin, List<FieldReference> columns) throws ExecutionSetupException {
+ public HiveScan(HiveReadEntry hiveReadEntry, HiveStoragePlugin storagePlugin, List<SchemaPath> columns) throws ExecutionSetupException {
this.table = hiveReadEntry.getTable();
this.hiveReadEntry = hiveReadEntry;
this.columns = columns;
@@ -101,7 +111,20 @@ public class HiveScan extends AbstractGroupScan {
this.storagePluginName = storagePlugin.getName();
}
- public List<FieldReference> getColumns() {
+ private HiveScan(HiveScan that) {
+ this.columns = that.columns;
+ this.endpoints = that.endpoints;
+ this.hiveReadEntry = that.hiveReadEntry;
+ this.inputSplits = that.inputSplits;
+ this.mappings = that.mappings;
+ this.partitionMap = that.partitionMap;
+ this.partitions = that.partitions;
+ this.storagePlugin = that.storagePlugin;
+ this.storagePluginName = that.storagePluginName;
+ this.table = that.table;
+ }
+
+ public List<SchemaPath> getColumns() {
return columns;
}
@@ -253,9 +276,16 @@ public class HiveScan extends AbstractGroupScan {
@Override
public String toString() {
- return "HiveScan [table=" + table
+ return "HiveScan [table=" + table
+ ", inputSplits=" + inputSplits
+ ", columns=" + columns + "]";
}
+ @Override
+ public GroupScan clone(List<SchemaPath> columns) {
+ HiveScan newScan = new HiveScan(this);
+ newScan.columns = columns;
+ return newScan;
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
index e6df66901..0a70b2082 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveStoragePlugin.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.store.hive;
import java.io.IOException;
+import java.util.List;
import net.hydromatic.optiq.Schema;
import net.hydromatic.optiq.SchemaPlus;
@@ -25,6 +26,7 @@ import net.hydromatic.optiq.SchemaPlus;
import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.rpc.user.DrillUser;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.AbstractStoragePlugin;
import org.apache.drill.exec.store.hive.schema.HiveSchemaFactory;
@@ -62,10 +64,10 @@ public class HiveStoragePlugin extends AbstractStoragePlugin {
}
@Override
- public HiveScan getPhysicalScan(JSONOptions selection) throws IOException {
+ public HiveScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
HiveReadEntry hiveReadEntry = selection.getListWith(new ObjectMapper(), new TypeReference<HiveReadEntry>(){});
try {
- return new HiveScan(hiveReadEntry, this, null);
+ return new HiveScan(hiveReadEntry, this, null);
} catch (ExecutionSetupException e) {
throw new IOException(e);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
index 8ff7c823c..0a020976d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveSubScan.java
@@ -26,7 +26,7 @@ import com.google.common.io.ByteArrayDataInput;
import com.google.common.io.ByteStreams;
import org.apache.commons.codec.binary.Base64;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.*;
import org.apache.hadoop.fs.Path;
@@ -53,14 +53,14 @@ public class HiveSubScan extends AbstractBase implements SubScan {
public List<String> splitClasses;
@JsonProperty("columns")
- public List<FieldReference> columns;
+ public List<SchemaPath> columns;
@JsonCreator
public HiveSubScan(@JsonProperty("hive-table") Table table,
@JsonProperty("partition") List<Partition> partitions,
@JsonProperty("splits") List<String> encodedSplits,
@JsonProperty("splitClasses") List<String> splitClasses,
- @JsonProperty("columns") List<FieldReference> columns) throws IOException, ReflectiveOperationException {
+ @JsonProperty("columns") List<SchemaPath> columns) throws IOException, ReflectiveOperationException {
this.table = table;
this.partitions = partitions;
this.encodedSplits = encodedSplits;
@@ -84,7 +84,7 @@ public class HiveSubScan extends AbstractBase implements SubScan {
return split;
}
- public List<FieldReference> getColumns() {
+ public List<SchemaPath> getColumns() {
return columns;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java
index 1e47684cd..13719d627 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/hive/HiveTextRecordReader.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.store.hive;
import com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.vector.*;
import org.apache.drill.exec.vector.allocator.VectorAllocator;
@@ -41,7 +41,7 @@ public class HiveTextRecordReader extends HiveRecordReader {
public final List<Integer> columnIds;
private final int numCols;
- public HiveTextRecordReader(Table table, Partition partition, InputSplit inputSplit, List<FieldReference> columns, FragmentContext context) throws ExecutionSetupException {
+ public HiveTextRecordReader(Table table, Partition partition, InputSplit inputSplit, List<SchemaPath> columns, FragmentContext context) throws ExecutionSetupException {
super(table, partition, inputSplit, columns, context);
String d = table.getSd().getSerdeInfo().getParameters().get("field.delim");
if (d != null) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
index a9261be55..b0d8ca575 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaGroupScan.java
@@ -22,13 +22,16 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.physical.base.SubScan;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.dfs.easy.EasyGroupScan;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -41,11 +44,24 @@ public class InfoSchemaGroupScan extends AbstractGroupScan{
private final SelectedTable table;
+ private List<SchemaPath> columns;
+
@JsonCreator
- public InfoSchemaGroupScan(@JsonProperty("table") SelectedTable table) {
+ public InfoSchemaGroupScan(@JsonProperty("table") SelectedTable table,
+ @JsonProperty("columns") List<SchemaPath> columns) {
this.table = table;
+ this.columns = columns;
+ }
+
+ private InfoSchemaGroupScan(InfoSchemaGroupScan that) {
+ this.table = that.table;
+ this.columns = that.columns;
}
+ public List<SchemaPath> getColumns() {
+ return columns;
+ }
+
@Override
public void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException {
Preconditions.checkArgument(endpoints.size() == 1);
@@ -84,7 +100,13 @@ public class InfoSchemaGroupScan extends AbstractGroupScan{
@Override
public String getDigest() {
- return this.table.toString();
+ return this.table.toString() + "columns=" + columns;
}
+ @Override
+ public GroupScan clone(List<SchemaPath> columns) {
+ InfoSchemaGroupScan newScan = new InfoSchemaGroupScan (this);
+ newScan.columns = columns;
+ return newScan;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
index 313ea867c..e7e3f377d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ischema/InfoSchemaStoragePlugin.java
@@ -18,12 +18,14 @@
package org.apache.drill.exec.store.ischema;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import net.hydromatic.optiq.SchemaPlus;
import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.rpc.user.DrillUser;
@@ -53,9 +55,9 @@ public class InfoSchemaStoragePlugin extends AbstractStoragePlugin{
}
@Override
- public InfoSchemaGroupScan getPhysicalScan(JSONOptions selection) throws IOException {
+ public InfoSchemaGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
SelectedTable table = selection.getWith(context.getConfig(), SelectedTable.class);
- return new InfoSchemaGroupScan(table);
+ return new InfoSchemaGroupScan(table, columns);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
index 10f6e08d6..4fe1d1be6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockStorageEngine.java
@@ -19,10 +19,12 @@ package org.apache.drill.exec.store.mock;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.List;
import net.hydromatic.optiq.SchemaPlus;
import org.apache.drill.common.JSONOptions;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.rpc.user.DrillUser;
@@ -43,7 +45,7 @@ public class MockStorageEngine extends AbstractStoragePlugin {
}
@Override
- public AbstractGroupScan getPhysicalScan(JSONOptions selection) throws IOException {
+ public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
ArrayList<MockScanEntry> readEntries = selection.getListWith(new ObjectMapper(),
new TypeReference<ArrayList<MockScanEntry>>() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 4fd05871d..ec6456bb0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -18,9 +18,11 @@
package org.apache.drill.exec.store.parquet;
import java.io.IOException;
+import java.util.List;
import java.util.Set;
import java.util.regex.Pattern;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
@@ -100,7 +102,12 @@ public class ParquetFormatPlugin implements FormatPlugin{
@Override
public ParquetGroupScan getGroupScan(FileSelection selection) throws IOException {
- return new ParquetGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot);
+ return new ParquetGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, null);
+ }
+
+ @Override
+ public ParquetGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException {
+ return new ParquetGroupScan(selection.getFileStatusList(fs), this, selection.selectionRoot, columns);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 6c3bd27e6..a8fff8ace 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.metrics.DrillMetrics;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.OperatorCost;
import org.apache.drill.exec.physical.base.AbstractGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Size;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
@@ -134,10 +135,11 @@ public class ParquetGroupScan extends AbstractGroupScan {
public ParquetGroupScan(List<FileStatus> files, //
ParquetFormatPlugin formatPlugin, //
- String selectionRoot) //
+ String selectionRoot,
+ List<SchemaPath> columns) //
throws IOException {
this.formatPlugin = formatPlugin;
- this.columns = null;
+ this.columns = columns;
this.formatConfig = formatPlugin.getConfig();
this.fs = formatPlugin.getFileSystem().getUnderlying();
@@ -150,6 +152,21 @@ public class ParquetGroupScan extends AbstractGroupScan {
readFooter(files);
}
+
+ /*
+ * This is used to clone another copy of the group scan.
+ */
+ private ParquetGroupScan(ParquetGroupScan that){
+ this.columns = that.columns;
+ this.endpointAffinities = that.endpointAffinities;
+ this.entries = that.entries;
+ this.formatConfig = that.formatConfig;
+ this.formatPlugin = that.formatPlugin;
+ this.fs = that.fs;
+ this.mappings = that.mappings;
+ this.rowGroupInfos = that.rowGroupInfos;
+ this.selectionRoot = that.selectionRoot;
+ }
private void readFooterFromEntries() throws IOException {
List<FileStatus> files = Lists.newArrayList();
@@ -348,4 +365,10 @@ public class ParquetGroupScan extends AbstractGroupScan {
+ ", columns=" + columns + "]";
}
+ @Override
+ public GroupScan clone(List<SchemaPath> columns) {
+ ParquetGroupScan newScan = new ParquetGroupScan(this);
+ newScan.columns = columns;
+ return newScan;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index 17d2adb5e..ef05b4c81 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -17,8 +17,9 @@
*/
package org.apache.drill.exec.store.text;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.FieldReference;
@@ -36,19 +37,24 @@ import org.apache.drill.exec.vector.RepeatedVarCharVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.*;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
-import java.io.IOException;
-import java.util.List;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
public class DrillTextRecordReader implements RecordReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTextRecordReader.class);
+ static final String COL_NAME = "columns";
+
private org.apache.hadoop.mapred.RecordReader<LongWritable, Text> reader;
private List<ValueVector> vectors = Lists.newArrayList();
private byte delimiter;
private int targetRecordCount;
- private FieldReference ref = new FieldReference("columns");
+ private FieldReference ref = new FieldReference(COL_NAME);
private FragmentContext context;
private RepeatedVarCharVector vector;
private List<Integer> columnIds = Lists.newArrayList();
@@ -63,9 +69,13 @@ public class DrillTextRecordReader implements RecordReader {
if(columns != null) {
for (SchemaPath path : columns) {
assert path.getRootSegment().isNamed();
- Preconditions.checkArgument(path.getRootSegment().getChild().isArray(),"Selected column must be an array index");
- int index = path.getRootSegment().getChild().getArraySegment().getIndex();
- columnIds.add(index);
+ Preconditions.checkArgument(path.getRootSegment().getPath().equals(COL_NAME), "Selected column must have name 'columns'");
+ // FIXME: need re-work for text column push-down.
+ if (path.getRootSegment().getChild() != null) {
+ Preconditions.checkArgument(path.getRootSegment().getChild().isArray(),"Selected column must be an array index");
+ int index = path.getRootSegment().getChild().getArraySegment().getIndex();
+ columnIds.add(index);
+ }
}
}
targetRecordCount = context.getConfig().getInt(ExecConstants.TEXT_LINE_READER_BATCH_SIZE);
@@ -161,7 +171,7 @@ public class DrillTextRecordReader implements RecordReader {
@Override
public void cleanup() {
try {
- reader.close();
+ reader.close();
} catch (IOException e) {
logger.warn("Exception closing reader: {}", e);
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
new file mode 100644
index 000000000..079857726
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.util.TestTools;
+import org.apache.drill.exec.record.RecordBatchLoader;
+import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.rpc.user.QueryResultBatch;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.eigenbase.sql.SqlExplain.Depth;
+import org.eigenbase.sql.SqlExplainLevel;
+
+public class PlanTestBase extends BaseTestQuery {
+
+ protected static final String OPTIQ_FORMAT = "text";
+ protected static final String JSON_FORMAT = "json";
+
+ /**
+ * This method will take a SQL string statement, get the PHYSICAL plan in json
+ * format. Then check the physical plan against the list expected substrs.
+ * Verify all the expected strings are contained in the physical plan string.
+ */
+ public void testPhysicalPlan(String sql, String... expectedSubstrs)
+ throws Exception {
+ sql = "EXPLAIN PLAN for " + sql.replace("[WORKING_PATH]", TestTools.getWorkingPath());
+
+ String planStr = getPlanInString(sql, JSON_FORMAT);
+
+ for (String colNames : expectedSubstrs) {
+ assertTrue(planStr.contains(colNames));
+ }
+ }
+
+ /**
+ * This method will take a SQL string statement, get the PHYSICAL plan in
+ * Optiq RelNode format. Then check the physical plan against the list
+ * expected substrs. Verify all the expected strings are contained in the
+ * physical plan string.
+ */
+ public void testRelPhysicalPlanLevDigest(String sql, String... expectedSubstrs)
+ throws Exception {
+ String planStr = getDrillRelPlanInString(sql, SqlExplainLevel.DIGEST_ATTRIBUTES, Depth.PHYSICAL);
+
+ for (String substr : expectedSubstrs) {
+ assertTrue(planStr.contains(substr));
+ }
+
+ }
+
+ /**
+ * This method will take a SQL string statement, get the LOGICAL plan in Optiq
+ * RelNode format. Then check the physical plan against the list expected
+ * substrs. Verify all the expected strings are contained in the physical plan
+ * string.
+ */
+ public void testRelLogicalPlanLevDigest(String sql, String... expectedSubstrs)
+ throws Exception {
+ String planStr = getDrillRelPlanInString(sql,
+ SqlExplainLevel.DIGEST_ATTRIBUTES, Depth.LOGICAL);
+
+ for (String substr : expectedSubstrs) {
+ assertTrue(planStr.contains(substr));
+ }
+ }
+
+ /**
+ * This method will take a SQL string statement, get the PHYSICAL plan in
+ * Optiq RelNode format. Then check the physical plan against the list
+ * expected substrs. Verify all the expected strings are contained in the
+ * physical plan string.
+ */
+ public void testRelPhysicalPlanLevExplain(String sql, String... expectedSubstrs) throws Exception {
+ String planStr = getDrillRelPlanInString(sql, SqlExplainLevel.EXPPLAN_ATTRIBUTES, Depth.PHYSICAL);
+
+ for (String substr : expectedSubstrs) {
+ assertTrue(planStr.contains(substr));
+ }
+ }
+
+ /**
+ * This method will take a SQL string statement, get the LOGICAL plan in Optiq
+ * RelNode format. Then check the physical plan against the list expected
+ * substrs. Verify all the expected strings are contained in the physical plan
+ * string.
+ */
+ public void testRelLogicalPlanLevExplain(String sql, String... expectedSubstrs) throws Exception {
+ String planStr = getDrillRelPlanInString(sql, SqlExplainLevel.EXPPLAN_ATTRIBUTES, Depth.LOGICAL);
+
+ for (String substr : expectedSubstrs) {
+ assertTrue(planStr.contains(substr));
+ }
+ }
+
+
+ /*
+ * This will get the plan (either logical or physical) in Optiq RelNode
+ * format, based on SqlExplainLevel and Depth.
+ */
+ private String getDrillRelPlanInString(String sql, SqlExplainLevel level,
+ Depth depth) throws Exception {
+ String levelStr, depthStr;
+ switch (level) {
+ case NO_ATTRIBUTES:
+ levelStr = "EXCLUDING ATTRIBUTES";
+ break;
+ case EXPPLAN_ATTRIBUTES:
+ levelStr = "INCLUDING ATTRIBUTES";
+ break;
+ case ALL_ATTRIBUTES:
+ levelStr = "INCLUDING ALL ATTRIBUTES";
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ switch (depth) {
+ case TYPE:
+ depthStr = "WITH TYPE";
+ break;
+ case LOGICAL:
+ depthStr = "WITHOUT IMPLEMENTATION";
+ break;
+ case PHYSICAL:
+ depthStr = "WITH IMPLEMENTATION";
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+
+ sql = "EXPLAIN PLAN " + levelStr + " " + depthStr + " for "
+ + sql.replace("[WORKING_PATH]", TestTools.getWorkingPath());
+
+ return getPlanInString(sql, OPTIQ_FORMAT);
+ }
+
+ /*
+ * This will submit an "EXPLAIN" statement, and return the column value which
+ * contains the plan's string.
+ */
+ private String getPlanInString(String sql, String columnName)
+ throws Exception {
+ List<QueryResultBatch> results = testSqlWithResults(sql);
+
+ RecordBatchLoader loader = new RecordBatchLoader(bit.getContext()
+ .getAllocator());
+ StringBuilder builder = new StringBuilder();
+
+ for (QueryResultBatch b : results) {
+ if (!b.hasData())
+ continue;
+
+ loader.load(b.getHeader().getDef(), b.getData());
+
+ VectorWrapper<?> vw = loader.getValueAccessorById(loader
+ .getValueVectorId(SchemaPath.getSimplePath(columnName)).getFieldId(),
+ NullableVarCharVector.class);
+
+ System.out.println(vw.getValueVector().getField().toExpr());
+ ValueVector vv = vw.getValueVector();
+ for (int i = 0; i < vv.getAccessor().getValueCount(); i++) {
+ Object o = vv.getAccessor().getObject(i);
+ builder.append(o);
+ System.out.println(vv.getAccessor().getObject(i));
+ }
+ }
+
+ return builder.toString();
+ }
+
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java
new file mode 100644
index 000000000..675cddb98
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill;
+
+import org.junit.Test;
+
+// Test the optimizer plan in terms of project pushdown.
+// When a query refers to a subset of columns in a table, optimizer should push the list
+// of refereed columns to the SCAN operator, so that SCAN operator would only retrieve
+// the column values in the subset of columns.
+
+public class TestProjectPushDown extends PlanTestBase {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
+ .getLogger(TestProjectPushDown.class);
+
+ @Test
+ public void testGroupBy() throws Exception {
+ String expectedColNames = " \"columns\" : [ \"`marital_status`\" ]";
+ testPhysicalPlan(
+ "select marital_status, COUNT(1) as cnt from cp.`employee.json` group by marital_status",
+ expectedColNames);
+ }
+
+ @Test
+ public void testOrderBy() throws Exception {
+ String expectedColNames = "\"columns\" : [ \"`employee_id`\", \"`full_name`\", \"`first_name`\", \"`last_name`\" ]";
+ testPhysicalPlan("select employee_id , full_name, first_name , last_name "
+ + "from cp.`employee.json` order by first_name, last_name",
+ expectedColNames);
+ }
+
+ @Test
+ public void testExprInSelect() throws Exception {
+ String expectedColNames = "\"columns\" : [ \"`employee_id`\", \"`full_name`\", \"`first_name`\", \"`last_name`\" ]";
+ testPhysicalPlan(
+ "select employee_id + 100, full_name, first_name , last_name "
+ + "from cp.`employee.json` order by first_name, last_name",
+ expectedColNames);
+ }
+
+ @Test
+ public void testExprInWhere() throws Exception {
+ String expectedColNames = "\"columns\" : [ \"`employee_id`\", \"`full_name`\", \"`first_name`\", \"`last_name`\" ]";
+ testPhysicalPlan(
+ "select employee_id + 100, full_name, first_name , last_name "
+ + "from cp.`employee.json` where employee_id + 500 < 1000 ",
+ expectedColNames);
+ }
+
+ @Test
+ public void testJoin() throws Exception {
+ String expectedColNames1 = "\"columns\" : [ \"`N_REGIONKEY`\", \"`N_NAME`\" ]";
+ String expectedColNames2 = "\"columns\" : [ \"`R_REGIONKEY`\", \"`R_NAME`\" ]";
+
+ testPhysicalPlan("SELECT\n" + " nations.N_NAME,\n" + " regions.R_NAME\n"
+ + "FROM\n"
+ + " dfs.`[WORKING_PATH]/../../sample-data/nation.parquet` nations\n"
+ + "JOIN\n"
+ + " dfs.`[WORKING_PATH]/../../sample-data/region.parquet` regions\n"
+ + " on nations.N_REGIONKEY = regions.R_REGIONKEY", expectedColNames1,
+ expectedColNames2);
+ }
+
+ @Test
+ public void testFromInfoSchema() throws Exception {
+ String expectedColNames = " \"columns\" : [ \"`CATALOG_DESCRIPTION`\" ]";
+ testPhysicalPlan(
+ "select count(CATALOG_DESCRIPTION) from INFORMATION_SCHEMA.CATALOGS",
+ expectedColNames);
+ }
+
+ @Test
+ public void testTPCH1() throws Exception {
+ String expectedColNames = " \"columns\" : [ \"`l_returnflag`\", \"`l_linestatus`\", \"`l_shipdate`\", \"`l_quantity`\", \"`l_extendedprice`\", \"`l_discount`\", \"`l_tax`\" ]";
+ testPhysicalPlanFromFile("queries/tpch/01.sql", expectedColNames);
+ }
+
+ @Test
+ public void testTPCH3() throws Exception {
+ String expectedColNames1 = "\"columns\" : [ \"`c_mktsegment`\", \"`c_custkey`\" ]";
+ String expectedColNames2 = " \"columns\" : [ \"`o_orderdate`\", \"`o_shippriority`\", \"`o_custkey`\", \"`o_orderkey`\" ";
+ String expectedColNames3 = "\"columns\" : [ \"`l_orderkey`\", \"`l_shipdate`\", \"`l_extendedprice`\", \"`l_discount`\" ]";
+ testPhysicalPlanFromFile("queries/tpch/03.sql", expectedColNames1, expectedColNames2, expectedColNames3);
+ }
+
+ private void testPhysicalPlanFromFile(String fileName, String... expectedSubstrs)
+ throws Exception {
+ String query = getFile(fileName);
+ String[] queries = query.split(";");
+ for (String q : queries) {
+ if (q.trim().isEmpty())
+ continue;
+ testPhysicalPlan(q, expectedSubstrs);
+ }
+ }
+
+}
diff --git a/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java b/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
index 148c115bb..6952c5636 100644
--- a/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
+++ b/sqlparser/src/test/java/org/apache/drill/jdbc/test/TestJdbcQuery.java
@@ -44,7 +44,7 @@ public class TestJdbcQuery extends JdbcTest{
// Set a timeout unless we're debugging.
- @Rule public TestRule TIMEOUT = TestTools.getTimeoutRule(200000000);
+ @Rule public TestRule TIMEOUT = TestTools.getTimeoutRule(20000);
private static final String WORKING_PATH;
static{
@@ -78,11 +78,11 @@ public class TestJdbcQuery extends JdbcTest{
@Test
public void testInfoSchema() throws Exception{
-// testQuery("select * from INFORMATION_SCHEMA.SCHEMATA");
+ testQuery("select * from INFORMATION_SCHEMA.SCHEMATA");
testQuery("select * from INFORMATION_SCHEMA.CATALOGS");
-// testQuery("select * from INFORMATION_SCHEMA.VIEWS");
+ testQuery("select * from INFORMATION_SCHEMA.VIEWS");
// testQuery("select * from INFORMATION_SCHEMA.TABLES");
-// testQuery("select * from INFORMATION_SCHEMA.COLUMNS");
+ testQuery("select * from INFORMATION_SCHEMA.COLUMNS");
}
@Test