aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java44
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java57
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java6
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java9
4 files changed, 74 insertions, 42 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
index 1bbf63b89..a3663391b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
@@ -85,6 +85,8 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
private List<EndpointAffinity> endpointAffinities;
private ParquetGroupScanStatistics parquetGroupScanStatistics;
+ // whether all row groups of this group scan fully match the filter
+ private boolean matchAllRowGroups = false;
protected AbstractParquetGroupScan(String userName,
List<SchemaPath> columns,
@@ -111,6 +113,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
this.fileSet = that.fileSet == null ? null : new HashSet<>(that.fileSet);
this.entries = that.entries == null ? null : new ArrayList<>(that.entries);
this.readerConfig = that.readerConfig;
+ this.matchAllRowGroups = that.matchAllRowGroups;
}
@JsonProperty
@@ -136,6 +139,11 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
}
@JsonIgnore
+ public boolean isMatchAllRowGroups() {
+ return matchAllRowGroups;
+ }
+
+ @JsonIgnore
@Override
public Collection<String> getFiles() {
return fileSet;
@@ -229,15 +237,12 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
}
@Override
- public GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities,
- FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager) {
-
- if (rowGroupInfos.size() == 1 ||
- ! (parquetTableMetadata.isRowGroupPrunable()) ||
- rowGroupInfos.size() > optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD)
- ) {
- // Stop pruning for 3 cases:
- // - 1 single parquet file,
+ public AbstractParquetGroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities,
+ FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager) {
+
+ if (!parquetTableMetadata.isRowGroupPrunable() ||
+ rowGroupInfos.size() > optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD)) {
+ // Stop pruning for 2 cases:
// - metadata does not have proper format to support row group level filter pruning,
// - # of row groups is beyond PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD.
return null;
@@ -253,6 +258,8 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
return null;
}
+ boolean matchAllRowGroupsLocal = true;
+
for (RowGroupInfo rowGroup : rowGroupInfos) {
final ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, columns);
List<String> partitionValues = getPartitionValues(rowGroup);
@@ -270,16 +277,27 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
if (match == ParquetFilterPredicate.RowsMatch.NONE) {
continue; // No row comply to the filter => drop the row group
}
- rowGroup.setRowsMatch(match);
+ // for the case when any of row groups partially matches the filter,
+ // matchAllRowGroupsLocal should be set to false
+ if (matchAllRowGroupsLocal) {
+ matchAllRowGroupsLocal = match == ParquetFilterPredicate.RowsMatch.ALL;
+ }
qualifiedRGs.add(rowGroup);
}
- if (qualifiedRGs.size() == rowGroupInfos.size() ) {
+ if (qualifiedRGs.size() == rowGroupInfos.size()) {
// There is no reduction of rowGroups. Return the original groupScan.
logger.debug("applyFilter() does not have any pruning!");
+ matchAllRowGroups = matchAllRowGroupsLocal;
return null;
} else if (qualifiedRGs.size() == 0) {
+ if (rowGroupInfos.size() == 1) {
+ // For the case when group scan has single row group and it was filtered,
+ // no need to create new group scan with the same row group.
+ return null;
+ }
+ matchAllRowGroupsLocal = false;
logger.debug("All row groups have been filtered out. Add back one to get schema from scanner.");
RowGroupInfo rg = rowGroupInfos.iterator().next();
qualifiedRGs.add(rg);
@@ -289,7 +307,9 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
ExpressionStringBuilder.toString(filterExpr), rowGroupInfos.size(), qualifiedRGs.size());
try {
- return cloneWithRowGroupInfos(qualifiedRGs);
+ AbstractParquetGroupScan cloneGroupScan = cloneWithRowGroupInfos(qualifiedRGs);
+ cloneGroupScan.matchAllRowGroups = matchAllRowGroupsLocal;
+ return cloneGroupScan;
} catch (IOException e) {
logger.warn("Could not apply filter prune due to Exception : {}", e);
return null;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
index 95a0534f7..c12ea7312 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
@@ -28,9 +28,7 @@ import org.apache.calcite.rex.RexUtil;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.exec.expr.stat.ParquetFilterPredicate;
-import org.apache.drill.exec.expr.stat.ParquetFilterPredicate.RowsMatch;
import org.apache.drill.exec.ops.OptimizerRulesContext;
-import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.planner.logical.DrillOptiq;
import org.apache.drill.exec.planner.logical.DrillParseContext;
@@ -174,14 +172,35 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
- final GroupScan newGroupScan = groupScan.applyFilter(conditionExp, optimizerContext,
+ AbstractParquetGroupScan newGroupScan = groupScan.applyFilter(conditionExp, optimizerContext,
optimizerContext.getFunctionRegistry(), optimizerContext.getPlannerSettings().getOptions());
if (timer != null) {
logger.debug("Took {} ms to apply filter on parquet row groups. ", timer.elapsed(TimeUnit.MILLISECONDS));
timer.stop();
}
- if (newGroupScan == null ) {
+ // For the case when newGroupScan wasn't created, the old one may
+ // fully match the filter for the case when row group pruning did not happen.
+ if (newGroupScan == null) {
+ if (groupScan.isMatchAllRowGroups()) {
+ RelNode child = project == null ? scan : project;
+ // If current row group fully matches filter,
+ // but row group pruning did not happen, remove the filter.
+ if (nonConvertedPredList.size() == 0) {
+ call.transformTo(child);
+ } else if (nonConvertedPredList.size() == predList.size()) {
+ // None of the predicates participated in filter pushdown.
+ return;
+ } else {
+ // If some of the predicates weren't used in the filter, creates new filter with them
+ // on top of current scan. Excludes the case when all predicates weren't used in the filter.
+ call.transformTo(filter.copy(filter.getTraitSet(), child,
+ RexUtil.composeConjunction(
+ filter.getCluster().getRexBuilder(),
+ nonConvertedPredList,
+ true)));
+ }
+ }
return;
}
@@ -191,27 +210,17 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
newNode = project.copy(project.getTraitSet(), Collections.singletonList(newNode));
}
- if (newGroupScan instanceof AbstractParquetGroupScan) {
- RowsMatch matchAll = RowsMatch.ALL;
- List<RowGroupInfo> rowGroupInfos = ((AbstractParquetGroupScan) newGroupScan).rowGroupInfos;
- for (RowGroupInfo rowGroup : rowGroupInfos) {
- if (rowGroup.getRowsMatch() != RowsMatch.ALL) {
- matchAll = RowsMatch.SOME;
- break;
- }
- }
- if (matchAll == ParquetFilterPredicate.RowsMatch.ALL) {
- // creates filter from the expressions which can't be pushed to the scan
- if (nonConvertedPredList.size() > 0) {
- newNode = filter.copy(filter.getTraitSet(), newNode,
- RexUtil.composeConjunction(
- filter.getCluster().getRexBuilder(),
- nonConvertedPredList,
- true));
- }
- call.transformTo(newNode);
- return;
+ if (newGroupScan.isMatchAllRowGroups()) {
+ // creates filter from the expressions which can't be pushed to the scan
+ if (nonConvertedPredList.size() > 0) {
+ newNode = filter.copy(filter.getTraitSet(), newNode,
+ RexUtil.composeConjunction(
+ filter.getCluster().getRexBuilder(),
+ nonConvertedPredList,
+ true));
}
+ call.transformTo(newNode);
+ return;
}
final RelNode newFilter = filter.copy(filter.getTraitSet(), Collections.singletonList(newNode));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java
index 7d2143c18..1c9ce107c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.store.parquet;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.drill.exec.expr.stat.ParquetFilterPredicate.RowsMatch;
import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
import org.apache.drill.exec.store.dfs.easy.FileWork;
import org.apache.drill.exec.store.schedule.CompleteWork;
@@ -36,7 +35,6 @@ public class RowGroupInfo extends ReadEntryFromHDFS implements CompleteWork, Fil
private List<? extends ColumnMetadata> columns;
private long rowCount; // rowCount = -1 indicates to include all rows.
private long numRecordsToRead;
- private RowsMatch rowsMatch = RowsMatch.SOME;
@JsonCreator
public RowGroupInfo(@JsonProperty("path") String path,
@@ -96,8 +94,4 @@ public class RowGroupInfo extends ReadEntryFromHDFS implements CompleteWork, Fil
public void setColumns(List<? extends ColumnMetadata> columns) {
this.columns = columns;
}
-
- public RowsMatch getRowsMatch() { return rowsMatch; }
-
- public void setRowsMatch(RowsMatch rowsMatch) { this.rowsMatch = rowsMatch; }
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
index ccc1480d9..80b06d916 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
@@ -649,6 +649,15 @@ public class TestParquetFilterPushDown extends PlanTestBase {
assertEquals(RowsMatch.ALL, isNotFalse.matches(re));
}
+ @Test
+ public void testParquetSingleRowGroupFilterRemoving() throws Exception {
+ test("create table dfs.tmp.`singleRowGroupTable` as select * from cp.`tpch/nation.parquet`");
+
+ String query = "select * from dfs.tmp.`singleRowGroupTable` where n_nationkey > -1";
+
+ testParquetFilterPruning(query, 25, 1, new String[]{"Filter\\("});
+ }
+
//////////////////////////////////////////////////////////////////////////////////////////////////
// Some test helper functions.
//////////////////////////////////////////////////////////////////////////////////////////////////