aboutsummaryrefslogtreecommitdiff
path: root/exec
diff options
context:
space:
mode:
authorHanifi Gunes <hgunes@maprtech.com>2014-08-28 10:21:07 -0700
committerJacques Nadeau <jacques@apache.org>2014-08-29 00:00:41 -0700
commitf148694738a84832d75aca4ef69bff47c68b463f (patch)
treecfdc63aee30e3bc7a79021e528d79a67821b0169 /exec
parentc1c0eba5bad30a433161ebf4e1b6c36ace2a881e (diff)
DRILL-1309: Implement ProjectPastFilterPushdown and update DrillScanRel cost model so that exclusive column so that star query is more expensive than exclusive column projection. Various fixes affecting record reaaders to handle `*` column as well as fixes to some test cases.
exclude parquet files from rat check
Diffstat (limited to 'exec')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjectPastFilterRule.java94
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java57
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/impl/DrillParserWithCompoundIdConverter.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java62
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java69
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java18
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java68
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java15
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java28
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java28
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java24
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java3
-rw-r--r--exec/java-exec/src/main/resources/bootstrap-storage-plugins.json5
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java48
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java7
-rw-r--r--exec/java-exec/src/test/resources/project/pushdown/empty.csv0
-rw-r--r--exec/java-exec/src/test/resources/project/pushdown/empty.json0
-rw-r--r--exec/java-exec/src/test/resources/project/pushdown/empty.parquet0
28 files changed, 412 insertions, 165 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java
index 72bec3b33..62d526bd9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java
@@ -67,12 +67,14 @@ public class JsonConvertFrom {
String input = new String(buf, com.google.common.base.Charsets.UTF_8);
try {
- org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader(buffer, null, false);
+ org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader(buffer, false);
jsonReader.write(new java.io.StringReader(input), writer);
} catch (Exception e) {
- System.out.println(" msg = " + e.getMessage() + " trace : " + e.getStackTrace());
+// System.out.println("Error while converting from JSON. ");
+// e.printStackTrace();
+ throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
}
}
}
@@ -94,12 +96,14 @@ public class JsonConvertFrom {
String input = new String(buf, com.google.common.base.Charsets.UTF_8);
try {
- org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader(buffer, null, false);
+ org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader(buffer, false);
jsonReader.write(new java.io.StringReader(input), writer);
} catch (Exception e) {
- System.out.println(" msg = " + e.getMessage() + " trace : " + e.getStackTrace());
+// System.out.println("Error while converting from JSON. ");
+// e.printStackTrace();
+ throw new org.apache.drill.common.exceptions.DrillRuntimeException("Error while converting from JSON. ", e);
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index fa20a9060..9c27c0c37 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.base;
import java.util.List;
+import com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.PhysicalOperatorSetupException;
import org.apache.drill.common.expression.SchemaPath;
@@ -32,6 +33,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
*/
public interface GroupScan extends Scan, HasAffinity{
+ public static final List<SchemaPath> ALL_COLUMNS = Lists.<SchemaPath>newArrayList(SchemaPath.getSimplePath("*"));
+
public abstract void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException;
public abstract SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjectPastFilterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjectPastFilterRule.java
new file mode 100644
index 000000000..dcec68a0d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjectPastFilterRule.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.eigenbase.rel.FilterRel;
+import org.eigenbase.rel.ProjectRel;
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.rel.rules.PushProjector;
+import org.eigenbase.relopt.RelOptRule;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.rex.RexCall;
+import org.eigenbase.rex.RexNode;
+import org.eigenbase.rex.RexOver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DrillPushProjectPastFilterRule extends RelOptRule {
+
+ private final static Logger logger = LoggerFactory.getLogger(DrillPushProjectPastFilterRule.class);
+
+ public final static RelOptRule INSTANCE = new DrillPushProjectPastFilterRule(new PushProjector.ExprCondition() {
+ @Override
+ public boolean test(RexNode expr) {
+ if (expr instanceof RexCall) {
+ RexCall call = (RexCall)expr;
+ return "ITEM".equals(call.getOperator().getName());
+ }
+ return false;
+ }
+ });
+
+ /**
+ * Expressions that should be preserved in the projection
+ */
+ private final PushProjector.ExprCondition preserveExprCondition;
+
+ private DrillPushProjectPastFilterRule(PushProjector.ExprCondition preserveExprCondition) {
+ super(RelOptHelper.any(ProjectRel.class, FilterRel.class));
+ this.preserveExprCondition = preserveExprCondition;
+ }
+
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ ProjectRel origProj;
+ FilterRel filterRel;
+
+ if (call.rels.length == 2) {
+ origProj = call.rel(0);
+ filterRel = call.rel(1);
+ } else {
+ origProj = null;
+ filterRel = call.rel(0);
+ }
+ RelNode rel = filterRel.getChild();
+ RexNode origFilter = filterRel.getCondition();
+
+ if ((origProj != null) && RexOver.containsOver(origProj.getProjects(), null)) {
+ // Cannot push project through filter if project contains a windowed
+ // aggregate -- it will affect row counts. Abort this rule
+ // invocation; pushdown will be considered after the windowed
+ // aggregate has been implemented. It's OK if the filter contains a
+ // windowed aggregate.
+ return;
+ }
+
+ PushProjector pushProjector = createPushProjector(origProj, origFilter, rel, preserveExprCondition);
+ RelNode topProject = pushProjector.convertProject(null);
+
+ if (topProject != null) {
+ call.transformTo(topProject);
+ }
+ }
+
+ protected PushProjector createPushProjector(ProjectRel origProj, RexNode origFilter, RelNode rel,
+ PushProjector.ExprCondition preserveExprCondition) {
+ return new PushProjector(origProj, origFilter,rel, preserveExprCondition);
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
index 63de69c95..cf92121be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRuleSets.java
@@ -89,7 +89,8 @@ public class DrillRuleSets {
RemoveDistinctAggregateRule.INSTANCE, //
ReduceAggregatesRule.INSTANCE, //
PushProjectPastJoinRule.INSTANCE,
- PushProjectPastFilterRule.INSTANCE,
+// PushProjectPastFilterRule.INSTANCE,
+ DrillPushProjectPastFilterRule.INSTANCE,
// SwapJoinRule.INSTANCE, //
// PushJoinThroughJoinRule.RIGHT, //
// PushJoinThroughJoinRule.LEFT, //
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
index dcbfb3d22..d6bbcd3e8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillScanRel.java
@@ -21,12 +21,16 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
import org.apache.drill.common.JSONOptions;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.data.LogicalOperator;
import org.apache.drill.common.logical.data.Scan;
+import org.apache.drill.exec.physical.base.AbstractGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
@@ -46,6 +50,8 @@ import org.eigenbase.reltype.RelDataType;
* GroupScan of a Drill table.
*/
public class DrillScanRel extends DrillScanRelBase implements DrillRel {
+ private final static int STAR_COLUMN_COST = 10000;
+
final private RelDataType rowType;
private GroupScan groupScan;
@@ -54,7 +60,7 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel {
RelOptTable table) {
// By default, scan does not support project pushdown.
// Decision whether push projects into scan will be made solely in DrillPushProjIntoScanRule.
- this(cluster, traits, table, table.getRowType(), null);
+ this(cluster, traits, table, table.getRowType(), AbstractGroupScan.ALL_COLUMNS);
}
/** Creates a DrillScan. */
@@ -62,26 +68,21 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel {
RelOptTable table, RelDataType rowType, List<SchemaPath> columns) {
super(DRILL_LOGICAL, cluster, traits, table);
this.rowType = rowType;
-
+ columns = columns == null || columns.size() == 0 ? GroupScan.ALL_COLUMNS : columns;
try {
- if (columns == null || columns.isEmpty()) {
- this.groupScan = (GroupScan) getCopy(this.drillTable.getGroupScan()) ;
- } else {
- this.groupScan = this.drillTable.getGroupScan().clone(columns);
- }
+ this.groupScan = drillTable.getGroupScan().clone(columns);
} catch (IOException e) {
throw new DrillRuntimeException("Failure creating scan.", e);
}
-
- }
-
- private static GroupScan getCopy(GroupScan scan){
- try {
- return (GroupScan) scan.getNewWithChildren((List<PhysicalOperator>) (Object) Collections.emptyList());
- } catch (ExecutionSetupException e) {
- throw new DrillRuntimeException("Unexpected failure while coping node.", e);
- }
}
+//
+// private static GroupScan getCopy(GroupScan scan){
+// try {
+// return (GroupScan) scan.getNewWithChildren((List<PhysicalOperator>) (Object) Collections.emptyList());
+// } catch (ExecutionSetupException e) {
+// throw new DrillRuntimeException("Unexpected failure while coping node.", e);
+// }
+// }
@Override
@@ -118,15 +119,29 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel {
/// by both logical and physical rels.
@Override
public RelOptCost computeSelfCost(RelOptPlanner planner) {
- ScanStats stats = this.groupScan.getScanStats();
- int columnCount = this.getRowType().getFieldCount();
+ ScanStats stats = groupScan.getScanStats();
+ int columnCount = getRowType().getFieldCount();
+ double ioCost = 0;
+ boolean isStarQuery = Iterables.tryFind(getRowType().getFieldNames(), new Predicate<String>() {
+ @Override
+ public boolean apply(String input) {
+ return Preconditions.checkNotNull(input).equals("*");
+ }
+ }).isPresent();
- if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
- return planner.getCostFactory().makeCost(stats.getRecordCount() * columnCount, stats.getCpuCost(), stats.getDiskCost());
+ if (isStarQuery) {
+ columnCount = STAR_COLUMN_COST;
}
// double rowCount = RelMetadataQuery.getRowCount(this);
double rowCount = stats.getRecordCount();
+ if (rowCount < 1) {
+ rowCount = 1;
+ }
+
+ if(PrelUtil.getSettings(getCluster()).useDefaultCosting()) {
+ return planner.getCostFactory().makeCost(rowCount * columnCount, stats.getCpuCost(), stats.getDiskCost());
+ }
double cpuCost = rowCount * columnCount; // for now, assume cpu cost is proportional to row count.
// Even though scan is reading from disk, in the currently generated plans all plans will
@@ -134,7 +149,7 @@ public class DrillScanRel extends DrillScanRelBase implements DrillRel {
// In the future we might consider alternative scans that go against projections or
// different compression schemes etc that affect the amount of data read. Such alternatives
// would affect both cpu and io cost.
- double ioCost = 0;
+
DrillCostFactory costFactory = (DrillCostFactory)planner.getCostFactory();
return costFactory.makeCost(rowCount, cpuCost, ioCost, 0);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java
index 92272f879..2e253aba7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/RelOptHelper.java
@@ -32,6 +32,10 @@ public class RelOptHelper {
public static RelOptRuleOperand any(Class<? extends RelNode> first){
return RelOptRule.operand(first, RelOptRule.any());
}
+
+ public static RelOptRuleOperand any(Class<? extends RelNode> first, Class<? extends RelNode> second) {
+ return RelOptRule.operand(first, RelOptRule.operand(second, RelOptRule.any()));
+ }
public static RelOptRuleOperand some(Class<? extends RelNode> rel, RelOptRuleOperand first, RelOptRuleOperand... rest){
return RelOptRule.operand(rel, RelOptRule.some(first, rest));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
index 63db153ba..25fa0cb19 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/ExplainHandler.java
@@ -55,8 +55,8 @@ public class ExplainHandler extends DefaultSqlHandler{
SqlNode sqlNode = rewrite(node);
SqlNode validated = validateNode(sqlNode);
RelNode rel = convertToRel(validated);
- DrillRel drel = convertToDrel(rel);
log("Optiq Logical", rel);
+ DrillRel drel = convertToDrel(rel);
log("Drill Logical", drel);
if(mode == ResultMode.LOGICAL){
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/impl/DrillParserWithCompoundIdConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/impl/DrillParserWithCompoundIdConverter.java
index 48867410c..382456a62 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/impl/DrillParserWithCompoundIdConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/impl/DrillParserWithCompoundIdConverter.java
@@ -21,6 +21,7 @@ import org.apache.drill.exec.planner.sql.parser.CompoundIdentifierConverter;
import org.eigenbase.sql.SqlNode;
import org.eigenbase.sql.parser.SqlAbstractParserImpl;
import org.eigenbase.sql.parser.SqlParserImplFactory;
+import org.eigenbase.sql.util.SqlVisitor;
import java.io.Reader;
@@ -39,15 +40,19 @@ public class DrillParserWithCompoundIdConverter extends DrillParserImpl {
super(stream);
}
+ protected SqlVisitor<SqlNode> createConverter() {
+ return new CompoundIdentifierConverter();
+ }
+
@Override
public SqlNode parseSqlExpressionEof() throws Exception {
SqlNode originalSqlNode = super.parseSqlExpressionEof();
- return originalSqlNode.accept(new CompoundIdentifierConverter());
+ return originalSqlNode.accept(createConverter());
}
@Override
public SqlNode parseSqlStmtEof() throws Exception {
SqlNode originalSqlNode = super.parseSqlStmtEof();
- return originalSqlNode.accept(new CompoundIdentifierConverter());
+ return originalSqlNode.accept(createConverter());
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
new file mode 100644
index 000000000..4cc06c81f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractRecordReader.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store;
+
+import java.util.Collection;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import org.apache.drill.common.expression.SchemaPath;
+
+public abstract class AbstractRecordReader implements RecordReader {
+ private static final String COL_NULL_ERROR = "Columns cannot be null. Use star column to select all fields.";
+ private static final String COL_EMPTY_ERROR = "Readers needs at least a column to read.";
+ public static final SchemaPath STAR_COLUMN = SchemaPath.getSimplePath("*");
+
+ private Collection<SchemaPath> columns = null;
+ private boolean isStarQuery = false;
+
+ protected final void setColumns(Collection<SchemaPath> projected) {
+ assert Preconditions.checkNotNull(projected, COL_NULL_ERROR).size() > 0 : COL_EMPTY_ERROR;
+ isStarQuery = isStarQuery(projected);
+ columns = transformColumns(projected);
+ }
+
+ protected Collection<SchemaPath> getColumns() {
+ return columns;
+ }
+
+ protected Collection<SchemaPath> transformColumns(Collection<SchemaPath> projected) {
+ return projected;
+ }
+
+ protected boolean isStarQuery() {
+ return isStarQuery;
+ }
+
+ public static boolean isStarQuery(Collection<SchemaPath> projected) {
+ return Iterables.tryFind(Preconditions.checkNotNull(projected, COL_NULL_ERROR), new Predicate<SchemaPath>() {
+ @Override
+ public boolean apply(SchemaPath path) {
+ return Preconditions.checkNotNull(path).equals(STAR_COLUMN);
+ }
+ }).isPresent();
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
index b32824529..9cdfe245c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractStoragePlugin.java
@@ -50,7 +50,7 @@ public abstract class AbstractStoragePlugin implements StoragePlugin{
@Override
public AbstractGroupScan getPhysicalScan(JSONOptions selection) throws IOException {
- return getPhysicalScan(selection, null);
+ return getPhysicalScan(selection, AbstractGroupScan.ALL_COLUMNS);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
index 2f0e85434..ec9a04e38 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSystemPlugin.java
@@ -109,11 +109,6 @@ public class FileSystemPlugin extends AbstractStoragePlugin{
}
@Override
- public AbstractGroupScan getPhysicalScan(JSONOptions selection) throws IOException {
- return this.getPhysicalScan(selection, null);
- }
-
- @Override
public AbstractGroupScan getPhysicalScan(JSONOptions selection, List<SchemaPath> columns) throws IOException {
FormatSelection formatSelection = selection.getWith(context.getConfig(), FormatSelection.class);
FormatPlugin plugin;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index bf8e301c2..d1923a500 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -186,7 +186,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
@Override
public AbstractGroupScan getGroupScan(FileSelection selection) throws IOException {
- return new EasyGroupScan(selection, this, null, selection.selectionRoot);
+ return new EasyGroupScan(selection, this, selection.selectionRoot);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 3de99c5fc..2bdf1a660 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.store.dfs.easy;
import java.io.IOException;
-import java.util.Collections;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -72,21 +71,15 @@ public class EasyGroupScan extends AbstractGroupScan{
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("selectionRoot") String selectionRoot
) throws IOException, ExecutionSetupException {
+ this(new FileSelection(files, true),
+ (EasyFormatPlugin<?>)engineRegistry.getFormatPlugin(storageConfig, formatConfig),
+ columns,
+ selectionRoot);
+ }
- this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
- Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config.");
- this.selection = new FileSelection(files, true);
- try{
- BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem().getUnderlying(), formatPlugin.getContext().getBits());
- this.chunks = b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), formatPlugin.isBlockSplittable());
- this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
- }catch(IOException e){
- logger.warn("Failure determining endpoint affinity.", e);
- this.endpointAffinities = Collections.emptyList();
- }
- maxWidth = chunks.size();
- this.columns = columns;
- this.selectionRoot = selectionRoot;
+ public EasyGroupScan(FileSelection selection, EasyFormatPlugin<?> formatPlugin, String selectionRoot)
+ throws IOException {
+ this(selection, formatPlugin, ALL_COLUMNS, selectionRoot);
}
public EasyGroupScan(
@@ -95,30 +88,26 @@ public class EasyGroupScan extends AbstractGroupScan{
List<SchemaPath> columns,
String selectionRoot
) throws IOException{
- this.selection = selection;
- this.formatPlugin = formatPlugin;
- this.columns = columns;
- try{
- BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem().getUnderlying(), formatPlugin.getContext().getBits());
- this.chunks = b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), formatPlugin.isBlockSplittable());
- this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
- }catch(IOException e){
- logger.warn("Failure determining endpoint affinity.", e);
- this.endpointAffinities = Collections.emptyList();
- }
- maxWidth = chunks.size();
+ this.selection = Preconditions.checkNotNull(selection);
+ this.formatPlugin = Preconditions.checkNotNull(formatPlugin, "Unable to load format plugin for provided format config.");
+ this.columns = columns == null || columns.size() == 0? ALL_COLUMNS : columns;
this.selectionRoot = selectionRoot;
+ BlockMapBuilder b = new BlockMapBuilder(formatPlugin.getFileSystem().getUnderlying(), formatPlugin.getContext().getBits());
+ this.chunks = b.generateFileWork(selection.getFileStatusList(formatPlugin.getFileSystem()), formatPlugin.isBlockSplittable());
+ this.maxWidth = chunks.size();
+ this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
}
private EasyGroupScan(EasyGroupScan that) {
- this.chunks = that.chunks;
- this.columns = that.columns;
- this.endpointAffinities = that.endpointAffinities;
- this.formatPlugin = that.formatPlugin;
- this.mappings = that.mappings;
- this.maxWidth = that.maxWidth;
- this.selection = that.selection;
- this.selectionRoot = that.selectionRoot;
+ Preconditions.checkNotNull(that, "Unable to clone: source is null.");
+ selection = that.selection;
+ formatPlugin = that.formatPlugin;
+ columns = that.columns;
+ selectionRoot = that.selectionRoot;
+ chunks = that.chunks;
+ endpointAffinities = that.endpointAffinities;
+ maxWidth = that.maxWidth;
+ mappings = that.mappings;
}
public String getSelectionRoot() {
@@ -168,16 +157,16 @@ public class EasyGroupScan extends AbstractGroupScan{
@Override
public List<EndpointAffinity> getOperatorAffinity() {
assert chunks != null && chunks.size() > 0;
- if (this.endpointAffinities == null) {
+ if (endpointAffinities == null) {
logger.debug("chunks: {}", chunks.size());
- this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
+ endpointAffinities = AffinityCreator.getAffinityMap(chunks);
}
- return this.endpointAffinities;
+ return endpointAffinities;
}
@Override
public void applyAssignments(List<DrillbitEndpoint> incomingEndpoints) {
- this.mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks);
+ mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks);
}
@Override
@@ -232,7 +221,7 @@ public class EasyGroupScan extends AbstractGroupScan{
@JsonIgnore
public boolean canPushdownProjects(List<SchemaPath> columns) {
- return this.formatPlugin.supportsPushDown();
+ return formatPlugin.supportsPushDown();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 81a23b024..6e1aa0ad0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -72,7 +72,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
@Override
public AbstractGroupScan getGroupScan(FileSelection selection, List<SchemaPath> columns) throws IOException {
- return new EasyGroupScan(selection, this, null, selection.selectionRoot); //TODO : textformat supports project?
+ return new EasyGroupScan(selection, this, columns, selection.selectionRoot); //TODO : textformat supports project?
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
index d1a086c32..fd40f417a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -26,6 +26,7 @@ import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.physical.base.AbstractBase;
+import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
import org.apache.drill.exec.physical.base.SubScan;
@@ -60,16 +61,9 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
@JsonProperty("columns") List<SchemaPath> columns, //
@JsonProperty("selectionRoot") String selectionRoot //
) throws ExecutionSetupException {
-
- if(formatConfig == null) formatConfig = new ParquetFormatConfig();
- Preconditions.checkNotNull(storageConfig);
- Preconditions.checkNotNull(formatConfig);
- this.formatPlugin = (ParquetFormatPlugin) registry.getFormatPlugin(storageConfig, formatConfig);
- Preconditions.checkNotNull(formatPlugin);
- this.rowGroupReadEntries = rowGroupReadEntries;
- this.formatConfig = formatPlugin.getConfig();
- this.columns = columns;
- this.selectionRoot = selectionRoot;
+ this((ParquetFormatPlugin) registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig),
+ formatConfig == null ? new ParquetFormatConfig() : formatConfig),
+ rowGroupReadEntries, columns, selectionRoot);
}
public ParquetRowGroupScan( //
@@ -77,10 +71,10 @@ public class ParquetRowGroupScan extends AbstractBase implements SubScan {
List<RowGroupReadEntry> rowGroupReadEntries, //
List<SchemaPath> columns,
String selectionRoot) {
- this.formatPlugin = formatPlugin;
+ this.formatPlugin = Preconditions.checkNotNull(formatPlugin);
this.formatConfig = formatPlugin.getConfig();
this.rowGroupReadEntries = rowGroupReadEntries;
- this.columns = columns;
+ this.columns = columns == null || columns.size() == 0 ? GroupScan.ALL_COLUMNS : columns;
this.selectionRoot = selectionRoot;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index a453e66bf..f9b6d91b5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.RecordReader;
import com.google.common.base.Preconditions;
@@ -65,11 +66,9 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
List<String[]> partitionColumns = Lists.newArrayList();
List<Integer> selectedPartitionColumns = Lists.newArrayList();
- boolean selectAllColumns = false;
+ boolean selectAllColumns = AbstractRecordReader.isStarQuery(columns);
- if (columns == null || columns.size() == 0) {
- selectAllColumns = true;
- } else {
+ if (!selectAllColumns) {
List<SchemaPath> newColums = Lists.newArrayList();
Pattern pattern = Pattern.compile(String.format("%s[0-9]+", partitionDesignator));
for (SchemaPath column : columns) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 34e7aea36..6c2d44c4b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -20,8 +20,13 @@ package org.apache.drill.exec.store.parquet.columnreaders;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
@@ -33,8 +38,10 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.vector.NullableBitVector;
import org.apache.drill.exec.vector.ValueVector;
@@ -53,7 +60,7 @@ import parquet.hadoop.metadata.ColumnChunkMetaData;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.schema.PrimitiveType;
-public class ParquetRecordReader implements RecordReader {
+public class ParquetRecordReader extends AbstractRecordReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordReader.class);
// this value has been inflated to read in multiple value vectors at once, and then break them up into smaller vectors
@@ -75,8 +82,8 @@ public class ParquetRecordReader implements RecordReader {
private int bitWidthAllFixedFields;
private boolean allFieldsFixedLength;
private int recordsPerBatch;
- private long totalRecords;
- private long rowGroupOffset;
+// private long totalRecords;
+// private long rowGroupOffset;
private List<ColumnReader> columnStatuses;
private FileSystem fileSystem;
@@ -84,8 +91,6 @@ public class ParquetRecordReader implements RecordReader {
Path hadoopPath;
private VarLenBinaryReader varLengthReader;
private ParquetMetadata footer;
- private List<SchemaPath> columns;
- private FragmentContext fragmentContext;
private OperatorContext operatorContext;
// This is a parallel list to the columns list above, it is used to determine the subset of the project
// pushdown columns that do not appear in this file
@@ -117,17 +122,13 @@ public class ParquetRecordReader implements RecordReader {
String path, int rowGroupIndex, FileSystem fs,
CodecFactoryExposer codecFactoryExposer, ParquetMetadata footer,
List<SchemaPath> columns) throws ExecutionSetupException {
- hadoopPath = new Path(path);
- fileSystem = fs;
+ this.hadoopPath = new Path(path);
+ this.fileSystem = fs;
this.codecFactoryExposer = codecFactoryExposer;
this.rowGroupIndex = rowGroupIndex;
this.batchSize = batchSize;
this.footer = footer;
- this.columns = columns;
- if (this.columns != null) {
- columnsFound = new boolean[this.columns.size()];
- nullFilledVectors = new ArrayList();
- }
+ setColumns(columns);
}
public CodecFactoryExposer getCodecFactoryExposer() {
@@ -184,25 +185,29 @@ public class ParquetRecordReader implements RecordReader {
// TODO - not sure if this is how we want to represent this
// for now it makes the existing tests pass, simply selecting
// all available data if no columns are provided
- if (this.columns != null){
- int i = 0;
- for (SchemaPath expr : this.columns){
- if ( field.matches(expr)){
- columnsFound[i] = true;
- return true;
- }
- i++;
+ if (isStarQuery()) {
+ return true;
+ }
+
+ int i = 0;
+ for (SchemaPath expr : getColumns()){
+ if ( field.matches(expr)){
+ columnsFound[i] = true;
+ return true;
}
- return false;
+ i++;
}
- return true;
+ return false;
}
@Override
public void setup(OutputMutator output) throws ExecutionSetupException {
-
+ if (!isStarQuery()) {
+ columnsFound = new boolean[getColumns().size()];
+ nullFilledVectors = new ArrayList();
+ }
columnStatuses = new ArrayList<>();
- totalRecords = footer.getBlocks().get(rowGroupIndex).getRowCount();
+// totalRecords = footer.getBlocks().get(rowGroupIndex).getRowCount();
List<ColumnDescriptor> columns = footer.getFileMetaData().getSchema().getColumns();
allFieldsFixedLength = true;
ColumnDescriptor column;
@@ -211,7 +216,7 @@ public class ParquetRecordReader implements RecordReader {
mockRecordsRead = 0;
MaterializedField field;
- ParquetMetadataConverter metaConverter = new ParquetMetadataConverter();
+// ParquetMetadataConverter metaConverter = new ParquetMetadataConverter();
FileMetaData fileMetaData;
// TODO - figure out how to deal with this better once we add nested reading, note also look where this map is used below
@@ -249,7 +254,7 @@ public class ParquetRecordReader implements RecordReader {
allFieldsFixedLength = false;
}
}
- rowGroupOffset = footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset();
+// rowGroupOffset = footer.getBlocks().get(rowGroupIndex).getColumns().get(0).getFirstDataPageOffset();
if (columnsToScan != 0 && allFieldsFixedLength) {
recordsPerBatch = (int) Math.min(Math.min(batchSize / bitWidthAllFixedFields,
@@ -294,10 +299,15 @@ public class ParquetRecordReader implements RecordReader {
}
varLengthReader = new VarLenBinaryReader(this, varLengthColumns);
- if (this.columns != null) {
+ if (!isStarQuery()) {
+ List<SchemaPath> projectedColumns = Lists.newArrayList(getColumns());
+ SchemaPath col;
for (int i = 0; i < columnsFound.length; i++) {
- if ( ! columnsFound[i]) {
- nullFilledVectors.add((NullableBitVector)output.addField(MaterializedField.create(this.columns.get(i), Types.optional(TypeProtos.MinorType.BIT)),
+ col = projectedColumns.get(i);
+ assert col!=null;
+ if ( ! columnsFound[i] && !col.equals(STAR_COLUMN)) {
+ nullFilledVectors.add((NullableBitVector)output.addField(MaterializedField.create(col,
+ Types.optional(TypeProtos.MinorType.BIT)),
(Class<? extends ValueVector>) TypeHelper.getValueVectorClass(TypeProtos.MinorType.BIT, DataMode.OPTIONAL)));
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 16f520cd8..7a864f026 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -18,12 +18,15 @@
package org.apache.drill.exec.store.parquet2;
import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
import org.apache.drill.exec.vector.BaseValueVector;
@@ -48,11 +51,12 @@ import parquet.schema.Type;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-public class DrillParquetReader implements RecordReader {
+public class DrillParquetReader extends AbstractRecordReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillParquetReader.class);
@@ -60,7 +64,6 @@ public class DrillParquetReader implements RecordReader {
private MessageType schema;
private Configuration conf;
private RowGroupReadEntry entry;
- private List<SchemaPath> columns;
private VectorContainerWriter writer;
private ColumnChunkIncReadStore pageReadStore;
private parquet.io.RecordReader<Void> recordReader;
@@ -73,11 +76,11 @@ public class DrillParquetReader implements RecordReader {
public DrillParquetReader(ParquetMetadata footer, RowGroupReadEntry entry, List<SchemaPath> columns, Configuration conf) {
this.footer = footer;
this.conf = conf;
- this.columns = columns;
this.entry = entry;
+ setColumns(columns);
}
- public static MessageType getProjection(MessageType schema, List<SchemaPath> columns) {
+ public static MessageType getProjection(MessageType schema, Collection<SchemaPath> columns) {
MessageType projection = null;
for (SchemaPath path : columns) {
List<String> segments = Lists.newArrayList();
@@ -117,10 +120,10 @@ public class DrillParquetReader implements RecordReader {
schema = footer.getFileMetaData().getSchema();
MessageType projection = null;
- if (columns == null || columns.size() == 0) {
+ if (isStarQuery()) {
projection = schema;
} else {
- projection = getProjection(schema, columns);
+ projection = getProjection(schema, getColumns());
if (projection == null) {
projection = schema;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
index f27e8e659..3c5d9a4a2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
@@ -64,17 +64,25 @@ public class BlockMapBuilder {
return codecFactory.getCodec(fileStatus.getPath()) != null;
}
- public List<CompleteFileWork> generateFileWork(List<FileStatus> files, boolean blockify) throws IOException{
+ public List<CompleteFileWork> generateFileWork(List<FileStatus> files, boolean blockify) throws IOException {
List<CompleteFileWork> work = Lists.newArrayList();
- for(FileStatus f : files){
- ImmutableRangeMap<Long,BlockLocation> rangeMap = getBlockMap(f);
- if(!blockify || compressed(f)){
- work.add(new CompleteFileWork(this.getEndpointByteMap(new FileStatusWork(f)), 0, f.getLen(), f.getPath().toString()));
- continue;
+ boolean error = false;
+ for(FileStatus f : files) {
+ error = false;
+ if (blockify && !compressed(f)) {
+ try {
+ ImmutableRangeMap<Long, BlockLocation> rangeMap = getBlockMap(f);
+ for (Entry<Range<Long>, BlockLocation> l : rangeMap.asMapOfRanges().entrySet()) {
+ work.add(new CompleteFileWork(getEndpointByteMap(new FileStatusWork(f)), l.getValue().getOffset(), l.getValue().getLength(), f.getPath().toString()));
+ }
+ } catch (IOException e) {
+ logger.warn("failure while generating file work.", e);
+ error = true;
+ }
}
-
- for(Entry<Range<Long>, BlockLocation> l : rangeMap.asMapOfRanges().entrySet()){
- work.add(new CompleteFileWork(this.getEndpointByteMap(new FileStatusWork(f)), l.getValue().getOffset(), l.getValue().getLength(), f.getPath().toString()));
+
+ if (!blockify || error || compressed(f)) {
+ work.add(new CompleteFileWork(getEndpointByteMap(new FileStatusWork(f)), 0, f.getLen(), f.getPath().toString()));
}
}
return work;
@@ -135,7 +143,7 @@ public class BlockMapBuilder {
private ImmutableRangeMap<Long,BlockLocation> getBlockMap(Path path) throws IOException{
ImmutableRangeMap<Long,BlockLocation> blockMap = blockMapMap.get(path);
- if(blockMap == null){
+ if(blockMap == null) {
blockMap = buildBlockMap(path);
}
return blockMap;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index 68921a2d0..2031aeee7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.FieldReference;
@@ -34,6 +36,7 @@ import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.RepeatedVarCharVector;
@@ -48,7 +51,7 @@ import org.apache.hadoop.mapred.TextInputFormat;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-public class DrillTextRecordReader implements RecordReader {
+public class DrillTextRecordReader extends AbstractRecordReader {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillTextRecordReader.class);
static final String COL_NAME = "columns";
@@ -66,36 +69,31 @@ public class DrillTextRecordReader implements RecordReader {
private Text value;
private int numCols = 0;
private boolean redoRecord = false;
- private boolean first = true;
public DrillTextRecordReader(FileSplit split, FragmentContext context, char delimiter, List<SchemaPath> columns) {
this.fragmentContext = context;
this.delimiter = (byte) delimiter;
- boolean getEntireRow = false;
+ setColumns(columns);
- if(columns != null) {
+ if (!isStarQuery()) {
+ String pathStr;
for (SchemaPath path : columns) {
assert path.getRootSegment().isNamed();
- Preconditions.checkArgument(path.getRootSegment().getPath().equals(COL_NAME), "Selected column must have name 'columns'");
- // FIXME: need re-work for text column push-down.
+ pathStr = path.getRootSegment().getPath();
+ Preconditions.checkArgument(pathStr.equals(COL_NAME) || (pathStr.equals("*") && path.getRootSegment().getChild() == null),
+ "Selected column(s) must have name 'columns' or must be plain '*'");
+
if (path.getRootSegment().getChild() != null) {
- Preconditions.checkArgument(path.getRootSegment().getChild().isArray(),"Selected column must be an array index");
+ Preconditions.checkArgument(path.getRootSegment().getChild().isArray(), "Selected column must be an array index");
int index = path.getRootSegment().getChild().getArraySegment().getIndex();
columnIds.add(index);
- } else {
- getEntireRow = true;
}
}
Collections.sort(columnIds);
+ numCols = columnIds.size();
}
targetRecordCount = context.getConfig().getInt(ExecConstants.TEXT_LINE_READER_BATCH_SIZE);
- /* If one of the columns requested is the entire row ('columns') then ignore the rest of the columns
- * we are going copy all the values in the repeated varchar vector
- */
- if (!getEntireRow) {
- numCols = columnIds.size();
- }
TextInputFormat inputFormat = new TextInputFormat();
JobConf job = new JobConf();
job.setInt("io.file.buffer.size", context.getConfig().getInt(ExecConstants.TEXT_LINE_READER_BUFFER_SIZE));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
index 79c94c82e..fa26b5416 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java
@@ -18,6 +18,10 @@
package org.apache.drill.exec.vector.complex.fn;
import io.netty.buffer.DrillBuf;
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.UnpooledByteBufAllocator;
import java.io.IOException;
import java.io.Reader;
@@ -31,7 +35,9 @@ import org.apache.drill.exec.expr.holders.BigIntHolder;
import org.apache.drill.exec.expr.holders.BitHolder;
import org.apache.drill.exec.expr.holders.Float8Holder;
import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.vector.complex.writer.BaseWriter;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ListWriter;
@@ -66,22 +72,22 @@ public class JsonReader {
private boolean allTextMode;
public JsonReader() throws IOException {
- this(null, null, false);
+ this(null, false);
+ }
+
+ public JsonReader(DrillBuf managedBuf, boolean allTextMode) throws IOException {
+ this(managedBuf, GroupScan.ALL_COLUMNS, allTextMode);
}
public JsonReader(DrillBuf managedBuf, List<SchemaPath> columns, boolean allTextMode) throws JsonParseException, IOException {
factory.configure(Feature.ALLOW_UNQUOTED_FIELD_NAMES, true);
factory.configure(Feature.ALLOW_COMMENTS, true);
- this.workBuf = managedBuf;
+ assert Preconditions.checkNotNull(columns).size() > 0 : "json record reader requires at least a column";
this.columns = columns;
- // TODO - remove this check once the optimizer is updated to push down * instead of a null list
- if (this.columns == null) {
- this.columns = new ArrayList();
- this.columns.add(new SchemaPath(new PathSegment.NameSegment("*")));
+ this.starRequested = containsStar();
+ this.workBuf = managedBuf;
this.allTextMode = allTextMode;
- }
this.columnsFound = new boolean[this.columns.size()];
- this.starRequested = containsStar();
}
private boolean containsStar() {
@@ -109,7 +115,7 @@ public class JsonReader {
public List<SchemaPath> getNullColumns() {
ArrayList<SchemaPath> nullColumns = new ArrayList<SchemaPath>();
for (int i = 0; i < columnsFound.length; i++ ) {
- if ( ! columnsFound[i] && ! columns.get(i).getRootSegment().getPath().equals("*") ) {
+ if ( ! columnsFound[i] && !columns.get(i).equals(AbstractRecordReader.STAR_COLUMN)) {
nullColumns.add(columns.get(i));
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java
index c2dcc951b..0636db6c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReaderWithState.java
@@ -25,6 +25,7 @@ import java.io.Reader;
import java.util.List;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
import com.fasterxml.jackson.core.JsonParseException;
@@ -46,7 +47,7 @@ public class JsonReaderWithState {
}
public JsonReaderWithState(JsonRecordSplitter splitter) throws IOException{
- this(splitter, null, null, false);
+ this(splitter, null, GroupScan.ALL_COLUMNS, false);
}
public List<SchemaPath> getNullColumns() {
diff --git a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
index eadb1bced..31df303d5 100644
--- a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
+++ b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json
@@ -43,6 +43,11 @@
type: "file",
connection: "classpath:///",
formats: {
+ "csv" : {
+ type: "text",
+ extensions: [ "csv" ],
+ delimiter: ","
+ },
"json" : {
type: "json"
},
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java
index ec8e92e1c..8520b9bf9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java
@@ -18,6 +18,8 @@
package org.apache.drill;
+import java.util.List;
+
import org.junit.Ignore;
import org.junit.Test;
@@ -105,6 +107,32 @@ public class TestProjectPushDown extends PlanTestBase {
testPhysicalPlanFromFile("queries/tpch/03.sql", expectedColNames1, expectedColNames2, expectedColNames3);
}
+
+ private static final String pushDownSql = "select %s from cp.`%s` t";
+ private static final String pushDownSqlWithFilter = pushDownSql + " where %s";
+ private final String[] inputTypes = new String[] {
+ "project/pushdown/empty.json",
+ "project/pushdown/empty.csv",
+ "tpch/lineitem.parquet"
+ };
+
+ @Test
+ public void testProjectPushDown() throws Exception {
+ final String projection = "t.trans_id, t.user_info.cust_id, t.marketing_info.keywords[0]";
+ final String expected = "\"columns\" : [ \"`trans_id`\", \"`user_info`.`cust_id`\", \"`marketing_info`.`keywords`[0]\" ],";
+ final String filter = "t.another_field = 10 and t.columns[0] = 100 and t.columns[1] = t.other.columns[2]";
+ final String expectedWithFilter = "\"columns\" : [ \"`another_field`\", \"`trans_id`\", \"`user_info`.`cust_id`\", \"`marketing_info`.`keywords`[0]\", \"`columns`[0]\", \"`columns`[1]\", \"`other`.`columns`[2]\" ],";
+
+ for (String inputType:inputTypes) {
+ testPushDown(new PushDownTestInstance(pushDownSql, expected, projection, inputType));
+ testPushDown(new PushDownTestInstance(pushDownSqlWithFilter, expectedWithFilter, projection, inputType, filter));
+ }
+ }
+
+ protected void testPushDown(PushDownTestInstance test) throws Exception {
+ testPhysicalPlan(test.getSql(), test.getExpected());
+ }
+
private void testPhysicalPlanFromFile(String fileName, String... expectedSubstrs)
throws Exception {
String query = getFile(fileName);
@@ -116,4 +144,24 @@ public class TestProjectPushDown extends PlanTestBase {
}
}
+ protected static class PushDownTestInstance {
+ private final String sqlPattern;
+ private final String expected;
+ private final Object[] params;
+
+ public PushDownTestInstance(String sqlPattern, String expected, Object... params) {
+ this.sqlPattern = sqlPattern;
+ this.expected = expected;
+ this.params = params;
+ }
+
+ public String getExpected() {
+ return expected;
+ }
+
+ public String getSql() {
+ return String.format(sqlPattern, params);
+ }
+ }
+
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
index bfc0d20e5..10b177542 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestJsonReader.java
@@ -34,6 +34,7 @@ import org.apache.drill.common.util.FileUtils;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
@@ -64,7 +65,7 @@ public class TestJsonReader extends BaseTestQuery {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestJsonReader.class);
private static BufferAllocator allocator;
- private static final boolean VERBOSE_DEBUG = false;
+ private static final boolean VERBOSE_DEBUG = true;
@BeforeClass
public static void setupAllocator(){
@@ -168,6 +169,7 @@ public class TestJsonReader extends BaseTestQuery {
String[] queries = {Files.toString(FileUtils.getResourceAsFile("/store/json/project_pushdown_json_physical_plan.json"), Charsets.UTF_8)};
long[] rowCounts = {3};
String filename = "/store/json/schema_change_int_to_string.json";
+ test("alter system set `store.json.all_text_mode` = false");
runTestsOnFile(filename, UserBitShared.QueryType.PHYSICAL, queries, rowCounts);
List<QueryResultBatch> results = testPhysicalWithResults(queries[0]);
@@ -271,7 +273,8 @@ public class TestJsonReader extends BaseTestQuery {
writer.allocate();
DrillBuf buffer = allocator.buffer(255);
- JsonReaderWithState jsonReader = new JsonReaderWithState(new ReaderJSONRecordSplitter(compound), buffer, null, false);
+ JsonReaderWithState jsonReader = new JsonReaderWithState(new ReaderJSONRecordSplitter(compound), buffer,
+ GroupScan.ALL_COLUMNS, false);
int i =0;
List<Integer> batchSizes = Lists.newArrayList();
diff --git a/exec/java-exec/src/test/resources/project/pushdown/empty.csv b/exec/java-exec/src/test/resources/project/pushdown/empty.csv
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/exec/java-exec/src/test/resources/project/pushdown/empty.csv
diff --git a/exec/java-exec/src/test/resources/project/pushdown/empty.json b/exec/java-exec/src/test/resources/project/pushdown/empty.json
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/exec/java-exec/src/test/resources/project/pushdown/empty.json
diff --git a/exec/java-exec/src/test/resources/project/pushdown/empty.parquet b/exec/java-exec/src/test/resources/project/pushdown/empty.parquet
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/exec/java-exec/src/test/resources/project/pushdown/empty.parquet