aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java102
1 files changed, 46 insertions, 56 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
index 65268923d..188049003 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
@@ -26,6 +26,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.common.util.GuavaUtils;
import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
@@ -37,7 +39,6 @@ import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
import org.apache.calcite.prepare.RelOptTableImpl;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.util.BitSets;
-import org.apache.calcite.util.Pair;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
@@ -54,6 +55,7 @@ import org.apache.drill.exec.store.dfs.FormatSelection;
import org.apache.drill.exec.store.dfs.MetadataContext;
import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.fs.Path;
// partition descriptor for file system based tables
@@ -145,18 +147,18 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
}
@Override
- public String getBaseTableLocation() {
+ public Path getBaseTableLocation() {
final FormatSelection origSelection = (FormatSelection) table.getSelection();
- return origSelection.getSelection().selectionRoot;
+ return origSelection.getSelection().getSelectionRoot();
}
@Override
protected void createPartitionSublists() {
- final Pair<Collection<String>, Boolean> fileLocationsAndStatus = getFileLocationsAndStatus();
+ final Pair<Collection<Path>, Boolean> fileLocationsAndStatus = getFileLocationsAndStatus();
List<PartitionLocation> locations = new LinkedList<>();
- boolean hasDirsOnly = fileLocationsAndStatus.right;
+ boolean hasDirsOnly = fileLocationsAndStatus.getRight();
- final String selectionRoot = getBaseTableLocation();
+ final Path selectionRoot = getBaseTableLocation();
// map used to map the partition keys (dir0, dir1, ..), to the list of partitions that share the same partition keys.
// For example,
@@ -166,35 +168,31 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
// Figure out the list of leaf subdirectories. For each leaf subdirectory, find the list of files (DFSFilePartitionLocation)
// it contains.
- for (String file: fileLocationsAndStatus.left) {
- DFSFilePartitionLocation dfsFilePartitionLocation = new DFSFilePartitionLocation(MAX_NESTED_SUBDIRS, selectionRoot, file, hasDirsOnly);
+ for (Path file: fileLocationsAndStatus.getLeft()) {
+ DFSFilePartitionLocation dfsFilePartitionLocation = new DFSFilePartitionLocation(MAX_NESTED_SUBDIRS,
+ selectionRoot, file, hasDirsOnly);
+ List<String> dirList = Arrays.asList(dfsFilePartitionLocation.getDirs());
- final String[] dirs = dfsFilePartitionLocation.getDirs();
- final List<String> dirList = Arrays.asList(dirs);
-
- if (!dirToFileMap.containsKey(dirList)) {
- dirToFileMap.put(dirList, new ArrayList<PartitionLocation>());
- }
+ dirToFileMap.putIfAbsent(dirList, new ArrayList<>());
dirToFileMap.get(dirList).add(dfsFilePartitionLocation);
}
// build a list of DFSDirPartitionLocation.
- for (final List<String> dirs : dirToFileMap.keySet()) {
- locations.add( new DFSDirPartitionLocation(dirs.toArray(new String[dirs.size()]), dirToFileMap.get(dirs)));
- }
+ dirToFileMap.keySet().stream()
+ .map(dirs -> new DFSDirPartitionLocation(dirs.toArray(new String[dirs.size()]), dirToFileMap.get(dirs)))
+ .forEach(locations::add);
locationSuperList = Lists.partition(locations, PartitionDescriptor.PARTITION_BATCH_SIZE);
sublistsCreated = true;
}
- protected Pair<Collection<String>, Boolean> getFileLocationsAndStatus() {
- Collection<String> fileLocations = null;
- Pair<Collection<String>, Boolean> fileLocationsAndStatus = null;
+ protected Pair<Collection<Path>, Boolean> getFileLocationsAndStatus() {
+ Collection<Path> fileLocations = null;
boolean isExpandedPartial = false;
if (scanRel instanceof DrillScanRel) {
// If a particular GroupScan provides files, get the list of files from there rather than
// DrillTable because GroupScan would have the updated version of the selection
- final DrillScanRel drillScan = (DrillScanRel) scanRel;
+ DrillScanRel drillScan = (DrillScanRel) scanRel;
if (drillScan.getGroupScan().hasFiles()) {
fileLocations = drillScan.getGroupScan().getFiles();
isExpandedPartial = false;
@@ -208,67 +206,59 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
fileLocations = selection.getFiles();
isExpandedPartial = selection.isExpandedPartial();
}
- fileLocationsAndStatus = Pair.of(fileLocations, isExpandedPartial);
- return fileLocationsAndStatus;
+ return Pair.of(fileLocations, isExpandedPartial);
}
@Override
- public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, String cacheFileRoot,
+ public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, Path cacheFileRoot,
boolean wasAllPartitionsPruned, MetadataContext metaContext) throws Exception {
- List<String> newFiles = Lists.newArrayList();
- for (final PartitionLocation location : newPartitionLocation) {
+ List<Path> newFiles = new ArrayList<>();
+ for (PartitionLocation location : newPartitionLocation) {
if (!location.isCompositePartition()) {
newFiles.add(location.getEntirePartitionLocation());
} else {
final Collection<SimplePartitionLocation> subPartitions = location.getPartitionLocationRecursive();
- for (final PartitionLocation subPart : subPartitions) {
+ for (PartitionLocation subPart : subPartitions) {
newFiles.add(subPart.getEntirePartitionLocation());
}
}
}
+ FormatSelection formatSelection = (FormatSelection) table.getSelection();
+ FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation(),
+ cacheFileRoot, wasAllPartitionsPruned, formatSelection.getSelection().getDirStatus());
+ newFileSelection.setMetaContext(metaContext);
+ RelOptTable relOptTable = scanRel.getTable();
+
if (scanRel instanceof DrillScanRel) {
- final FormatSelection formatSelection = (FormatSelection)table.getSelection();
- final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation(),
- cacheFileRoot, wasAllPartitionsPruned, formatSelection.getSelection().getDirStatus());
- newFileSelection.setMetaContext(metaContext);
- final FileGroupScan newGroupScan =
- ((FileGroupScan)((DrillScanRel)scanRel).getGroupScan()).clone(newFileSelection);
+ FileGroupScan newGroupScan =
+ ((FileGroupScan) ((DrillScanRel) scanRel).getGroupScan()).clone(newFileSelection);
return new DrillScanRel(scanRel.getCluster(),
scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
- scanRel.getTable(),
+ relOptTable,
newGroupScan,
scanRel.getRowType(),
((DrillScanRel) scanRel).getColumns(),
true /*filter pushdown*/);
} else if (scanRel instanceof EnumerableTableScan) {
- return createNewTableScanFromSelection((EnumerableTableScan)scanRel, newFiles, cacheFileRoot,
- wasAllPartitionsPruned, metaContext);
+ FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection);
+
+ DynamicDrillTable dynamicDrillTable = new DynamicDrillTable(table.getPlugin(), table.getStorageEngineName(),
+ table.getUserName(), newFormatSelection);
+ /* Copy statistics from the original relOptTable */
+ dynamicDrillTable.setStatsTable(table.getStatsTable());
+ DrillTranslatableTable newTable = new DrillTranslatableTable(dynamicDrillTable);
+
+ RelOptTableImpl newOptTableImpl = RelOptTableImpl.create(relOptTable.getRelOptSchema(), relOptTable.getRowType(),
+ newTable, GuavaUtils.convertToUnshadedImmutableList(ImmutableList.of()));
+
+ // return an EnumerableTableScan with fileSelection being part of digest of TableScan node.
+ return DirPrunedEnumerableTableScan.create(scanRel.getCluster(), newOptTableImpl, newFileSelection.toString());
} else {
throw new UnsupportedOperationException("Only DrillScanRel and EnumerableTableScan is allowed!");
}
}
- private TableScan createNewTableScanFromSelection(EnumerableTableScan oldScan, List<String> newFiles, String cacheFileRoot,
- boolean wasAllPartitionsPruned, MetadataContext metaContext) {
- final RelOptTableImpl t = (RelOptTableImpl) oldScan.getTable();
- final FormatSelection formatSelection = (FormatSelection) table.getSelection();
- final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation(),
- cacheFileRoot, wasAllPartitionsPruned, formatSelection.getSelection().getDirStatus());
- newFileSelection.setMetaContext(metaContext);
- final FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection);
- final DynamicDrillTable dynamicDrillTable = new DynamicDrillTable(table.getPlugin(), table.getStorageEngineName(),
- table.getUserName(), newFormatSelection);
- /* Copy statistics from the original table */
- dynamicDrillTable.setStatsTable(table.getStatsTable());
- final DrillTranslatableTable newTable = new DrillTranslatableTable(dynamicDrillTable);
- final RelOptTableImpl newOptTableImpl = RelOptTableImpl.create(t.getRelOptSchema(), t.getRowType(), newTable,
- GuavaUtils.convertToUnshadedImmutableList(ImmutableList.of()));
-
- // return an EnumerableTableScan with fileSelection being part of digest of TableScan node.
- return DirPrunedEnumerableTableScan.create(oldScan.getCluster(), newOptTableImpl, newFileSelection.toString());
- }
-
@Override
public TableScan createTableScan(List<PartitionLocation> newPartitionLocation,
boolean wasAllPartitionsPruned) throws Exception {