aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical
diff options
context:
space:
mode:
authorAman Sinha <asinha@maprtech.com>2016-03-25 12:55:59 -0700
committerAman Sinha <asinha@maprtech.com>2016-07-18 17:01:24 -0700
commit4f818d074373f3572cb3c2e99d1c9c43df2090aa (patch)
treee58a37eeafa01c32f526fc9ac9373988e2d8e8c6 /exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical
parent70aba772a9434e0703078bddb47008f35cffb8bf (diff)
DRILL-4530: Optimize partition pruning with metadata caching for the single partition case.
- Enhance PruneScanRule to detect single partitions based on referenced dirs in the filter. - Keep a new status of EXPANDED_PARTIAL for FileSelection. - Create separate .directories metadata file to prune directories first before files. - Introduce cacheFileRoot attribute to keep track of the parent directory of the cache file after partition pruning. Check if prefix components are non-null the very first time single partition info is initialized. Add separate interface method to create scan using a cacheFileRoot. Create filenames list with unique names using fileSet if available. Add several unit tests. Populate only fileSet when expanding using the metadata cache. Remove cacheFileRoot parameter from FileGroupScan's clone() method and instead leverage it from FileSelection. Keep track of whether all partitions were previously pruned and process this state where needed. close apache/drill#519
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/FindPartitionConditions.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java114
2 files changed, 118 insertions, 8 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/FindPartitionConditions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/FindPartitionConditions.java
index d1446b6c7..620b6b266 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/FindPartitionConditions.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/FindPartitionConditions.java
@@ -84,6 +84,10 @@ public class FindPartitionConditions extends RexVisitorImpl<Void> {
private final BitSet dirs;
+ // The Scan could be projecting several dirN columns but we are only interested in the
+ // ones that are referenced by the Filter, so keep track of such referenced dirN columns.
+ private final BitSet referencedDirs;
+
private final List<PushDirFilter> pushStatusStack = Lists.newArrayList();
private final Deque<OpState> opStack = new ArrayDeque<OpState>();
@@ -103,6 +107,7 @@ public class FindPartitionConditions extends RexVisitorImpl<Void> {
// go deep
super(true);
this.dirs = dirs;
+ this.referencedDirs = new BitSet(dirs.size());
}
public FindPartitionConditions(BitSet dirs, RexBuilder builder) {
@@ -110,6 +115,7 @@ public class FindPartitionConditions extends RexVisitorImpl<Void> {
super(true);
this.dirs = dirs;
this.builder = builder;
+ this.referencedDirs = new BitSet(dirs.size());
}
public void analyze(RexNode exp) {
@@ -131,6 +137,10 @@ public class FindPartitionConditions extends RexVisitorImpl<Void> {
return resultCondition;
}
+ public BitSet getReferencedDirs() {
+ return referencedDirs;
+ }
+
private Void pushVariable() {
pushStatusStack.add(PushDirFilter.NO_PUSH);
return null;
@@ -222,6 +232,8 @@ public class FindPartitionConditions extends RexVisitorImpl<Void> {
if(dirs.get(inputRef.getIndex())){
pushStatusStack.add(PushDirFilter.PUSH);
addResult(inputRef);
+ referencedDirs.set(inputRef.getIndex());
+
}else{
pushStatusStack.add(PushDirFilter.NO_PUSH);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index a9fb10188..209e03da5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -48,6 +48,7 @@ import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.FileSystemPartitionDescriptor;
import org.apache.drill.exec.planner.PartitionDescriptor;
import org.apache.drill.exec.planner.PartitionLocation;
+import org.apache.drill.exec.planner.SimplePartitionLocation;
import org.apache.drill.exec.planner.logical.DrillOptiq;
import org.apache.drill.exec.planner.logical.DrillParseContext;
import org.apache.drill.exec.planner.logical.DrillScanRel;
@@ -60,7 +61,6 @@ import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
import org.apache.drill.exec.store.dfs.FormatSelection;
-import org.apache.drill.exec.store.parquet.ParquetGroupScan;
import org.apache.drill.exec.vector.NullableBitVector;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.plan.RelOptRule;
@@ -68,6 +68,7 @@ import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelOptRuleOperand;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rex.RexNode;
+import org.apache.commons.lang3.tuple.Pair;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -143,6 +144,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
}
protected void doOnMatch(RelOptRuleCall call, Filter filterRel, Project projectRel, TableScan scanRel) {
+
final String pruningClassName = getClass().getName();
logger.info("Beginning partition pruning, pruning class: {}", pruningClassName);
Stopwatch totalPruningTime = Stopwatch.createStarted();
@@ -166,6 +168,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
List<String> fieldNames = scanRel.getRowType().getFieldNames();
BitSet columnBitset = new BitSet();
BitSet partitionColumnBitSet = new BitSet();
+ Map<Integer, Integer> partitionMap = Maps.newHashMap();
int relColIndex = 0;
for (String field : fieldNames) {
@@ -174,6 +177,8 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
fieldNameMap.put(partitionIndex, field);
partitionColumnBitSet.set(partitionIndex);
columnBitset.set(relColIndex);
+ // mapping between the relColIndex and partitionIndex
+ partitionMap.put(relColIndex, partitionIndex);
}
relColIndex++;
}
@@ -193,6 +198,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
FindPartitionConditions c = new FindPartitionConditions(columnBitset, filterRel.getCluster().getRexBuilder());
c.analyze(condition);
RexNode pruneCondition = c.getFinalCondition();
+ BitSet referencedDirsBitSet = c.getReferencedDirs();
logger.info("Total elapsed time to build and analyze filter tree: {} ms",
miscTimer.elapsed(TimeUnit.MILLISECONDS));
@@ -210,6 +216,10 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
int batchIndex = 0;
PartitionLocation firstLocation = null;
LogicalExpression materializedExpr = null;
+ boolean checkForSingle = descriptor.supportsSinglePartOptimization();
+ boolean isSinglePartition = true;
+ String[] spInfo = null;
+ int maxIndex = -1;
// Outer loop: iterate over a list of batches of PartitionLocations
for (List<PartitionLocation> partitions : descriptor) {
@@ -269,13 +279,59 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
int recordCount = 0;
int qualifiedCount = 0;
- // Inner loop: within each batch iterate over the PartitionLocations
- for(PartitionLocation part: partitions){
- if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount) == 1){
- newPartitions.add(part);
- qualifiedCount++;
+ if (checkForSingle &&
+ partitions.get(0).isCompositePartition() /* apply single partition check only for composite partitions */) {
+ // Inner loop: within each batch iterate over the PartitionLocations
+ for (PartitionLocation part : partitions) {
+ assert part.isCompositePartition();
+ if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount) == 1) {
+ newPartitions.add(part);
+ if (isSinglePartition) { // only need to do this if we are already single partition
+ // compose the array of partition values for the directories that are referenced by filter:
+ // e.g suppose the dir hierarchy is year/quarter/month and the query is:
+ // SELECT * FROM T WHERE dir0=2015 AND dir1 = 'Q1',
+ // then for 2015/Q1/Feb, this will have ['2015', 'Q1', null]
+ // Note that we are not using the PartitionLocation here but composing a different list because
+ // we are only interested in the directory columns that are referenced in the filter condition. not
+ // the SELECT list or other parts of the query.
+ Pair<String[], Integer> p = composePartition(referencedDirsBitSet, partitionMap, vectors, recordCount);
+ String[] parts = p.getLeft();
+ int tmpIndex = p.getRight();
+ if (spInfo == null) {
+ for (int j = 0; j <= tmpIndex; j++) {
+ if (parts[j] == null) { // prefixes should be non-null
+ isSinglePartition = false;
+ break;
+ }
+ }
+ spInfo = parts;
+ maxIndex = tmpIndex;
+ } else if (maxIndex != tmpIndex) {
+ isSinglePartition = false;
+ break;
+ } else {
+ // we only want to compare until the maxIndex inclusive since subsequent values would be null
+ for (int j = 0; j <= maxIndex; j++) {
+ if (!spInfo[j].equals(parts[j])) {
+ isSinglePartition = false;
+ break;
+ }
+ }
+ }
+ }
+ qualifiedCount++;
+ }
+ recordCount++;
+ }
+ } else {
+ // Inner loop: within each batch iterate over the PartitionLocations
+ for(PartitionLocation part: partitions){
+ if(!output.getAccessor().isNull(recordCount) && output.getAccessor().get(recordCount) == 1) {
+ newPartitions.add(part);
+ qualifiedCount++;
+ }
+ recordCount++;
}
- recordCount++;
}
logger.debug("Within batch {}: total records: {}, qualified records: {}", batchIndex, recordCount, qualifiedCount);
batchIndex++;
@@ -299,6 +355,8 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
// handle the case all partitions are filtered out.
boolean canDropFilter = true;
+ boolean wasAllPartitionsPruned = false;
+ String cacheFileRoot = null;
if (newPartitions.isEmpty()) {
assert firstLocation != null;
@@ -306,6 +364,16 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
// In such case, we should not drop filter.
newPartitions.add(firstLocation.getPartitionLocationRecursive().get(0));
canDropFilter = false;
+ // NOTE: with DRILL-4530, the PruneScanRule may be called with only a list of
+ // directories first and the non-composite partition location will still return
+ // directories, not files. So, additional processing is done depending on this flag
+ wasAllPartitionsPruned = true;
+ logger.info("All {} partitions were pruned; added back a single partition to allow creating a schema", numTotal);
+
+ // set the cacheFileRoot appropriately
+ if (firstLocation.isCompositePartition()) {
+ cacheFileRoot = descriptor.getBaseTableLocation() + firstLocation.getCompositePartitionPath();
+ }
}
logger.info("Pruned {} partitions down to {}", numTotal, newPartitions.size());
@@ -320,7 +388,18 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
condition = condition.accept(reverseVisitor);
pruneCondition = pruneCondition.accept(reverseVisitor);
- RelNode inputRel = descriptor.createTableScan(newPartitions);
+ if (checkForSingle && isSinglePartition && !wasAllPartitionsPruned) {
+ // if metadata cache file could potentially be used, then assign a proper cacheFileRoot
+ String path = "";
+ for (int j = 0; j <= maxIndex; j++) {
+ path += "/" + spInfo[j];
+ }
+ cacheFileRoot = descriptor.getBaseTableLocation() + path;
+ }
+
+ RelNode inputRel = descriptor.supportsSinglePartOptimization() ?
+ descriptor.createTableScan(newPartitions, cacheFileRoot, wasAllPartitionsPruned) :
+ descriptor.createTableScan(newPartitions, wasAllPartitionsPruned);
if (projectRel != null) {
inputRel = projectRel.copy(projectRel.getTraitSet(), Collections.singletonList(inputRel));
@@ -340,6 +419,25 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
}
}
+ private Pair<String[], Integer> composePartition(BitSet referencedDirsBitSet,
+ Map<Integer, Integer> partitionMap,
+ ValueVector[] vectors,
+ int recordCount) {
+ String[] partition = new String[vectors.length];
+ int maxIndex = -1;
+ for (int referencedDirsIndex : BitSets.toIter(referencedDirsBitSet)) {
+ int partitionColumnIndex = partitionMap.get(referencedDirsIndex);
+ ValueVector vv = vectors[partitionColumnIndex];
+ if (vv.getAccessor().getValueCount() > 0 &&
+ vv.getAccessor().getObject(recordCount) != null) {
+ String value = vv.getAccessor().getObject(recordCount).toString();
+ partition[partitionColumnIndex] = value;
+ maxIndex = Math.max(maxIndex, partitionColumnIndex);
+ }
+ }
+ return Pair.of(partition, maxIndex);
+ }
+
protected LogicalExpression materializePruneExpr(RexNode pruneCondition,
PlannerSettings settings,
RelNode scanRel,