aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBohdan Kazydub <bohdan.kazydub@gmail.com>2019-02-13 01:14:14 +0200
committerSorabh Hamirwasia <sorabh@apache.org>2019-03-14 15:36:10 -0700
commita99db5fe53084a18a83e589ae529ffe5edbcc0a8 (patch)
treea3c48a0311f248a550cc6c2377936d384e03322e
parente2619f6e09da53730fb3281fe9fad663f564a2c2 (diff)
DRILL-7038: Queries on partitioned columns scan the entire datasets
- Added new optimizer rule which checks if query references directory columns only and has DISTINCT or GROUP BY operation. If the condition holds, instead of scanning full file set the following will be performed: 1) if there is cache metadata file, these directories will be read from it, 2) otherwise directories will be gathered from selection object (PartitionLocation). In the end Scan node will be transformed to DrillValuesRel (containing constant literals) with gathered values so no scan will be performed. closes #1640
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java246
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java1
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java25
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/TestDirScanToValuesConversion.java73
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java20
7 files changed, 354 insertions, 28 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index b6b0f58a0..5b032d061 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -406,7 +406,8 @@ public enum PlannerPhase {
ParquetPruneScanRule.getFilterOnScanParquet(optimizerRulesContext),
// Include LIMIT_ON_PROJECT since LIMIT_ON_SCAN may not work without it
DrillPushLimitToScanRule.LIMIT_ON_PROJECT,
- DrillPushLimitToScanRule.LIMIT_ON_SCAN
+ DrillPushLimitToScanRule.LIMIT_ON_SCAN,
+ PruneScanRule.getConvertAggScanToValuesRule(optimizerRulesContext)
)
.build();
@@ -472,7 +473,8 @@ public enum PlannerPhase {
.addAll(getItemStarRules())
.add(
PruneScanRule.getDirFilterOnProject(optimizerRulesContext),
- PruneScanRule.getDirFilterOnScan(optimizerRulesContext)
+ PruneScanRule.getDirFilterOnScan(optimizerRulesContext),
+ PruneScanRule.getConvertAggScanToValuesRule(optimizerRulesContext)
)
.build();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index 8d3a8cec9..901592f6a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -17,19 +17,31 @@
*/
package org.apache.drill.exec.planner.logical.partition;
+import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.TableScan;
+import org.apache.calcite.rel.logical.LogicalValues;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
+import org.apache.calcite.rel.type.RelRecordType;
import org.apache.calcite.rex.RexUtil;
+import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.BitSets;
import org.apache.drill.common.expression.ErrorCollectorImpl;
import org.apache.drill.common.expression.LogicalExpression;
@@ -50,15 +62,20 @@ import org.apache.drill.exec.planner.PartitionDescriptor;
import org.apache.drill.exec.planner.PartitionLocation;
import org.apache.drill.exec.planner.logical.DrillOptiq;
import org.apache.drill.exec.planner.logical.DrillParseContext;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.logical.DrillRelFactories;
import org.apache.drill.exec.planner.logical.DrillScanRel;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.logical.DrillTranslatableTable;
+import org.apache.drill.exec.planner.logical.DrillValuesRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.store.ColumnExplorer;
import org.apache.drill.exec.store.StoragePluginOptimizerRule;
+import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.FormatSelection;
import org.apache.drill.exec.store.dfs.MetadataContext;
import org.apache.drill.exec.store.dfs.MetadataContext.PruneStatus;
@@ -71,9 +88,6 @@ import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rex.RexNode;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
-
import org.apache.drill.exec.vector.ValueVector;
import org.apache.hadoop.fs.Path;
@@ -144,6 +158,10 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
return new DirPruneScanFilterOnScanRule(optimizerRulesContext);
}
+ public static RelOptRule getConvertAggScanToValuesRule(OptimizerRulesContext optimizerRulesContext) {
+ return new ConvertAggScanToValuesRule(optimizerRulesContext);
+ }
+
protected void doOnMatch(RelOptRuleCall call, Filter filterRel, Project projectRel, TableScan scanRel) {
final String pruningClassName = getClass().getName();
@@ -171,11 +189,11 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
RewriteAsBinaryOperators visitor = new RewriteAsBinaryOperators(true, filterRel.getCluster().getRexBuilder());
condition = condition.accept(visitor);
- Map<Integer, String> fieldNameMap = Maps.newHashMap();
+ Map<Integer, String> fieldNameMap = new HashMap<>();
List<String> fieldNames = scanRel.getRowType().getFieldNames();
BitSet columnBitset = new BitSet();
BitSet partitionColumnBitSet = new BitSet();
- Map<Integer, Integer> partitionMap = Maps.newHashMap();
+ Map<Integer, Integer> partitionMap = new HashMap<>();
int relColIndex = 0;
for (String field : fieldNames) {
@@ -223,7 +241,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
}
// set up the partitions
- List<PartitionLocation> newPartitions = Lists.newArrayList();
+ List<PartitionLocation> newPartitions = new ArrayList<>();
long numTotal = 0; // total number of partitions
int batchIndex = 0;
PartitionLocation firstLocation = null;
@@ -553,4 +571,220 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
}
}
+ /**
+ * A rule which converts {@link Aggregate} on {@link TableScan} with directory columns
+ * (see {@link org.apache.drill.exec.ExecConstants#FILESYSTEM_PARTITION_COLUMN_LABEL}) only
+ * into {@link DrillValuesRel} to avoid scanning at all.
+ *
+ * Resulting {@link DrillValuesRel} will be populated with constant literals obtained from:
+ * <ol>
+ * <li>metadata directory file if it exists</li>
+ * <li>or from file selection</li>
+ * </ol>
+ */
+ private static class ConvertAggScanToValuesRule extends PruneScanRule {
+
+ private final Pattern dirPattern;
+
+ private ConvertAggScanToValuesRule(OptimizerRulesContext optimizerRulesContext) {
+ super(RelOptHelper.some(Aggregate.class, DrillRel.DRILL_LOGICAL, RelOptHelper.any(TableScan.class)),
+ "PartitionColumnScanPruningRule:Prune_On_Scan", optimizerRulesContext);
+ String partitionColumnLabel = optimizerRulesContext.getPlannerSettings().getFsPartitionColumnLabel();
+ dirPattern = Pattern.compile(partitionColumnLabel + "\\d+");
+ }
+
+ @Override
+ public PartitionDescriptor getPartitionDescriptor(PlannerSettings settings, TableScan scanRel) {
+ return new FileSystemPartitionDescriptor(settings, scanRel);
+ }
+
+ // Checks if query references directory columns only and has DISTINCT or GROUP BY operation
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ Aggregate aggregate = call.rel(0);
+ TableScan scan = call.rel(1);
+
+ if (!isQualifiedFilePruning(scan)
+ || scan.getRowType().getFieldCount() != aggregate.getRowType().getFieldCount()) {
+ return false;
+ }
+
+ List<String> fieldNames = scan.getRowType().getFieldNames();
+ // Check if select contains partition columns (dir0, dir1, dir2,..., dirN) only
+ for (String field : fieldNames) {
+ if (!dirPattern.matcher(field).matches()) {
+ return false;
+ }
+ }
+
+ return scan.isDistinct() || aggregate.getGroupCount() > 0;
+ }
+
+ /*
+ Transforms Scan node to DrillValuesRel node to avoid unnecessary scanning of selected files.
+ If cache metadata directory file exists, directory columns will be read from it,
+ otherwise directories will be gathered from selection (PartitionLocations).
+ DrillValuesRel will contain gathered constant literals.
+
+ For example, plan for "select dir0, dir1 from `t` group by 1, 2", where table `t` has directory structure year/quarter
+
+ 00-00 Screen
+ 00-01 Project(dir0=[$0], dir1=[$1])
+ 00-02 HashAgg(group=[{0, 1}])
+ 00-03 Scan(table=[[t]], groupscan=[ParquetGroupScan [entries=[ReadEntryWithPath [path=file:/path/t/1996/Q4/orders_96_q4.parquet],
+ ReadEntryWithPath [path=file:/path/t/1996/Q1/file_96_q1.parquet], ReadEntryWithPath [path=file:/path/t/1996/Q3/file_96_q3.parquet],
+ ReadEntryWithPath [path=file:/path/t/1996/Q2/file_96_q2.parquet], ReadEntryWithPath [path=file:/path/t/1994/Q4/file_94_q4.parquet],
+ ReadEntryWithPath [path=file:/path/t/1994/Q1/file_94_q1.parquet], ReadEntryWithPath [path=file:/path/t/1994/Q3/file_94_q3.parquet],
+ ReadEntryWithPath [path=file:/path/t/1994/Q2/file_94_q2.parquet], ReadEntryWithPath [path=file:/path/t/1995/Q4/file_95_q4.parquet],
+ ReadEntryWithPath [path=file:/path/t/1995/Q1/file_95_q1.parquet], ReadEntryWithPath [path=file:/path/t/1995/Q3/file_95_q3.parquet],
+ ReadEntryWithPath [path=file:/path/t/1995/Q2/file_95_q2.parquet]], selectionRoot=file:/path/t, ..., columns=[`dir0`, `dir1`]]])
+
+ will be changed to
+
+ 00-00 Screen
+ 00-01 Project(dir0=[$0], dir1=[$1])
+ 00-02 HashAgg(group=[{0, 1}])
+ 00-03 Values(tuples=[[{ '1995', 'Q1' }, { '1994', 'Q4' }, { '1996', 'Q3' }, { '1996', 'Q2' }, { '1994', 'Q2' },
+ { '1995', 'Q4' }, { '1996', 'Q1' }, { '1995', 'Q3' }, { '1996', 'Q4' }, { '1994', 'Q3' }, { '1994', 'Q1' }, { '1995', 'Q2' }]])
+ */
+ @Override
+ public void onMatch(RelOptRuleCall call) {
+ TableScan scan = call.rel(1);
+
+ String pruningClassName = getClass().getName();
+ logger.debug("Beginning file partition pruning, pruning class: {}", pruningClassName);
+ Stopwatch totalPruningTime = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
+
+ Object selection = getDrillTable(scan).getSelection();
+ MetadataContext metaContext = null;
+ FileSelection fileSelection = null;
+ if (selection instanceof FormatSelection) {
+ fileSelection = ((FormatSelection) selection).getSelection();
+ metaContext = fileSelection.getMetaContext();
+ }
+
+ PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+ PartitionDescriptor descriptor = getPartitionDescriptor(settings, scan);
+
+ List<String> fieldNames = scan.getRowType().getFieldNames();
+ List<String> values = Collections.emptyList();
+ List<Integer> indexes = new ArrayList<>(fieldNames.size());
+ for (String field : fieldNames) {
+ int index = descriptor.getPartitionHierarchyIndex(field);
+ indexes.add(index);
+ }
+
+ if (metaContext != null && metaContext.getDirectories() != null) {
+ // Dir metadata cache file exists
+ logger.debug("Using Metadata Directories cache");
+ values = getValues(fileSelection.getSelectionRoot(), metaContext.getDirectories(), indexes);
+ }
+
+ if (values.isEmpty()) {
+ logger.debug("Not using Metadata Directories cache");
+ int batchIndex = 0;
+ // Outer loop: iterate over a list of batches of PartitionLocations
+ values = new ArrayList<>();
+ for (List<PartitionLocation> partitions : descriptor) {
+ logger.debug("Evaluating file partition pruning for batch {}", batchIndex);
+
+ try {
+ values.addAll(getValues(partitions, indexes));
+ } catch (Exception e) {
+ logger.warn("Exception while trying to prune files.", e);
+ if (totalPruningTime != null) {
+ logger.debug("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+ }
+
+ // continue without partition pruning
+ return;
+ }
+ batchIndex++;
+ }
+
+ if (values.isEmpty()) {
+ // No changes are required
+ return;
+ }
+ }
+
+ try {
+ // Transform Scan node to DrillValuesRel node
+ List<RelDataTypeField> typeFields = new ArrayList<>(fieldNames.size());
+ RelDataTypeFactory typeFactory = scan.getCluster().getTypeFactory();
+
+ int i = 0;
+ for (String field : fieldNames) {
+ RelDataType dataType = typeFactory.createTypeWithNullability(
+ typeFactory.createSqlType(SqlTypeName.VARCHAR, Types.MAX_VARCHAR_LENGTH), true);
+ typeFields.add(new RelDataTypeFieldImpl(field, i++, dataType));
+ }
+ RelRecordType t = new RelRecordType(scan.getRowType().getStructKind(), typeFields);
+ RelNode newInput = DrillRelFactories.LOGICAL_BUILDER.create(scan.getCluster(), null)
+ .values(t, values.toArray())
+ .build();
+
+ RelTraitSet traits = newInput.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
+ newInput = new DrillValuesRel(
+ newInput.getCluster(),
+ newInput.getRowType(),
+ ((LogicalValues) newInput).getTuples(), traits
+ );
+
+ Aggregate aggregate = call.rel(0);
+ Aggregate newAggregate = aggregate.copy(
+ aggregate.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+ newInput,
+ aggregate.indicator,
+ aggregate.getGroupSet(),
+ aggregate.getGroupSets(),
+ aggregate.getAggCallList()
+ );
+ call.transformTo(newAggregate);
+ } catch (Exception e) {
+ logger.warn("Exception while using the pruned partitions.", e);
+ } finally {
+ if (totalPruningTime != null) {
+ logger.debug("Total pruning elapsed time: {} ms", totalPruningTime.elapsed(TimeUnit.MILLISECONDS));
+ }
+ }
+ }
+
+ private List<String> getValues(Path selectionRoot, List<Path> directories, List<Integer> indexes) {
+ List<String> values = new ArrayList<>();
+ for (Path dir : directories) {
+ List<String> parts = ColumnExplorer.listPartitionValues(dir, selectionRoot, true);
+ for (int index : indexes) {
+ if (index < parts.size()) {
+ values.add(parts.get(index));
+ } else {
+ // No partition value for given index - set null value
+ values.add(null);
+ }
+ }
+ }
+
+ return values;
+ }
+
+ private List<String> getValues(List<PartitionLocation> partitions, List<Integer> indexes) {
+ List<String> values = new ArrayList<>(partitions.size() * indexes.size());
+ partitions.forEach(partition -> indexes.forEach(
+ index -> values.add(partition.getPartitionValue(index)))
+ );
+ return values;
+ }
+
+ private static boolean isQualifiedFilePruning(final TableScan scan) {
+ if (scan instanceof EnumerableTableScan) {
+ Object selection = getDrillTable(scan).getSelection();
+ return selection instanceof FormatSelection;
+ } else if (scan instanceof DrillScanRel) {
+ GroupScan groupScan = ((DrillScanRel) scan).getGroupScan();
+ // this rule is applicable only for dfs based partition pruning in Drill Logical
+ return groupScan instanceof FileGroupScan;
+ }
+ return false;
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java
index 073847812..5d3c18e92 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.dfs;
+import java.util.List;
import java.util.Map;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
@@ -39,6 +40,8 @@ public class MetadataContext {
private boolean metadataCacheCorrupted;
+ private List<Path> directories;
+
public void setStatus(Path dir) {
dirModifCheckMap.put(dir, true);
}
@@ -83,6 +86,14 @@ public class MetadataContext {
this.metadataCacheCorrupted = metadataCacheCorrupted;
}
+ public void setDirectories(List<Path> directories) {
+ this.directories = directories;
+ }
+
+ public List<Path> getDirectories() {
+ return directories;
+ }
+
public enum PruneStatus {
NOT_STARTED, // initial state
PRUNED, // partitions were pruned
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index df84a9d1c..17a950644 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -276,6 +276,7 @@ public class ParquetFormatPlugin implements FormatPlugin {
ParquetReaderConfig readerConfig = ParquetReaderConfig.builder().withFormatConfig(formatConfig).build();
ParquetTableMetadataDirs mDirs = Metadata.readMetadataDirs(fs, dirMetaPath, metaContext, readerConfig);
if (mDirs != null && mDirs.getDirectories().size() > 0) {
+ metaContext.setDirectories(mDirs.getDirectories());
FileSelection dirSelection = FileSelection.createFromDirectories(mDirs.getDirectories(), selection,
selection.getSelectionRoot() /* cacheFileRoot initially points to selectionRoot */);
dirSelection.setExpandedPartial();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
index 247d7842e..da257c02c 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
@@ -20,15 +20,20 @@ package org.apache.drill;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
import java.util.List;
import java.util.Stack;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.commons.io.FileUtils;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.record.RecordBatchLoader;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
+import org.apache.drill.exec.store.parquet.metadata.Metadata;
import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.calcite.sql.SqlExplain.Depth;
@@ -317,6 +322,26 @@ public class PlanTestBase extends BaseTestQuery {
testPhysical(plan);
}
+ /**
+ * Helper method for checking the metadata file existence
+ *
+ * @param table table name or table path
+ */
+ public static void checkForMetadataFile(String table) {
+ final String tmpDir;
+
+ try {
+ tmpDir = dirTestWatcher.getRootDir().getCanonicalPath();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ File metaFile = table.startsWith(tmpDir) ? FileUtils.getFile(table, Metadata.METADATA_FILENAME)
+ : FileUtils.getFile(tmpDir, table, Metadata.METADATA_FILENAME);
+ assertTrue(String.format("There is no metadata cache file for the %s table", table),
+ Files.exists(metaFile.toPath()));
+ }
+
/*
* This will get the plan (either logical or physical) in Optiq RelNode
* format, based on SqlExplainLevel and Depth.
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestDirScanToValuesConversion.java b/exec/java-exec/src/test/java/org/apache/drill/TestDirScanToValuesConversion.java
new file mode 100644
index 000000000..1aeeb5ffb
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestDirScanToValuesConversion.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill;
+
+import org.apache.drill.categories.PlannerTest;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.nio.file.Paths;
+
+@Category({PlannerTest.class})
+public class TestDirScanToValuesConversion extends PlanTestBase {
+
+ private static final String TABLE_WITH_METADATA = "parquetTable1";
+
+ @BeforeClass
+ public static void setUp() {
+ dirTestWatcher.copyResourceToRoot(Paths.get("multilevel"));
+ dirTestWatcher.copyResourceToRoot(Paths.get("multilevel/parquet"), Paths.get(TABLE_WITH_METADATA));
+ }
+
+ @Test
+ public void testDirScanToValuesConversion() throws Exception {
+ String[] tableNames = {"multilevel/parquet", "multilevel/json", "multilevel/csv"};
+ String[] queries = {
+ "select distinct dir0, dir1 from dfs.`%s`",
+ "select dir0, dir1 from dfs.`%s` group by 1, 2"
+ };
+ for (String tableName : tableNames) {
+ for (String query : queries) {
+ testPlanMatchingPatterns(String.format(query, tableName), new String[]{"Values\\(tuples="}, new String[]{"Scan\\(table="});
+ }
+ }
+ }
+
+ @Test
+ public void testDirScanToValuesConversionWithMetadataCache() throws Exception {
+ test("refresh table metadata dfs.`%s`", TABLE_WITH_METADATA);
+ checkForMetadataFile(TABLE_WITH_METADATA);
+ String query = String.format("select distinct dir0, dir1 from dfs.`%s`", TABLE_WITH_METADATA);
+ PlanTestBase.testPlanMatchingPatterns(query, new String[]{"Values\\(tuples="}, null);
+ }
+
+ @Test
+ public void testDirScanToValuesConversionIsNotApplied() throws Exception {
+ String[] tableNames = {"multilevel/parquet", "multilevel/json", "multilevel/csv"};
+ String[] queries = {
+ "select dir0, dir1 from dfs.`%s`", // no aggregation
+ "select dir0, dir1, o_custkey from dfs.`%s` group by 1, 2, 3" // not only partition columns present
+ };
+ for (String tableName : tableNames) {
+ for (String query : queries) {
+ testPlanMatchingPatterns(String.format(query, tableName), new String[]{"Scan\\(table="}, new String[]{"Values\\(tuples="});
+ }
+ }
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
index 3500f6f30..38b39446d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetMetadataCache.java
@@ -956,24 +956,4 @@ public class TestParquetMetadataCache extends PlanTestBase {
int actualRowCount = testSql(query);
assertEquals(expectedRowCount, actualRowCount);
}
-
- /**
- * Helper method for checking the metadata file existence
- *
- * @param table table name or table path
- */
- private void checkForMetadataFile(String table) {
- final String tmpDir;
-
- try {
- tmpDir = dirTestWatcher.getRootDir().getCanonicalPath();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- File metaFile = table.startsWith(tmpDir) ? FileUtils.getFile(table, Metadata.METADATA_FILENAME)
- : FileUtils.getFile(tmpDir, table, Metadata.METADATA_FILENAME);
- assertTrue(String.format("There is no metadata cache file for the %s table", table),
- Files.exists(metaFile.toPath()));
- }
}