diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java | 102 |
1 files changed, 46 insertions, 56 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java index 65268923d..188049003 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java @@ -26,6 +26,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import org.apache.calcite.plan.RelOptTable; +import org.apache.commons.lang3.tuple.Pair; import org.apache.drill.common.util.GuavaUtils; import org.apache.drill.shaded.guava.com.google.common.base.Charsets; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; @@ -37,7 +39,6 @@ import org.apache.calcite.adapter.enumerable.EnumerableTableScan; import org.apache.calcite.prepare.RelOptTableImpl; import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.util.BitSets; -import org.apache.calcite.util.Pair; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; @@ -54,6 +55,7 @@ import org.apache.drill.exec.store.dfs.FormatSelection; import org.apache.drill.exec.store.dfs.MetadataContext; import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.ValueVector; +import org.apache.hadoop.fs.Path; // partition descriptor for file system based tables @@ -145,18 +147,18 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor { } @Override - public String getBaseTableLocation() { + public Path getBaseTableLocation() { final FormatSelection origSelection = (FormatSelection) table.getSelection(); - return origSelection.getSelection().selectionRoot; + return origSelection.getSelection().getSelectionRoot(); } @Override protected void createPartitionSublists() { - final Pair<Collection<String>, Boolean> fileLocationsAndStatus = getFileLocationsAndStatus(); + final Pair<Collection<Path>, Boolean> fileLocationsAndStatus = getFileLocationsAndStatus(); List<PartitionLocation> locations = new LinkedList<>(); - boolean hasDirsOnly = fileLocationsAndStatus.right; + boolean hasDirsOnly = fileLocationsAndStatus.getRight(); - final String selectionRoot = getBaseTableLocation(); + final Path selectionRoot = getBaseTableLocation(); // map used to map the partition keys (dir0, dir1, ..), to the list of partitions that share the same partition keys. // For example, @@ -166,35 +168,31 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor { // Figure out the list of leaf subdirectories. For each leaf subdirectory, find the list of files (DFSFilePartitionLocation) // it contains. - for (String file: fileLocationsAndStatus.left) { - DFSFilePartitionLocation dfsFilePartitionLocation = new DFSFilePartitionLocation(MAX_NESTED_SUBDIRS, selectionRoot, file, hasDirsOnly); + for (Path file: fileLocationsAndStatus.getLeft()) { + DFSFilePartitionLocation dfsFilePartitionLocation = new DFSFilePartitionLocation(MAX_NESTED_SUBDIRS, + selectionRoot, file, hasDirsOnly); + List<String> dirList = Arrays.asList(dfsFilePartitionLocation.getDirs()); - final String[] dirs = dfsFilePartitionLocation.getDirs(); - final List<String> dirList = Arrays.asList(dirs); - - if (!dirToFileMap.containsKey(dirList)) { - dirToFileMap.put(dirList, new ArrayList<PartitionLocation>()); - } + dirToFileMap.putIfAbsent(dirList, new ArrayList<>()); dirToFileMap.get(dirList).add(dfsFilePartitionLocation); } // build a list of DFSDirPartitionLocation. - for (final List<String> dirs : dirToFileMap.keySet()) { - locations.add( new DFSDirPartitionLocation(dirs.toArray(new String[dirs.size()]), dirToFileMap.get(dirs))); - } + dirToFileMap.keySet().stream() + .map(dirs -> new DFSDirPartitionLocation(dirs.toArray(new String[dirs.size()]), dirToFileMap.get(dirs))) + .forEach(locations::add); locationSuperList = Lists.partition(locations, PartitionDescriptor.PARTITION_BATCH_SIZE); sublistsCreated = true; } - protected Pair<Collection<String>, Boolean> getFileLocationsAndStatus() { - Collection<String> fileLocations = null; - Pair<Collection<String>, Boolean> fileLocationsAndStatus = null; + protected Pair<Collection<Path>, Boolean> getFileLocationsAndStatus() { + Collection<Path> fileLocations = null; boolean isExpandedPartial = false; if (scanRel instanceof DrillScanRel) { // If a particular GroupScan provides files, get the list of files from there rather than // DrillTable because GroupScan would have the updated version of the selection - final DrillScanRel drillScan = (DrillScanRel) scanRel; + DrillScanRel drillScan = (DrillScanRel) scanRel; if (drillScan.getGroupScan().hasFiles()) { fileLocations = drillScan.getGroupScan().getFiles(); isExpandedPartial = false; @@ -208,67 +206,59 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor { fileLocations = selection.getFiles(); isExpandedPartial = selection.isExpandedPartial(); } - fileLocationsAndStatus = Pair.of(fileLocations, isExpandedPartial); - return fileLocationsAndStatus; + return Pair.of(fileLocations, isExpandedPartial); } @Override - public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, String cacheFileRoot, + public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, Path cacheFileRoot, boolean wasAllPartitionsPruned, MetadataContext metaContext) throws Exception { - List<String> newFiles = Lists.newArrayList(); - for (final PartitionLocation location : newPartitionLocation) { + List<Path> newFiles = new ArrayList<>(); + for (PartitionLocation location : newPartitionLocation) { if (!location.isCompositePartition()) { newFiles.add(location.getEntirePartitionLocation()); } else { final Collection<SimplePartitionLocation> subPartitions = location.getPartitionLocationRecursive(); - for (final PartitionLocation subPart : subPartitions) { + for (PartitionLocation subPart : subPartitions) { newFiles.add(subPart.getEntirePartitionLocation()); } } } + FormatSelection formatSelection = (FormatSelection) table.getSelection(); + FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation(), + cacheFileRoot, wasAllPartitionsPruned, formatSelection.getSelection().getDirStatus()); + newFileSelection.setMetaContext(metaContext); + RelOptTable relOptTable = scanRel.getTable(); + if (scanRel instanceof DrillScanRel) { - final FormatSelection formatSelection = (FormatSelection)table.getSelection(); - final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation(), - cacheFileRoot, wasAllPartitionsPruned, formatSelection.getSelection().getDirStatus()); - newFileSelection.setMetaContext(metaContext); - final FileGroupScan newGroupScan = - ((FileGroupScan)((DrillScanRel)scanRel).getGroupScan()).clone(newFileSelection); + FileGroupScan newGroupScan = + ((FileGroupScan) ((DrillScanRel) scanRel).getGroupScan()).clone(newFileSelection); return new DrillScanRel(scanRel.getCluster(), scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL), - scanRel.getTable(), + relOptTable, newGroupScan, scanRel.getRowType(), ((DrillScanRel) scanRel).getColumns(), true /*filter pushdown*/); } else if (scanRel instanceof EnumerableTableScan) { - return createNewTableScanFromSelection((EnumerableTableScan)scanRel, newFiles, cacheFileRoot, - wasAllPartitionsPruned, metaContext); + FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection); + + DynamicDrillTable dynamicDrillTable = new DynamicDrillTable(table.getPlugin(), table.getStorageEngineName(), + table.getUserName(), newFormatSelection); + /* Copy statistics from the original relOptTable */ + dynamicDrillTable.setStatsTable(table.getStatsTable()); + DrillTranslatableTable newTable = new DrillTranslatableTable(dynamicDrillTable); + + RelOptTableImpl newOptTableImpl = RelOptTableImpl.create(relOptTable.getRelOptSchema(), relOptTable.getRowType(), + newTable, GuavaUtils.convertToUnshadedImmutableList(ImmutableList.of())); + + // return an EnumerableTableScan with fileSelection being part of digest of TableScan node. + return DirPrunedEnumerableTableScan.create(scanRel.getCluster(), newOptTableImpl, newFileSelection.toString()); } else { throw new UnsupportedOperationException("Only DrillScanRel and EnumerableTableScan is allowed!"); } } - private TableScan createNewTableScanFromSelection(EnumerableTableScan oldScan, List<String> newFiles, String cacheFileRoot, - boolean wasAllPartitionsPruned, MetadataContext metaContext) { - final RelOptTableImpl t = (RelOptTableImpl) oldScan.getTable(); - final FormatSelection formatSelection = (FormatSelection) table.getSelection(); - final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation(), - cacheFileRoot, wasAllPartitionsPruned, formatSelection.getSelection().getDirStatus()); - newFileSelection.setMetaContext(metaContext); - final FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection); - final DynamicDrillTable dynamicDrillTable = new DynamicDrillTable(table.getPlugin(), table.getStorageEngineName(), - table.getUserName(), newFormatSelection); - /* Copy statistics from the original table */ - dynamicDrillTable.setStatsTable(table.getStatsTable()); - final DrillTranslatableTable newTable = new DrillTranslatableTable(dynamicDrillTable); - final RelOptTableImpl newOptTableImpl = RelOptTableImpl.create(t.getRelOptSchema(), t.getRowType(), newTable, - GuavaUtils.convertToUnshadedImmutableList(ImmutableList.of())); - - // return an EnumerableTableScan with fileSelection being part of digest of TableScan node. - return DirPrunedEnumerableTableScan.create(oldScan.getCluster(), newOptTableImpl, newFileSelection.toString()); - } - @Override public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, boolean wasAllPartitionsPruned) throws Exception { |