diff options
4 files changed, 74 insertions, 42 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java index 1bbf63b89..a3663391b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java @@ -85,6 +85,8 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { private List<EndpointAffinity> endpointAffinities; private ParquetGroupScanStatistics parquetGroupScanStatistics; + // whether all row groups of this group scan fully match the filter + private boolean matchAllRowGroups = false; protected AbstractParquetGroupScan(String userName, List<SchemaPath> columns, @@ -111,6 +113,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { this.fileSet = that.fileSet == null ? null : new HashSet<>(that.fileSet); this.entries = that.entries == null ? null : new ArrayList<>(that.entries); this.readerConfig = that.readerConfig; + this.matchAllRowGroups = that.matchAllRowGroups; } @JsonProperty @@ -136,6 +139,11 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { } @JsonIgnore + public boolean isMatchAllRowGroups() { + return matchAllRowGroups; + } + + @JsonIgnore @Override public Collection<String> getFiles() { return fileSet; @@ -229,15 +237,12 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { } @Override - public GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities, - FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager) { - - if (rowGroupInfos.size() == 1 || - ! (parquetTableMetadata.isRowGroupPrunable()) || - rowGroupInfos.size() > optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD) - ) { - // Stop pruning for 3 cases: - // - 1 single parquet file, + public AbstractParquetGroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities, + FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager) { + + if (!parquetTableMetadata.isRowGroupPrunable() || + rowGroupInfos.size() > optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD)) { + // Stop pruning for 2 cases: // - metadata does not have proper format to support row group level filter pruning, // - # of row groups is beyond PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD. return null; @@ -253,6 +258,8 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { return null; } + boolean matchAllRowGroupsLocal = true; + for (RowGroupInfo rowGroup : rowGroupInfos) { final ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, columns); List<String> partitionValues = getPartitionValues(rowGroup); @@ -270,16 +277,27 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { if (match == ParquetFilterPredicate.RowsMatch.NONE) { continue; // No row comply to the filter => drop the row group } - rowGroup.setRowsMatch(match); + // for the case when any of row groups partially matches the filter, + // matchAllRowGroupsLocal should be set to false + if (matchAllRowGroupsLocal) { + matchAllRowGroupsLocal = match == ParquetFilterPredicate.RowsMatch.ALL; + } qualifiedRGs.add(rowGroup); } - if (qualifiedRGs.size() == rowGroupInfos.size() ) { + if (qualifiedRGs.size() == rowGroupInfos.size()) { // There is no reduction of rowGroups. Return the original groupScan. logger.debug("applyFilter() does not have any pruning!"); + matchAllRowGroups = matchAllRowGroupsLocal; return null; } else if (qualifiedRGs.size() == 0) { + if (rowGroupInfos.size() == 1) { + // For the case when group scan has single row group and it was filtered, + // no need to create new group scan with the same row group. + return null; + } + matchAllRowGroupsLocal = false; logger.debug("All row groups have been filtered out. Add back one to get schema from scanner."); RowGroupInfo rg = rowGroupInfos.iterator().next(); qualifiedRGs.add(rg); @@ -289,7 +307,9 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { ExpressionStringBuilder.toString(filterExpr), rowGroupInfos.size(), qualifiedRGs.size()); try { - return cloneWithRowGroupInfos(qualifiedRGs); + AbstractParquetGroupScan cloneGroupScan = cloneWithRowGroupInfos(qualifiedRGs); + cloneGroupScan.matchAllRowGroups = matchAllRowGroupsLocal; + return cloneGroupScan; } catch (IOException e) { logger.warn("Could not apply filter prune due to Exception : {}", e); return null; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java index 95a0534f7..c12ea7312 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java @@ -28,9 +28,7 @@ import org.apache.calcite.rex.RexUtil; import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.ValueExpressions; import org.apache.drill.exec.expr.stat.ParquetFilterPredicate; -import org.apache.drill.exec.expr.stat.ParquetFilterPredicate.RowsMatch; import org.apache.drill.exec.ops.OptimizerRulesContext; -import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.planner.common.DrillRelOptUtil; import org.apache.drill.exec.planner.logical.DrillOptiq; import org.apache.drill.exec.planner.logical.DrillParseContext; @@ -174,14 +172,35 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule { Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; - final GroupScan newGroupScan = groupScan.applyFilter(conditionExp, optimizerContext, + AbstractParquetGroupScan newGroupScan = groupScan.applyFilter(conditionExp, optimizerContext, optimizerContext.getFunctionRegistry(), optimizerContext.getPlannerSettings().getOptions()); if (timer != null) { logger.debug("Took {} ms to apply filter on parquet row groups. ", timer.elapsed(TimeUnit.MILLISECONDS)); timer.stop(); } - if (newGroupScan == null ) { + // For the case when newGroupScan wasn't created, the old one may + // fully match the filter for the case when row group pruning did not happen. + if (newGroupScan == null) { + if (groupScan.isMatchAllRowGroups()) { + RelNode child = project == null ? scan : project; + // If current row group fully matches filter, + // but row group pruning did not happen, remove the filter. + if (nonConvertedPredList.size() == 0) { + call.transformTo(child); + } else if (nonConvertedPredList.size() == predList.size()) { + // None of the predicates participated in filter pushdown. + return; + } else { + // If some of the predicates weren't used in the filter, creates new filter with them + // on top of current scan. Excludes the case when all predicates weren't used in the filter. + call.transformTo(filter.copy(filter.getTraitSet(), child, + RexUtil.composeConjunction( + filter.getCluster().getRexBuilder(), + nonConvertedPredList, + true))); + } + } return; } @@ -191,27 +210,17 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule { newNode = project.copy(project.getTraitSet(), Collections.singletonList(newNode)); } - if (newGroupScan instanceof AbstractParquetGroupScan) { - RowsMatch matchAll = RowsMatch.ALL; - List<RowGroupInfo> rowGroupInfos = ((AbstractParquetGroupScan) newGroupScan).rowGroupInfos; - for (RowGroupInfo rowGroup : rowGroupInfos) { - if (rowGroup.getRowsMatch() != RowsMatch.ALL) { - matchAll = RowsMatch.SOME; - break; - } - } - if (matchAll == ParquetFilterPredicate.RowsMatch.ALL) { - // creates filter from the expressions which can't be pushed to the scan - if (nonConvertedPredList.size() > 0) { - newNode = filter.copy(filter.getTraitSet(), newNode, - RexUtil.composeConjunction( - filter.getCluster().getRexBuilder(), - nonConvertedPredList, - true)); - } - call.transformTo(newNode); - return; + if (newGroupScan.isMatchAllRowGroups()) { + // creates filter from the expressions which can't be pushed to the scan + if (nonConvertedPredList.size() > 0) { + newNode = filter.copy(filter.getTraitSet(), newNode, + RexUtil.composeConjunction( + filter.getCluster().getRexBuilder(), + nonConvertedPredList, + true)); } + call.transformTo(newNode); + return; } final RelNode newFilter = filter.copy(filter.getTraitSet(), Collections.singletonList(newNode)); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java index 7d2143c18..1c9ce107c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java @@ -19,7 +19,6 @@ package org.apache.drill.exec.store.parquet; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.drill.exec.expr.stat.ParquetFilterPredicate.RowsMatch; import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS; import org.apache.drill.exec.store.dfs.easy.FileWork; import org.apache.drill.exec.store.schedule.CompleteWork; @@ -36,7 +35,6 @@ public class RowGroupInfo extends ReadEntryFromHDFS implements CompleteWork, Fil private List<? extends ColumnMetadata> columns; private long rowCount; // rowCount = -1 indicates to include all rows. private long numRecordsToRead; - private RowsMatch rowsMatch = RowsMatch.SOME; @JsonCreator public RowGroupInfo(@JsonProperty("path") String path, @@ -96,8 +94,4 @@ public class RowGroupInfo extends ReadEntryFromHDFS implements CompleteWork, Fil public void setColumns(List<? extends ColumnMetadata> columns) { this.columns = columns; } - - public RowsMatch getRowsMatch() { return rowsMatch; } - - public void setRowsMatch(RowsMatch rowsMatch) { this.rowsMatch = rowsMatch; } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java index ccc1480d9..80b06d916 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java @@ -649,6 +649,15 @@ public class TestParquetFilterPushDown extends PlanTestBase { assertEquals(RowsMatch.ALL, isNotFalse.matches(re)); } + @Test + public void testParquetSingleRowGroupFilterRemoving() throws Exception { + test("create table dfs.tmp.`singleRowGroupTable` as select * from cp.`tpch/nation.parquet`"); + + String query = "select * from dfs.tmp.`singleRowGroupTable` where n_nationkey > -1"; + + testParquetFilterPruning(query, 25, 1, new String[]{"Filter\\("}); + } + ////////////////////////////////////////////////////////////////////////////////////////////////// // Some test helper functions. ////////////////////////////////////////////////////////////////////////////////////////////////// |