diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java | 37 |
1 files changed, 23 insertions, 14 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java index 0db007ab6..d0e2734d2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.module.afterburner.AfterburnerModule; +import org.apache.drill.exec.serialization.PathSerDe; import org.apache.drill.exec.store.parquet.ParquetReaderConfig; import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; @@ -82,6 +83,9 @@ import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetFi import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetTableMetadata_v3; import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.RowGroupMetadata_v3; +/** + * This is an utility class, holder for Parquet Table Metadata and {@link ParquetReaderConfig} + */ public class Metadata { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Metadata.class); @@ -106,7 +110,7 @@ public class Metadata { * @param path path * @param readerConfig parquet reader configuration */ - public static void createMeta(FileSystem fs, String path, ParquetReaderConfig readerConfig) throws IOException { + public static void createMeta(FileSystem fs, Path path, ParquetReaderConfig readerConfig) throws IOException { Metadata metadata = new Metadata(readerConfig); metadata.createMetaFilesRecursively(path, fs); } @@ -208,13 +212,13 @@ public class Metadata { * {@code path} directory). * @throws IOException if parquet metadata can't be serialized and written to the json file */ - private Pair<ParquetTableMetadata_v3, ParquetTableMetadataDirs> createMetaFilesRecursively(final String path, FileSystem fs) throws IOException { + private Pair<ParquetTableMetadata_v3, ParquetTableMetadataDirs> createMetaFilesRecursively(final Path path, FileSystem fs) throws IOException { Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; List<ParquetFileMetadata_v3> metaDataList = Lists.newArrayList(); - List<String> directoryList = Lists.newArrayList(); + List<Path> directoryList = Lists.newArrayList(); ConcurrentHashMap<ColumnTypeMetadata_v3.Key, ColumnTypeMetadata_v3> columnTypeInfoSet = new ConcurrentHashMap<>(); - Path p = new Path(path); + Path p = path; FileStatus fileStatus = fs.getFileStatus(p); assert fileStatus.isDirectory() : "Expected directory"; @@ -222,10 +226,10 @@ public class Metadata { for (final FileStatus file : DrillFileSystemUtil.listAll(fs, p, false)) { if (file.isDirectory()) { - ParquetTableMetadata_v3 subTableMetadata = (createMetaFilesRecursively(file.getPath().toString(), fs)).getLeft(); + ParquetTableMetadata_v3 subTableMetadata = (createMetaFilesRecursively(file.getPath(), fs)).getLeft(); metaDataList.addAll(subTableMetadata.files); directoryList.addAll(subTableMetadata.directories); - directoryList.add(file.getPath().toString()); + directoryList.add(file.getPath()); // Merge the schema from the child level into the current level //TODO: We need a merge method that merges two columns with the same name but different types columnTypeInfoSet.putAll(subTableMetadata.columnTypeInfo); @@ -268,7 +272,7 @@ public class Metadata { ParquetTableMetadataDirs parquetTableMetadataDirs = new ParquetTableMetadataDirs(directoryList); return Pair.of(parquetTableMetadata, parquetTableMetadataDirs); } - List<String> emptyDirList = Lists.newArrayList(); + List<Path> emptyDirList = new ArrayList<>(); if (timer != null) { logger.debug("Creating metadata files recursively took {} ms", timer.elapsed(TimeUnit.MILLISECONDS)); timer.stop(); @@ -495,7 +499,7 @@ public class Metadata { rowGroupMetadataList.add(rowGroupMeta); } - String path = Path.getPathWithoutSchemeAndAuthority(file.getPath()).toString(); + Path path = Path.getPathWithoutSchemeAndAuthority(file.getPath()); return new ParquetFileMetadata_v3(path, file.getLen(), rowGroupMetadataList); } @@ -535,6 +539,8 @@ public class Metadata { * * @param parquetTableMetadata parquet table metadata * @param p file path + * @param fs Drill file system + * @throws IOException if metadata can't be serialized */ private void writeFile(ParquetTableMetadata_v3 parquetTableMetadata, Path p, FileSystem fs) throws IOException { JsonFactory jsonFactory = new JsonFactory(); @@ -542,6 +548,7 @@ public class Metadata { jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false); ObjectMapper mapper = new ObjectMapper(jsonFactory); SimpleModule module = new SimpleModule(); + module.addSerializer(Path.class, new PathSerDe.Se()); module.addSerializer(ColumnMetadata_v3.class, new ColumnMetadata_v3.Serializer()); mapper.registerModule(module); OutputStream os = fs.create(p); @@ -556,6 +563,7 @@ public class Metadata { jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false); ObjectMapper mapper = new ObjectMapper(jsonFactory); SimpleModule module = new SimpleModule(); + module.addSerializer(Path.class, new PathSerDe.Se()); mapper.registerModule(module); OutputStream os = fs.create(p); mapper.writerWithDefaultPrettyPrinter().writeValue(os, parquetTableMetadataDirs); @@ -602,7 +610,7 @@ public class Metadata { parquetTableMetadataDirs.updateRelativePaths(metadataParentDirPath); if (!alreadyCheckedModification && tableModified(parquetTableMetadataDirs.getDirectories(), path, metadataParentDir, metaContext, fs)) { parquetTableMetadataDirs = - (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()).toString(), fs)).getRight(); + (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs)).getRight(); newMetadata = true; } } else { @@ -616,7 +624,7 @@ public class Metadata { } if (!alreadyCheckedModification && tableModified(parquetTableMetadata.getDirectories(), path, metadataParentDir, metaContext, fs)) { parquetTableMetadata = - (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()).toString(), fs)).getLeft(); + (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs)).getLeft(); newMetadata = true; } @@ -647,9 +655,10 @@ public class Metadata { * @return true if metadata needs to be updated, false otherwise * @throws IOException if some resources are not accessible */ - private boolean tableModified(List<String> directories, Path metaFilePath, Path parentDir, MetadataContext metaContext, FileSystem fs) throws IOException { + private boolean tableModified(List<Path> directories, Path metaFilePath, Path parentDir, + MetadataContext metaContext, FileSystem fs) throws IOException { Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; - metaContext.setStatus(parentDir.toUri().getPath()); + metaContext.setStatus(parentDir); long metaFileModifyTime = fs.getFileStatus(metaFilePath).getModificationTime(); FileStatus directoryStatus = fs.getFileStatus(parentDir); int numDirs = 1; @@ -661,10 +670,10 @@ public class Metadata { } return true; } - for (String directory : directories) { + for (Path directory : directories) { numDirs++; metaContext.setStatus(directory); - directoryStatus = fs.getFileStatus(new Path(directory)); + directoryStatus = fs.getFileStatus(directory); if (directoryStatus.getModificationTime() > metaFileModifyTime) { if (timer != null) { logger.debug("Directory {} was modified. Took {} ms to check modification time of {} directories", |