diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java | 114 |
1 files changed, 106 insertions, 8 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java index a9fb10188..209e03da5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java @@ -48,6 +48,7 @@ import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.planner.FileSystemPartitionDescriptor; import org.apache.drill.exec.planner.PartitionDescriptor; import org.apache.drill.exec.planner.PartitionLocation; +import org.apache.drill.exec.planner.SimplePartitionLocation; import org.apache.drill.exec.planner.logical.DrillOptiq; import org.apache.drill.exec.planner.logical.DrillParseContext; import org.apache.drill.exec.planner.logical.DrillScanRel; @@ -60,7 +61,6 @@ import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.store.StoragePluginOptimizerRule; import org.apache.drill.exec.store.dfs.FormatSelection; -import org.apache.drill.exec.store.parquet.ParquetGroupScan; import org.apache.drill.exec.vector.NullableBitVector; import org.apache.calcite.rel.RelNode; import org.apache.calcite.plan.RelOptRule; @@ -68,6 +68,7 @@ import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.plan.RelOptRuleOperand; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rex.RexNode; +import org.apache.commons.lang3.tuple.Pair; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -143,6 +144,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { } protected void doOnMatch(RelOptRuleCall call, Filter filterRel, Project projectRel, TableScan scanRel) { + final String pruningClassName = getClass().getName(); logger.info("Beginning partition pruning, pruning class: {}", pruningClassName); Stopwatch totalPruningTime = Stopwatch.createStarted(); @@ -166,6 +168,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { List<String> fieldNames = scanRel.getRowType().getFieldNames(); BitSet columnBitset = new BitSet(); BitSet partitionColumnBitSet = new BitSet(); + Map<Integer, Integer> partitionMap = Maps.newHashMap(); int relColIndex = 0; for (String field : fieldNames) { @@ -174,6 +177,8 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { fieldNameMap.put(partitionIndex, field); partitionColumnBitSet.set(partitionIndex); columnBitset.set(relColIndex); + // mapping between the relColIndex and partitionIndex + partitionMap.put(relColIndex, partitionIndex); } relColIndex++; } @@ -193,6 +198,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { FindPartitionConditions c = new FindPartitionConditions(columnBitset, filterRel.getCluster().getRexBuilder()); c.analyze(condition); RexNode pruneCondition = c.getFinalCondition(); + BitSet referencedDirsBitSet = c.getReferencedDirs(); logger.info("Total elapsed time to build and analyze filter tree: {} ms", miscTimer.elapsed(TimeUnit.MILLISECONDS)); @@ -210,6 +216,10 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { int batchIndex = 0; PartitionLocation firstLocation = null; LogicalExpression materializedExpr = null; + boolean checkForSingle = descriptor.supportsSinglePartOptimization(); + boolean isSinglePartition = true; + String[] spInfo = null; + int maxIndex = -1; // Outer loop: iterate over a list of batches of PartitionLocations for (List<PartitionLocation> partitions : descriptor) { @@ -269,13 +279,59 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { int recordCount = 0; int qualifiedCount = 0; - // Inner loop: within each batch iterate over the PartitionLocations - for(PartitionLocation part: partitions){ - if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount) == 1){ - newPartitions.add(part); - qualifiedCount++; + if (checkForSingle && + partitions.get(0).isCompositePartition() /* apply single partition check only for composite partitions */) { + // Inner loop: within each batch iterate over the PartitionLocations + for (PartitionLocation part : partitions) { + assert part.isCompositePartition(); + if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount) == 1) { + newPartitions.add(part); + if (isSinglePartition) { // only need to do this if we are already single partition + // compose the array of partition values for the directories that are referenced by filter: + // e.g suppose the dir hierarchy is year/quarter/month and the query is: + // SELECT * FROM T WHERE dir0=2015 AND dir1 = 'Q1', + // then for 2015/Q1/Feb, this will have ['2015', 'Q1', null] + // Note that we are not using the PartitionLocation here but composing a different list because + // we are only interested in the directory columns that are referenced in the filter condition. not + // the SELECT list or other parts of the query. + Pair<String[], Integer> p = composePartition(referencedDirsBitSet, partitionMap, vectors, recordCount); + String[] parts = p.getLeft(); + int tmpIndex = p.getRight(); + if (spInfo == null) { + for (int j = 0; j <= tmpIndex; j++) { + if (parts[j] == null) { // prefixes should be non-null + isSinglePartition = false; + break; + } + } + spInfo = parts; + maxIndex = tmpIndex; + } else if (maxIndex != tmpIndex) { + isSinglePartition = false; + break; + } else { + // we only want to compare until the maxIndex inclusive since subsequent values would be null + for (int j = 0; j <= maxIndex; j++) { + if (!spInfo[j].equals(parts[j])) { + isSinglePartition = false; + break; + } + } + } + } + qualifiedCount++; + } + recordCount++; + } + } else { + // Inner loop: within each batch iterate over the PartitionLocations + for(PartitionLocation part: partitions){ + if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount) == 1) { + newPartitions.add(part); + qualifiedCount++; + } + recordCount++; } - recordCount++; } logger.debug("Within batch {}: total records: {}, qualified records: {}", batchIndex, recordCount, qualifiedCount); batchIndex++; @@ -299,6 +355,8 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { // handle the case all partitions are filtered out. boolean canDropFilter = true; + boolean wasAllPartitionsPruned = false; + String cacheFileRoot = null; if (newPartitions.isEmpty()) { assert firstLocation != null; @@ -306,6 +364,16 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { // In such case, we should not drop filter. newPartitions.add(firstLocation.getPartitionLocationRecursive().get(0)); canDropFilter = false; + // NOTE: with DRILL-4530, the PruneScanRule may be called with only a list of + // directories first and the non-composite partition location will still return + // directories, not files. So, additional processing is done depending on this flag + wasAllPartitionsPruned = true; + logger.info("All {} partitions were pruned; added back a single partition to allow creating a schema", numTotal); + + // set the cacheFileRoot appropriately + if (firstLocation.isCompositePartition()) { + cacheFileRoot = descriptor.getBaseTableLocation() + firstLocation.getCompositePartitionPath(); + } } logger.info("Pruned {} partitions down to {}", numTotal, newPartitions.size()); @@ -320,7 +388,18 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { condition = condition.accept(reverseVisitor); pruneCondition = pruneCondition.accept(reverseVisitor); - RelNode inputRel = descriptor.createTableScan(newPartitions); + if (checkForSingle && isSinglePartition && !wasAllPartitionsPruned) { + // if metadata cache file could potentially be used, then assign a proper cacheFileRoot + String path = ""; + for (int j = 0; j <= maxIndex; j++) { + path += "/" + spInfo[j]; + } + cacheFileRoot = descriptor.getBaseTableLocation() + path; + } + + RelNode inputRel = descriptor.supportsSinglePartOptimization() ? + descriptor.createTableScan(newPartitions, cacheFileRoot, wasAllPartitionsPruned) : + descriptor.createTableScan(newPartitions, wasAllPartitionsPruned); if (projectRel != null) { inputRel = projectRel.copy(projectRel.getTraitSet(), Collections.singletonList(inputRel)); @@ -340,6 +419,25 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { } } + private Pair<String[], Integer> composePartition(BitSet referencedDirsBitSet, + Map<Integer, Integer> partitionMap, + ValueVector[] vectors, + int recordCount) { + String[] partition = new String[vectors.length]; + int maxIndex = -1; + for (int referencedDirsIndex : BitSets.toIter(referencedDirsBitSet)) { + int partitionColumnIndex = partitionMap.get(referencedDirsIndex); + ValueVector vv = vectors[partitionColumnIndex]; + if (vv.getAccessor().getValueCount() > 0 && + vv.getAccessor().getObject(recordCount) != null) { + String value = vv.getAccessor().getObject(recordCount).toString(); + partition[partitionColumnIndex] = value; + maxIndex = Math.max(maxIndex, partitionColumnIndex); + } + } + return Pair.of(partition, maxIndex); + } + protected LogicalExpression materializePruneExpr(RexNode pruneCondition, PlannerSettings settings, RelNode scanRel, |