diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/store')
41 files changed, 348 insertions, 332 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java index d5d9784b6..33b500018 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashMap; @@ -133,20 +134,20 @@ public class ColumnExplorer { public static List<String> getPartitionColumnNames(FileSelection selection, SchemaConfig schemaConfig) { int partitionsCount = 0; // a depth of table root path - int rootDepth = new Path(selection.getSelectionRoot()).depth(); + int rootDepth = selection.getSelectionRoot().depth(); - for (String file : selection.getFiles()) { + for (Path file : selection.getFiles()) { // Calculates partitions count for the concrete file: // depth of file path - depth of table root path - 1. // The depth of file path includes file itself, // so we should subtract 1 to consider only directories. - int currentPartitionsCount = new Path(file).depth() - rootDepth - 1; + int currentPartitionsCount = file.depth() - rootDepth - 1; // max depth of files path should be used to handle all partitions partitionsCount = Math.max(partitionsCount, currentPartitionsCount); } String partitionColumnLabel = schemaConfig.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val; - List<String> partitions = Lists.newArrayList(); + List<String> partitions = new ArrayList<>(); // generates partition column names: dir0, dir1 etc. for (int i = 0; i < partitionsCount; i++) { @@ -165,7 +166,7 @@ public class ColumnExplorer { * @param includeFileImplicitColumns if file implicit columns should be included into the result * @return implicit columns map */ - public Map<String, String> populateImplicitColumns(String filePath, + public Map<String, String> populateImplicitColumns(Path filePath, List<String> partitionValues, boolean includeFileImplicitColumns) { Map<String, String> implicitValues = new LinkedHashMap<>(); @@ -177,7 +178,7 @@ public class ColumnExplorer { } if (includeFileImplicitColumns) { - Path path = Path.getPathWithoutSchemeAndAuthority(new Path(filePath)); + Path path = Path.getPathWithoutSchemeAndAuthority(filePath); for (Map.Entry<String, ImplicitFileColumns> entry : selectedImplicitColumns.entrySet()) { implicitValues.put(entry.getKey(), entry.getValue().getValue(path)); } @@ -189,15 +190,16 @@ public class ColumnExplorer { /** * Compares root and file path to determine directories * that are present in the file path but absent in root. - * Example: root - a/b/c, filePath - a/b/c/d/e/0_0_0.parquet, result - d/e. + * Example: root - a/b/c, file - a/b/c/d/e/0_0_0.parquet, result - d/e. * Stores different directory names in the list in successive order. * - * @param filePath file path + * @param file file path * @param root root directory + * @param hasDirsOnly whether it is file or directory * @return list of directory names */ - public static List<String> listPartitionValues(String filePath, String root) { - String[] dirs = parsePartitions(filePath, root); + public static List<String> listPartitionValues(Path file, Path root, boolean hasDirsOnly) { + String[] dirs = parsePartitions(file, root, hasDirsOnly); if (dirs == null) { return Collections.emptyList(); } @@ -208,21 +210,23 @@ public class ColumnExplorer { * Low-level parse of partitions, returned as a string array. Returns a * null array for invalid values. * - * @param filePath file path + * @param file file path * @param root root directory + * @param hasDirsOnly whether it is file or directory * @return array of directory names, or null if the arguments are invalid */ - public static String[] parsePartitions(String filePath, String root) { - if (filePath == null || root == null) { + public static String[] parsePartitions(Path file, Path root, boolean hasDirsOnly) { + if (file == null || root == null) { return null; } - int rootDepth = new Path(root).depth(); - Path path = new Path(filePath); - int parentDepth = path.getParent().depth(); - - int diffCount = parentDepth - rootDepth; + if (!hasDirsOnly) { + file = file.getParent(); + } + int rootDepth = root.depth(); + int fileDepth = file.depth(); + int diffCount = fileDepth - rootDepth; if (diffCount < 0) { return null; } @@ -230,10 +234,10 @@ public class ColumnExplorer { String[] diffDirectoryNames = new String[diffCount]; // start filling in array from the end - for (int i = rootDepth; parentDepth > i; i++) { - path = path.getParent(); + for (int i = rootDepth; fileDepth > i; i++) { // place in the end of array - diffDirectoryNames[parentDepth - i - 1] = path.getName(); + diffDirectoryNames[fileDepth - i - 1] = file.getName(); + file = file.getParent(); } return diffDirectoryNames; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroDrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroDrillTable.java index 50d7ee8f9..10d90d4b7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroDrillTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroDrillTable.java @@ -54,8 +54,8 @@ public class AvroDrillTable extends DrillTable { SchemaConfig schemaConfig, FormatSelection selection) { super(storageEngineName, plugin, schemaConfig.getUserName(), selection); - List<String> asFiles = selection.getAsFiles(); - Path path = new Path(asFiles.get(0)); + List<Path> asFiles = selection.getAsFiles(); + Path path = asFiles.get(0); this.schemaConfig = schemaConfig; try { reader = new DataFileReader<>(new FsInput(path, plugin.getFsConf()), new GenericDatumReader<GenericContainer>()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java index 1d7226ad1..07444e972 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java @@ -90,13 +90,13 @@ public class AvroRecordReader extends AbstractRecordReader { public AvroRecordReader(final FragmentContext fragmentContext, - final String inputPath, + final Path inputPath, final long start, final long length, final FileSystem fileSystem, final List<SchemaPath> projectedColumns, final String userName) { - hadoop = new Path(inputPath); + hadoop = inputPath; this.start = start; this.end = start + length; buffer = fragmentContext.getManagedBuffer(); @@ -111,12 +111,8 @@ public class AvroRecordReader extends AbstractRecordReader { private DataFileReader<GenericContainer> getReader(final Path hadoop, final FileSystem fs) throws ExecutionSetupException { try { final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(this.opUserName, this.queryUserName); - return ugi.doAs(new PrivilegedExceptionAction<DataFileReader<GenericContainer>>() { - @Override - public DataFileReader<GenericContainer> run() throws Exception { - return new DataFileReader<>(new FsInput(hadoop, fs.getConf()), new GenericDatumReader<GenericContainer>()); - } - }); + return ugi.doAs((PrivilegedExceptionAction<DataFileReader<GenericContainer>>) () -> + new DataFileReader<>(new FsInput(hadoop, fs.getConf()), new GenericDatumReader<GenericContainer>())); } catch (IOException | InterruptedException e) { throw new ExecutionSetupException( String.format("Error in creating avro reader for file: %s", hadoop), e); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java index 2fa9558b0..902abdab0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java @@ -19,12 +19,15 @@ package org.apache.drill.exec.store.dfs; import java.io.IOException; import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; -import org.apache.drill.shaded.guava.com.google.common.base.Strings; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.drill.common.exceptions.DrillRuntimeException; @@ -41,15 +44,15 @@ public class FileSelection { private List<FileStatus> statuses; - public List<String> files; + public List<Path> files; /** * root path for the selections */ - public final String selectionRoot; + public final Path selectionRoot; /** * root path for the metadata cache file (if any) */ - public final String cacheFileRoot; + public final Path cacheFileRoot; /** * metadata context useful for metadata operations (if any) @@ -82,17 +85,17 @@ public class FileSelection { * @param files list of files * @param selectionRoot root path for selections */ - public FileSelection(final List<FileStatus> statuses, final List<String> files, final String selectionRoot) { + public FileSelection(List<FileStatus> statuses, List<Path> files, Path selectionRoot) { this(statuses, files, selectionRoot, null, false, StatusType.NOT_CHECKED); } - public FileSelection(final List<FileStatus> statuses, final List<String> files, final String selectionRoot, - final String cacheFileRoot, final boolean wasAllPartitionsPruned) { + public FileSelection(List<FileStatus> statuses, List<Path> files, Path selectionRoot, Path cacheFileRoot, + boolean wasAllPartitionsPruned) { this(statuses, files, selectionRoot, cacheFileRoot, wasAllPartitionsPruned, StatusType.NOT_CHECKED); } - public FileSelection(final List<FileStatus> statuses, final List<String> files, final String selectionRoot, - final String cacheFileRoot, final boolean wasAllPartitionsPruned, final StatusType dirStatus) { + public FileSelection(List<FileStatus> statuses, List<Path> files, Path selectionRoot, Path cacheFileRoot, + boolean wasAllPartitionsPruned, StatusType dirStatus) { this.statuses = statuses; this.files = files; this.selectionRoot = selectionRoot; @@ -104,7 +107,7 @@ public class FileSelection { /** * Copy constructor for convenience. */ - protected FileSelection(final FileSelection selection) { + protected FileSelection(FileSelection selection) { Preconditions.checkNotNull(selection, "selection cannot be null"); this.statuses = selection.statuses; this.files = selection.files; @@ -116,17 +119,17 @@ public class FileSelection { this.wasAllPartitionsPruned = selection.wasAllPartitionsPruned; } - public String getSelectionRoot() { + public Path getSelectionRoot() { return selectionRoot; } - public List<FileStatus> getStatuses(final DrillFileSystem fs) throws IOException { + public List<FileStatus> getStatuses(DrillFileSystem fs) throws IOException { Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; if (statuses == null) { - final List<FileStatus> newStatuses = Lists.newArrayList(); - for (final String pathStr:files) { - newStatuses.add(fs.getFileStatus(new Path(pathStr))); + List<FileStatus> newStatuses = Lists.newArrayList(); + for (Path pathStr : files) { + newStatuses.add(fs.getFileStatus(pathStr)); } statuses = newStatuses; } @@ -139,11 +142,11 @@ public class FileSelection { return statuses; } - public List<String> getFiles() { + public List<Path> getFiles() { if (files == null) { - final List<String> newFiles = Lists.newArrayList(); - for (final FileStatus status:statuses) { - newFiles.add(status.getPath().toString()); + List<Path> newFiles = Lists.newArrayList(); + for (FileStatus status:statuses) { + newFiles.add(status.getPath()); } files = newFiles; } @@ -153,7 +156,7 @@ public class FileSelection { public boolean containsDirectories(DrillFileSystem fs) throws IOException { if (dirStatus == StatusType.NOT_CHECKED) { dirStatus = StatusType.NO_DIRS; - for (final FileStatus status : getStatuses(fs)) { + for (FileStatus status : getStatuses(fs)) { if (status.isDirectory()) { dirStatus = StatusType.HAS_DIRS; break; @@ -175,7 +178,7 @@ public class FileSelection { nonDirectories.addAll(DrillFileSystemUtil.listFiles(fs, status.getPath(), true)); } - final FileSelection fileSel = create(nonDirectories, null, selectionRoot); + FileSelection fileSel = create(nonDirectories, null, selectionRoot); if (timer != null) { logger.debug("FileSelection.minusDirectories() took {} ms, numFiles: {}", timer.elapsed(TimeUnit.MILLISECONDS), statuses.size()); timer.stop(); @@ -223,38 +226,36 @@ public class FileSelection { * @param files list of files. * @return longest common path */ - private static String commonPathForFiles(final List<String> files) { + private static Path commonPathForFiles(List<Path> files) { if (files == null || files.isEmpty()) { - return ""; + return new Path("/"); } - final int total = files.size(); - final String[][] folders = new String[total][]; + int total = files.size(); + String[][] folders = new String[total][]; int shortest = Integer.MAX_VALUE; for (int i = 0; i < total; i++) { - final Path path = new Path(files.get(i)); - folders[i] = Path.getPathWithoutSchemeAndAuthority(path).toString().split(Path.SEPARATOR); + folders[i] = files.get(i).toUri().getPath().split(Path.SEPARATOR); shortest = Math.min(shortest, folders[i].length); } int latest; out: for (latest = 0; latest < shortest; latest++) { - final String current = folders[0][latest]; + String current = folders[0][latest]; for (int i = 1; i < folders.length; i++) { if (!current.equals(folders[i][latest])) { break out; } } } - final Path path = new Path(files.get(0)); - final URI uri = path.toUri(); - final String pathString = buildPath(folders[0], latest); - return new Path(uri.getScheme(), uri.getAuthority(), pathString).toString(); + URI uri = files.get(0).toUri(); + String pathString = buildPath(folders[0], latest); + return new Path(uri.getScheme(), uri.getAuthority(), pathString); } - private static String buildPath(final String[] path, final int folderIndex) { - final StringBuilder builder = new StringBuilder(); + private static String buildPath(String[] path, int folderIndex) { + StringBuilder builder = new StringBuilder(); for (int i=0; i<folderIndex; i++) { builder.append(path[i]).append(Path.SEPARATOR); } @@ -262,20 +263,20 @@ public class FileSelection { return builder.toString(); } - public static FileSelection create(final DrillFileSystem fs, final String parent, final String path, - final boolean allowAccessOutsideWorkspace) throws IOException { + public static FileSelection create(DrillFileSystem fs, String parent, String path, + boolean allowAccessOutsideWorkspace) throws IOException { Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; boolean hasWildcard = path.contains(WILD_CARD); - final Path combined = new Path(parent, removeLeadingSlash(path)); + Path combined = new Path(parent, removeLeadingSlash(path)); if (!allowAccessOutsideWorkspace) { checkBackPaths(new Path(parent).toUri().getPath(), combined.toUri().getPath(), path); } - final FileStatus[] statuses = fs.globStatus(combined); // note: this would expand wildcards + FileStatus[] statuses = fs.globStatus(combined); // note: this would expand wildcards if (statuses == null) { return null; } - final FileSelection fileSel = create(Lists.newArrayList(statuses), null, combined.toUri().getPath()); + FileSelection fileSel = create(Arrays.asList(statuses), null, combined); if (timer != null) { logger.debug("FileSelection.create() took {} ms ", timer.elapsed(TimeUnit.MILLISECONDS)); timer.stop(); @@ -298,62 +299,51 @@ public class FileSelection { * @return null if creation of {@link FileSelection} fails with an {@link IllegalArgumentException} * otherwise a new selection. * - * @see FileSelection#FileSelection(List, List, String) + * @see FileSelection#FileSelection(List, List, Path) */ - public static FileSelection create(final List<FileStatus> statuses, final List<String> files, final String root, - final String cacheFileRoot, final boolean wasAllPartitionsPruned) { - final boolean bothNonEmptySelection = (statuses != null && statuses.size() > 0) && (files != null && files.size() > 0); - final boolean bothEmptySelection = (statuses == null || statuses.size() == 0) && (files == null || files.size() == 0); + public static FileSelection create(List<FileStatus> statuses, List<Path> files, Path root, + Path cacheFileRoot, boolean wasAllPartitionsPruned) { + boolean bothNonEmptySelection = (statuses != null && statuses.size() > 0) && (files != null && files.size() > 0); + boolean bothEmptySelection = (statuses == null || statuses.size() == 0) && (files == null || files.size() == 0); if (bothNonEmptySelection || bothEmptySelection) { return null; } - final String selectionRoot; + Path selectionRoot; if (statuses == null || statuses.isEmpty()) { selectionRoot = commonPathForFiles(files); } else { - if (Strings.isNullOrEmpty(root)) { - throw new DrillRuntimeException("Selection root is null or empty" + root); - } - final Path rootPath = handleWildCard(root); - final URI uri = statuses.get(0).getPath().toUri(); - final Path path = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath()); - selectionRoot = path.toString(); + Objects.requireNonNull(root, "Selection root is null"); + Path rootPath = handleWildCard(root); + URI uri = statuses.get(0).getPath().toUri(); + selectionRoot = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath()); } return new FileSelection(statuses, files, selectionRoot, cacheFileRoot, wasAllPartitionsPruned); } - public static FileSelection create(final List<FileStatus> statuses, final List<String> files, final String root) { + public static FileSelection create(List<FileStatus> statuses, List<Path> files, Path root) { return FileSelection.create(statuses, files, root, null, false); } - public static FileSelection createFromDirectories(final List<String> dirPaths, final FileSelection selection, - final String cacheFileRoot) { + public static FileSelection createFromDirectories(List<Path> dirPaths, FileSelection selection, + Path cacheFileRoot) { Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; - final String root = selection.getSelectionRoot(); - if (Strings.isNullOrEmpty(root)) { - throw new DrillRuntimeException("Selection root is null or empty" + root); - } + Path root = selection.getSelectionRoot(); + Objects.requireNonNull(root, "Selection root is null"); if (dirPaths == null || dirPaths.isEmpty()) { throw new DrillRuntimeException("List of directories is null or empty"); } - List<String> dirs = Lists.newArrayList(); + // for wildcard the directory list should have already been expanded + List<Path> dirs = selection.hadWildcard() ? selection.getFileStatuses().stream() + .map(FileStatus::getPath) + .collect(Collectors.toList()) : new ArrayList<>(dirPaths); - if (selection.hadWildcard()) { // for wildcard the directory list should have already been expanded - for (FileStatus status : selection.getFileStatuses()) { - dirs.add(status.getPath().toString()); - } - } else { - dirs.addAll(dirPaths); - } - - final Path rootPath = handleWildCard(root); - // final URI uri = dirPaths.get(0).toUri(); - final URI uri = selection.getFileStatuses().get(0).getPath().toUri(); - final Path path = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath()); - FileSelection fileSel = new FileSelection(null, dirs, path.toString(), cacheFileRoot, false); + Path rootPath = handleWildCard(root); + URI uri = selection.getFileStatuses().get(0).getPath().toUri(); + Path path = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath()); + FileSelection fileSel = new FileSelection(null, dirs, path, cacheFileRoot, false); fileSel.setHadWildcard(selection.hadWildcard()); if (timer != null) { logger.debug("FileSelection.createFromDirectories() took {} ms ", timer.elapsed(TimeUnit.MILLISECONDS)); @@ -362,18 +352,15 @@ public class FileSelection { return fileSel; } - private static Path handleWildCard(final String root) { - if (root.contains(WILD_CARD)) { - int idx = root.indexOf(WILD_CARD); // first wild card in the path - idx = root.lastIndexOf('/', idx); // file separator right before the first wild card - final String newRoot = root.substring(0, idx); - if (newRoot.length() == 0) { - // Ensure that we always return a valid root. - return new Path("/"); - } - return new Path(newRoot); + private static Path handleWildCard(Path root) { + String stringRoot = root.toUri().getPath(); + if (stringRoot.contains(WILD_CARD)) { + int idx = stringRoot.indexOf(WILD_CARD); // first wild card in the path + idx = stringRoot.lastIndexOf('/', idx); // file separator right before the first wild card + String newRoot = stringRoot.substring(0, idx); + return DrillFileSystemUtil.createPathSafe(newRoot); } else { - return new Path(root); + return new Path(stringRoot); } } @@ -426,7 +413,7 @@ public class FileSelection { return this.hadWildcard; } - public String getCacheFileRoot() { + public Path getCacheFileRoot() { return cacheFileRoot; } @@ -456,12 +443,12 @@ public class FileSelection { @Override public String toString() { - final StringBuilder sb = new StringBuilder(); + StringBuilder sb = new StringBuilder(); sb.append("root=").append(this.selectionRoot); sb.append("files=["); boolean isFirst = true; - for (final String file : this.files) { + for (Path file : this.files) { if (isFirst) { isFirst = false; sb.append(file); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java index 40549cc92..7d7bcfaa0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java @@ -24,6 +24,7 @@ import org.apache.drill.common.logical.FormatPluginConfig; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.hadoop.fs.Path; public class FormatSelection { @@ -32,16 +33,15 @@ public class FormatSelection { private FormatPluginConfig format; private FileSelection selection; - public FormatSelection(){} + public FormatSelection() {} @JsonCreator - public FormatSelection(@JsonProperty("format") FormatPluginConfig format, @JsonProperty("files") List<String> files){ + public FormatSelection(@JsonProperty("format") FormatPluginConfig format, @JsonProperty("files") List<Path> files){ this.format = format; this.selection = FileSelection.create(null, files, null); } public FormatSelection(FormatPluginConfig format, FileSelection selection) { - super(); this.format = format; this.selection = selection; } @@ -52,7 +52,7 @@ public class FormatSelection { } @JsonProperty("files") - public List<String> getAsFiles(){ + public List<Path> getAsFiles(){ return selection.getFiles(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java index 877ceb64c..073847812 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.store.dfs; import java.util.Map; import org.apache.drill.shaded.guava.com.google.common.collect.Maps; +import org.apache.hadoop.fs.Path; /** * A metadata context that holds state across multiple invocations of @@ -32,17 +33,17 @@ public class MetadataContext { * Note: the #directories is typically a small percentage of the #files, so the memory footprint * is expected to be relatively small. */ - private Map<String, Boolean> dirModifCheckMap = Maps.newHashMap(); + private Map<Path, Boolean> dirModifCheckMap = Maps.newHashMap(); private PruneStatus pruneStatus = PruneStatus.NOT_STARTED; private boolean metadataCacheCorrupted; - public void setStatus(String dir) { + public void setStatus(Path dir) { dirModifCheckMap.put(dir, true); } - public void clearStatus(String dir) { + public void clearStatus(Path dir) { dirModifCheckMap.put(dir, false); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java index 15107ac18..ec925e37a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java @@ -21,6 +21,7 @@ import org.apache.drill.exec.store.dfs.easy.FileWork; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.hadoop.fs.Path; public class ReadEntryFromHDFS extends ReadEntryWithPath implements FileWork { @@ -28,7 +29,7 @@ public class ReadEntryFromHDFS extends ReadEntryWithPath implements FileWork { private long length; @JsonCreator - public ReadEntryFromHDFS(@JsonProperty("path") String path,@JsonProperty("start") long start, @JsonProperty("length") long length) { + public ReadEntryFromHDFS(@JsonProperty("path") Path path, @JsonProperty("start") long start, @JsonProperty("length") long length) { super(path); this.start = start; this.length = length; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java index 4564e159c..88bd9fbad 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java @@ -18,19 +18,20 @@ package org.apache.drill.exec.store.dfs; +import org.apache.hadoop.fs.Path; + public class ReadEntryWithPath { - protected String path; + protected Path path; + // Default constructor is needed for deserialization + public ReadEntryWithPath() {} - public ReadEntryWithPath(String path) { - super(); + public ReadEntryWithPath(Path path) { this.path = path; } - public ReadEntryWithPath(){} - - public String getPath(){ + public Path getPath() { return path; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java index d3bed8feb..d76c6489e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java @@ -156,7 +156,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements for (FileWork work : scan.getWorkUnits()){ RecordReader recordReader = getRecordReader(context, dfs, work, scan.getColumns(), scan.getUserName()); readers.add(recordReader); - List<String> partitionValues = ColumnExplorer.listPartitionValues(work.getPath(), scan.getSelectionRoot()); + List<String> partitionValues = ColumnExplorer.listPartitionValues(work.getPath(), scan.getSelectionRoot(), false); Map<String, String> implicitValues = columnExplorer.populateImplicitColumns(work.getPath(), partitionValues, supportsFileImplicitColumns); implicitColumns.add(implicitValues); if (implicitValues.size() > mapWithMaxColumns.size()) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java index 759d07ff9..4449ec054 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java @@ -18,7 +18,6 @@ package org.apache.drill.exec.store.dfs.easy; import java.io.IOException; -import java.util.Iterator; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; @@ -52,6 +51,7 @@ import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.collect.Iterators; import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import org.apache.hadoop.fs.Path; @JsonTypeName("fs-scan") public class EasyGroupScan extends AbstractFileGroupScan { @@ -65,17 +65,17 @@ public class EasyGroupScan extends AbstractFileGroupScan { private ListMultimap<Integer, CompleteFileWork> mappings; private List<CompleteFileWork> chunks; private List<EndpointAffinity> endpointAffinities; - private String selectionRoot; + private Path selectionRoot; @JsonCreator public EasyGroupScan( @JsonProperty("userName") String userName, - @JsonProperty("files") List<String> files, // - @JsonProperty("storage") StoragePluginConfig storageConfig, // - @JsonProperty("format") FormatPluginConfig formatConfig, // - @JacksonInject StoragePluginRegistry engineRegistry, // + @JsonProperty("files") List<Path> files, + @JsonProperty("storage") StoragePluginConfig storageConfig, + @JsonProperty("format") FormatPluginConfig formatConfig, + @JacksonInject StoragePluginRegistry engineRegistry, @JsonProperty("columns") List<SchemaPath> columns, - @JsonProperty("selectionRoot") String selectionRoot + @JsonProperty("selectionRoot") Path selectionRoot ) throws IOException, ExecutionSetupException { this(ImpersonationUtil.resolveUserName(userName), FileSelection.create(null, files, selectionRoot), @@ -84,17 +84,17 @@ public class EasyGroupScan extends AbstractFileGroupScan { selectionRoot); } - public EasyGroupScan(String userName, FileSelection selection, EasyFormatPlugin<?> formatPlugin, String selectionRoot) + public EasyGroupScan(String userName, FileSelection selection, EasyFormatPlugin<?> formatPlugin, Path selectionRoot) throws IOException { this(userName, selection, formatPlugin, ALL_COLUMNS, selectionRoot); } public EasyGroupScan( String userName, - FileSelection selection, // - EasyFormatPlugin<?> formatPlugin, // + FileSelection selection, + EasyFormatPlugin<?> formatPlugin, List<SchemaPath> columns, - String selectionRoot + Path selectionRoot ) throws IOException{ super(userName); this.selection = Preconditions.checkNotNull(selection); @@ -106,12 +106,7 @@ public class EasyGroupScan extends AbstractFileGroupScan { @JsonIgnore public Iterable<CompleteFileWork> getWorkIterable() { - return new Iterable<CompleteFileWork>() { - @Override - public Iterator<CompleteFileWork> iterator() { - return Iterators.unmodifiableIterator(chunks.iterator()); - } - }; + return () -> Iterators.unmodifiableIterator(chunks.iterator()); } private EasyGroupScan(final EasyGroupScan that) { @@ -136,7 +131,7 @@ public class EasyGroupScan extends AbstractFileGroupScan { this.endpointAffinities = AffinityCreator.getAffinityMap(chunks); } - public String getSelectionRoot() { + public Path getSelectionRoot() { return selectionRoot; } @@ -158,7 +153,7 @@ public class EasyGroupScan extends AbstractFileGroupScan { @JsonProperty("files") @Override - public List<String> getFiles() { + public List<Path> getFiles() { return selection.getFiles(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java index 0dbae1e40..fbb3f475c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.store.dfs.easy; -import java.io.IOException; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; @@ -34,6 +33,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.apache.hadoop.fs.Path; @JsonTypeName("fs-sub-scan") public class EasySubScan extends AbstractSubScan{ @@ -42,18 +42,18 @@ public class EasySubScan extends AbstractSubScan{ private final List<FileWorkImpl> files; private final EasyFormatPlugin<?> formatPlugin; private final List<SchemaPath> columns; - private String selectionRoot; + private Path selectionRoot; @JsonCreator public EasySubScan( @JsonProperty("userName") String userName, - @JsonProperty("files") List<FileWorkImpl> files, // - @JsonProperty("storage") StoragePluginConfig storageConfig, // - @JsonProperty("format") FormatPluginConfig formatConfig, // - @JacksonInject StoragePluginRegistry engineRegistry, // - @JsonProperty("columns") List<SchemaPath> columns, // - @JsonProperty("selectionRoot") String selectionRoot - ) throws IOException, ExecutionSetupException { + @JsonProperty("files") List<FileWorkImpl> files, + @JsonProperty("storage") StoragePluginConfig storageConfig, + @JsonProperty("format") FormatPluginConfig formatConfig, + @JacksonInject StoragePluginRegistry engineRegistry, + @JsonProperty("columns") List<SchemaPath> columns, + @JsonProperty("selectionRoot") Path selectionRoot + ) throws ExecutionSetupException { super(userName); this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig); Preconditions.checkNotNull(this.formatPlugin); @@ -63,7 +63,7 @@ public class EasySubScan extends AbstractSubScan{ } public EasySubScan(String userName, List<FileWorkImpl> files, EasyFormatPlugin<?> plugin, List<SchemaPath> columns, - String selectionRoot){ + Path selectionRoot){ super(userName); this.formatPlugin = plugin; this.files = files; @@ -72,7 +72,7 @@ public class EasySubScan extends AbstractSubScan{ } @JsonProperty - public String getSelectionRoot() { + public Path getSelectionRoot() { return selectionRoot; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java index 587201ea9..3aeb2c26d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java @@ -17,8 +17,14 @@ */ package org.apache.drill.exec.store.dfs.easy; + +import org.apache.hadoop.fs.Path; + public interface FileWork { - String getPath(); + + Path getPath(); + long getStart(); + long getLength(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java index 505d68e78..6de8842cd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java @@ -18,12 +18,12 @@ package org.apache.drill.exec.store.direct; import com.fasterxml.jackson.annotation.JsonTypeName; -import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.ScanStats; import org.apache.drill.exec.store.RecordReader; +import org.apache.hadoop.fs.Path; import java.util.Collection; import java.util.List; @@ -37,20 +37,20 @@ import java.util.List; @JsonTypeName("metadata-direct-scan") public class MetadataDirectGroupScan extends DirectGroupScan { - private final Collection<String> files; + private final Collection<Path> files; - public MetadataDirectGroupScan(RecordReader reader, Collection<String> files) { + public MetadataDirectGroupScan(RecordReader reader, Collection<Path> files) { super(reader); this.files = files; } - public MetadataDirectGroupScan(RecordReader reader, Collection<String> files, ScanStats stats) { + public MetadataDirectGroupScan(RecordReader reader, Collection<Path> files, ScanStats stats) { super(reader, stats); this.files = files; } @Override - public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException { + public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { assert children == null || children.isEmpty(); return new MetadataDirectGroupScan(reader, files, stats); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java index 428a4e1bd..d3fcc5aab 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java @@ -77,7 +77,7 @@ public class JSONRecordReader extends AbstractRecordReader { * @param columns pathnames of columns/subfields to read * @throws OutOfMemoryException */ - public JSONRecordReader(final FragmentContext fragmentContext, final String inputPath, final DrillFileSystem fileSystem, + public JSONRecordReader(final FragmentContext fragmentContext, final Path inputPath, final DrillFileSystem fileSystem, final List<SchemaPath> columns) throws OutOfMemoryException { this(fragmentContext, inputPath, null, fileSystem, columns); } @@ -90,14 +90,13 @@ public class JSONRecordReader extends AbstractRecordReader { * @param columns pathnames of columns/subfields to read * @throws OutOfMemoryException */ - public JSONRecordReader(final FragmentContext fragmentContext, final JsonNode embeddedContent, - final DrillFileSystem fileSystem, final List<SchemaPath> columns) throws OutOfMemoryException { + public JSONRecordReader(FragmentContext fragmentContext, JsonNode embeddedContent, DrillFileSystem fileSystem, + List<SchemaPath> columns) throws OutOfMemoryException { this(fragmentContext, null, embeddedContent, fileSystem, columns); } - private JSONRecordReader(final FragmentContext fragmentContext, final String inputPath, - final JsonNode embeddedContent, final DrillFileSystem fileSystem, - final List<SchemaPath> columns) { + private JSONRecordReader(FragmentContext fragmentContext, Path inputPath, JsonNode embeddedContent, + DrillFileSystem fileSystem, List<SchemaPath> columns) { Preconditions.checkArgument( (inputPath == null && embeddedContent != null) || @@ -106,7 +105,7 @@ public class JSONRecordReader extends AbstractRecordReader { ); if (inputPath != null) { - this.hadoopPath = new Path(inputPath); + this.hadoopPath = inputPath; } else { this.embeddedContent = embeddedContent; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java index 9dbe715b1..ec4bb1255 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java @@ -87,7 +87,7 @@ public class SequenceFileFormatPlugin extends EasyFormatPlugin<SequenceFileForma FileWork fileWork, List<SchemaPath> columns, String userName) throws ExecutionSetupException { - final Path path = dfs.makeQualified(new Path(fileWork.getPath())); + final Path path = dfs.makeQualified(fileWork.getPath()); final FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""}); return new SequenceFileRecordReader(split, dfs, context.getQueryUserName(), userName); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java index 1c53a3774..03ae6f554 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java @@ -82,7 +82,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm FileWork fileWork, List<SchemaPath> columns, String userName) { - Path path = dfs.makeQualified(new Path(fileWork.getPath())); + Path path = dfs.makeQualified(fileWork.getPath()); FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""}); if (context.getOptions().getBoolean(ExecConstants.ENABLE_NEW_TEXT_READER_KEY)) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java index 3958a3270..5a78732af 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java @@ -192,7 +192,7 @@ public class HttpdLogFormatPlugin extends EasyFormatPlugin<HttpdLogFormatPlugin. HttpdLogFormatPlugin.this.getConfig().getTimestampFormat(), fieldMapping); - final Path path = fs.makeQualified(new Path(work.getPath())); + final Path path = fs.makeQualified(work.getPath()); FileSplit split = new FileSplit(path, work.getStart(), work.getLength(), new String[]{""}); TextInputFormat inputFormat = new TextInputFormat(); JobConf job = new JobConf(fs.getConf()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageFormatPlugin.java index 15ea1b462..048aa8230 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageFormatPlugin.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.Collections; import java.util.List; -import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.exec.ops.FragmentContext; @@ -57,11 +56,9 @@ public class ImageFormatPlugin extends EasyFormatPlugin<ImageFormatConfig> { @Override public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork, - List<SchemaPath> columns, String userName) throws ExecutionSetupException { - return new ImageRecordReader(context, dfs, fileWork.getPath(), - ((ImageFormatConfig)formatConfig).hasFileSystemMetadata(), - ((ImageFormatConfig)formatConfig).isDescriptive(), - ((ImageFormatConfig)formatConfig).getTimeZone()); + List<SchemaPath> columns, String userName) { + return new ImageRecordReader(context, dfs, fileWork.getPath(), formatConfig.hasFileSystemMetadata(), + formatConfig.isDescriptive(), formatConfig.getTimeZone()); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java index 2a4b4fb48..08ed4fd09 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java @@ -99,10 +99,10 @@ public class ImageRecordReader extends AbstractRecordReader { private DrillBuf managedBuffer; private boolean finish; - public ImageRecordReader(FragmentContext context, DrillFileSystem fs, String inputPath, + public ImageRecordReader(FragmentContext context, DrillFileSystem fs, Path inputPath, boolean fileSystemMetadata, boolean descriptive, String timeZone) { this.fs = fs; - hadoopPath = fs.makeQualified(new Path(inputPath)); + hadoopPath = fs.makeQualified(inputPath); this.fileSystemMetadata = fileSystemMetadata; this.descriptive = descriptive; this.timeZone = (timeZone != null) ? TimeZone.getTimeZone(timeZone) : TimeZone.getDefault(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java index e5d1dc438..ae0b733ff 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java @@ -41,9 +41,6 @@ import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.NullableTimeStampVector; import org.apache.drill.exec.vector.NullableTimeVector; - -import org.apache.hadoop.fs.Path; - import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; @@ -522,7 +519,7 @@ public class LogRecordReader extends AbstractRecordReader { private void openFile() { InputStream in; try { - in = dfs.open(new Path(fileWork.getPath())); + in = dfs.open(fileWork.getPath()); } catch (Exception e) { throw UserException .dataReadError(e) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java index e2d356953..5528d028c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java @@ -52,6 +52,7 @@ import org.apache.drill.exec.store.schedule.AffinityCreator; import org.apache.drill.exec.store.schedule.AssignmentCreator; import org.apache.drill.exec.store.schedule.EndpointByteMap; import org.apache.drill.exec.store.schedule.EndpointByteMapImpl; +import org.apache.hadoop.fs.Path; import java.io.IOException; import java.util.ArrayList; @@ -80,7 +81,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { protected ParquetTableMetadataBase parquetTableMetadata; protected List<RowGroupInfo> rowGroupInfos; protected ListMultimap<Integer, RowGroupInfo> mappings; - protected Set<String> fileSet; + protected Set<Path> fileSet; protected ParquetReaderConfig readerConfig; private List<EndpointAffinity> endpointAffinities; @@ -146,7 +147,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { @JsonIgnore @Override - public Collection<String> getFiles() { + public Collection<Path> getFiles() { return fileSet; } @@ -428,12 +429,12 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { } @JsonIgnore - public <T> T getPartitionValue(String path, SchemaPath column, Class<T> clazz) { + public <T> T getPartitionValue(Path path, SchemaPath column, Class<T> clazz) { return clazz.cast(parquetGroupScanStatistics.getPartitionValue(path, column)); } @JsonIgnore - public Set<String> getFileSet() { + public Set<Path> getFileSet() { return fileSet; } // partition pruning methods end @@ -441,7 +442,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { // helper method used for partition pruning and filter push down @Override public void modifyFileSelection(FileSelection selection) { - List<String> files = selection.getFiles(); + List<Path> files = selection.getFiles(); fileSet = new HashSet<>(files); entries = new ArrayList<>(files.size()); @@ -464,7 +465,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { if (fileSet == null) { fileSet = new HashSet<>(); fileSet.addAll(parquetTableMetadata.getFiles().stream() - .map((Function<ParquetFileMetadata, String>) ParquetFileMetadata::getPath) + .map((Function<ParquetFileMetadata, Path>) ParquetFileMetadata::getPath) .collect(Collectors.toSet())); } @@ -505,7 +506,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { // abstract methods block start protected abstract void initInternal() throws IOException; protected abstract Collection<CoordinationProtos.DrillbitEndpoint> getDrillbits(); - protected abstract AbstractParquetGroupScan cloneWithFileSelection(Collection<String> filePaths) throws IOException; + protected abstract AbstractParquetGroupScan cloneWithFileSelection(Collection<Path> filePaths) throws IOException; protected abstract boolean supportsFileImplicitColumns(); protected abstract List<String> getPartitionValues(RowGroupInfo rowGroupInfo); // abstract methods block end @@ -520,7 +521,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { * @return new parquet group scan */ private AbstractParquetGroupScan cloneWithRowGroupInfos(List<RowGroupInfo> rowGroupInfos) throws IOException { - Set<String> filePaths = rowGroupInfos.stream() + Set<Path> filePaths = rowGroupInfos.stream() .map(ReadEntryWithPath::getPath) .collect(Collectors.toSet()); // set keeps file names unique AbstractParquetGroupScan scan = cloneWithFileSelection(filePaths); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java index 99161fd59..b1819e639 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java @@ -52,7 +52,8 @@ public abstract class AbstractParquetScanBatchCreator { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractParquetScanBatchCreator.class); - protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRowGroupScan rowGroupScan, OperatorContext oContext) throws ExecutionSetupException { + protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRowGroupScan rowGroupScan, + OperatorContext oContext) throws ExecutionSetupException { final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), rowGroupScan.getColumns()); if (!columnExplorer.isStarQuery()) { @@ -63,7 +64,7 @@ public abstract class AbstractParquetScanBatchCreator { AbstractDrillFileSystemManager fsManager = getDrillFileSystemCreator(oContext, context.getOptions()); // keep footers in a map to avoid re-reading them - Map<String, ParquetMetadata> footers = new HashMap<>(); + Map<Path, ParquetMetadata> footers = new HashMap<>(); List<RecordReader> readers = new LinkedList<>(); List<Map<String, String>> implicitColumns = new ArrayList<>(); Map<String, String> mapWithMaxColumns = new LinkedHashMap<>(); @@ -150,8 +151,8 @@ public abstract class AbstractParquetScanBatchCreator { protected abstract AbstractDrillFileSystemManager getDrillFileSystemCreator(OperatorContext operatorContext, OptionManager optionManager); - private ParquetMetadata readFooter(Configuration conf, String path, ParquetReaderConfig readerConfig) throws IOException { - try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(path), + private ParquetMetadata readFooter(Configuration conf, Path path, ParquetReaderConfig readerConfig) throws IOException { + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, readerConfig.addCountersToConf(conf)), readerConfig.toReadOptions())) { return reader.getFooter(); } @@ -168,6 +169,6 @@ public abstract class AbstractParquetScanBatchCreator { this.operatorContext = operatorContext; } - protected abstract DrillFileSystem get(Configuration config, String path) throws ExecutionSetupException; + protected abstract DrillFileSystem get(Configuration config, Path path) throws ExecutionSetupException; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index a1d9f518d..cb9e9b96d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -68,8 +68,8 @@ public class ParquetGroupScan extends AbstractParquetGroupScan { private final MetadataContext metaContext; private boolean usedMetadataCache; // false by default // may change when filter push down / partition pruning is applied - private String selectionRoot; - private String cacheFileRoot; + private Path selectionRoot; + private Path cacheFileRoot; @JsonCreator public ParquetGroupScan(@JacksonInject StoragePluginRegistry engineRegistry, @@ -78,8 +78,8 @@ public class ParquetGroupScan extends AbstractParquetGroupScan { @JsonProperty("storage") StoragePluginConfig storageConfig, @JsonProperty("format") FormatPluginConfig formatConfig, @JsonProperty("columns") List<SchemaPath> columns, - @JsonProperty("selectionRoot") String selectionRoot, - @JsonProperty("cacheFileRoot") String cacheFileRoot, + @JsonProperty("selectionRoot") Path selectionRoot, + @JsonProperty("cacheFileRoot") Path cacheFileRoot, @JsonProperty("readerConfig") ParquetReaderConfig readerConfig, @JsonProperty("filter") LogicalExpression filter) throws IOException, ExecutionSetupException { super(ImpersonationUtil.resolveUserName(userName), columns, entries, readerConfig, filter); @@ -127,7 +127,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan { // The fully expanded list is already stored as part of the fileSet entries.add(new ReadEntryWithPath(fileSelection.getSelectionRoot())); } else { - for (String fileName : fileSelection.getFiles()) { + for (Path fileName : fileSelection.getFiles()) { entries.add(new ReadEntryWithPath(fileName)); } } @@ -169,12 +169,12 @@ public class ParquetGroupScan extends AbstractParquetGroupScan { } @JsonProperty - public String getSelectionRoot() { + public Path getSelectionRoot() { return selectionRoot; } @JsonProperty - public String getCacheFileRoot() { + public Path getCacheFileRoot() { return cacheFileRoot; } // getters for serialization / deserialization end @@ -224,8 +224,8 @@ public class ParquetGroupScan extends AbstractParquetGroupScan { // For EXPLAIN, remove the URI prefix from cacheFileRoot. If cacheFileRoot is null, we // would have read the cache file from selectionRoot String cacheFileRootString = (cacheFileRoot == null) ? - Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot)).toString() : - Path.getPathWithoutSchemeAndAuthority(new Path(cacheFileRoot)).toString(); + Path.getPathWithoutSchemeAndAuthority(selectionRoot).toString() : + Path.getPathWithoutSchemeAndAuthority(cacheFileRoot).toString(); builder.append(", cacheFileRoot=").append(cacheFileRootString); } @@ -241,7 +241,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan { FileSystem processUserFileSystem = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), fs.getConf()); Path metaPath = null; if (entries.size() == 1 && parquetTableMetadata == null) { - Path p = Path.getPathWithoutSchemeAndAuthority(new Path(entries.get(0).getPath())); + Path p = Path.getPathWithoutSchemeAndAuthority(entries.get(0).getPath()); if (fs.isDirectory(p)) { // Using the metadata file makes sense when querying a directory; otherwise // if querying a single file we can look up the metadata directly from the file @@ -257,9 +257,9 @@ public class ParquetGroupScan extends AbstractParquetGroupScan { parquetTableMetadata = Metadata.getParquetTableMetadata(processUserFileSystem, p.toString(), readerConfig); } } else { - Path p = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot)); + Path p = Path.getPathWithoutSchemeAndAuthority(selectionRoot); metaPath = new Path(p, Metadata.METADATA_FILENAME); - if (!metaContext.isMetadataCacheCorrupted() && fs.isDirectory(new Path(selectionRoot)) + if (!metaContext.isMetadataCacheCorrupted() && fs.isDirectory(selectionRoot) && fs.exists(metaPath)) { if (parquetTableMetadata == null) { parquetTableMetadata = Metadata.readBlockMeta(processUserFileSystem, metaPath, metaContext, readerConfig); @@ -275,7 +275,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan { final List<FileStatus> fileStatuses = new ArrayList<>(); for (ReadEntryWithPath entry : entries) { fileStatuses.addAll( - DrillFileSystemUtil.listFiles(fs, Path.getPathWithoutSchemeAndAuthority(new Path(entry.getPath())), true)); + DrillFileSystemUtil.listFiles(fs, Path.getPathWithoutSchemeAndAuthority(entry.getPath()), true)); } Map<FileStatus, FileSystem> statusMap = fileStatuses.stream() @@ -292,7 +292,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan { } @Override - protected AbstractParquetGroupScan cloneWithFileSelection(Collection<String> filePaths) throws IOException { + protected AbstractParquetGroupScan cloneWithFileSelection(Collection<Path> filePaths) throws IOException { FileSelection newSelection = new FileSelection(null, new ArrayList<>(filePaths), getSelectionRoot(), cacheFileRoot, false); return clone(newSelection); } @@ -309,7 +309,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan { @Override protected List<String> getPartitionValues(RowGroupInfo rowGroupInfo) { - return ColumnExplorer.listPartitionValues(rowGroupInfo.getPath(), selectionRoot); + return ColumnExplorer.listPartitionValues(rowGroupInfo.getPath(), selectionRoot, false); } // overridden protected methods block end @@ -425,7 +425,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan { } } else { final Path path = Path.getPathWithoutSchemeAndAuthority(cacheFileRoot); - fileSet.add(path.toString()); + fileSet.add(path); } } } @@ -436,21 +436,21 @@ public class ParquetGroupScan extends AbstractParquetGroupScan { return null; } - List<String> fileNames = new ArrayList<>(fileSet); + List<Path> fileNames = new ArrayList<>(fileSet); // when creating the file selection, set the selection root without the URI prefix // The reason is that the file names above have been created in the form // /a/b/c.parquet and the format of the selection root must match that of the file names // otherwise downstream operations such as partition pruning can break. - final Path metaRootPath = Path.getPathWithoutSchemeAndAuthority(new Path(selection.getSelectionRoot())); - this.selectionRoot = metaRootPath.toString(); + final Path metaRootPath = Path.getPathWithoutSchemeAndAuthority(selection.getSelectionRoot()); + this.selectionRoot = metaRootPath; // Use the FileSelection constructor directly here instead of the FileSelection.create() method // because create() changes the root to include the scheme and authority; In future, if create() // is the preferred way to instantiate a file selection, we may need to do something different... // WARNING: file statuses and file names are inconsistent - FileSelection newSelection = new FileSelection(selection.getStatuses(fs), fileNames, metaRootPath.toString(), - cacheFileRoot, selection.wasAllPartitionsPruned()); + FileSelection newSelection = new FileSelection(selection.getStatuses(fs), fileNames, metaRootPath, cacheFileRoot, + selection.wasAllPartitionsPruned()); newSelection.setExpandedFully(); newSelection.setMetaContext(metaContext); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java index 9381043b5..915652413 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java @@ -21,6 +21,7 @@ import org.apache.commons.lang3.mutable.MutableLong; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.hadoop.fs.Path; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; @@ -41,7 +42,7 @@ import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetTa public class ParquetGroupScanStatistics { // map from file names to maps of column name to partition value mappings - private Map<String, Map<SchemaPath, Object>> partitionValueMap; + private Map<Path, Map<SchemaPath, Object>> partitionValueMap; // only for partition columns : value is unique for each partition private Map<SchemaPath, TypeProtos.MajorType> partitionColTypeMap; // total number of non-null value for each column in parquet files @@ -78,7 +79,7 @@ public class ParquetGroupScanStatistics { return rowCount; } - public Object getPartitionValue(String path, SchemaPath column) { + public Object getPartitionValue(Path path, SchemaPath column) { return partitionValueMap.get(path).get(column); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java index eabe2df14..2513772c9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java @@ -37,6 +37,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; // Class containing information for reading a single parquet row group from HDFS @JsonTypeName("parquet-row-group-scan") @@ -45,7 +46,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan { private final ParquetFormatPlugin formatPlugin; private final ParquetFormatConfig formatConfig; - private final String selectionRoot; + private final Path selectionRoot; @JsonCreator public ParquetRowGroupScan(@JacksonInject StoragePluginRegistry registry, @@ -55,7 +56,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan { @JsonProperty("rowGroupReadEntries") LinkedList<RowGroupReadEntry> rowGroupReadEntries, @JsonProperty("columns") List<SchemaPath> columns, @JsonProperty("readerConfig") ParquetReaderConfig readerConfig, - @JsonProperty("selectionRoot") String selectionRoot, + @JsonProperty("selectionRoot") Path selectionRoot, @JsonProperty("filter") LogicalExpression filter) throws ExecutionSetupException { this(userName, (ParquetFormatPlugin) registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig), Preconditions.checkNotNull(formatConfig)), @@ -71,7 +72,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan { List<RowGroupReadEntry> rowGroupReadEntries, List<SchemaPath> columns, ParquetReaderConfig readerConfig, - String selectionRoot, + Path selectionRoot, LogicalExpression filter) { super(userName, rowGroupReadEntries, columns, readerConfig, filter); this.formatPlugin = Preconditions.checkNotNull(formatPlugin, "Could not find format config for the given configuration"); @@ -90,7 +91,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan { } @JsonProperty - public String getSelectionRoot() { + public Path getSelectionRoot() { return selectionRoot; } @@ -127,7 +128,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan { @Override public List<String> getPartitionValues(RowGroupReadEntry rowGroupReadEntry) { - return ColumnExplorer.listPartitionValues(rowGroupReadEntry.getPath(), selectionRoot); + return ColumnExplorer.listPartitionValues(rowGroupReadEntry.getPath(), selectionRoot, false); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java index f0ef63989..8c91200d3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java @@ -28,6 +28,7 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import java.io.IOException; import java.util.List; @@ -61,7 +62,7 @@ public class ParquetScanBatchCreator extends AbstractParquetScanBatchCreator imp } @Override - protected DrillFileSystem get(Configuration config, String path) throws ExecutionSetupException { + protected DrillFileSystem get(Configuration config, Path path) throws ExecutionSetupException { if (fs == null) { try { fs = useAsyncPageReader ? operatorContext.newNonTrackingFileSystem(config) : operatorContext.newFileSystem(config); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java index 1c9ce107c..5fbadd6ee 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java @@ -23,6 +23,7 @@ import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS; import org.apache.drill.exec.store.dfs.easy.FileWork; import org.apache.drill.exec.store.schedule.CompleteWork; import org.apache.drill.exec.store.schedule.EndpointByteMap; +import org.apache.hadoop.fs.Path; import java.util.List; @@ -37,7 +38,7 @@ public class RowGroupInfo extends ReadEntryFromHDFS implements CompleteWork, Fil private long numRecordsToRead; @JsonCreator - public RowGroupInfo(@JsonProperty("path") String path, + public RowGroupInfo(@JsonProperty("path") Path path, @JsonProperty("start") long start, @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java index 665179f6b..be3f50c30 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java @@ -22,6 +22,7 @@ import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.hadoop.fs.Path; public class RowGroupReadEntry extends ReadEntryFromHDFS { @@ -29,7 +30,7 @@ public class RowGroupReadEntry extends ReadEntryFromHDFS { private long numRecordsToRead; @JsonCreator - public RowGroupReadEntry(@JsonProperty("path") String path, @JsonProperty("start") long start, + public RowGroupReadEntry(@JsonProperty("path") Path path, @JsonProperty("start") long start, @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex, @JsonProperty("numRecordsToRead") long numRecordsToRead) { super(path, start, length); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java index 17cf8c44d..ba4c49330 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -90,7 +90,7 @@ public class ParquetRecordReader extends AbstractRecordReader { public boolean useBulkReader; @SuppressWarnings("unused") - private String name; + private Path name; public ParquetReaderStats parquetReaderStats = new ParquetReaderStats(); private BatchReader batchReader; @@ -123,44 +123,42 @@ public class ParquetRecordReader extends AbstractRecordReader { } public ParquetRecordReader(FragmentContext fragmentContext, - String path, + Path path, int rowGroupIndex, long numRecordsToRead, FileSystem fs, CodecFactory codecFactory, ParquetMetadata footer, List<SchemaPath> columns, - ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) throws ExecutionSetupException { - this(fragmentContext, numRecordsToRead, - path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus); + ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) { + this(fragmentContext, numRecordsToRead, path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus); } public ParquetRecordReader(FragmentContext fragmentContext, - String path, + Path path, int rowGroupIndex, FileSystem fs, CodecFactory codecFactory, ParquetMetadata footer, List<SchemaPath> columns, - ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) - throws ExecutionSetupException { - this(fragmentContext, footer.getBlocks().get(rowGroupIndex).getRowCount(), - path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus); + ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) { + this(fragmentContext, footer.getBlocks().get(rowGroupIndex).getRowCount(), path, rowGroupIndex, fs, codecFactory, + footer, columns, dateCorruptionStatus); } public ParquetRecordReader( FragmentContext fragmentContext, long numRecordsToRead, - String path, + Path path, int rowGroupIndex, FileSystem fs, CodecFactory codecFactory, ParquetMetadata footer, List<SchemaPath> columns, - ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) throws ExecutionSetupException { + ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) { this.name = path; - this.hadoopPath = new Path(path); + this.hadoopPath = path; this.fileSystem = fs; this.codecFactory = codecFactory; this.rowGroupIndex = rowGroupIndex; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java index 0db007ab6..d0e2734d2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.module.afterburner.AfterburnerModule; +import org.apache.drill.exec.serialization.PathSerDe; import org.apache.drill.exec.store.parquet.ParquetReaderConfig; import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; @@ -82,6 +83,9 @@ import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetFi import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetTableMetadata_v3; import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.RowGroupMetadata_v3; +/** + * This is an utility class, holder for Parquet Table Metadata and {@link ParquetReaderConfig} + */ public class Metadata { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Metadata.class); @@ -106,7 +110,7 @@ public class Metadata { * @param path path * @param readerConfig parquet reader configuration */ - public static void createMeta(FileSystem fs, String path, ParquetReaderConfig readerConfig) throws IOException { + public static void createMeta(FileSystem fs, Path path, ParquetReaderConfig readerConfig) throws IOException { Metadata metadata = new Metadata(readerConfig); metadata.createMetaFilesRecursively(path, fs); } @@ -208,13 +212,13 @@ public class Metadata { * {@code path} directory). * @throws IOException if parquet metadata can't be serialized and written to the json file */ - private Pair<ParquetTableMetadata_v3, ParquetTableMetadataDirs> createMetaFilesRecursively(final String path, FileSystem fs) throws IOException { + private Pair<ParquetTableMetadata_v3, ParquetTableMetadataDirs> createMetaFilesRecursively(final Path path, FileSystem fs) throws IOException { Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; List<ParquetFileMetadata_v3> metaDataList = Lists.newArrayList(); - List<String> directoryList = Lists.newArrayList(); + List<Path> directoryList = Lists.newArrayList(); ConcurrentHashMap<ColumnTypeMetadata_v3.Key, ColumnTypeMetadata_v3> columnTypeInfoSet = new ConcurrentHashMap<>(); - Path p = new Path(path); + Path p = path; FileStatus fileStatus = fs.getFileStatus(p); assert fileStatus.isDirectory() : "Expected directory"; @@ -222,10 +226,10 @@ public class Metadata { for (final FileStatus file : DrillFileSystemUtil.listAll(fs, p, false)) { if (file.isDirectory()) { - ParquetTableMetadata_v3 subTableMetadata = (createMetaFilesRecursively(file.getPath().toString(), fs)).getLeft(); + ParquetTableMetadata_v3 subTableMetadata = (createMetaFilesRecursively(file.getPath(), fs)).getLeft(); metaDataList.addAll(subTableMetadata.files); directoryList.addAll(subTableMetadata.directories); - directoryList.add(file.getPath().toString()); + directoryList.add(file.getPath()); // Merge the schema from the child level into the current level //TODO: We need a merge method that merges two columns with the same name but different types columnTypeInfoSet.putAll(subTableMetadata.columnTypeInfo); @@ -268,7 +272,7 @@ public class Metadata { ParquetTableMetadataDirs parquetTableMetadataDirs = new ParquetTableMetadataDirs(directoryList); return Pair.of(parquetTableMetadata, parquetTableMetadataDirs); } - List<String> emptyDirList = Lists.newArrayList(); + List<Path> emptyDirList = new ArrayList<>(); if (timer != null) { logger.debug("Creating metadata files recursively took {} ms", timer.elapsed(TimeUnit.MILLISECONDS)); timer.stop(); @@ -495,7 +499,7 @@ public class Metadata { rowGroupMetadataList.add(rowGroupMeta); } - String path = Path.getPathWithoutSchemeAndAuthority(file.getPath()).toString(); + Path path = Path.getPathWithoutSchemeAndAuthority(file.getPath()); return new ParquetFileMetadata_v3(path, file.getLen(), rowGroupMetadataList); } @@ -535,6 +539,8 @@ public class Metadata { * * @param parquetTableMetadata parquet table metadata * @param p file path + * @param fs Drill file system + * @throws IOException if metadata can't be serialized */ private void writeFile(ParquetTableMetadata_v3 parquetTableMetadata, Path p, FileSystem fs) throws IOException { JsonFactory jsonFactory = new JsonFactory(); @@ -542,6 +548,7 @@ public class Metadata { jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false); ObjectMapper mapper = new ObjectMapper(jsonFactory); SimpleModule module = new SimpleModule(); + module.addSerializer(Path.class, new PathSerDe.Se()); module.addSerializer(ColumnMetadata_v3.class, new ColumnMetadata_v3.Serializer()); mapper.registerModule(module); OutputStream os = fs.create(p); @@ -556,6 +563,7 @@ public class Metadata { jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false); ObjectMapper mapper = new ObjectMapper(jsonFactory); SimpleModule module = new SimpleModule(); + module.addSerializer(Path.class, new PathSerDe.Se()); mapper.registerModule(module); OutputStream os = fs.create(p); mapper.writerWithDefaultPrettyPrinter().writeValue(os, parquetTableMetadataDirs); @@ -602,7 +610,7 @@ public class Metadata { parquetTableMetadataDirs.updateRelativePaths(metadataParentDirPath); if (!alreadyCheckedModification && tableModified(parquetTableMetadataDirs.getDirectories(), path, metadataParentDir, metaContext, fs)) { parquetTableMetadataDirs = - (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()).toString(), fs)).getRight(); + (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs)).getRight(); newMetadata = true; } } else { @@ -616,7 +624,7 @@ public class Metadata { } if (!alreadyCheckedModification && tableModified(parquetTableMetadata.getDirectories(), path, metadataParentDir, metaContext, fs)) { parquetTableMetadata = - (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()).toString(), fs)).getLeft(); + (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs)).getLeft(); newMetadata = true; } @@ -647,9 +655,10 @@ public class Metadata { * @return true if metadata needs to be updated, false otherwise * @throws IOException if some resources are not accessible */ - private boolean tableModified(List<String> directories, Path metaFilePath, Path parentDir, MetadataContext metaContext, FileSystem fs) throws IOException { + private boolean tableModified(List<Path> directories, Path metaFilePath, Path parentDir, + MetadataContext metaContext, FileSystem fs) throws IOException { Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; - metaContext.setStatus(parentDir.toUri().getPath()); + metaContext.setStatus(parentDir); long metaFileModifyTime = fs.getFileStatus(metaFilePath).getModificationTime(); FileStatus directoryStatus = fs.getFileStatus(parentDir); int numDirs = 1; @@ -661,10 +670,10 @@ public class Metadata { } return true; } - for (String directory : directories) { + for (Path directory : directories) { numDirs++; metaContext.setStatus(directory); - directoryStatus = fs.getFileStatus(new Path(directory)); + directoryStatus = fs.getFileStatus(directory); if (directoryStatus.getModificationTime() > metaFileModifyTime) { if (timer != null) { logger.debug("Directory {} was modified. Took {} ms to check modification time of {} directories", diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java index bed8be67a..ee07470d2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.store.parquet.metadata; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.hadoop.fs.Path; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; @@ -57,7 +58,7 @@ public class MetadataBase { public static abstract class ParquetTableMetadataBase { @JsonIgnore - public abstract List<String> getDirectories(); + public abstract List<Path> getDirectories(); @JsonIgnore public abstract List<? extends ParquetFileMetadata> getFiles(); @@ -83,7 +84,7 @@ public class MetadataBase { } public static abstract class ParquetFileMetadata { - @JsonIgnore public abstract String getPath(); + @JsonIgnore public abstract Path getPath(); @JsonIgnore public abstract Long getLength(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java index 3e7c2ffcc..2794e2b14 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java @@ -21,6 +21,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.drill.common.util.DrillVersionInfo; import org.apache.hadoop.fs.Path; +import java.util.ArrayList; import java.util.List; import static org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.SUPPORTED_VERSIONS; @@ -39,12 +40,12 @@ public class MetadataPathUtils { * @param baseDir base parent directory * @return list of absolute paths */ - public static List<String> convertToAbsolutePaths(List<String> paths, String baseDir) { + public static List<Path> convertToAbsolutePaths(List<Path> paths, String baseDir) { if (!paths.isEmpty()) { - List<String> absolutePaths = Lists.newArrayList(); - for (String relativePath : paths) { - String absolutePath = (new Path(relativePath).isAbsolute()) ? relativePath - : new Path(baseDir, relativePath).toUri().getPath(); + List<Path> absolutePaths = Lists.newArrayList(); + for (Path relativePath : paths) { + Path absolutePath = (relativePath.isAbsolute()) ? relativePath + : new Path(baseDir, relativePath); absolutePaths.add(absolutePath); } return absolutePaths; @@ -64,10 +65,10 @@ public class MetadataPathUtils { if (!files.isEmpty()) { List<ParquetFileMetadata_v3> filesWithAbsolutePaths = Lists.newArrayList(); for (ParquetFileMetadata_v3 file : files) { - Path relativePath = new Path(file.getPath()); + Path relativePath = file.getPath(); // create a new file if old one contains a relative path, otherwise use an old file ParquetFileMetadata_v3 fileWithAbsolutePath = (relativePath.isAbsolute()) ? file - : new ParquetFileMetadata_v3(new Path(baseDir, relativePath).toUri().getPath(), file.length, file.rowGroups); + : new ParquetFileMetadata_v3(new Path(baseDir, relativePath), file.length, file.rowGroups); filesWithAbsolutePaths.add(fileWithAbsolutePath); } return filesWithAbsolutePaths; @@ -84,9 +85,9 @@ public class MetadataPathUtils { * @return parquet table metadata with relative paths for the files and directories */ public static ParquetTableMetadata_v3 createMetadataWithRelativePaths( - ParquetTableMetadata_v3 tableMetadataWithAbsolutePaths, String baseDir) { - List<String> directoriesWithRelativePaths = Lists.newArrayList(); - for (String directory : tableMetadataWithAbsolutePaths.getDirectories()) { + ParquetTableMetadata_v3 tableMetadataWithAbsolutePaths, Path baseDir) { + List<Path> directoriesWithRelativePaths = new ArrayList<>(); + for (Path directory : tableMetadataWithAbsolutePaths.getDirectories()) { directoriesWithRelativePaths.add(relativize(baseDir, directory)); } List<ParquetFileMetadata_v3> filesWithRelativePaths = Lists.newArrayList(); @@ -105,9 +106,9 @@ public class MetadataPathUtils { * @param baseDir base path (the part of the Path, which should be cut off from child path) * @return relative path */ - public static String relativize(String baseDir, String childPath) { - Path fullPathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(new Path(childPath)); - Path basePathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(new Path(baseDir)); + public static Path relativize(Path baseDir, Path childPath) { + Path fullPathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(childPath); + Path basePathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(baseDir); // Since hadoop Path hasn't relativize() we use uri.relativize() to get relative path Path relativeFilePath = new Path(basePathWithoutSchemeAndAuthority.toUri() @@ -116,7 +117,7 @@ public class MetadataPathUtils { throw new IllegalStateException(String.format("Path %s is not a subpath of %s.", basePathWithoutSchemeAndAuthority.toUri().getPath(), fullPathWithoutSchemeAndAuthority.toUri().getPath())); } - return relativeFilePath.toUri().getPath(); + return relativeFilePath; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V1.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V1.java index 92feb5f8a..4b0dca803 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V1.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V1.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import org.apache.drill.common.expression.SchemaPath; +import org.apache.hadoop.fs.Path; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; @@ -43,19 +44,19 @@ public class Metadata_V1 { @JsonProperty(value = "metadata_version", access = JsonProperty.Access.WRITE_ONLY) private String metadataVersion; @JsonProperty List<ParquetFileMetadata_v1> files; - @JsonProperty List<String> directories; + @JsonProperty List<Path> directories; public ParquetTableMetadata_v1() { } - public ParquetTableMetadata_v1(String metadataVersion, List<ParquetFileMetadata_v1> files, List<String> directories) { + public ParquetTableMetadata_v1(String metadataVersion, List<ParquetFileMetadata_v1> files, List<Path> directories) { this.metadataVersion = metadataVersion; this.files = files; this.directories = directories; } @JsonIgnore - @Override public List<String> getDirectories() { + @Override public List<Path> getDirectories() { return directories; } @@ -114,7 +115,7 @@ public class Metadata_V1 { */ public static class ParquetFileMetadata_v1 extends ParquetFileMetadata { @JsonProperty - public String path; + public Path path; @JsonProperty public Long length; @JsonProperty @@ -123,7 +124,7 @@ public class Metadata_V1 { public ParquetFileMetadata_v1() { } - public ParquetFileMetadata_v1(String path, Long length, List<RowGroupMetadata_v1> rowGroups) { + public ParquetFileMetadata_v1(Path path, Long length, List<RowGroupMetadata_v1> rowGroups) { this.path = path; this.length = length; this.rowGroups = rowGroups; @@ -134,7 +135,7 @@ public class Metadata_V1 { return String.format("path: %s rowGroups: %s", path, rowGroups); } - @JsonIgnore @Override public String getPath() { + @JsonIgnore @Override public Path getPath() { return path; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V2.java index 7eddc1279..a78fca4bf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V2.java @@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.JsonDeserializer; import com.fasterxml.jackson.databind.JsonSerializer; import com.fasterxml.jackson.databind.KeyDeserializer; import com.fasterxml.jackson.databind.SerializerProvider; +import org.apache.hadoop.fs.Path; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; @@ -59,7 +60,7 @@ public class Metadata_V2 { @JsonProperty public ConcurrentHashMap<ColumnTypeMetadata_v2.Key, ColumnTypeMetadata_v2> columnTypeInfo; @JsonProperty List<ParquetFileMetadata_v2> files; - @JsonProperty List<String> directories; + @JsonProperty List<Path> directories; @JsonProperty String drillVersion; public ParquetTableMetadata_v2() { @@ -71,7 +72,7 @@ public class Metadata_V2 { } public ParquetTableMetadata_v2(String metadataVersion, ParquetTableMetadataBase parquetTable, - List<ParquetFileMetadata_v2> files, List<String> directories, String drillVersion) { + List<ParquetFileMetadata_v2> files, List<Path> directories, String drillVersion) { this.metadataVersion = metadataVersion; this.files = files; this.directories = directories; @@ -79,7 +80,7 @@ public class Metadata_V2 { this.drillVersion = drillVersion; } - public ParquetTableMetadata_v2(String metadataVersion, List<ParquetFileMetadata_v2> files, List<String> directories, + public ParquetTableMetadata_v2(String metadataVersion, List<ParquetFileMetadata_v2> files, List<Path> directories, ConcurrentHashMap<ColumnTypeMetadata_v2.Key, ColumnTypeMetadata_v2> columnTypeInfo, String drillVersion) { this.metadataVersion = metadataVersion; this.files = files; @@ -93,7 +94,7 @@ public class Metadata_V2 { } @JsonIgnore - @Override public List<String> getDirectories() { + @Override public List<Path> getDirectories() { return directories; } @@ -152,14 +153,14 @@ public class Metadata_V2 { * Struct which contains the metadata for a single parquet file */ public static class ParquetFileMetadata_v2 extends ParquetFileMetadata { - @JsonProperty public String path; + @JsonProperty public Path path; @JsonProperty public Long length; @JsonProperty public List<RowGroupMetadata_v2> rowGroups; public ParquetFileMetadata_v2() { } - public ParquetFileMetadata_v2(String path, Long length, List<RowGroupMetadata_v2> rowGroups) { + public ParquetFileMetadata_v2(Path path, Long length, List<RowGroupMetadata_v2> rowGroups) { this.path = path; this.length = length; this.rowGroups = rowGroups; @@ -169,7 +170,7 @@ public class Metadata_V2 { return String.format("path: %s rowGroups: %s", path, rowGroups); } - @JsonIgnore @Override public String getPath() { + @JsonIgnore @Override public Path getPath() { return path; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V3.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V3.java index 4bb07f78f..a5ff89795 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V3.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V3.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.JsonSerializer; import com.fasterxml.jackson.databind.KeyDeserializer; import com.fasterxml.jackson.databind.SerializerProvider; import org.apache.drill.common.expression.SchemaPath; +import org.apache.hadoop.fs.Path; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; @@ -54,7 +55,7 @@ public class Metadata_V3 { @JsonProperty public ConcurrentHashMap<ColumnTypeMetadata_v3.Key, ColumnTypeMetadata_v3> columnTypeInfo; @JsonProperty List<ParquetFileMetadata_v3> files; - @JsonProperty List<String> directories; + @JsonProperty List<Path> directories; @JsonProperty String drillVersion; /** @@ -74,7 +75,7 @@ public class Metadata_V3 { } public ParquetTableMetadata_v3(String metadataVersion, ParquetTableMetadataBase parquetTable, - List<ParquetFileMetadata_v3> files, List<String> directories, String drillVersion) { + List<ParquetFileMetadata_v3> files, List<Path> directories, String drillVersion) { this.metadataVersion = metadataVersion; this.files = files; this.directories = directories; @@ -82,7 +83,7 @@ public class Metadata_V3 { this.drillVersion = drillVersion; } - public ParquetTableMetadata_v3(String metadataVersion, List<ParquetFileMetadata_v3> files, List<String> directories, + public ParquetTableMetadata_v3(String metadataVersion, List<ParquetFileMetadata_v3> files, List<Path> directories, ConcurrentHashMap<ColumnTypeMetadata_v3.Key, ColumnTypeMetadata_v3> columnTypeInfo, String drillVersion) { this.metadataVersion = metadataVersion; @@ -97,7 +98,7 @@ public class Metadata_V3 { } @JsonIgnore - @Override public List<String> getDirectories() { + @Override public List<Path> getDirectories() { return directories; } @@ -168,14 +169,14 @@ public class Metadata_V3 { * Struct which contains the metadata for a single parquet file */ public static class ParquetFileMetadata_v3 extends ParquetFileMetadata { - @JsonProperty public String path; + @JsonProperty public Path path; @JsonProperty public Long length; @JsonProperty public List<RowGroupMetadata_v3> rowGroups; public ParquetFileMetadata_v3() { } - public ParquetFileMetadata_v3(String path, Long length, List<RowGroupMetadata_v3> rowGroups) { + public ParquetFileMetadata_v3(Path path, Long length, List<RowGroupMetadata_v3> rowGroups) { this.path = path; this.length = length; this.rowGroups = rowGroups; @@ -185,7 +186,7 @@ public class Metadata_V3 { return String.format("path: %s rowGroups: %s", path, rowGroups); } - @JsonIgnore @Override public String getPath() { + @JsonIgnore @Override public Path getPath() { return path; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/ParquetTableMetadataDirs.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/ParquetTableMetadataDirs.java index 186f53415..b1fd7f23e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/ParquetTableMetadataDirs.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/ParquetTableMetadataDirs.java @@ -19,24 +19,25 @@ package org.apache.drill.exec.store.parquet.metadata; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.hadoop.fs.Path; import java.util.List; public class ParquetTableMetadataDirs { @JsonProperty - List<String> directories; + List<Path> directories; public ParquetTableMetadataDirs() { // default constructor needed for deserialization } - public ParquetTableMetadataDirs(List<String> directories) { + public ParquetTableMetadataDirs(List<Path> directories) { this.directories = directories; } @JsonIgnore - public List<String> getDirectories() { + public List<Path> getDirectories() { return directories; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java index 09c016a5f..5c49e040a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java @@ -235,7 +235,7 @@ public class DrillParquetReader extends AbstractRecordReader { paths.put(md.getPath(), md); } - Path filePath = new Path(entry.getPath()); + Path filePath = entry.getPath(); BlockMetaData blockMetaData = footer.getBlocks().get(entry.getRowGroupIndex()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java index d688f3b07..aef56a307 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java @@ -92,11 +92,11 @@ public class PcapRecordReader extends AbstractRecordReader { .build(); } - public PcapRecordReader(final String pathToFile, + public PcapRecordReader(final Path pathToFile, final FileSystem fileSystem, final List<SchemaPath> projectedColumns) { this.fs = fileSystem; - this.pathToFile = fs.makeQualified(new Path(pathToFile)); + this.pathToFile = fs.makeQualified(pathToFile); this.projectedColumns = projectedColumns; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java index b1c5f2427..0ad234de7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java @@ -65,11 +65,11 @@ public class PcapngRecordReader extends AbstractRecordReader { private Iterator<IPcapngType> it; - public PcapngRecordReader(final String pathToFile, + public PcapngRecordReader(final Path pathToFile, final FileSystem fileSystem, final List<SchemaPath> columns) { this.fs = fileSystem; - this.pathToFile = fs.makeQualified(new Path(pathToFile)); + this.pathToFile = fs.makeQualified(pathToFile); this.columns = columns; setColumns(columns); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java index 7de31a063..6bc7bb02a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java @@ -116,8 +116,8 @@ public class BlockMapBuilder { try { ImmutableRangeMap<Long, BlockLocation> rangeMap = getBlockMap(status); for (Entry<Range<Long>, BlockLocation> l : rangeMap.asMapOfRanges().entrySet()) { - work.add(new CompleteFileWork(getEndpointByteMap(noDrillbitHosts, new FileStatusWork(status)), l.getValue().getOffset(), l.getValue().getLength(), status.getPath() - .toString())); + work.add(new CompleteFileWork(getEndpointByteMap(noDrillbitHosts, new FileStatusWork(status)), + l.getValue().getOffset(), l.getValue().getLength(), status.getPath())); } } catch (IOException e) { logger.warn("failure while generating file work.", e); @@ -127,7 +127,8 @@ public class BlockMapBuilder { if (!blockify || error || compressed(status)) { - work.add(new CompleteFileWork(getEndpointByteMap(noDrillbitHosts, new FileStatusWork(status)), 0, status.getLen(), status.getPath().toString())); + work.add(new CompleteFileWork(getEndpointByteMap(noDrillbitHosts, new FileStatusWork(status)), 0, + status.getLen(), status.getPath())); } // This if-condition is specific for empty CSV file @@ -135,7 +136,8 @@ public class BlockMapBuilder { // And if this CSV file is empty, rangeMap would be empty also // Therefore, at the point before this if-condition, work would not be populated if(work.isEmpty()) { - work.add(new CompleteFileWork(getEndpointByteMap(noDrillbitHosts, new FileStatusWork(status)), 0, 0, status.getPath().toString())); + work.add(new CompleteFileWork(getEndpointByteMap(noDrillbitHosts, new FileStatusWork(status)), 0, 0, + status.getPath())); } if (noDrillbitHosts != null) { @@ -162,8 +164,8 @@ public class BlockMapBuilder { } @Override - public String getPath() { - return status.getPath().toString(); + public Path getPath() { + return status.getPath(); } @Override @@ -231,20 +233,20 @@ public class BlockMapBuilder { */ public EndpointByteMap getEndpointByteMap(Set<String> noDrillbitHosts, FileWork work) throws IOException { Stopwatch watch = Stopwatch.createStarted(); - Path fileName = new Path(work.getPath()); + Path fileName = work.getPath(); - ImmutableRangeMap<Long,BlockLocation> blockMap = getBlockMap(fileName); + ImmutableRangeMap<Long, BlockLocation> blockMap = getBlockMap(fileName); EndpointByteMapImpl endpointByteMap = new EndpointByteMapImpl(); long start = work.getStart(); long end = start + work.getLength(); Range<Long> rowGroupRange = Range.closedOpen(start, end); // Find submap of ranges that intersect with the rowGroup - ImmutableRangeMap<Long,BlockLocation> subRangeMap = blockMap.subRangeMap(rowGroupRange); + ImmutableRangeMap<Long, BlockLocation> subRangeMap = blockMap.subRangeMap(rowGroupRange); // Iterate through each block in this submap and get the host for the block location - for (Map.Entry<Range<Long>,BlockLocation> block : subRangeMap.asMapOfRanges().entrySet()) { + for (Map.Entry<Range<Long>, BlockLocation> block : subRangeMap.asMapOfRanges().entrySet()) { String[] hosts; Range<Long> blockRange = block.getKey(); try { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java index 04c4eb0db..4b0402bd8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java @@ -21,16 +21,17 @@ import org.apache.drill.exec.store.dfs.easy.FileWork; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.hadoop.fs.Path; public class CompleteFileWork implements FileWork, CompleteWork { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CompleteFileWork.class); private long start; private long length; - private String path; + private Path path; private EndpointByteMap byteMap; - public CompleteFileWork(EndpointByteMap byteMap, long start, long length, String path) { + public CompleteFileWork(EndpointByteMap byteMap, long start, long length, Path path) { super(); this.start = start; this.length = length; @@ -69,7 +70,7 @@ public class CompleteFileWork implements FileWork, CompleteWork { } @Override - public String getPath() { + public Path getPath() { return path; } @@ -87,22 +88,28 @@ public class CompleteFileWork implements FileWork, CompleteWork { return new FileWorkImpl(start, length, path); } - public static class FileWorkImpl implements FileWork{ + @Override + public String toString() { + return String.format("File: %s start: %d length: %d", path, start, length); + } + + public static class FileWorkImpl implements FileWork { + + private long start; + private long length; + private Path path; @JsonCreator - public FileWorkImpl(@JsonProperty("start") long start, @JsonProperty("length") long length, @JsonProperty("path") String path) { - super(); + public FileWorkImpl(@JsonProperty("start") long start, + @JsonProperty("length") long length, + @JsonProperty("path") Path path) { this.start = start; this.length = length; this.path = path; } - public long start; - public long length; - public String path; - @Override - public String getPath() { + public Path getPath() { return path; } @@ -116,10 +123,13 @@ public class CompleteFileWork implements FileWork, CompleteWork { return length; } - } - - @Override - public String toString() { - return String.format("File: %s start: %d length: %d", path, start, length); + @Override + public String toString() { + return "FileWorkImpl{" + + "start=" + start + + ", length=" + length + + ", path=" + path + + '}'; + } } } |