aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical
diff options
context:
space:
mode:
authorArina Ielchiieva <arina.yelchiyeva@gmail.com>2018-03-20 18:29:45 +0000
committerArina Ielchiieva <arina.yelchiyeva@gmail.com>2018-04-27 11:41:22 +0300
commitc6549e58859397c88cb1de61b4f6eee52a07ed0c (patch)
tree60a4a3d48d095f5afe7d9e07a86a8114be4e85b2 /exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical
parent84cd83495adf7b0a80932535809c58a1cd3324e9 (diff)
DRILL-6331: Revisit Hive Drill native parquet implementation to be exposed to Drill optimizations (filter / limit push down, count to direct scan)
1. Factored out common logic for Drill parquet reader and Hive Drill native parquet readers: AbstractParquetGroupScan, AbstractParquetRowGroupScan, AbstractParquetScanBatchCreator. 2. Rules that worked previously only with ParquetGroupScan, now can be applied for any class that extends AbstractParquetGroupScan: DrillFilterItemStarReWriterRule, ParquetPruneScanRule, PruneScanRule. 3. Hive populated partition values based on information returned from Hive metastore. Drill populates partition values based on path difference between selection root and actual file path. Before ColumnExplorer populated partition values based on Drill approach. Since now ColumnExplorer populates values for parquet files from Hive tables, `populateImplicitColumns` method logic was changed to populated partition columns only based on given partition values. 4. Refactored ParquetPartitionDescriptor to be responsible for populating partition values rather than storing this logic in parquet group scan class. 5. Metadata class was moved to separate metadata package (org.apache.drill.exec.store.parquet.metadata). Factored out several inner classed to improve code readability. 6. Collected all Drill native parquet reader unit tests into one class TestHiveDrillNativeParquetReader, also added new tests to cover new functionality. 7. Reduced excessive logging when parquet files metadata is read closes #1214
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterItemStarReWriterRule.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java77
3 files changed, 57 insertions, 44 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterItemStarReWriterRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterItemStarReWriterRule.java
index 27f8c49c6..b7cdcfc9b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterItemStarReWriterRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillFilterItemStarReWriterRule.java
@@ -32,7 +32,7 @@ import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.planner.types.RelDataTypeDrillImpl;
import org.apache.drill.exec.planner.types.RelDataTypeHolder;
-import org.apache.drill.exec.store.parquet.ParquetGroupScan;
+import org.apache.drill.exec.store.parquet.AbstractParquetGroupScan;
import java.util.ArrayList;
import java.util.Collection;
@@ -73,7 +73,7 @@ public class DrillFilterItemStarReWriterRule {
@Override
public boolean matches(RelOptRuleCall call) {
DrillScanRel scan = call.rel(1);
- return scan.getGroupScan() instanceof ParquetGroupScan && super.matches(call);
+ return scan.getGroupScan() instanceof AbstractParquetGroupScan && super.matches(call);
}
@Override
@@ -129,7 +129,7 @@ public class DrillFilterItemStarReWriterRule {
@Override
public boolean matches(RelOptRuleCall call) {
DrillScanRel scan = call.rel(1);
- return scan.getGroupScan() instanceof ParquetGroupScan && super.matches(call);
+ return scan.getGroupScan() instanceof AbstractParquetGroupScan && super.matches(call);
}
@Override
@@ -149,7 +149,7 @@ public class DrillFilterItemStarReWriterRule {
@Override
public boolean matches(RelOptRuleCall call) {
DrillScanRel scan = call.rel(2);
- return scan.getGroupScan() instanceof ParquetGroupScan && super.matches(call);
+ return scan.getGroupScan() instanceof AbstractParquetGroupScan && super.matches(call);
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
index 3153b9d48..6e44383c0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/ParquetPruneScanRule.java
@@ -19,10 +19,8 @@ package org.apache.drill.exec.planner.logical.partition;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
-import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.TableScan;
import org.apache.drill.exec.ops.OptimizerRulesContext;
-import org.apache.drill.exec.physical.base.FileGroupScan;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.ParquetPartitionDescriptor;
import org.apache.drill.exec.planner.PartitionDescriptor;
@@ -32,11 +30,11 @@ import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.physical.PrelUtil;
-import org.apache.drill.exec.store.parquet.ParquetGroupScan;
+import org.apache.drill.exec.store.parquet.AbstractParquetGroupScan;
public class ParquetPruneScanRule {
- public static final RelOptRule getFilterOnProjectParquet(OptimizerRulesContext optimizerRulesContext) {
+ public static RelOptRule getFilterOnProjectParquet(OptimizerRulesContext optimizerRulesContext) {
return new PruneScanRule(
RelOptHelper.some(DrillFilterRel.class, RelOptHelper.some(DrillProjectRel.class, RelOptHelper.any(DrillScanRel.class))),
"PruneScanRule:Filter_On_Project_Parquet",
@@ -53,9 +51,9 @@ public class ParquetPruneScanRule {
GroupScan groupScan = scan.getGroupScan();
// this rule is applicable only for parquet based partition pruning
if (PrelUtil.getPlannerSettings(scan.getCluster().getPlanner()).isHepPartitionPruningEnabled()) {
- return groupScan instanceof ParquetGroupScan && groupScan.supportsPartitionFilterPushdown() && !scan.partitionFilterPushdown();
+ return groupScan instanceof AbstractParquetGroupScan && groupScan.supportsPartitionFilterPushdown() && !scan.partitionFilterPushdown();
} else {
- return groupScan instanceof ParquetGroupScan && groupScan.supportsPartitionFilterPushdown();
+ return groupScan instanceof AbstractParquetGroupScan && groupScan.supportsPartitionFilterPushdown();
}
}
@@ -69,7 +67,7 @@ public class ParquetPruneScanRule {
};
}
- public static final RelOptRule getFilterOnScanParquet(OptimizerRulesContext optimizerRulesContext) {
+ public static RelOptRule getFilterOnScanParquet(OptimizerRulesContext optimizerRulesContext) {
return new PruneScanRule(
RelOptHelper.some(DrillFilterRel.class, RelOptHelper.any(DrillScanRel.class)),
"PruneScanRule:Filter_On_Scan_Parquet", optimizerRulesContext) {
@@ -85,9 +83,9 @@ public class ParquetPruneScanRule {
GroupScan groupScan = scan.getGroupScan();
// this rule is applicable only for parquet based partition pruning
if (PrelUtil.getPlannerSettings(scan.getCluster().getPlanner()).isHepPartitionPruningEnabled()) {
- return groupScan instanceof ParquetGroupScan && groupScan.supportsPartitionFilterPushdown() && !scan.partitionFilterPushdown();
+ return groupScan instanceof AbstractParquetGroupScan && groupScan.supportsPartitionFilterPushdown() && !scan.partitionFilterPushdown();
} else {
- return groupScan instanceof ParquetGroupScan && groupScan.supportsPartitionFilterPushdown();
+ return groupScan instanceof AbstractParquetGroupScan && groupScan.supportsPartitionFilterPushdown();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index 5f679a4e6..7fa17940b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -24,7 +24,6 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Stopwatch;
-
import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Project;
@@ -147,8 +146,8 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
protected void doOnMatch(RelOptRuleCall call, Filter filterRel, Project projectRel, TableScan scanRel) {
final String pruningClassName = getClass().getName();
- logger.info("Beginning partition pruning, pruning class: {}", pruningClassName);
- Stopwatch totalPruningTime = Stopwatch.createStarted();
+ logger.debug("Beginning partition pruning, pruning class: {}", pruningClassName);
+ Stopwatch totalPruningTime = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
final PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
PartitionDescriptor descriptor = getPartitionDescriptor(settings, scanRel);
@@ -191,30 +190,33 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
}
if (partitionColumnBitSet.isEmpty()) {
- logger.info("No partition columns are projected from the scan..continue. " +
- "Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+ if (totalPruningTime != null) {
+ logger.debug("No partition columns are projected from the scan..continue. Total pruning elapsed time: {} ms",
+ totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+ }
setPruneStatus(metaContext, PruneStatus.NOT_PRUNED);
return;
}
// stop watch to track how long we spend in different phases of pruning
- Stopwatch miscTimer = Stopwatch.createUnstarted();
-
- // track how long we spend building the filter tree
- miscTimer.start();
+ // first track how long we spend building the filter tree
+ Stopwatch miscTimer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
FindPartitionConditions c = new FindPartitionConditions(columnBitset, filterRel.getCluster().getRexBuilder());
c.analyze(condition);
RexNode pruneCondition = c.getFinalCondition();
BitSet referencedDirsBitSet = c.getReferencedDirs();
- logger.info("Total elapsed time to build and analyze filter tree: {} ms",
- miscTimer.elapsed(TimeUnit.MILLISECONDS));
- miscTimer.reset();
+ if (miscTimer != null) {
+ logger.debug("Total elapsed time to build and analyze filter tree: {} ms", miscTimer.elapsed(TimeUnit.MILLISECONDS));
+ miscTimer.reset();
+ }
if (pruneCondition == null) {
- logger.info("No conditions were found eligible for partition pruning." +
- "Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+ if (totalPruningTime != null) {
+ logger.debug("No conditions were found eligible for partition pruning. Total pruning elapsed time: {} ms",
+ totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+ }
setPruneStatus(metaContext, PruneStatus.NOT_PRUNED);
return;
}
@@ -251,15 +253,19 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
container.add(v);
}
- // track how long we spend populating partition column vectors
- miscTimer.start();
+ if (miscTimer != null) {
+ // track how long we spend populating partition column vectors
+ miscTimer.start();
+ }
// populate partition vectors.
descriptor.populatePartitionVectors(vectors, partitions, partitionColumnBitSet, fieldNameMap);
- logger.info("Elapsed time to populate partitioning column vectors: {} ms within batchIndex: {}",
- miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex);
- miscTimer.reset();
+ if (miscTimer != null) {
+ logger.debug("Elapsed time to populate partitioning column vectors: {} ms within batchIndex: {}",
+ miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex);
+ miscTimer.reset();
+ }
// materialize the expression; only need to do this once
if (batchIndex == 0) {
@@ -267,8 +273,9 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
if (materializedExpr == null) {
// continue without partition pruning; no need to log anything here since
// materializePruneExpr logs it already
- logger.info("Total pruning elapsed time: {} ms",
- totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+ if (totalPruningTime != null) {
+ logger.debug("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+ }
setPruneStatus(metaContext, PruneStatus.NOT_PRUNED);
return;
}
@@ -276,14 +283,18 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
output.allocateNew(partitions.size());
- // start the timer to evaluate how long we spend in the interpreter evaluation
- miscTimer.start();
+ if (miscTimer != null) {
+ // start the timer to evaluate how long we spend in the interpreter evaluation
+ miscTimer.start();
+ }
InterpreterEvaluator.evaluate(partitions.size(), optimizerContext, container, output, materializedExpr);
- logger.info("Elapsed time in interpreter evaluation: {} ms within batchIndex: {} with # of partitions : {}",
- miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex, partitions.size());
- miscTimer.reset();
+ if (miscTimer != null) {
+ logger.debug("Elapsed time in interpreter evaluation: {} ms within batchIndex: {} with # of partitions : {}",
+ miscTimer.elapsed(TimeUnit.MILLISECONDS), batchIndex, partitions.size());
+ miscTimer.reset();
+ }
int recordCount = 0;
int qualifiedCount = 0;
@@ -338,7 +349,9 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
batchIndex++;
} catch (Exception e) {
logger.warn("Exception while trying to prune partition.", e);
- logger.info("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+ if (totalPruningTime != null) {
+ logger.debug("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+ }
setPruneStatus(metaContext, PruneStatus.NOT_PRUNED);
return; // continue without partition pruning
@@ -352,7 +365,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
try {
if (newPartitions.size() == numTotal) {
- logger.info("No partitions were eligible for pruning");
+ logger.debug("No partitions were eligible for pruning");
return;
}
@@ -371,7 +384,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
// directories first and the non-composite partition location will still return
// directories, not files. So, additional processing is done depending on this flag
wasAllPartitionsPruned = true;
- logger.info("All {} partitions were pruned; added back a single partition to allow creating a schema", numTotal);
+ logger.debug("All {} partitions were pruned; added back a single partition to allow creating a schema", numTotal);
// set the cacheFileRoot appropriately
if (firstLocation.isCompositePartition()) {
@@ -379,7 +392,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
}
}
- logger.info("Pruned {} partitions down to {}", numTotal, newPartitions.size());
+ logger.debug("Pruned {} partitions down to {}", numTotal, newPartitions.size());
List<RexNode> conjuncts = RelOptUtil.conjunctions(condition);
List<RexNode> pruneConjuncts = RelOptUtil.conjunctions(pruneCondition);
@@ -439,7 +452,9 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
} catch (Exception e) {
logger.warn("Exception while using the pruned partitions.", e);
} finally {
- logger.info("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+ if (totalPruningTime != null) {
+ logger.debug("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+ }
}
}