diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/store')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java | 78 |
1 files changed, 45 insertions, 33 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java index d0e2734d2..18cbb6627 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.module.afterburner.AfterburnerModule; import org.apache.drill.exec.serialization.PathSerDe; +import java.util.Set; import org.apache.drill.exec.store.parquet.ParquetReaderConfig; import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; @@ -105,14 +106,15 @@ public class Metadata { /** * Create the parquet metadata file for the directory at the given path, and for any subdirectories. - * * @param fs file system * @param path path * @param readerConfig parquet reader configuration + * @param allColumns if set, store column metadata for all the columns + * @param columnSet Set of columns for which column metadata has to be stored */ - public static void createMeta(FileSystem fs, Path path, ParquetReaderConfig readerConfig) throws IOException { + public static void createMeta(FileSystem fs, Path path, ParquetReaderConfig readerConfig, boolean allColumns, Set<String> columnSet) throws IOException { Metadata metadata = new Metadata(readerConfig); - metadata.createMetaFilesRecursively(path, fs); + metadata.createMetaFilesRecursively(path, fs, allColumns, columnSet); } /** @@ -207,12 +209,14 @@ public class Metadata { * * @param path to the directory of the parquet table * @param fs file system + * @param allColumns if set, store column metadata for all the columns + * @param columnSet Set of columns for which column metadata has to be stored * @return Pair of parquet metadata. The left one is a parquet metadata for the table. The right one of the Pair is * a metadata for all subdirectories (if they are present and there are no any parquet files in the * {@code path} directory). * @throws IOException if parquet metadata can't be serialized and written to the json file */ - private Pair<ParquetTableMetadata_v3, ParquetTableMetadataDirs> createMetaFilesRecursively(final Path path, FileSystem fs) throws IOException { + private Pair<ParquetTableMetadata_v3, ParquetTableMetadataDirs> createMetaFilesRecursively(final Path path, FileSystem fs, boolean allColumns, Set<String> columnSet) throws IOException { Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; List<ParquetFileMetadata_v3> metaDataList = Lists.newArrayList(); List<Path> directoryList = Lists.newArrayList(); @@ -226,7 +230,7 @@ public class Metadata { for (final FileStatus file : DrillFileSystemUtil.listAll(fs, p, false)) { if (file.isDirectory()) { - ParquetTableMetadata_v3 subTableMetadata = (createMetaFilesRecursively(file.getPath(), fs)).getLeft(); + ParquetTableMetadata_v3 subTableMetadata = (createMetaFilesRecursively(file.getPath(), fs, allColumns, columnSet)).getLeft(); metaDataList.addAll(subTableMetadata.files); directoryList.addAll(subTableMetadata.directories); directoryList.add(file.getPath()); @@ -240,7 +244,7 @@ public class Metadata { ParquetTableMetadata_v3 parquetTableMetadata = new ParquetTableMetadata_v3(SUPPORTED_VERSIONS.last().toString(), DrillVersionInfo.getVersion()); if (childFiles.size() > 0) { - List<ParquetFileMetadata_v3 > childFilesMetadata = getParquetFileMetadata_v3(parquetTableMetadata, childFiles); + List<ParquetFileMetadata_v3 > childFilesMetadata = getParquetFileMetadata_v3(parquetTableMetadata, childFiles, allColumns, columnSet); metaDataList.addAll(childFilesMetadata); // Note that we do not need to merge the columnInfo at this point. The columnInfo is already added // to the parquetTableMetadata. @@ -330,7 +334,7 @@ public class Metadata { throws IOException { ParquetTableMetadata_v3 tableMetadata = new ParquetTableMetadata_v3(SUPPORTED_VERSIONS.last().toString(), DrillVersionInfo.getVersion()); - tableMetadata.files = getParquetFileMetadata_v3(tableMetadata, fileStatusMap); + tableMetadata.files = getParquetFileMetadata_v3(tableMetadata, fileStatusMap, true, null); tableMetadata.directories = new ArrayList<>(); return tableMetadata; } @@ -341,14 +345,15 @@ public class Metadata { * @param parquetTableMetadata_v3 can store column schema info from all the files and row groups * @param fileStatusMap parquet files statuses and corresponding file systems * + * @param allColumns if set, store column metadata for all the columns + * @param columnSet Set of columns for which column metadata has to be stored * @return list of the parquet file metadata with absolute paths * @throws IOException is thrown in case of issues while executing the list of runnables */ - private List<ParquetFileMetadata_v3> getParquetFileMetadata_v3( - ParquetTableMetadata_v3 parquetTableMetadata_v3, Map<FileStatus, FileSystem> fileStatusMap) throws IOException { + private List<ParquetFileMetadata_v3> getParquetFileMetadata_v3(ParquetTableMetadata_v3 parquetTableMetadata_v3, Map<FileStatus, FileSystem> fileStatusMap, boolean allColumns, Set<String> columnSet) throws IOException { return TimedCallable.run("Fetch parquet metadata", logger, Collectors.toList(fileStatusMap, - (fileStatus, fileSystem) -> new MetadataGatherer(parquetTableMetadata_v3, fileStatus, fileSystem)), + (fileStatus, fileSystem) -> new MetadataGatherer(parquetTableMetadata_v3, fileStatus, fileSystem, allColumns, columnSet)), 16 ); } @@ -361,16 +366,20 @@ public class Metadata { private final ParquetTableMetadata_v3 parquetTableMetadata; private final FileStatus fileStatus; private final FileSystem fs; + private final boolean allColumns; + private final Set<String> columnSet; - MetadataGatherer(ParquetTableMetadata_v3 parquetTableMetadata, FileStatus fileStatus, FileSystem fs) { + MetadataGatherer(ParquetTableMetadata_v3 parquetTableMetadata, FileStatus fileStatus, FileSystem fs, boolean allColumns, Set<String> columnSet) { this.parquetTableMetadata = parquetTableMetadata; this.fileStatus = fileStatus; this.fs = fs; + this.allColumns = allColumns; + this.columnSet = columnSet; } @Override protected ParquetFileMetadata_v3 runInner() throws Exception { - return getParquetFileMetadata_v3(parquetTableMetadata, fileStatus, fs); + return getParquetFileMetadata_v3(parquetTableMetadata, fileStatus, fs, allColumns, columnSet); } public String toString() { @@ -417,7 +426,7 @@ public class Metadata { * Get the metadata for a single file */ private ParquetFileMetadata_v3 getParquetFileMetadata_v3(ParquetTableMetadata_v3 parquetTableMetadata, - final FileStatus file, final FileSystem fs) throws IOException, InterruptedException { + final FileStatus file, final FileSystem fs, boolean allColumns, Set<String> columnSet) throws IOException, InterruptedException { final ParquetMetadata metadata; final UserGroupInformation processUserUgi = ImpersonationUtil.getProcessUserUGI(); final Configuration conf = new Configuration(fs.getConf()); @@ -453,7 +462,6 @@ public class Metadata { List<ColumnMetadata_v3> columnMetadataList = new ArrayList<>(); long length = 0; for (ColumnChunkMetaData col : rowGroup.getColumns()) { - Statistics<?> stats = col.getStatistics(); String[] columnName = col.getPath().toArray(); SchemaPath columnSchemaName = SchemaPath.getCompoundPath(columnName); ColTypeInfo colTypeInfo = colTypeInfoMap.get(columnSchemaName); @@ -466,25 +474,28 @@ public class Metadata { parquetTableMetadata.columnTypeInfo = new ConcurrentHashMap<>(); } parquetTableMetadata.columnTypeInfo.put(new ColumnTypeMetadata_v3.Key(columnTypeMetadata.name), columnTypeMetadata); - - // Save the column schema info. We'll merge it into one list - Object minValue = null; - Object maxValue = null; - long numNulls = -1; - boolean statsAvailable = stats != null && !stats.isEmpty(); - if (statsAvailable) { - if (stats.hasNonNullValue()) { - minValue = stats.genericGetMin(); - maxValue = stats.genericGetMax(); - if (containsCorruptDates == ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_CORRUPTION && columnTypeMetadata.originalType == OriginalType.DATE) { - minValue = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) minValue); - maxValue = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) maxValue); + // Store column metadata only if allColumns is set to true or if the column belongs to the subset of columns specified in the refresh command + if (allColumns || columnSet == null || !allColumns && columnSet != null && columnSet.size() > 0 && columnSet.contains(columnSchemaName.getRootSegmentPath())) { + Statistics<?> stats = col.getStatistics(); + // Save the column schema info. We'll merge it into one list + Object minValue = null; + Object maxValue = null; + long numNulls = -1; + boolean statsAvailable = stats != null && !stats.isEmpty(); + if (statsAvailable) { + if (stats.hasNonNullValue()) { + minValue = stats.genericGetMin(); + maxValue = stats.genericGetMax(); + if (containsCorruptDates == ParquetReaderUtility.DateCorruptionStatus.META_SHOWS_CORRUPTION && columnTypeMetadata.originalType == OriginalType.DATE) { + minValue = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) minValue); + maxValue = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) maxValue); + } } + numNulls = stats.getNumNulls(); } - numNulls = stats.getNumNulls(); + ColumnMetadata_v3 columnMetadata = new ColumnMetadata_v3(columnTypeMetadata.name, col.getPrimitiveType().getPrimitiveTypeName(), minValue, maxValue, numNulls); + columnMetadataList.add(columnMetadata); } - ColumnMetadata_v3 columnMetadata = new ColumnMetadata_v3(columnTypeMetadata.name, col.getPrimitiveType().getPrimitiveTypeName(), minValue, maxValue, numNulls); - columnMetadataList.add(columnMetadata); length += col.getTotalSize(); } @@ -610,7 +621,7 @@ public class Metadata { parquetTableMetadataDirs.updateRelativePaths(metadataParentDirPath); if (!alreadyCheckedModification && tableModified(parquetTableMetadataDirs.getDirectories(), path, metadataParentDir, metaContext, fs)) { parquetTableMetadataDirs = - (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs)).getRight(); + (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs, true, null)).getRight(); newMetadata = true; } } else { @@ -622,9 +633,10 @@ public class Metadata { if (new MetadataVersion(parquetTableMetadata.getMetadataVersion()).compareTo(new MetadataVersion(3, 0)) >= 0) { ((ParquetTableMetadata_v3) parquetTableMetadata).updateRelativePaths(metadataParentDirPath); } - if (!alreadyCheckedModification && tableModified(parquetTableMetadata.getDirectories(), path, metadataParentDir, metaContext, fs)) { + if (!alreadyCheckedModification && tableModified(parquetTableMetadata.getDirectories(), path, metadataParentDir, metaContext, fs)) { + // TODO change with current columns in existing metadata (auto refresh feature) parquetTableMetadata = - (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs)).getLeft(); + (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs, true, null)).getLeft(); newMetadata = true; } |