aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java114
1 files changed, 106 insertions, 8 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index a9fb10188..209e03da5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -48,6 +48,7 @@ import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.FileSystemPartitionDescriptor;
import org.apache.drill.exec.planner.PartitionDescriptor;
import org.apache.drill.exec.planner.PartitionLocation;
+import org.apache.drill.exec.planner.SimplePartitionLocation;
import org.apache.drill.exec.planner.logical.DrillOptiq;
import org.apache.drill.exec.planner.logical.DrillParseContext;
import org.apache.drill.exec.planner.logical.DrillScanRel;
@@ -60,7 +61,6 @@ import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.dfs.FormatSelection;
-import org.apache.drill.exec.store.parquet.ParquetGroupScan;
import org.apache.drill.exec.vector.NullableBitVector;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.plan.RelOptRule;
@@ -68,6 +68,7 @@ import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rex.RexNode;
+import org.apache.commons.lang3.tuple.Pair;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -143,6 +144,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
}
protected void doOnMatch(RelOptRuleCall call, Filter filterRel, Project projectRel, TableScan scanRel) {
+
final String pruningClassName = getClass().getName();
logger.info("Beginning partition pruning, pruning class: {}", pruningClassName);
Stopwatch totalPruningTime = Stopwatch.createStarted();
@@ -166,6 +168,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
List<String> fieldNames = scanRel.getRowType().getFieldNames();
BitSet columnBitset = new BitSet();
BitSet partitionColumnBitSet = new BitSet();
+ Map<Integer, Integer> partitionMap = Maps.newHashMap();
int relColIndex = 0;
for (String field : fieldNames) {
@@ -174,6 +177,8 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
fieldNameMap.put(partitionIndex, field);
partitionColumnBitSet.set(partitionIndex);
columnBitset.set(relColIndex);
+ // mapping between the relColIndex and partitionIndex
+ partitionMap.put(relColIndex, partitionIndex);
}
relColIndex++;
}
@@ -193,6 +198,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
FindPartitionConditions c = new FindPartitionConditions(columnBitset, filterRel.getCluster().getRexBuilder());
c.analyze(condition);
RexNode pruneCondition = c.getFinalCondition();
+ BitSet referencedDirsBitSet = c.getReferencedDirs();
logger.info("Total elapsed time to build and analyze filter tree: {} ms",
miscTimer.elapsed(TimeUnit.MILLISECONDS));
@@ -210,6 +216,10 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
int batchIndex = 0;
PartitionLocation firstLocation = null;
LogicalExpression materializedExpr = null;
+ boolean checkForSingle = descriptor.supportsSinglePartOptimization();
+ boolean isSinglePartition = true;
+ String[] spInfo = null;
+ int maxIndex = -1;
// Outer loop: iterate over a list of batches of PartitionLocations
for (List<PartitionLocation> partitions : descriptor) {
@@ -269,13 +279,59 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
int recordCount = 0;
int qualifiedCount = 0;
- // Inner loop: within each batch iterate over the PartitionLocations
- for(PartitionLocation part: partitions){
- if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount) == 1){
- newPartitions.add(part);
- qualifiedCount++;
+ if (checkForSingle &&
+ partitions.get(0).isCompositePartition() /* apply single partition check only for composite partitions */) {
+ // Inner loop: within each batch iterate over the PartitionLocations
+ for (PartitionLocation part : partitions) {
+ assert part.isCompositePartition();
+ if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount) == 1) {
+ newPartitions.add(part);
+ if (isSinglePartition) { // only need to do this if we are already single partition
+ // compose the array of partition values for the directories that are referenced by filter:
+ // e.g suppose the dir hierarchy is year/quarter/month and the query is:
+ // SELECT * FROM T WHERE dir0=2015 AND dir1 = 'Q1',
+ // then for 2015/Q1/Feb, this will have ['2015', 'Q1', null]
+ // Note that we are not using the PartitionLocation here but composing a different list because
+ // we are only interested in the directory columns that are referenced in the filter condition. not
+ // the SELECT list or other parts of the query.
+ Pair<String[], Integer> p = composePartition(referencedDirsBitSet, partitionMap, vectors, recordCount);
+ String[] parts = p.getLeft();
+ int tmpIndex = p.getRight();
+ if (spInfo == null) {
+ for (int j = 0; j <= tmpIndex; j++) {
+ if (parts[j] == null) { // prefixes should be non-null
+ isSinglePartition = false;
+ break;
+ }
+ }
+ spInfo = parts;
+ maxIndex = tmpIndex;
+ } else if (maxIndex != tmpIndex) {
+ isSinglePartition = false;
+ break;
+ } else {
+ // we only want to compare until the maxIndex inclusive since subsequent values would be null
+ for (int j = 0; j <= maxIndex; j++) {
+ if (!spInfo[j].equals(parts[j])) {
+ isSinglePartition = false;
+ break;
+ }
+ }
+ }
+ }
+ qualifiedCount++;
+ }
+ recordCount++;
+ }
+ } else {
+ // Inner loop: within each batch iterate over the PartitionLocations
+ for(PartitionLocation part: partitions){
+ if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount) == 1) {
+ newPartitions.add(part);
+ qualifiedCount++;
+ }
+ recordCount++;
}
- recordCount++;
}
logger.debug("Within batch {}: total records: {}, qualified records: {}", batchIndex, recordCount, qualifiedCount);
batchIndex++;
@@ -299,6 +355,8 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
// handle the case all partitions are filtered out.
boolean canDropFilter = true;
+ boolean wasAllPartitionsPruned = false;
+ String cacheFileRoot = null;
if (newPartitions.isEmpty()) {
assert firstLocation != null;
@@ -306,6 +364,16 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
// In such case, we should not drop filter.
newPartitions.add(firstLocation.getPartitionLocationRecursive().get(0));
canDropFilter = false;
+ // NOTE: with DRILL-4530, the PruneScanRule may be called with only a list of
+ // directories first and the non-composite partition location will still return
+ // directories, not files. So, additional processing is done depending on this flag
+ wasAllPartitionsPruned = true;
+ logger.info("All {} partitions were pruned; added back a single partition to allow creating a schema", numTotal);
+
+ // set the cacheFileRoot appropriately
+ if (firstLocation.isCompositePartition()) {
+ cacheFileRoot = descriptor.getBaseTableLocation() + firstLocation.getCompositePartitionPath();
+ }
}
logger.info("Pruned {} partitions down to {}", numTotal, newPartitions.size());
@@ -320,7 +388,18 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
condition = condition.accept(reverseVisitor);
pruneCondition = pruneCondition.accept(reverseVisitor);
- RelNode inputRel = descriptor.createTableScan(newPartitions);
+ if (checkForSingle && isSinglePartition && !wasAllPartitionsPruned) {
+ // if metadata cache file could potentially be used, then assign a proper cacheFileRoot
+ String path = "";
+ for (int j = 0; j <= maxIndex; j++) {
+ path += "/" + spInfo[j];
+ }
+ cacheFileRoot = descriptor.getBaseTableLocation() + path;
+ }
+
+ RelNode inputRel = descriptor.supportsSinglePartOptimization() ?
+ descriptor.createTableScan(newPartitions, cacheFileRoot, wasAllPartitionsPruned) :
+ descriptor.createTableScan(newPartitions, wasAllPartitionsPruned);
if (projectRel != null) {
inputRel = projectRel.copy(projectRel.getTraitSet(), Collections.singletonList(inputRel));
@@ -340,6 +419,25 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
}
}
+ private Pair<String[], Integer> composePartition(BitSet referencedDirsBitSet,
+ Map<Integer, Integer> partitionMap,
+ ValueVector[] vectors,
+ int recordCount) {
+ String[] partition = new String[vectors.length];
+ int maxIndex = -1;
+ for (int referencedDirsIndex : BitSets.toIter(referencedDirsBitSet)) {
+ int partitionColumnIndex = partitionMap.get(referencedDirsIndex);
+ ValueVector vv = vectors[partitionColumnIndex];
+ if (vv.getAccessor().getValueCount() > 0 &&
+ vv.getAccessor().getObject(recordCount) != null) {
+ String value = vv.getAccessor().getObject(recordCount).toString();
+ partition[partitionColumnIndex] = value;
+ maxIndex = Math.max(maxIndex, partitionColumnIndex);
+ }
+ }
+ return Pair.of(partition, maxIndex);
+ }
+
protected LogicalExpression materializePruneExpr(RexNode pruneCondition,
PlannerSettings settings,
RelNode scanRel,