diff options
author | Hanifi Gunes <hgunes@maprtech.com> | 2014-08-28 10:21:07 -0700 |
---|---|---|
committer | Jacques Nadeau <jacques@apache.org> | 2014-08-29 00:00:41 -0700 |
commit | f148694738a84832d75aca4ef69bff47c68b463f (patch) | |
tree | cfdc63aee30e3bc7a79021e528d79a67821b0169 /exec/java-exec/src | |
parent | c1c0eba5bad30a433161ebf4e1b6c36ace2a881e (diff) |
DRILL-1309: Implement ProjectPastFilterPushdown and update DrillScanRel cost model so that exclusive column so that star query is more expensive than exclusive column projection. Various fixes affecting record reaaders to handle `*` column as well as fixes to some test cases.
exclude parquet files from rat check
Diffstat (limited to 'exec/java-exec/src')
28 files changed, 412 insertions, 165 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java index 72bec3b33..62d526bd9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java @@ -67,12 +67,14 @@ public class JsonConvertFrom { String input = new String(buf, com.google.common.base.Charsets.UTF_8); try { - org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader(buffer, null, false); + org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader(buffer, false); jsonReader.write(new java.io.StringReader(input), writer); } catch (Exception e) { - System.out.println(" msg = " + e.getMessage() + " trace : " + e.getStackTrace()); +// System.out.println("Error while converting from JSON. "); +// e.printStackTrace(); + throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); } } } @@ -94,12 +96,14 @@ public class JsonConvertFrom { String input = new String(buf, com.google.common.base.Charsets.UTF_8); try { - org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader(buffer, null, false); + org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader(buffer, false); jsonReader.write(new java.io.StringReader(input), writer); } catch (Exception e) { - System.out.println(" msg = " + e.getMessage() + " trace : " + e.getStackTrace()); +// System.out.println("Error while converting from JSON. "); +// e.printStackTrace(); + throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e); } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java index fa20a9060..9c27c0c37 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.base; import java.util.List; +import com.google.common.collect.Lists; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.PhysicalOperatorSetupException; import org.apache.drill.common.expression.SchemaPath; @@ -32,6 +33,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore; */ public interface GroupScan extends Scan, HasAffinity{ + public static final List<SchemaPath> ALL_COLUMNS = Lists.<SchemaPath>newArrayList(SchemaPath.getSimplePath("*")); + public abstract void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException; public abstract SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjectPastFilterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjectPastFilterRule.java new file mode 100644 index 000000000..dcec68a0d --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjectPastFilterRule.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import org.eigenbase.rel.FilterRel; +import org.eigenbase.rel.ProjectRel; +import org.eigenbase.rel.RelNode; +import org.eigenbase.rel.rules.PushProjector; +import org.eigenbase.relopt.RelOptRule; +import org.eigenbase.relopt.RelOptRuleCall; +import org.eigenbase.rex.RexCall; +import org.eigenbase.rex.RexNode; +import org.eigenbase.rex.RexOver; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DrillPushProjectPastFilterRule extends RelOptRule { + + private final static Logger logger = LoggerFactory.getLogger(DrillPushProjectPastFilterRule.class); + + public final static RelOptRule INSTANCE = new DrillPushProjectPastFilterRule(new PushProjector.ExprCondition() { + @Override + public boolean test(RexNode expr) { + if (expr instanceof RexCall) { + RexCall call = (RexCall)expr; + return "ITEM".equals(call.getOperator().getName()); + } + return false; + } + }); + + /** + * Expressions that should be preserved in the projection + */ + private final PushProjector.ExprCondition preserveExprCondition; + + private DrillPushProjectPastFilterRule(PushProjector.ExprCondition preserveExprCondition) { + super(RelOptHelper.any(ProjectRel.class, FilterRel.class)); + this.preserveExprCondition = preserveExprCondition; + } + + @Override + public void onMatch(RelOptRuleCall call) { + ProjectRel origProj; + FilterRel filterRel; + + if (call.rels.length == 2) { + origProj = call.rel(0); + filterRel = call.rel(1); + } else { + origProj = null; + filterRel = call.rel(0); + } + RelNode rel = filterRel.getChild(); + RexNode origFilter = filterRel.getCondition(); + + if ((origProj != null) && RexOver.containsOver(origProj.getProjects(), null)) { + // Cannot push project through filter if project contains a windowed + // aggregate -- it will affect row counts. Abort this rule + // invocation; pushdown will be considered after the windowed + // aggregate has been implemented. It's OK if the filter contains a + // windowed aggregate. + return; + } + + PushProjector pushProjector = createPushProjector(origProj, origFilter, rel, preserveExprCondition); + RelNode topProject = pushProjector.convertProject(null); + + if (topProject != null) { + call.transformTo(topProject); + } + } + + protected PushProjector createPushProjector(ProjectRel origProj, RexNode origFilter, RelNode rel, + PushProjector.ExprCondition preserveExprCondition) { + return new PushProjector(origProj, origFilter,rel, preserveExprCondition); + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java index 63de69c95..cf92121be 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java @@ -89,7 +89,8 @@ public class DrillRuleSets { RemoveDistinctAggregateRule.INSTANCE, // ReduceAggregatesRule.INSTANCE, // PushProjectPastJoinRule.INSTANCE, - PushProjectPastFilterRule.INSTANCE, +// PushProjectPastFilterRule.INSTANCE, + DrillPushProjectPastFilterRule.INSTANCE, // SwapJoinRule.INSTANCE, // // PushJoinThroughJoinRule.RIGHT, // // PushJoinThroughJoinRule.LEFT, // diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java index dcbfb3d22..d6bbcd3e8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java @@ -21,12 +21,16 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; import org.apache.drill.common.JSONOptions; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.data.LogicalOperator; import org.apache.drill.common.logical.data.Scan; +import org.apache.drill.exec.physical.base.AbstractGroupScan; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.ScanStats; @@ -46,6 +50,8 @@ import org.eigenbase.reltype.RelDataType; * GroupScan of a Drill table. */ public class DrillScanRel extends DrillScanRelBase implements DrillRel { + private final static int STAR_COLUMN_COST = 10000; + final private RelDataType rowType; private GroupScan groupScan; @@ -54,7 +60,7 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel { RelOptTable table) { // By default, scan does not support project pushdown. // Decision whether push projects into scan will be made solely in DrillPushProjIntoScanRule. - this(cluster, traits, table, table.getRowType(), null); + this(cluster, traits, table, table.getRowType(), AbstractGroupScan.ALL_COLUMNS); } /** Creates a DrillScan. */ @@ -62,26 +68,21 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel { RelOptTable table, RelDataType rowType, List<SchemaPath> columns) { super(DRILL_LOGICAL, cluster, traits, table); this.rowType = rowType; - + columns = columns == null || columns.size() == 0 ? GroupScan.ALL_COLUMNS : columns; try { - if (columns == null || columns.isEmpty()) { - this.groupScan = (GroupScan) getCopy(this.drillTable.getGroupScan()) ; - } else { - this.groupScan = this.drillTable.getGroupScan().clone(columns); - } + this.groupScan = drillTable.getGroupScan().clone(columns); } catch (IOException e) { throw new DrillRuntimeException("Failure creating scan.", e); } - - } - - private static GroupScan getCopy(GroupScan scan){ - try { - return (GroupScan) scan.getNewWithChildren((List<PhysicalOperator>) (Object) Collections.emptyList()); - } catch (ExecutionSetupException e) { - throw new DrillRuntimeException("Unexpected failure while coping node.", e); - } } +// +// private static GroupScan getCopy(GroupScan scan){ +// try { +// return (GroupScan) scan.getNewWithChildren((List<PhysicalOperator>) (Object) Collections.emptyList()); +// } catch (ExecutionSetupException e) { +// throw new DrillRuntimeException("Unexpected failure while coping node.", e); +// } +// } @Override @@ -118,15 +119,29 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel { /// by both logical and physical rels. @Override public RelOptCost computeSelfCost(RelOptPlanner planner) { - ScanStats stats = this.groupScan.getScanStats(); - int columnCount = this.getRowType().getFieldCount(); + ScanStats stats = groupScan.getScanStats(); + int columnCount = getRowType().getFieldCount(); + double ioCost = 0; + boolean isStarQuery = Iterables.tryFind(getRowType().getFieldNames(), new Predicate<String>() { + @Override + public boolean apply(String input) { + return Preconditions.checkNotNull(input).equals("*"); + } + }).isPresent(); - if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) { - return planner.getCostFactory().makeCost(stats.getRecordCount() * columnCount, stats.getCpuCost(), stats.getDiskCost()); + if (isStarQuery) { + columnCount = STAR_COLUMN_COST; } // double rowCount = RelMetadataQuery.getRowCount(this); double rowCount = stats.getRecordCount(); + if (rowCount < 1) { + rowCount = 1; + } + + if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) { + return planner.getCostFactory().makeCost(rowCount * columnCount, stats.getCpuCost(), stats.getDiskCost()); + } double cpuCost = rowCount * columnCount; // for now, assume cpu cost is proportional to row count. // Even though scan is reading from disk, in the currently generated plans all plans will @@ -134,7 +149,7 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel { // In the future we might consider alternative scans that go against projections or // different compression schemes etc that affect the amount of data read. Such alternatives // would affect both cpu and io cost. - double ioCost = 0; + DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory(); return costFactory.makeCost(rowCount, cpuCost, ioCost, 0); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java index 92272f879..2e253aba7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java @@ -32,6 +32,10 @@ public class RelOptHelper { public static RelOptRuleOperand any(Class<? extends RelNode> first){ return RelOptRule.operand(first, RelOptRule.any()); } + + public static RelOptRuleOperand any(Class<? extends RelNode> first, Class<? extends RelNode> second) { + return RelOptRule.operand(first, RelOptRule.operand(second, RelOptRule.any())); + } public static RelOptRuleOperand some(Class<? extends RelNode> rel, RelOptRuleOperand first, RelOptRuleOperand... rest){ return RelOptRule.operand(rel, RelOptRule.some(first, rest)); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java index 63db153ba..25fa0cb19 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java @@ -55,8 +55,8 @@ public class ExplainHandler extends DefaultSqlHandler{ SqlNode sqlNode = rewrite(node); SqlNode validated = validateNode(sqlNode); RelNode rel = convertToRel(validated); - DrillRel drel = convertToDrel(rel); log("Optiq Logical", rel); + DrillRel drel = convertToDrel(rel); log("Drill Logical", drel); if(mode == ResultMode.LOGICAL){ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/impl/DrillParserWithCompoundIdConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/impl/DrillParserWithCompoundIdConverter.java index 48867410c..382456a62 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/impl/DrillParserWithCompoundIdConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/impl/DrillParserWithCompoundIdConverter.java @@ -21,6 +21,7 @@ import org.apache.drill.exec.planner.sql.parser.CompoundIdentifierConverter; import org.eigenbase.sql.SqlNode; import org.eigenbase.sql.parser.SqlAbstractParserImpl; import org.eigenbase.sql.parser.SqlParserImplFactory; +import org.eigenbase.sql.util.SqlVisitor; import java.io.Reader; @@ -39,15 +40,19 @@ public class DrillParserWithCompoundIdConverter extends DrillParserImpl { super(stream); } + protected SqlVisitor<SqlNode> createConverter() { + return new CompoundIdentifierConverter(); + } + @Override public SqlNode parseSqlExpressionEof() throws Exception { SqlNode originalSqlNode = super.parseSqlExpressionEof(); - return originalSqlNode.accept(new CompoundIdentifierConverter()); + return originalSqlNode.accept(createConverter()); } @Override public SqlNode parseSqlStmtEof() throws Exception { SqlNode originalSqlNode = super.parseSqlStmtEof(); - return originalSqlNode.accept(new CompoundIdentifierConverter()); + return originalSqlNode.accept(createConverter()); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java new file mode 100644 index 000000000..4cc06c81f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store; + +import java.util.Collection; + +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import org.apache.drill.common.expression.SchemaPath; + +public abstract class AbstractRecordReader implements RecordReader { + private static final String COL_NULL_ERROR = "Columns cannot be null. Use star column to select all fields."; + private static final String COL_EMPTY_ERROR = "Readers needs at least a column to read."; + public static final SchemaPath STAR_COLUMN = SchemaPath.getSimplePath("*"); + + private Collection<SchemaPath> columns = null; + private boolean isStarQuery = false; + + protected final void setColumns(Collection<SchemaPath> projected) { + assert Preconditions.checkNotNull(projected, COL_NULL_ERROR).size() > 0 : COL_EMPTY_ERROR; + isStarQuery = isStarQuery(projected); + columns = transformColumns(projected); + } + + protected Collection<SchemaPath> getColumns() { + return columns; + } + + protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> projected) { + return projected; + } + + protected boolean isStarQuery() { + return isStarQuery; + } + + public static boolean isStarQuery(Collection<SchemaPath> projected) { + return Iterables.tryFind(Preconditions.checkNotNull(projected, COL_NULL_ERROR), new Predicate<SchemaPath>() { + @Override + public boolean apply(SchemaPath path) { + return Preconditions.checkNotNull(path).equals(STAR_COLUMN); + } + }).isPresent(); + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java index b32824529..9cdfe245c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java @@ -50,7 +50,7 @@ public abstract class AbstractStoragePlugin implements StoragePlugin{ @Override public AbstractGroupScan getPhysicalScan(JSONOptions selection) throws IOException { - return getPhysicalScan(selection, null); + return getPhysicalScan(selection, AbstractGroupScan.ALL_COLUMNS); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java index 2f0e85434..ec9a04e38 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java @@ -109,11 +109,6 @@ public class FileSystemPlugin extends AbstractStoragePlugin{ } @Override - public AbstractGroupScan getPhysicalScan(JSONOptions selection) throws IOException { - return this.getPhysicalScan(selection, null); - } - - @Override public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException { FormatSelection formatSelection = selection.getWith(context.getConfig(), FormatSelection.class); FormatPlugin plugin; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java index bf8e301c2..d1923a500 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java @@ -186,7 +186,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements @Override public AbstractGroupScan getGroupScan(FileSelection selection) throws IOException { - return new EasyGroupScan(selection, this, null, selection.selectionRoot); + return new EasyGroupScan(selection, this, selection.selectionRoot); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java index 3de99c5fc..2bdf1a660 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java @@ -18,7 +18,6 @@ package org.apache.drill.exec.store.dfs.easy; import java.io.IOException; -import java.util.Collections; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; @@ -72,21 +71,15 @@ public class EasyGroupScan extends AbstractGroupScan{ @JsonProperty("columns") List<SchemaPath> columns, @JsonProperty("selectionRoot") String selectionRoot ) throws IOException, ExecutionSetupException { + this(new FileSelection(files, true), + (EasyFormatPlugin<?>)engineRegistry.getFormatPlugin(storageConfig, formatConfig), + columns, + selectionRoot); + } - this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig); - Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config."); - this.selection = new FileSelection(files, true); - try{ - BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem().getUnderlying(), formatPlugin.getContext().getBits()); - this.chunks = b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), formatPlugin.isBlockSplittable()); - this.endpointAffinities = AffinityCreator.getAffinityMap(chunks); - }catch(IOException e){ - logger.warn("Failure determining endpoint affinity.", e); - this.endpointAffinities = Collections.emptyList(); - } - maxWidth = chunks.size(); - this.columns = columns; - this.selectionRoot = selectionRoot; + public EasyGroupScan(FileSelection selection, EasyFormatPlugin<?> formatPlugin, String selectionRoot) + throws IOException { + this(selection, formatPlugin, ALL_COLUMNS, selectionRoot); } public EasyGroupScan( @@ -95,30 +88,26 @@ public class EasyGroupScan extends AbstractGroupScan{ List<SchemaPath> columns, String selectionRoot ) throws IOException{ - this.selection = selection; - this.formatPlugin = formatPlugin; - this.columns = columns; - try{ - BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem().getUnderlying(), formatPlugin.getContext().getBits()); - this.chunks = b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), formatPlugin.isBlockSplittable()); - this.endpointAffinities = AffinityCreator.getAffinityMap(chunks); - }catch(IOException e){ - logger.warn("Failure determining endpoint affinity.", e); - this.endpointAffinities = Collections.emptyList(); - } - maxWidth = chunks.size(); + this.selection = Preconditions.checkNotNull(selection); + this.formatPlugin = Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config."); + this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : columns; this.selectionRoot = selectionRoot; + BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem().getUnderlying(), formatPlugin.getContext().getBits()); + this.chunks = b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), formatPlugin.isBlockSplittable()); + this.maxWidth = chunks.size(); + this.endpointAffinities = AffinityCreator.getAffinityMap(chunks); } private EasyGroupScan(EasyGroupScan that) { - this.chunks = that.chunks; - this.columns = that.columns; - this.endpointAffinities = that.endpointAffinities; - this.formatPlugin = that.formatPlugin; - this.mappings = that.mappings; - this.maxWidth = that.maxWidth; - this.selection = that.selection; - this.selectionRoot = that.selectionRoot; + Preconditions.checkNotNull(that, "Unable to clone: source is null."); + selection = that.selection; + formatPlugin = that.formatPlugin; + columns = that.columns; + selectionRoot = that.selectionRoot; + chunks = that.chunks; + endpointAffinities = that.endpointAffinities; + maxWidth = that.maxWidth; + mappings = that.mappings; } public String getSelectionRoot() { @@ -168,16 +157,16 @@ public class EasyGroupScan extends AbstractGroupScan{ @Override public List<EndpointAffinity> getOperatorAffinity() { assert chunks != null && chunks.size() > 0; - if (this.endpointAffinities == null) { + if (endpointAffinities == null) { logger.debug("chunks: {}", chunks.size()); - this.endpointAffinities = AffinityCreator.getAffinityMap(chunks); + endpointAffinities = AffinityCreator.getAffinityMap(chunks); } - return this.endpointAffinities; + return endpointAffinities; } @Override public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) { - this.mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks); + mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks); } @Override @@ -232,7 +221,7 @@ public class EasyGroupScan extends AbstractGroupScan{ @JsonIgnore public boolean canPushdownProjects(List<SchemaPath> columns) { - return this.formatPlugin.supportsPushDown(); + return formatPlugin.supportsPushDown(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java index 81a23b024..6e1aa0ad0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java @@ -72,7 +72,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm @Override public AbstractGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException { - return new EasyGroupScan(selection, this, null, selection.selectionRoot); //TODO : textformat supports project? + return new EasyGroupScan(selection, this, columns, selection.selectionRoot); //TODO : textformat supports project? } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java index d1a086c32..fd40f417a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java @@ -26,6 +26,7 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.physical.base.AbstractBase; +import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; import org.apache.drill.exec.physical.base.SubScan; @@ -60,16 +61,9 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan { @JsonProperty("columns") List<SchemaPath> columns, // @JsonProperty("selectionRoot") String selectionRoot // ) throws ExecutionSetupException { - - if(formatConfig == null) formatConfig = new ParquetFormatConfig(); - Preconditions.checkNotNull(storageConfig); - Preconditions.checkNotNull(formatConfig); - this.formatPlugin = (ParquetFormatPlugin) registry.getFormatPlugin(storageConfig, formatConfig); - Preconditions.checkNotNull(formatPlugin); - this.rowGroupReadEntries = rowGroupReadEntries; - this.formatConfig = formatPlugin.getConfig(); - this.columns = columns; - this.selectionRoot = selectionRoot; + this((ParquetFormatPlugin) registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig), + formatConfig == null ? new ParquetFormatConfig() : formatConfig), + rowGroupReadEntries, columns, selectionRoot); } public ParquetRowGroupScan( // @@ -77,10 +71,10 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan { List<RowGroupReadEntry> rowGroupReadEntries, // List<SchemaPath> columns, String selectionRoot) { - this.formatPlugin = formatPlugin; + this.formatPlugin = Preconditions.checkNotNull(formatPlugin); this.formatConfig = formatPlugin.getConfig(); this.rowGroupReadEntries = rowGroupReadEntries; - this.columns = columns; + this.columns = columns == null || columns.size() == 0 ? GroupScan.ALL_COLUMNS : columns; this.selectionRoot = selectionRoot; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java index a453e66bf..f9b6d91b5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java @@ -32,6 +32,7 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.ScanBatch; import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.RecordReader; import com.google.common.base.Preconditions; @@ -65,11 +66,9 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan List<String[]> partitionColumns = Lists.newArrayList(); List<Integer> selectedPartitionColumns = Lists.newArrayList(); - boolean selectAllColumns = false; + boolean selectAllColumns = AbstractRecordReader.isStarQuery(columns); - if (columns == null || columns.size() == 0) { - selectAllColumns = true; - } else { + if (!selectAllColumns) { List<SchemaPath> newColums = Lists.newArrayList(); Pattern pattern = Pattern.compile(String.format("%s[0-9]+", partitionDesignator)); for (SchemaPath column : columns) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java index 34e7aea36..6c2d44c4b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -20,8 +20,13 @@ package org.apache.drill.exec.store.parquet.columnreaders; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; @@ -33,8 +38,10 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; +import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.vector.NullableBitVector; import org.apache.drill.exec.vector.ValueVector; @@ -53,7 +60,7 @@ import parquet.hadoop.metadata.ColumnChunkMetaData; import parquet.hadoop.metadata.ParquetMetadata; import parquet.schema.PrimitiveType; -public class ParquetRecordReader implements RecordReader { +public class ParquetRecordReader extends AbstractRecordReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class); // this value has been inflated to read in multiple value vectors at once, and then break them up into smaller vectors @@ -75,8 +82,8 @@ public class ParquetRecordReader implements RecordReader { private int bitWidthAllFixedFields; private boolean allFieldsFixedLength; private int recordsPerBatch; - private long totalRecords; - private long rowGroupOffset; +// private long totalRecords; +// private long rowGroupOffset; private List<ColumnReader> columnStatuses; private FileSystem fileSystem; @@ -84,8 +91,6 @@ public class ParquetRecordReader implements RecordReader { Path hadoopPath; private VarLenBinaryReader varLengthReader; private ParquetMetadata footer; - private List<SchemaPath> columns; - private FragmentContext fragmentContext; private OperatorContext operatorContext; // This is a parallel list to the columns list above, it is used to determine the subset of the project // pushdown columns that do not appear in this file @@ -117,17 +122,13 @@ public class ParquetRecordReader implements RecordReader { String path, int rowGroupIndex, FileSystem fs, CodecFactoryExposer codecFactoryExposer, ParquetMetadata footer, List<SchemaPath> columns) throws ExecutionSetupException { - hadoopPath = new Path(path); - fileSystem = fs; + this.hadoopPath = new Path(path); + this.fileSystem = fs; this.codecFactoryExposer = codecFactoryExposer; this.rowGroupIndex = rowGroupIndex; this.batchSize = batchSize; this.footer = footer; - this.columns = columns; - if (this.columns != null) { - columnsFound = new boolean[this.columns.size()]; - nullFilledVectors = new ArrayList(); - } + setColumns(columns); } public CodecFactoryExposer getCodecFactoryExposer() { @@ -184,25 +185,29 @@ public class ParquetRecordReader implements RecordReader { // TODO - not sure if this is how we want to represent this // for now it makes the existing tests pass, simply selecting // all available data if no columns are provided - if (this.columns != null){ - int i = 0; - for (SchemaPath expr : this.columns){ - if ( field.matches(expr)){ - columnsFound[i] = true; - return true; - } - i++; + if (isStarQuery()) { + return true; + } + + int i = 0; + for (SchemaPath expr : getColumns()){ + if ( field.matches(expr)){ + columnsFound[i] = true; + return true; } - return false; + i++; } - return true; + return false; } @Override public void setup(OutputMutator output) throws ExecutionSetupException { - + if (!isStarQuery()) { + columnsFound = new boolean[getColumns().size()]; + nullFilledVectors = new ArrayList(); + } columnStatuses = new ArrayList<>(); - totalRecords = footer.getBlocks().get(rowGroupIndex).getRowCount(); +// totalRecords = footer.getBlocks().get(rowGroupIndex).getRowCount(); List<ColumnDescriptor> columns = footer.getFileMetaData().getSchema().getColumns(); allFieldsFixedLength = true; ColumnDescriptor column; @@ -211,7 +216,7 @@ public class ParquetRecordReader implements RecordReader { mockRecordsRead = 0; MaterializedField field; - ParquetMetadataConverter metaConverter = new ParquetMetadataConverter(); +// ParquetMetadataConverter metaConverter = new ParquetMetadataConverter(); FileMetaData fileMetaData; // TODO - figure out how to deal with this better once we add nested reading, note also look where this map is used below @@ -249,7 +254,7 @@ public class ParquetRecordReader implements RecordReader { allFieldsFixedLength = false; } } - rowGroupOffset = footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset(); +// rowGroupOffset = footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset(); if (columnsToScan != 0 && allFieldsFixedLength) { recordsPerBatch = (int) Math.min(Math.min(batchSize / bitWidthAllFixedFields, @@ -294,10 +299,15 @@ public class ParquetRecordReader implements RecordReader { } varLengthReader = new VarLenBinaryReader(this, varLengthColumns); - if (this.columns != null) { + if (!isStarQuery()) { + List<SchemaPath> projectedColumns = Lists.newArrayList(getColumns()); + SchemaPath col; for (int i = 0; i < columnsFound.length; i++) { - if ( ! columnsFound[i]) { - nullFilledVectors.add((NullableBitVector)output.addField(MaterializedField.create(this.columns.get(i), Types.optional(TypeProtos.MinorType.BIT)), + col = projectedColumns.get(i); + assert col!=null; + if ( ! columnsFound[i] && !col.equals(STAR_COLUMN)) { + nullFilledVectors.add((NullableBitVector)output.addField(MaterializedField.create(col, + Types.optional(TypeProtos.MinorType.BIT)), (Class<? extends ValueVector>) TypeHelper.getValueVectorClass(TypeProtos.MinorType.BIT, DataMode.OPTIONAL))); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java index 16f520cd8..7a864f026 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java @@ -18,12 +18,15 @@ package org.apache.drill.exec.store.parquet2; import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; +import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.parquet.RowGroupReadEntry; import org.apache.drill.exec.vector.BaseValueVector; @@ -48,11 +51,12 @@ import parquet.schema.Type; import java.io.IOException; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; -public class DrillParquetReader implements RecordReader { +public class DrillParquetReader extends AbstractRecordReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillParquetReader.class); @@ -60,7 +64,6 @@ public class DrillParquetReader implements RecordReader { private MessageType schema; private Configuration conf; private RowGroupReadEntry entry; - private List<SchemaPath> columns; private VectorContainerWriter writer; private ColumnChunkIncReadStore pageReadStore; private parquet.io.RecordReader<Void> recordReader; @@ -73,11 +76,11 @@ public class DrillParquetReader implements RecordReader { public DrillParquetReader(ParquetMetadata footer, RowGroupReadEntry entry, List<SchemaPath> columns, Configuration conf) { this.footer = footer; this.conf = conf; - this.columns = columns; this.entry = entry; + setColumns(columns); } - public static MessageType getProjection(MessageType schema, List<SchemaPath> columns) { + public static MessageType getProjection(MessageType schema, Collection<SchemaPath> columns) { MessageType projection = null; for (SchemaPath path : columns) { List<String> segments = Lists.newArrayList(); @@ -117,10 +120,10 @@ public class DrillParquetReader implements RecordReader { schema = footer.getFileMetaData().getSchema(); MessageType projection = null; - if (columns == null || columns.size() == 0) { + if (isStarQuery()) { projection = schema; } else { - projection = getProjection(schema, columns); + projection = getProjection(schema, getColumns()); if (projection == null) { projection = schema; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java index f27e8e659..3c5d9a4a2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java @@ -64,17 +64,25 @@ public class BlockMapBuilder { return codecFactory.getCodec(fileStatus.getPath()) != null; } - public List<CompleteFileWork> generateFileWork(List<FileStatus> files, boolean blockify) throws IOException{ + public List<CompleteFileWork> generateFileWork(List<FileStatus> files, boolean blockify) throws IOException { List<CompleteFileWork> work = Lists.newArrayList(); - for(FileStatus f : files){ - ImmutableRangeMap<Long,BlockLocation> rangeMap = getBlockMap(f); - if(!blockify || compressed(f)){ - work.add(new CompleteFileWork(this.getEndpointByteMap(new FileStatusWork(f)), 0, f.getLen(), f.getPath().toString())); - continue; + boolean error = false; + for(FileStatus f : files) { + error = false; + if (blockify && !compressed(f)) { + try { + ImmutableRangeMap<Long, BlockLocation> rangeMap = getBlockMap(f); + for (Entry<Range<Long>, BlockLocation> l : rangeMap.asMapOfRanges().entrySet()) { + work.add(new CompleteFileWork(getEndpointByteMap(new FileStatusWork(f)), l.getValue().getOffset(), l.getValue().getLength(), f.getPath().toString())); + } + } catch (IOException e) { + logger.warn("failure while generating file work.", e); + error = true; + } } - - for(Entry<Range<Long>, BlockLocation> l : rangeMap.asMapOfRanges().entrySet()){ - work.add(new CompleteFileWork(this.getEndpointByteMap(new FileStatusWork(f)), l.getValue().getOffset(), l.getValue().getLength(), f.getPath().toString())); + + if (!blockify || error || compressed(f)) { + work.add(new CompleteFileWork(getEndpointByteMap(new FileStatusWork(f)), 0, f.getLen(), f.getPath().toString())); } } return work; @@ -135,7 +143,7 @@ public class BlockMapBuilder { private ImmutableRangeMap<Long,BlockLocation> getBlockMap(Path path) throws IOException{ ImmutableRangeMap<Long,BlockLocation> blockMap = blockMapMap.get(path); - if(blockMap == null){ + if(blockMap == null) { blockMap = buildBlockMap(path); } return blockMap; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java index 68921a2d0..2031aeee7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.FieldReference; @@ -34,6 +36,7 @@ import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.RepeatedVarCharVector; @@ -48,7 +51,7 @@ import org.apache.hadoop.mapred.TextInputFormat; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -public class DrillTextRecordReader implements RecordReader { +public class DrillTextRecordReader extends AbstractRecordReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTextRecordReader.class); static final String COL_NAME = "columns"; @@ -66,36 +69,31 @@ public class DrillTextRecordReader implements RecordReader { private Text value; private int numCols = 0; private boolean redoRecord = false; - private boolean first = true; public DrillTextRecordReader(FileSplit split, FragmentContext context, char delimiter, List<SchemaPath> columns) { this.fragmentContext = context; this.delimiter = (byte) delimiter; - boolean getEntireRow = false; + setColumns(columns); - if(columns != null) { + if (!isStarQuery()) { + String pathStr; for (SchemaPath path : columns) { assert path.getRootSegment().isNamed(); - Preconditions.checkArgument(path.getRootSegment().getPath().equals(COL_NAME), "Selected column must have name 'columns'"); - // FIXME: need re-work for text column push-down. + pathStr = path.getRootSegment().getPath(); + Preconditions.checkArgument(pathStr.equals(COL_NAME) || (pathStr.equals("*") && path.getRootSegment().getChild() == null), + "Selected column(s) must have name 'columns' or must be plain '*'"); + if (path.getRootSegment().getChild() != null) { - Preconditions.checkArgument(path.getRootSegment().getChild().isArray(),"Selected column must be an array index"); + Preconditions.checkArgument(path.getRootSegment().getChild().isArray(), "Selected column must be an array index"); int index = path.getRootSegment().getChild().getArraySegment().getIndex(); columnIds.add(index); - } else { - getEntireRow = true; } } Collections.sort(columnIds); + numCols = columnIds.size(); } targetRecordCount = context.getConfig().getInt(ExecConstants.TEXT_LINE_READER_BATCH_SIZE); - /* If one of the columns requested is the entire row ('columns') then ignore the rest of the columns - * we are going copy all the values in the repeated varchar vector - */ - if (!getEntireRow) { - numCols = columnIds.size(); - } TextInputFormat inputFormat = new TextInputFormat(); JobConf job = new JobConf(); job.setInt("io.file.buffer.size", context.getConfig().getInt(ExecConstants.TEXT_LINE_READER_BUFFER_SIZE)); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java index 79c94c82e..fa26b5416 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java @@ -18,6 +18,10 @@ package org.apache.drill.exec.vector.complex.fn; import io.netty.buffer.DrillBuf; +import com.google.common.base.Preconditions; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.UnpooledByteBufAllocator; import java.io.IOException; import java.io.Reader; @@ -31,7 +35,9 @@ import org.apache.drill.exec.expr.holders.BigIntHolder; import org.apache.drill.exec.expr.holders.BitHolder; import org.apache.drill.exec.expr.holders.Float8Holder; import org.apache.drill.exec.expr.holders.VarCharHolder; +import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.store.AbstractRecordReader; import org.apache.drill.exec.vector.complex.writer.BaseWriter; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter; @@ -66,22 +72,22 @@ public class JsonReader { private boolean allTextMode; public JsonReader() throws IOException { - this(null, null, false); + this(null, false); + } + + public JsonReader(DrillBuf managedBuf, boolean allTextMode) throws IOException { + this(managedBuf, GroupScan.ALL_COLUMNS, allTextMode); } public JsonReader(DrillBuf managedBuf, List<SchemaPath> columns, boolean allTextMode) throws JsonParseException, IOException { factory.configure(Feature.ALLOW_UNQUOTED_FIELD_NAMES, true); factory.configure(Feature.ALLOW_COMMENTS, true); - this.workBuf = managedBuf; + assert Preconditions.checkNotNull(columns).size() > 0 : "json record reader requires at least a column"; this.columns = columns; - // TODO - remove this check once the optimizer is updated to push down * instead of a null list - if (this.columns == null) { - this.columns = new ArrayList(); - this.columns.add(new SchemaPath(new PathSegment.NameSegment("*"))); + this.starRequested = containsStar(); + this.workBuf = managedBuf; this.allTextMode = allTextMode; - } this.columnsFound = new boolean[this.columns.size()]; - this.starRequested = containsStar(); } private boolean containsStar() { @@ -109,7 +115,7 @@ public class JsonReader { public List<SchemaPath> getNullColumns() { ArrayList<SchemaPath> nullColumns = new ArrayList<SchemaPath>(); for (int i = 0; i < columnsFound.length; i++ ) { - if ( ! columnsFound[i] && ! columns.get(i).getRootSegment().getPath().equals("*") ) { + if ( ! columnsFound[i] && !columns.get(i).equals(AbstractRecordReader.STAR_COLUMN)) { nullColumns.add(columns.get(i)); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java index c2dcc951b..0636db6c1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java @@ -25,6 +25,7 @@ import java.io.Reader; import java.util.List; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; import com.fasterxml.jackson.core.JsonParseException; @@ -46,7 +47,7 @@ public class JsonReaderWithState { } public JsonReaderWithState(JsonRecordSplitter splitter) throws IOException{ - this(splitter, null, null, false); + this(splitter, null, GroupScan.ALL_COLUMNS, false); } public List<SchemaPath> getNullColumns() { diff --git a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json index eadb1bced..31df303d5 100644 --- a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json +++ b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json @@ -43,6 +43,11 @@ type: "file", connection: "classpath:///", formats: { + "csv" : { + type: "text", + extensions: [ "csv" ], + delimiter: "," + }, "json" : { type: "json" }, diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java index ec8e92e1c..8520b9bf9 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java @@ -18,6 +18,8 @@ package org.apache.drill; +import java.util.List; + import org.junit.Ignore; import org.junit.Test; @@ -105,6 +107,32 @@ public class TestProjectPushDown extends PlanTestBase { testPhysicalPlanFromFile("queries/tpch/03.sql", expectedColNames1, expectedColNames2, expectedColNames3); } + + private static final String pushDownSql = "select %s from cp.`%s` t"; + private static final String pushDownSqlWithFilter = pushDownSql + " where %s"; + private final String[] inputTypes = new String[] { + "project/pushdown/empty.json", + "project/pushdown/empty.csv", + "tpch/lineitem.parquet" + }; + + @Test + public void testProjectPushDown() throws Exception { + final String projection = "t.trans_id, t.user_info.cust_id, t.marketing_info.keywords[0]"; + final String expected = "\"columns\" : [ \"`trans_id`\", \"`user_info`.`cust_id`\", \"`marketing_info`.`keywords`[0]\" ],"; + final String filter = "t.another_field = 10 and t.columns[0] = 100 and t.columns[1] = t.other.columns[2]"; + final String expectedWithFilter = "\"columns\" : [ \"`another_field`\", \"`trans_id`\", \"`user_info`.`cust_id`\", \"`marketing_info`.`keywords`[0]\", \"`columns`[0]\", \"`columns`[1]\", \"`other`.`columns`[2]\" ],"; + + for (String inputType:inputTypes) { + testPushDown(new PushDownTestInstance(pushDownSql, expected, projection, inputType)); + testPushDown(new PushDownTestInstance(pushDownSqlWithFilter, expectedWithFilter, projection, inputType, filter)); + } + } + + protected void testPushDown(PushDownTestInstance test) throws Exception { + testPhysicalPlan(test.getSql(), test.getExpected()); + } + private void testPhysicalPlanFromFile(String fileName, String... expectedSubstrs) throws Exception { String query = getFile(fileName); @@ -116,4 +144,24 @@ public class TestProjectPushDown extends PlanTestBase { } } + protected static class PushDownTestInstance { + private final String sqlPattern; + private final String expected; + private final Object[] params; + + public PushDownTestInstance(String sqlPattern, String expected, Object... params) { + this.sqlPattern = sqlPattern; + this.expected = expected; + this.params = params; + } + + public String getExpected() { + return expected; + } + + public String getSql() { + return String.format(sqlPattern, params); + } + } + } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java index bfc0d20e5..10b177542 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java @@ -34,6 +34,7 @@ import org.apache.drill.common.util.FileUtils; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.TopLevelAllocator; +import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.proto.UserBitShared; import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.record.VectorWrapper; @@ -64,7 +65,7 @@ public class TestJsonReader extends BaseTestQuery { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestJsonReader.class); private static BufferAllocator allocator; - private static final boolean VERBOSE_DEBUG = false; + private static final boolean VERBOSE_DEBUG = true; @BeforeClass public static void setupAllocator(){ @@ -168,6 +169,7 @@ public class TestJsonReader extends BaseTestQuery { String[] queries = {Files.toString(FileUtils.getResourceAsFile("/store/json/project_pushdown_json_physical_plan.json"), Charsets.UTF_8)}; long[] rowCounts = {3}; String filename = "/store/json/schema_change_int_to_string.json"; + test("alter system set `store.json.all_text_mode` = false"); runTestsOnFile(filename, UserBitShared.QueryType.PHYSICAL, queries, rowCounts); List<QueryResultBatch> results = testPhysicalWithResults(queries[0]); @@ -271,7 +273,8 @@ public class TestJsonReader extends BaseTestQuery { writer.allocate(); DrillBuf buffer = allocator.buffer(255); - JsonReaderWithState jsonReader = new JsonReaderWithState(new ReaderJSONRecordSplitter(compound), buffer, null, false); + JsonReaderWithState jsonReader = new JsonReaderWithState(new ReaderJSONRecordSplitter(compound), buffer, + GroupScan.ALL_COLUMNS, false); int i =0; List<Integer> batchSizes = Lists.newArrayList(); diff --git a/exec/java-exec/src/test/resources/project/pushdown/empty.csv b/exec/java-exec/src/test/resources/project/pushdown/empty.csv new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/exec/java-exec/src/test/resources/project/pushdown/empty.csv diff --git a/exec/java-exec/src/test/resources/project/pushdown/empty.json b/exec/java-exec/src/test/resources/project/pushdown/empty.json new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/exec/java-exec/src/test/resources/project/pushdown/empty.json diff --git a/exec/java-exec/src/test/resources/project/pushdown/empty.parquet b/exec/java-exec/src/test/resources/project/pushdown/empty.parquet new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/exec/java-exec/src/test/resources/project/pushdown/empty.parquet |