diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java | 246 |
1 files changed, 240 insertions, 6 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java index 8d3a8cec9..901592f6a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java @@ -17,19 +17,31 @@ */ package org.apache.drill.exec.planner.logical.partition; +import java.util.ArrayList; import java.util.BitSet; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import org.apache.drill.exec.util.DrillFileSystemUtil; import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; import org.apache.calcite.adapter.enumerable.EnumerableTableScan; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.core.Aggregate; import org.apache.calcite.rel.core.Filter; import org.apache.calcite.rel.core.Project; import org.apache.calcite.rel.core.TableScan; +import org.apache.calcite.rel.logical.LogicalValues; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeFieldImpl; +import org.apache.calcite.rel.type.RelRecordType; import org.apache.calcite.rex.RexUtil; +import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.util.BitSets; import org.apache.drill.common.expression.ErrorCollectorImpl; import org.apache.drill.common.expression.LogicalExpression; @@ -50,15 +62,20 @@ import org.apache.drill.exec.planner.PartitionDescriptor; import org.apache.drill.exec.planner.PartitionLocation; import org.apache.drill.exec.planner.logical.DrillOptiq; import org.apache.drill.exec.planner.logical.DrillParseContext; +import org.apache.drill.exec.planner.logical.DrillRel; +import org.apache.drill.exec.planner.logical.DrillRelFactories; import org.apache.drill.exec.planner.logical.DrillScanRel; import org.apache.drill.exec.planner.logical.DrillTable; import org.apache.drill.exec.planner.logical.DrillTranslatableTable; +import org.apache.drill.exec.planner.logical.DrillValuesRel; import org.apache.drill.exec.planner.logical.RelOptHelper; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.planner.physical.PrelUtil; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.VectorContainer; +import org.apache.drill.exec.store.ColumnExplorer; import org.apache.drill.exec.store.StoragePluginOptimizerRule; +import org.apache.drill.exec.store.dfs.FileSelection; import org.apache.drill.exec.store.dfs.FormatSelection; import org.apache.drill.exec.store.dfs.MetadataContext; import org.apache.drill.exec.store.dfs.MetadataContext.PruneStatus; @@ -71,9 +88,6 @@ import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rex.RexNode; import org.apache.commons.lang3.tuple.Pair; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; -import org.apache.drill.shaded.guava.com.google.common.collect.Maps; - import org.apache.drill.exec.vector.ValueVector; import org.apache.hadoop.fs.Path; @@ -144,6 +158,10 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { return new DirPruneScanFilterOnScanRule(optimizerRulesContext); } + public static RelOptRule getConvertAggScanToValuesRule(OptimizerRulesContext optimizerRulesContext) { + return new ConvertAggScanToValuesRule(optimizerRulesContext); + } + protected void doOnMatch(RelOptRuleCall call, Filter filterRel, Project projectRel, TableScan scanRel) { final String pruningClassName = getClass().getName(); @@ -171,11 +189,11 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { RewriteAsBinaryOperators visitor = new RewriteAsBinaryOperators(true, filterRel.getCluster().getRexBuilder()); condition = condition.accept(visitor); - Map<Integer, String> fieldNameMap = Maps.newHashMap(); + Map<Integer, String> fieldNameMap = new HashMap<>(); List<String> fieldNames = scanRel.getRowType().getFieldNames(); BitSet columnBitset = new BitSet(); BitSet partitionColumnBitSet = new BitSet(); - Map<Integer, Integer> partitionMap = Maps.newHashMap(); + Map<Integer, Integer> partitionMap = new HashMap<>(); int relColIndex = 0; for (String field : fieldNames) { @@ -223,7 +241,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { } // set up the partitions - List<PartitionLocation> newPartitions = Lists.newArrayList(); + List<PartitionLocation> newPartitions = new ArrayList<>(); long numTotal = 0; // total number of partitions int batchIndex = 0; PartitionLocation firstLocation = null; @@ -553,4 +571,220 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { } } + /** + * A rule which converts {@link Aggregate} on {@link TableScan} with directory columns + * (see {@link org.apache.drill.exec.ExecConstants#FILESYSTEM_PARTITION_COLUMN_LABEL}) only + * into {@link DrillValuesRel} to avoid scanning at all. + * + * Resulting {@link DrillValuesRel} will be populated with constant literals obtained from: + * <ol> + * <li>metadata directory file if it exists</li> + * <li>or from file selection</li> + * </ol> + */ + private static class ConvertAggScanToValuesRule extends PruneScanRule { + + private final Pattern dirPattern; + + private ConvertAggScanToValuesRule(OptimizerRulesContext optimizerRulesContext) { + super(RelOptHelper.some(Aggregate.class, DrillRel.DRILL_LOGICAL, RelOptHelper.any(TableScan.class)), + "PartitionColumnScanPruningRule:Prune_On_Scan", optimizerRulesContext); + String partitionColumnLabel = optimizerRulesContext.getPlannerSettings().getFsPartitionColumnLabel(); + dirPattern = Pattern.compile(partitionColumnLabel + "\\d+"); + } + + @Override + public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) { + return new FileSystemPartitionDescriptor(settings, scanRel); + } + + // Checks if query references directory columns only and has DISTINCT or GROUP BY operation + @Override + public boolean matches(RelOptRuleCall call) { + Aggregate aggregate = call.rel(0); + TableScan scan = call.rel(1); + + if (!isQualifiedFilePruning(scan) + || scan.getRowType().getFieldCount() != aggregate.getRowType().getFieldCount()) { + return false; + } + + List<String> fieldNames = scan.getRowType().getFieldNames(); + // Check if select contains partition columns (dir0, dir1, dir2,..., dirN) only + for (String field : fieldNames) { + if (!dirPattern.matcher(field).matches()) { + return false; + } + } + + return scan.isDistinct() || aggregate.getGroupCount() > 0; + } + + /* + Transforms Scan node to DrillValuesRel node to avoid unnecessary scanning of selected files. + If cache metadata directory file exists, directory columns will be read from it, + otherwise directories will be gathered from selection (PartitionLocations). + DrillValuesRel will contain gathered constant literals. + + For example, plan for "select dir0, dir1 from `t` group by 1, 2", where table `t` has directory structure year/quarter + + 00-00 Screen + 00-01 Project(dir0=[$0], dir1=[$1]) + 00-02 HashAgg(group=[{0, 1}]) + 00-03 Scan(table=[[t]], groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=file:/path/t/1996/Q4/orders_96_q4.parquet], + ReadEntryWithPath [path=file:/path/t/1996/Q1/file_96_q1.parquet], ReadEntryWithPath [path=file:/path/t/1996/Q3/file_96_q3.parquet], + ReadEntryWithPath [path=file:/path/t/1996/Q2/file_96_q2.parquet], ReadEntryWithPath [path=file:/path/t/1994/Q4/file_94_q4.parquet], + ReadEntryWithPath [path=file:/path/t/1994/Q1/file_94_q1.parquet], ReadEntryWithPath [path=file:/path/t/1994/Q3/file_94_q3.parquet], + ReadEntryWithPath [path=file:/path/t/1994/Q2/file_94_q2.parquet], ReadEntryWithPath [path=file:/path/t/1995/Q4/file_95_q4.parquet], + ReadEntryWithPath [path=file:/path/t/1995/Q1/file_95_q1.parquet], ReadEntryWithPath [path=file:/path/t/1995/Q3/file_95_q3.parquet], + ReadEntryWithPath [path=file:/path/t/1995/Q2/file_95_q2.parquet]], selectionRoot=file:/path/t, ..., columns=[`dir0`, `dir1`]]]) + + will be changed to + + 00-00 Screen + 00-01 Project(dir0=[$0], dir1=[$1]) + 00-02 HashAgg(group=[{0, 1}]) + 00-03 Values(tuples=[[{ '1995', 'Q1' }, { '1994', 'Q4' }, { '1996', 'Q3' }, { '1996', 'Q2' }, { '1994', 'Q2' }, + { '1995', 'Q4' }, { '1996', 'Q1' }, { '1995', 'Q3' }, { '1996', 'Q4' }, { '1994', 'Q3' }, { '1994', 'Q1' }, { '1995', 'Q2' }]]) + */ + @Override + public void onMatch(RelOptRuleCall call) { + TableScan scan = call.rel(1); + + String pruningClassName = getClass().getName(); + logger.debug("Beginning file partition pruning, pruning class: {}", pruningClassName); + Stopwatch totalPruningTime = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; + + Object selection = getDrillTable(scan).getSelection(); + MetadataContext metaContext = null; + FileSelection fileSelection = null; + if (selection instanceof FormatSelection) { + fileSelection = ((FormatSelection) selection).getSelection(); + metaContext = fileSelection.getMetaContext(); + } + + PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner()); + PartitionDescriptor descriptor = getPartitionDescriptor(settings, scan); + + List<String> fieldNames = scan.getRowType().getFieldNames(); + List<String> values = Collections.emptyList(); + List<Integer> indexes = new ArrayList<>(fieldNames.size()); + for (String field : fieldNames) { + int index = descriptor.getPartitionHierarchyIndex(field); + indexes.add(index); + } + + if (metaContext != null && metaContext.getDirectories() != null) { + // Dir metadata cache file exists + logger.debug("Using Metadata Directories cache"); + values = getValues(fileSelection.getSelectionRoot(), metaContext.getDirectories(), indexes); + } + + if (values.isEmpty()) { + logger.debug("Not using Metadata Directories cache"); + int batchIndex = 0; + // Outer loop: iterate over a list of batches of PartitionLocations + values = new ArrayList<>(); + for (List<PartitionLocation> partitions : descriptor) { + logger.debug("Evaluating file partition pruning for batch {}", batchIndex); + + try { + values.addAll(getValues(partitions, indexes)); + } catch (Exception e) { + logger.warn("Exception while trying to prune files.", e); + if (totalPruningTime != null) { + logger.debug("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS)); + } + + // continue without partition pruning + return; + } + batchIndex++; + } + + if (values.isEmpty()) { + // No changes are required + return; + } + } + + try { + // Transform Scan node to DrillValuesRel node + List<RelDataTypeField> typeFields = new ArrayList<>(fieldNames.size()); + RelDataTypeFactory typeFactory = scan.getCluster().getTypeFactory(); + + int i = 0; + for (String field : fieldNames) { + RelDataType dataType = typeFactory.createTypeWithNullability( + typeFactory.createSqlType(SqlTypeName.VARCHAR, Types.MAX_VARCHAR_LENGTH), true); + typeFields.add(new RelDataTypeFieldImpl(field, i++, dataType)); + } + RelRecordType t = new RelRecordType(scan.getRowType().getStructKind(), typeFields); + RelNode newInput = DrillRelFactories.LOGICAL_BUILDER.create(scan.getCluster(), null) + .values(t, values.toArray()) + .build(); + + RelTraitSet traits = newInput.getTraitSet().plus(DrillRel.DRILL_LOGICAL); + newInput = new DrillValuesRel( + newInput.getCluster(), + newInput.getRowType(), + ((LogicalValues) newInput).getTuples(), traits + ); + + Aggregate aggregate = call.rel(0); + Aggregate newAggregate = aggregate.copy( + aggregate.getTraitSet().plus(DrillRel.DRILL_LOGICAL), + newInput, + aggregate.indicator, + aggregate.getGroupSet(), + aggregate.getGroupSets(), + aggregate.getAggCallList() + ); + call.transformTo(newAggregate); + } catch (Exception e) { + logger.warn("Exception while using the pruned partitions.", e); + } finally { + if (totalPruningTime != null) { + logger.debug("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS)); + } + } + } + + private List<String> getValues(Path selectionRoot, List<Path> directories, List<Integer> indexes) { + List<String> values = new ArrayList<>(); + for (Path dir : directories) { + List<String> parts = ColumnExplorer.listPartitionValues(dir, selectionRoot, true); + for (int index : indexes) { + if (index < parts.size()) { + values.add(parts.get(index)); + } else { + // No partition value for given index - set null value + values.add(null); + } + } + } + + return values; + } + + private List<String> getValues(List<PartitionLocation> partitions, List<Integer> indexes) { + List<String> values = new ArrayList<>(partitions.size() * indexes.size()); + partitions.forEach(partition -> indexes.forEach( + index -> values.add(partition.getPartitionValue(index))) + ); + return values; + } + + private static boolean isQualifiedFilePruning(final TableScan scan) { + if (scan instanceof EnumerableTableScan) { + Object selection = getDrillTable(scan).getSelection(); + return selection instanceof FormatSelection; + } else if (scan instanceof DrillScanRel) { + GroupScan groupScan = ((DrillScanRel) scan).getGroupScan(); + // this rule is applicable only for dfs based partition pruning in Drill Logical + return groupScan instanceof FileGroupScan; + } + return false; + } + } } |