aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
diff options
context:
space:
mode:
authorBohdan Kazydub <bohdan.kazydub@gmail.com>2019-02-13 01:14:14 +0200
committerSorabh Hamirwasia <sorabh@apache.org>2019-03-14 15:36:10 -0700
commita99db5fe53084a18a83e589ae529ffe5edbcc0a8 (patch)
treea3c48a0311f248a550cc6c2377936d384e03322e /exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
parente2619f6e09da53730fb3281fe9fad663f564a2c2 (diff)
DRILL-7038: Queries on partitioned columns scan the entire datasets
- Added new optimizer rule which checks if query references directory columns only and has DISTINCT or GROUP BY operation. If the condition holds, instead of scanning full file set the following will be performed: 1) if there is cache metadata file, these directories will be read from it, 2) otherwise directories will be gathered from selection object (PartitionLocation). In the end Scan node will be transformed to DrillValuesRel (containing constant literals) with gathered values so no scan will be performed. closes #1640
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java246
1 files changed, 240 insertions, 6 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index 8d3a8cec9..901592f6a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -17,19 +17,31 @@
*/
package org.apache.drill.exec.planner.logical.partition;
+import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalValues;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
+import org.apache.calcite.rel.type.RelRecordType;
import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.BitSets;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.LogicalExpression;
@@ -50,15 +62,20 @@ import org.apache.drill.exec.planner.PartitionDescriptor;
import org.apache.drill.exec.planner.PartitionLocation;
import org.apache.drill.exec.planner.logical.DrillOptiq;
import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.logical.DrillRelFactories;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.logical.DrillTranslatableTable;
+import org.apache.drill.exec.planner.logical.DrillValuesRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.store.ColumnExplorer;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.FormatSelection;
import org.apache.drill.exec.store.dfs.MetadataContext;
import org.apache.drill.exec.store.dfs.MetadataContext.PruneStatus;
@@ -71,9 +88,6 @@ import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rex.RexNode;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
-
import org.apache.drill.exec.vector.ValueVector;
import org.apache.hadoop.fs.Path;
@@ -144,6 +158,10 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
return new DirPruneScanFilterOnScanRule(optimizerRulesContext);
}
+ public static RelOptRule getConvertAggScanToValuesRule(OptimizerRulesContext optimizerRulesContext) {
+ return new ConvertAggScanToValuesRule(optimizerRulesContext);
+ }
+
protected void doOnMatch(RelOptRuleCall call, Filter filterRel, Project projectRel, TableScan scanRel) {
final String pruningClassName = getClass().getName();
@@ -171,11 +189,11 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
RewriteAsBinaryOperators visitor = new RewriteAsBinaryOperators(true, filterRel.getCluster().getRexBuilder());
condition = condition.accept(visitor);
- Map<Integer, String> fieldNameMap = Maps.newHashMap();
+ Map<Integer, String> fieldNameMap = new HashMap<>();
List<String> fieldNames = scanRel.getRowType().getFieldNames();
BitSet columnBitset = new BitSet();
BitSet partitionColumnBitSet = new BitSet();
- Map<Integer, Integer> partitionMap = Maps.newHashMap();
+ Map<Integer, Integer> partitionMap = new HashMap<>();
int relColIndex = 0;
for (String field : fieldNames) {
@@ -223,7 +241,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
}
// set up the partitions
- List<PartitionLocation> newPartitions = Lists.newArrayList();
+ List<PartitionLocation> newPartitions = new ArrayList<>();
long numTotal = 0; // total number of partitions
int batchIndex = 0;
PartitionLocation firstLocation = null;
@@ -553,4 +571,220 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
}
}
+ /**
+ * A rule which converts {@link Aggregate} on {@link TableScan} with directory columns
+ * (see {@link org.apache.drill.exec.ExecConstants#FILESYSTEM_PARTITION_COLUMN_LABEL}) only
+ * into {@link DrillValuesRel} to avoid scanning at all.
+ *
+ * Resulting {@link DrillValuesRel} will be populated with constant literals obtained from:
+ * <ol>
+ * <li>metadata directory file if it exists</li>
+ * <li>or from file selection</li>
+ * </ol>
+ */
+ private static class ConvertAggScanToValuesRule extends PruneScanRule {
+
+ private final Pattern dirPattern;
+
+ private ConvertAggScanToValuesRule(OptimizerRulesContext optimizerRulesContext) {
+ super(RelOptHelper.some(Aggregate.class, DrillRel.DRILL_LOGICAL, RelOptHelper.any(TableScan.class)),
+ "PartitionColumnScanPruningRule:Prune_On_Scan", optimizerRulesContext);
+ String partitionColumnLabel = optimizerRulesContext.getPlannerSettings().getFsPartitionColumnLabel();
+ dirPattern = Pattern.compile(partitionColumnLabel + "\\d+");
+ }
+
+ @Override
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
+ return new FileSystemPartitionDescriptor(settings, scanRel);
+ }
+
+ // Checks if query references directory columns only and has DISTINCT or GROUP BY operation
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ Aggregate aggregate = call.rel(0);
+ TableScan scan = call.rel(1);
+
+ if (!isQualifiedFilePruning(scan)
+ || scan.getRowType().getFieldCount() != aggregate.getRowType().getFieldCount()) {
+ return false;
+ }
+
+ List<String> fieldNames = scan.getRowType().getFieldNames();
+ // Check if select contains partition columns (dir0, dir1, dir2,..., dirN) only
+ for (String field : fieldNames) {
+ if (!dirPattern.matcher(field).matches()) {
+ return false;
+ }
+ }
+
+ return scan.isDistinct() || aggregate.getGroupCount() > 0;
+ }
+
+ /*
+ Transforms Scan node to DrillValuesRel node to avoid unnecessary scanning of selected files.
+ If cache metadata directory file exists, directory columns will be read from it,
+ otherwise directories will be gathered from selection (PartitionLocations).
+ DrillValuesRel will contain gathered constant literals.
+
+ For example, plan for "select dir0, dir1 from `t` group by 1, 2", where table `t` has directory structure year/quarter
+
+ 00-00 Screen
+ 00-01 Project(dir0=[$0], dir1=[$1])
+ 00-02 HashAgg(group=[{0, 1}])
+ 00-03 Scan(table=[[t]], groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=file:/path/t/1996/Q4/orders_96_q4.parquet],
+ ReadEntryWithPath [path=file:/path/t/1996/Q1/file_96_q1.parquet], ReadEntryWithPath [path=file:/path/t/1996/Q3/file_96_q3.parquet],
+ ReadEntryWithPath [path=file:/path/t/1996/Q2/file_96_q2.parquet], ReadEntryWithPath [path=file:/path/t/1994/Q4/file_94_q4.parquet],
+ ReadEntryWithPath [path=file:/path/t/1994/Q1/file_94_q1.parquet], ReadEntryWithPath [path=file:/path/t/1994/Q3/file_94_q3.parquet],
+ ReadEntryWithPath [path=file:/path/t/1994/Q2/file_94_q2.parquet], ReadEntryWithPath [path=file:/path/t/1995/Q4/file_95_q4.parquet],
+ ReadEntryWithPath [path=file:/path/t/1995/Q1/file_95_q1.parquet], ReadEntryWithPath [path=file:/path/t/1995/Q3/file_95_q3.parquet],
+ ReadEntryWithPath [path=file:/path/t/1995/Q2/file_95_q2.parquet]], selectionRoot=file:/path/t, ..., columns=[`dir0`, `dir1`]]])
+
+ will be changed to
+
+ 00-00 Screen
+ 00-01 Project(dir0=[$0], dir1=[$1])
+ 00-02 HashAgg(group=[{0, 1}])
+ 00-03 Values(tuples=[[{ '1995', 'Q1' }, { '1994', 'Q4' }, { '1996', 'Q3' }, { '1996', 'Q2' }, { '1994', 'Q2' },
+ { '1995', 'Q4' }, { '1996', 'Q1' }, { '1995', 'Q3' }, { '1996', 'Q4' }, { '1994', 'Q3' }, { '1994', 'Q1' }, { '1995', 'Q2' }]])
+ */
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ TableScan scan = call.rel(1);
+
+ String pruningClassName = getClass().getName();
+ logger.debug("Beginning file partition pruning, pruning class: {}", pruningClassName);
+ Stopwatch totalPruningTime = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
+
+ Object selection = getDrillTable(scan).getSelection();
+ MetadataContext metaContext = null;
+ FileSelection fileSelection = null;
+ if (selection instanceof FormatSelection) {
+ fileSelection = ((FormatSelection) selection).getSelection();
+ metaContext = fileSelection.getMetaContext();
+ }
+
+ PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+ PartitionDescriptor descriptor = getPartitionDescriptor(settings, scan);
+
+ List<String> fieldNames = scan.getRowType().getFieldNames();
+ List<String> values = Collections.emptyList();
+ List<Integer> indexes = new ArrayList<>(fieldNames.size());
+ for (String field : fieldNames) {
+ int index = descriptor.getPartitionHierarchyIndex(field);
+ indexes.add(index);
+ }
+
+ if (metaContext != null && metaContext.getDirectories() != null) {
+ // Dir metadata cache file exists
+ logger.debug("Using Metadata Directories cache");
+ values = getValues(fileSelection.getSelectionRoot(), metaContext.getDirectories(), indexes);
+ }
+
+ if (values.isEmpty()) {
+ logger.debug("Not using Metadata Directories cache");
+ int batchIndex = 0;
+ // Outer loop: iterate over a list of batches of PartitionLocations
+ values = new ArrayList<>();
+ for (List<PartitionLocation> partitions : descriptor) {
+ logger.debug("Evaluating file partition pruning for batch {}", batchIndex);
+
+ try {
+ values.addAll(getValues(partitions, indexes));
+ } catch (Exception e) {
+ logger.warn("Exception while trying to prune files.", e);
+ if (totalPruningTime != null) {
+ logger.debug("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+ }
+
+ // continue without partition pruning
+ return;
+ }
+ batchIndex++;
+ }
+
+ if (values.isEmpty()) {
+ // No changes are required
+ return;
+ }
+ }
+
+ try {
+ // Transform Scan node to DrillValuesRel node
+ List<RelDataTypeField> typeFields = new ArrayList<>(fieldNames.size());
+ RelDataTypeFactory typeFactory = scan.getCluster().getTypeFactory();
+
+ int i = 0;
+ for (String field : fieldNames) {
+ RelDataType dataType = typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.VARCHAR, Types.MAX_VARCHAR_LENGTH), true);
+ typeFields.add(new RelDataTypeFieldImpl(field, i++, dataType));
+ }
+ RelRecordType t = new RelRecordType(scan.getRowType().getStructKind(), typeFields);
+ RelNode newInput = DrillRelFactories.LOGICAL_BUILDER.create(scan.getCluster(), null)
+ .values(t, values.toArray())
+ .build();
+
+ RelTraitSet traits = newInput.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
+ newInput = new DrillValuesRel(
+ newInput.getCluster(),
+ newInput.getRowType(),
+ ((LogicalValues) newInput).getTuples(), traits
+ );
+
+ Aggregate aggregate = call.rel(0);
+ Aggregate newAggregate = aggregate.copy(
+ aggregate.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+ newInput,
+ aggregate.indicator,
+ aggregate.getGroupSet(),
+ aggregate.getGroupSets(),
+ aggregate.getAggCallList()
+ );
+ call.transformTo(newAggregate);
+ } catch (Exception e) {
+ logger.warn("Exception while using the pruned partitions.", e);
+ } finally {
+ if (totalPruningTime != null) {
+ logger.debug("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+ }
+ }
+ }
+
+ private List<String> getValues(Path selectionRoot, List<Path> directories, List<Integer> indexes) {
+ List<String> values = new ArrayList<>();
+ for (Path dir : directories) {
+ List<String> parts = ColumnExplorer.listPartitionValues(dir, selectionRoot, true);
+ for (int index : indexes) {
+ if (index < parts.size()) {
+ values.add(parts.get(index));
+ } else {
+ // No partition value for given index - set null value
+ values.add(null);
+ }
+ }
+ }
+
+ return values;
+ }
+
+ private List<String> getValues(List<PartitionLocation> partitions, List<Integer> indexes) {
+ List<String> values = new ArrayList<>(partitions.size() * indexes.size());
+ partitions.forEach(partition -> indexes.forEach(
+ index -> values.add(partition.getPartitionValue(index)))
+ );
+ return values;
+ }
+
+ private static boolean isQualifiedFilePruning(final TableScan scan) {
+ if (scan instanceof EnumerableTableScan) {
+ Object selection = getDrillTable(scan).getSelection();
+ return selection instanceof FormatSelection;
+ } else if (scan instanceof DrillScanRel) {
+ GroupScan groupScan = ((DrillScanRel) scan).getGroupScan();
+ // this rule is applicable only for dfs based partition pruning in Drill Logical
+ return groupScan instanceof FileGroupScan;
+ }
+ return false;
+ }
+ }
}