aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java246
1 files changed, 240 insertions, 6 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index 8d3a8cec9..901592f6a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -17,19 +17,31 @@
*/
package org.apache.drill.exec.planner.logical.partition;
+import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalValues;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
+import org.apache.calcite.rel.type.RelRecordType;
import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.BitSets;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.LogicalExpression;
@@ -50,15 +62,20 @@ import org.apache.drill.exec.planner.PartitionDescriptor;
import org.apache.drill.exec.planner.PartitionLocation;
import org.apache.drill.exec.planner.logical.DrillOptiq;
import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.logical.DrillRelFactories;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.logical.DrillTranslatableTable;
+import org.apache.drill.exec.planner.logical.DrillValuesRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.store.ColumnExplorer;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.FormatSelection;
import org.apache.drill.exec.store.dfs.MetadataContext;
import org.apache.drill.exec.store.dfs.MetadataContext.PruneStatus;
@@ -71,9 +88,6 @@ import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rex.RexNode;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
-
import org.apache.drill.exec.vector.ValueVector;
import org.apache.hadoop.fs.Path;
@@ -144,6 +158,10 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
return new DirPruneScanFilterOnScanRule(optimizerRulesContext);
}
+ public static RelOptRule getConvertAggScanToValuesRule(OptimizerRulesContext optimizerRulesContext) {
+ return new ConvertAggScanToValuesRule(optimizerRulesContext);
+ }
+
protected void doOnMatch(RelOptRuleCall call, Filter filterRel, Project projectRel, TableScan scanRel) {
final String pruningClassName = getClass().getName();
@@ -171,11 +189,11 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
RewriteAsBinaryOperators visitor = new RewriteAsBinaryOperators(true, filterRel.getCluster().getRexBuilder());
condition = condition.accept(visitor);
- Map<Integer, String> fieldNameMap = Maps.newHashMap();
+ Map<Integer, String> fieldNameMap = new HashMap<>();
List<String> fieldNames = scanRel.getRowType().getFieldNames();
BitSet columnBitset = new BitSet();
BitSet partitionColumnBitSet = new BitSet();
- Map<Integer, Integer> partitionMap = Maps.newHashMap();
+ Map<Integer, Integer> partitionMap = new HashMap<>();
int relColIndex = 0;
for (String field : fieldNames) {
@@ -223,7 +241,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
}
// set up the partitions
- List<PartitionLocation> newPartitions = Lists.newArrayList();
+ List<PartitionLocation> newPartitions = new ArrayList<>();
long numTotal = 0; // total number of partitions
int batchIndex = 0;
PartitionLocation firstLocation = null;
@@ -553,4 +571,220 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
}
}
+ /**
+ * A rule which converts {@link Aggregate} on {@link TableScan} with directory columns
+ * (see {@link org.apache.drill.exec.ExecConstants#FILESYSTEM_PARTITION_COLUMN_LABEL}) only
+ * into {@link DrillValuesRel} to avoid scanning at all.
+ *
+ * Resulting {@link DrillValuesRel} will be populated with constant literals obtained from:
+ * <ol>
+ * <li>metadata directory file if it exists</li>
+ * <li>or from file selection</li>
+ * </ol>
+ */
+ private static class ConvertAggScanToValuesRule extends PruneScanRule {
+
+ private final Pattern dirPattern;
+
+ private ConvertAggScanToValuesRule(OptimizerRulesContext optimizerRulesContext) {
+ super(RelOptHelper.some(Aggregate.class, DrillRel.DRILL_LOGICAL, RelOptHelper.any(TableScan.class)),
+ "PartitionColumnScanPruningRule:Prune_On_Scan", optimizerRulesContext);
+ String partitionColumnLabel = optimizerRulesContext.getPlannerSettings().getFsPartitionColumnLabel();
+ dirPattern = Pattern.compile(partitionColumnLabel + "\\d+");
+ }
+
+ @Override
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
+ return new FileSystemPartitionDescriptor(settings, scanRel);
+ }
+
+ // Checks if query references directory columns only and has DISTINCT or GROUP BY operation
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ Aggregate aggregate = call.rel(0);
+ TableScan scan = call.rel(1);
+
+ if (!isQualifiedFilePruning(scan)
+ || scan.getRowType().getFieldCount() != aggregate.getRowType().getFieldCount()) {
+ return false;
+ }
+
+ List<String> fieldNames = scan.getRowType().getFieldNames();
+ // Check if select contains partition columns (dir0, dir1, dir2,..., dirN) only
+ for (String field : fieldNames) {
+ if (!dirPattern.matcher(field).matches()) {
+ return false;
+ }
+ }
+
+ return scan.isDistinct() || aggregate.getGroupCount() > 0;
+ }
+
+ /*
+ Transforms Scan node to DrillValuesRel node to avoid unnecessary scanning of selected files.
+ If cache metadata directory file exists, directory columns will be read from it,
+ otherwise directories will be gathered from selection (PartitionLocations).
+ DrillValuesRel will contain gathered constant literals.
+
+ For example, plan for "select dir0, dir1 from `t` group by 1, 2", where table `t` has directory structure year/quarter
+
+ 00-00 Screen
+ 00-01 Project(dir0=[$0], dir1=[$1])
+ 00-02 HashAgg(group=[{0, 1}])
+ 00-03 Scan(table=[[t]], groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=file:/path/t/1996/Q4/orders_96_q4.parquet],
+ ReadEntryWithPath [path=file:/path/t/1996/Q1/file_96_q1.parquet], ReadEntryWithPath [path=file:/path/t/1996/Q3/file_96_q3.parquet],
+ ReadEntryWithPath [path=file:/path/t/1996/Q2/file_96_q2.parquet], ReadEntryWithPath [path=file:/path/t/1994/Q4/file_94_q4.parquet],
+ ReadEntryWithPath [path=file:/path/t/1994/Q1/file_94_q1.parquet], ReadEntryWithPath [path=file:/path/t/1994/Q3/file_94_q3.parquet],
+ ReadEntryWithPath [path=file:/path/t/1994/Q2/file_94_q2.parquet], ReadEntryWithPath [path=file:/path/t/1995/Q4/file_95_q4.parquet],
+ ReadEntryWithPath [path=file:/path/t/1995/Q1/file_95_q1.parquet], ReadEntryWithPath [path=file:/path/t/1995/Q3/file_95_q3.parquet],
+ ReadEntryWithPath [path=file:/path/t/1995/Q2/file_95_q2.parquet]], selectionRoot=file:/path/t, ..., columns=[`dir0`, `dir1`]]])
+
+ will be changed to
+
+ 00-00 Screen
+ 00-01 Project(dir0=[$0], dir1=[$1])
+ 00-02 HashAgg(group=[{0, 1}])
+ 00-03 Values(tuples=[[{ '1995', 'Q1' }, { '1994', 'Q4' }, { '1996', 'Q3' }, { '1996', 'Q2' }, { '1994', 'Q2' },
+ { '1995', 'Q4' }, { '1996', 'Q1' }, { '1995', 'Q3' }, { '1996', 'Q4' }, { '1994', 'Q3' }, { '1994', 'Q1' }, { '1995', 'Q2' }]])
+ */
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ TableScan scan = call.rel(1);
+
+ String pruningClassName = getClass().getName();
+ logger.debug("Beginning file partition pruning, pruning class: {}", pruningClassName);
+ Stopwatch totalPruningTime = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
+
+ Object selection = getDrillTable(scan).getSelection();
+ MetadataContext metaContext = null;
+ FileSelection fileSelection = null;
+ if (selection instanceof FormatSelection) {
+ fileSelection = ((FormatSelection) selection).getSelection();
+ metaContext = fileSelection.getMetaContext();
+ }
+
+ PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+ PartitionDescriptor descriptor = getPartitionDescriptor(settings, scan);
+
+ List<String> fieldNames = scan.getRowType().getFieldNames();
+ List<String> values = Collections.emptyList();
+ List<Integer> indexes = new ArrayList<>(fieldNames.size());
+ for (String field : fieldNames) {
+ int index = descriptor.getPartitionHierarchyIndex(field);
+ indexes.add(index);
+ }
+
+ if (metaContext != null && metaContext.getDirectories() != null) {
+ // Dir metadata cache file exists
+ logger.debug("Using Metadata Directories cache");
+ values = getValues(fileSelection.getSelectionRoot(), metaContext.getDirectories(), indexes);
+ }
+
+ if (values.isEmpty()) {
+ logger.debug("Not using Metadata Directories cache");
+ int batchIndex = 0;
+ // Outer loop: iterate over a list of batches of PartitionLocations
+ values = new ArrayList<>();
+ for (List<PartitionLocation> partitions : descriptor) {
+ logger.debug("Evaluating file partition pruning for batch {}", batchIndex);
+
+ try {
+ values.addAll(getValues(partitions, indexes));
+ } catch (Exception e) {
+ logger.warn("Exception while trying to prune files.", e);
+ if (totalPruningTime != null) {
+ logger.debug("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+ }
+
+ // continue without partition pruning
+ return;
+ }
+ batchIndex++;
+ }
+
+ if (values.isEmpty()) {
+ // No changes are required
+ return;
+ }
+ }
+
+ try {
+ // Transform Scan node to DrillValuesRel node
+ List<RelDataTypeField> typeFields = new ArrayList<>(fieldNames.size());
+ RelDataTypeFactory typeFactory = scan.getCluster().getTypeFactory();
+
+ int i = 0;
+ for (String field : fieldNames) {
+ RelDataType dataType = typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.VARCHAR, Types.MAX_VARCHAR_LENGTH), true);
+ typeFields.add(new RelDataTypeFieldImpl(field, i++, dataType));
+ }
+ RelRecordType t = new RelRecordType(scan.getRowType().getStructKind(), typeFields);
+ RelNode newInput = DrillRelFactories.LOGICAL_BUILDER.create(scan.getCluster(), null)
+ .values(t, values.toArray())
+ .build();
+
+ RelTraitSet traits = newInput.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
+ newInput = new DrillValuesRel(
+ newInput.getCluster(),
+ newInput.getRowType(),
+ ((LogicalValues) newInput).getTuples(), traits
+ );
+
+ Aggregate aggregate = call.rel(0);
+ Aggregate newAggregate = aggregate.copy(
+ aggregate.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+ newInput,
+ aggregate.indicator,
+ aggregate.getGroupSet(),
+ aggregate.getGroupSets(),
+ aggregate.getAggCallList()
+ );
+ call.transformTo(newAggregate);
+ } catch (Exception e) {
+ logger.warn("Exception while using the pruned partitions.", e);
+ } finally {
+ if (totalPruningTime != null) {
+ logger.debug("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+ }
+ }
+ }
+
+ private List<String> getValues(Path selectionRoot, List<Path> directories, List<Integer> indexes) {
+ List<String> values = new ArrayList<>();
+ for (Path dir : directories) {
+ List<String> parts = ColumnExplorer.listPartitionValues(dir, selectionRoot, true);
+ for (int index : indexes) {
+ if (index < parts.size()) {
+ values.add(parts.get(index));
+ } else {
+ // No partition value for given index - set null value
+ values.add(null);
+ }
+ }
+ }
+
+ return values;
+ }
+
+ private List<String> getValues(List<PartitionLocation> partitions, List<Integer> indexes) {
+ List<String> values = new ArrayList<>(partitions.size() * indexes.size());
+ partitions.forEach(partition -> indexes.forEach(
+ index -> values.add(partition.getPartitionValue(index)))
+ );
+ return values;
+ }
+
+ private static boolean isQualifiedFilePruning(final TableScan scan) {
+ if (scan instanceof EnumerableTableScan) {
+ Object selection = getDrillTable(scan).getSelection();
+ return selection instanceof FormatSelection;
+ } else if (scan instanceof DrillScanRel) {
+ GroupScan groupScan = ((DrillScanRel) scan).getGroupScan();
+ // this rule is applicable only for dfs based partition pruning in Drill Logical
+ return groupScan instanceof FileGroupScan;
+ }
+ return false;
+ }
+ }
}