diff options
author | Steven Phillips <smp@apache.org> | 2015-08-18 13:24:33 -0700 |
---|---|---|
committer | adeneche <adeneche@gmail.com> | 2015-09-18 16:19:13 -0700 |
commit | 0ee609581423b9649391be02013da86fe9b0e2f2 (patch) | |
tree | fd72a9db0f32318a8f4b2b1181caf75fa5329995 /exec/java-exec/src/main/java/org/apache/drill | |
parent | 3f5ebafcaa8fc0ed08d5964081e4c22a6906d46b (diff) |
DRILL-2743: Parquet file metadata caching
rebasing on top of master required conflict resolution in Parser.tdd and parserImpls.ftl
this closes #114
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill')
7 files changed, 896 insertions, 201 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java new file mode 100644 index 000000000..ce4059b08 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.sql.handlers; + +import java.io.IOException; +import java.util.List; + + +import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.Table; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.tools.RelConversionException; +import org.apache.calcite.tools.ValidationException; +import org.apache.drill.common.logical.FormatPluginConfig; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.planner.logical.DrillRel; +import org.apache.drill.exec.planner.logical.DrillScreenRel; +import org.apache.drill.exec.planner.logical.DrillStoreRel; +import org.apache.drill.exec.planner.logical.DrillTable; +import org.apache.drill.exec.planner.logical.DrillWriterRel; +import org.apache.drill.exec.planner.physical.Prel; +import org.apache.drill.exec.planner.sql.DirectPlan; +import org.apache.drill.exec.planner.sql.DrillSqlWorker; +import org.apache.drill.exec.planner.sql.parser.SqlRefreshMetadata; +import org.apache.drill.exec.store.AbstractSchema; +import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.dfs.FileSystemPlugin; +import org.apache.drill.exec.store.dfs.FormatSelection; +import org.apache.drill.exec.store.dfs.NamedFormatPluginConfig; +import org.apache.drill.exec.store.parquet.Metadata; +import org.apache.drill.exec.store.parquet.ParquetFormatConfig; +import org.apache.drill.exec.work.foreman.ForemanSetupException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; + +import static org.apache.drill.exec.planner.sql.SchemaUtilites.findSchema; + +public class RefreshMetadataHandler extends DefaultSqlHandler { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RefreshMetadataHandler.class); + + public RefreshMetadataHandler(SqlHandlerConfig config) { + super(config); + } + + private PhysicalPlan direct(boolean outcome, String message, Object... values){ + return DirectPlan.createDirectPlan(context, outcome, String.format(message, values)); + } + + private PhysicalPlan notSupported(String tbl){ + return direct(false, "Table %s does not support metadata refresh. Support is currently limited to single-directory-based Parquet tables.", tbl); + } + + @Override + public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException, ForemanSetupException { + final SqlRefreshMetadata refreshTable = unwrap(sqlNode, SqlRefreshMetadata.class); + + try { + + final SchemaPlus schema = findSchema(context.getNewDefaultSchema(), + refreshTable.getSchemaPath()); + + final String tableName = refreshTable.getName(); + + if (tableName.contains("*") || tableName.contains("?")) { + return direct(false, "Glob path %s not supported for metadata refresh", tableName); + } + + final Table table = schema.getTable(tableName); + + if(table == null){ + return direct(false, "Table %s does not exist.", tableName); + } + + if(! (table instanceof DrillTable) ){ + return notSupported(tableName); + } + + + final DrillTable drillTable = (DrillTable) table; + + final Object selection = drillTable.getSelection(); + if( !(selection instanceof FormatSelection) ){ + return notSupported(tableName); + } + + FormatSelection formatSelection = (FormatSelection) selection; + + FormatPluginConfig formatConfig = formatSelection.getFormat(); + if (!((formatConfig instanceof ParquetFormatConfig) || + ((formatConfig instanceof NamedFormatPluginConfig) && ((NamedFormatPluginConfig) formatConfig).name.equals("parquet")))) { + return notSupported(tableName); + } + + FileSystemPlugin plugin = (FileSystemPlugin) drillTable.getPlugin(); + DrillFileSystem fs = new DrillFileSystem(plugin.getFormatPlugin(formatSelection.getFormat()).getFsConf()); + + String selectionRoot = formatSelection.getSelection().selectionRoot; + if (!fs.getFileStatus(new Path(selectionRoot)).isDirectory()) { + return notSupported(tableName); + } + + Metadata.createMeta(fs, selectionRoot); + return direct(true, "Successfully updated metadata for table %s.", tableName); + + } catch(Exception e) { + logger.error("Failed to update metadata for table '{}'", refreshTable.getName(), e); + return DirectPlan.createDirectPlan(context, false, String.format("Error: %s", e.getMessage())); + } + } + + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlRefreshMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlRefreshMetadata.java new file mode 100644 index 000000000..01050b83c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlRefreshMetadata.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.sql.parser; + +import java.util.List; + +import org.apache.calcite.sql.SqlCall; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlLiteral; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.calcite.sql.SqlWriter; +import org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler; +import org.apache.drill.exec.planner.sql.handlers.RefreshMetadataHandler; +import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +/** + * Sql parse tree node to represent statement: + * REFRESH TABLE METADATA tblname + */ +public class SqlRefreshMetadata extends DrillSqlCall { + public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("REFRESH_TABLE_METADATA", SqlKind.OTHER) { + @Override + public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) { + return new SqlRefreshMetadata(pos, (SqlIdentifier) operands[0]); + } + }; + + private SqlIdentifier tblName; + + public SqlRefreshMetadata(SqlParserPos pos, SqlIdentifier tblName){ + super(pos); + this.tblName = tblName; + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List<SqlNode> getOperandList() { + List<SqlNode> ops = Lists.newArrayList(); + ops.add(tblName); + return ops; + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("REFRESH"); + writer.keyword("TABLE"); + writer.keyword("METADATA"); + tblName.unparse(writer, leftPrec, rightPrec); + } + + public String getName() { + if (tblName.isSimple()) { + return tblName.getSimple(); + } + + return tblName.names.get(tblName.names.size() - 1); + } + + public List<String> getSchemaPath() { + if (tblName.isSimple()) { + return ImmutableList.of(); + } + + return tblName.names.subList(0, tblName.names.size() - 1); + } + + @Override + public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) { + return new RefreshMetadataHandler(config); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java index 00f463d1c..5c2d71a1d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java @@ -29,6 +29,9 @@ public class DrillPathFilter extends Utils.OutputFileUtils.OutputFilesFilter { if (path.getName().startsWith(DrillFileSystem.DOT_FILE_PREFIX)) { return false; } + if (path.getName().startsWith(".")) { + return false; + } return super.accept(path); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java new file mode 100644 index 000000000..02414a4b4 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java @@ -0,0 +1,498 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator.Feature; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.expression.SchemaPath.De; +import org.apache.drill.exec.store.TimedRunnable; +import org.apache.drill.exec.store.dfs.DrillPathFilter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import parquet.column.statistics.Statistics; +import parquet.hadoop.ParquetFileReader; +import parquet.hadoop.metadata.BlockMetaData; +import parquet.hadoop.metadata.ColumnChunkMetaData; +import parquet.hadoop.metadata.ParquetMetadata; +import parquet.schema.GroupType; +import parquet.schema.MessageType; +import parquet.schema.OriginalType; +import parquet.schema.PrimitiveType.PrimitiveTypeName; +import parquet.schema.Type; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class Metadata { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Metadata.class); + + public static final String METADATA_FILENAME = ".drill.parquet_metadata"; + + private final FileSystem fs; + + /** + * Create the parquet metadata file for the directory at the given path, and for any subdirectories + * @param fs + * @param path + * @throws IOException + */ + public static void createMeta(FileSystem fs, String path) throws IOException { + Metadata metadata = new Metadata(fs); + metadata.createMetaFilesRecursively(path); + } + + /** + * Get the parquet metadata for the parquet files in the given directory, including those in subdirectories + * @param fs + * @param path + * @return + * @throws IOException + */ + public static ParquetTableMetadata_v1 getParquetTableMetadata(FileSystem fs, String path) throws IOException { + Metadata metadata = new Metadata(fs); + return metadata.getParquetTableMetadata(path); + } + + /** + * Get the parquet metadata for a list of parquet files + * @param fs + * @param fileStatuses + * @return + * @throws IOException + */ + public static ParquetTableMetadata_v1 getParquetTableMetadata(FileSystem fs, + List<FileStatus> fileStatuses) throws IOException { + Metadata metadata = new Metadata(fs); + return metadata.getParquetTableMetadata(fileStatuses); + } + + /** + * Get the parquet metadata for a directory by reading the metadata file + * @param fs + * @param path The path to the metadata file, located in the directory that contains the parquet files + * @return + * @throws IOException + */ + public static ParquetTableMetadata_v1 readBlockMeta(FileSystem fs, String path) throws IOException { + Metadata metadata = new Metadata(fs); + return metadata.readBlockMeta(path); + } + + private Metadata(FileSystem fs) { + this.fs = fs; + } + + /** + * Create the parquet metadata file for the directory at the given path, and for any subdirectories + * @param path + * @throws IOException + */ + private ParquetTableMetadata_v1 createMetaFilesRecursively(final String path) throws IOException { + List<ParquetFileMetadata> metaDataList = Lists.newArrayList(); + List<String> directoryList = Lists.newArrayList(); + Path p = new Path(path); + FileStatus fileStatus = fs.getFileStatus(p); + assert fileStatus.isDirectory() : "Expected directory"; + + final List<FileStatus> childFiles = Lists.newArrayList(); + + for (final FileStatus file : fs.listStatus(p, new DrillPathFilter())) { + if (file.isDirectory()) { + ParquetTableMetadata_v1 subTableMetadata = createMetaFilesRecursively(file.getPath().toString()); + metaDataList.addAll(subTableMetadata.files); + directoryList.addAll(subTableMetadata.directories); + directoryList.add(file.getPath().toString()); + } else { + childFiles.add(file); + } + } + if (childFiles.size() > 0) { + metaDataList.addAll(getParquetFileMetadata(childFiles)); + } + ParquetTableMetadata_v1 parquetTableMetadata = new ParquetTableMetadata_v1(metaDataList, directoryList); + writeFile(parquetTableMetadata, new Path(p, METADATA_FILENAME)); + return parquetTableMetadata; + } + + /** + * Get the parquet metadata for the parquet files in a directory + * @param path the path of the directory + * @return + * @throws IOException + */ + private ParquetTableMetadata_v1 getParquetTableMetadata(String path) throws IOException { + Path p = new Path(path); + FileStatus fileStatus = fs.getFileStatus(p); + Stopwatch watch = new Stopwatch(); + watch.start(); + List<FileStatus> fileStatuses = getFileStatuses(fileStatus); + logger.info("Took {} ms to get file statuses", watch.elapsed(TimeUnit.MILLISECONDS)); + return getParquetTableMetadata(fileStatuses); + } + + /** + * Get the parquet metadata for a list of parquet files + * @param fileStatuses + * @return + * @throws IOException + */ + private ParquetTableMetadata_v1 getParquetTableMetadata(List<FileStatus> fileStatuses) throws IOException { + List<ParquetFileMetadata> fileMetadataList = getParquetFileMetadata(fileStatuses); + return new ParquetTableMetadata_v1(fileMetadataList, new ArrayList<String>()); + } + + /** + * Get a list of file metadata for a list of parquet files + * @param fileStatuses + * @return + * @throws IOException + */ + private List<ParquetFileMetadata> getParquetFileMetadata(List<FileStatus> fileStatuses) throws IOException { + List<TimedRunnable<ParquetFileMetadata>> gatherers = Lists.newArrayList(); + for (FileStatus file : fileStatuses) { + gatherers.add(new MetadataGatherer(file)); + } + + List<ParquetFileMetadata> metaDataList = Lists.newArrayList(); + metaDataList.addAll(TimedRunnable.run("Fetch parquet metadata", logger, gatherers, 16)); + return metaDataList; + } + + /** + * Recursively get a list of files + * @param fileStatus + * @return + * @throws IOException + */ + private List<FileStatus> getFileStatuses(FileStatus fileStatus) throws IOException { + List<FileStatus> statuses = Lists.newArrayList(); + if (fileStatus.isDirectory()) { + for (FileStatus child : fs.listStatus(fileStatus.getPath(), new DrillPathFilter())) { + statuses.addAll(getFileStatuses(child)); + } + } else { + statuses.add(fileStatus); + } + return statuses; + } + + /** + * TimedRunnable that reads the footer from parquet and collects file metadata + */ + private class MetadataGatherer extends TimedRunnable<ParquetFileMetadata> { + + private FileStatus fileStatus; + + public MetadataGatherer(FileStatus fileStatus) { + this.fileStatus = fileStatus; + } + + @Override + protected ParquetFileMetadata runInner() throws Exception { + return getParquetFileMetadata(fileStatus); + } + + @Override + protected IOException convertToIOException(Exception e) { + if (e instanceof IOException) { + return (IOException) e; + } else { + return new IOException(e); + } + } + } + + private OriginalType getOriginalType(Type type, String[] path, int depth) { + if (type.isPrimitive()) { + return type.getOriginalType(); + } + Type t = ((GroupType) type).getType(path[path.length - depth - 1]); + return getOriginalType(t, path, depth + 1); + } + + /** + * Get the metadata for a single file + * @param file + * @return + * @throws IOException + */ + private ParquetFileMetadata getParquetFileMetadata(FileStatus file) throws IOException { + ParquetMetadata metadata = ParquetFileReader.readFooter(fs.getConf(), file); + MessageType schema = metadata.getFileMetaData().getSchema(); + + Map<SchemaPath,OriginalType> originalTypeMap = Maps.newHashMap(); + schema.getPaths(); + for (String[] path : schema.getPaths()) { + originalTypeMap.put(SchemaPath.getCompoundPath(path), getOriginalType(schema, path, 0)); + } + + List<RowGroupMetadata> rowGroupMetadataList = Lists.newArrayList(); + + for (BlockMetaData rowGroup : metadata.getBlocks()) { + List<ColumnMetadata> columnMetadataList = Lists.newArrayList(); + long length = 0; + for (ColumnChunkMetaData col : rowGroup.getColumns()) { + ColumnMetadata columnMetadata; + + boolean statsAvailable = (col.getStatistics() != null && !col.getStatistics().isEmpty()); + + Statistics stats = col.getStatistics(); + SchemaPath columnName = SchemaPath.getCompoundPath(col.getPath().toArray()); + if (statsAvailable) { + columnMetadata = new ColumnMetadata(columnName, col.getType(), originalTypeMap.get(columnName), + stats.genericGetMax(), stats.genericGetMin(), stats.getNumNulls()); + } else { + columnMetadata = new ColumnMetadata(columnName, col.getType(), originalTypeMap.get(columnName), + null, null, null); + } + columnMetadataList.add(columnMetadata); + length += col.getTotalSize(); + } + + RowGroupMetadata rowGroupMeta = new RowGroupMetadata(rowGroup.getStartingPos(), length, rowGroup.getRowCount(), + getHostAffinity(file, rowGroup.getStartingPos(), length), columnMetadataList); + + rowGroupMetadataList.add(rowGroupMeta); + } + String path = Path.getPathWithoutSchemeAndAuthority(file.getPath()).toString(); + + return new ParquetFileMetadata(path, file.getLen(), rowGroupMetadataList); + } + + /** + * Get the host affinity for a row group + * @param fileStatus the parquet file + * @param start the start of the row group + * @param length the length of the row group + * @return + * @throws IOException + */ + private Map<String,Float> getHostAffinity(FileStatus fileStatus, long start, long length) throws IOException { + BlockLocation[] blockLocations = fs.getFileBlockLocations(fileStatus, start, length); + Map<String,Float> hostAffinityMap = Maps.newHashMap(); + for (BlockLocation blockLocation : blockLocations) { + for (String host : blockLocation.getHosts()) { + Float currentAffinity = hostAffinityMap.get(host); + float blockStart = blockLocation.getOffset(); + float blockEnd = blockStart + blockLocation.getLength(); + float rowGroupEnd = start + length; + Float newAffinity = (blockLocation.getLength() - (blockStart < start ? start - blockStart : 0) - + (blockEnd > rowGroupEnd ? blockEnd - rowGroupEnd : 0)) / length; + if (currentAffinity != null) { + hostAffinityMap.put(host, currentAffinity + newAffinity); + } else { + hostAffinityMap.put(host, newAffinity); + } + } + } + return hostAffinityMap; + } + + /** + * Serialize parquet metadata to json and write to a file + * @param parquetTableMetadata + * @param p + * @throws IOException + */ + private void writeFile(ParquetTableMetadata_v1 parquetTableMetadata, Path p) throws IOException { + JsonFactory jsonFactory = new JsonFactory(); + jsonFactory.configure(Feature.AUTO_CLOSE_TARGET, false); + jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false); + ObjectMapper mapper = new ObjectMapper(jsonFactory); + FSDataOutputStream os = fs.create(p); + mapper.writerWithDefaultPrettyPrinter().writeValue(os, parquetTableMetadata); + os.flush(); + os.close(); + } + + /** + * Read the parquet metadata from a file + * @param path + * @return + * @throws IOException + */ + private ParquetTableMetadata_v1 readBlockMeta(String path) throws IOException { + Path p = new Path(path); + ObjectMapper mapper = new ObjectMapper(); + SimpleModule module = new SimpleModule(); + module.addDeserializer(SchemaPath.class, new De()); + mapper.registerModule(module); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + FSDataInputStream is = fs.open(p); + ParquetTableMetadata_v1 parquetTableMetadata = mapper.readValue(is, ParquetTableMetadata_v1.class); + if (tableModified(parquetTableMetadata, p)) { + parquetTableMetadata = createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString()); + } + return parquetTableMetadata; + } + + /** + * Check if the parquet metadata needs to be updated by comparing the modification time of the directories with + * the modification time of the metadata file + * @param tableMetadata + * @param metaFilePath + * @return + * @throws IOException + */ + private boolean tableModified(ParquetTableMetadata_v1 tableMetadata, Path metaFilePath) throws IOException { + long metaFileModifyTime = fs.getFileStatus(metaFilePath).getModificationTime(); + FileStatus directoryStatus = fs.getFileStatus(metaFilePath.getParent()); + if (directoryStatus.getModificationTime() > metaFileModifyTime) { + return true; + } + for (String directory : tableMetadata.directories) { + directoryStatus = fs.getFileStatus(new Path(directory)); + if (directoryStatus.getModificationTime() > metaFileModifyTime) { + return true; + } + } + return false; + } + + @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "metadata_version") + public static class ParquetTableMetadataBase { + + } + + /** + * Struct which contains the metadata for an entire parquet directory structure + */ + @JsonTypeName("v1") + public static class ParquetTableMetadata_v1 extends ParquetTableMetadataBase { + @JsonProperty + List<ParquetFileMetadata> files; + @JsonProperty + List<String> directories; + + public ParquetTableMetadata_v1() { + super(); + } + + public ParquetTableMetadata_v1(List<ParquetFileMetadata> files, List<String> directories) { + this.files = files; + this.directories = directories; + } + } + + /** + * Struct which contains the metadata for a single parquet file + */ + public static class ParquetFileMetadata { + @JsonProperty + public String path; + @JsonProperty + public Long length; + @JsonProperty + public List<RowGroupMetadata> rowGroups; + + public ParquetFileMetadata() { + super(); + } + + public ParquetFileMetadata(String path, Long length, List<RowGroupMetadata> rowGroups) { + this.path = path; + this.length = length; + this.rowGroups = rowGroups; + } + + @Override + public String toString() { + return String.format("path: %s rowGroups: %s", path, rowGroups); + } + } + + /** + * A struct that contains the metadata for a parquet row group + */ + public static class RowGroupMetadata { + @JsonProperty + public Long start; + @JsonProperty + public Long length; + @JsonProperty + public Long rowCount; + @JsonProperty + public Map<String, Float> hostAffinity; + @JsonProperty + public List<ColumnMetadata> columns; + + public RowGroupMetadata() { + super(); + } + + public RowGroupMetadata(Long start, Long length, Long rowCount, + Map<String, Float> hostAffinity, List<ColumnMetadata> columns) { + this.start = start; + this.length = length; + this.rowCount = rowCount; + this.hostAffinity = hostAffinity; + this.columns = columns; + } + } + + /** + * A struct that contains the metadata for a column in a parquet file + */ + public static class ColumnMetadata { + @JsonProperty + public SchemaPath name; + @JsonProperty + public PrimitiveTypeName primitiveType; + @JsonProperty + public OriginalType originalType; + @JsonProperty + public Object max; + @JsonProperty + public Object min; + @JsonProperty + public Long nulls; + + public ColumnMetadata() { + super(); + } + + public ColumnMetadata(SchemaPath name, PrimitiveTypeName primitiveType, OriginalType originalType, + Object max, Object min, Long nulls) { + this.name = name; + this.primitiveType = primitiveType; + this.originalType = originalType; + this.max = max; + this.min = min; + this.nulls = nulls; + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java index 446e12aa3..eeb522ab6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java @@ -217,6 +217,9 @@ public class ParquetFormatPlugin implements FormatPlugin{ return true; } else { + if (fs.exists(new Path(dir.getPath(), Metadata.METADATA_FILENAME))) { + return true; + } PathFilter filter = new DrillPathFilter(); FileStatus[] files = fs.listStatus(dir.getPath(), filter); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index 845bce95c..00d36ffaa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -18,30 +18,23 @@ package org.apache.drill.exec.store.parquet; import java.io.IOException; -import java.security.PrivilegedExceptionAction; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.TimeUnit; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import io.netty.buffer.DrillBuf; -import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.FormatPluginConfig; import org.apache.drill.common.logical.StoragePluginConfig; -import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.Types; -import org.apache.drill.exec.expr.holders.IntervalHolder; import org.apache.drill.exec.metrics.DrillMetrics; import org.apache.drill.exec.physical.EndpointAffinity; import org.apache.drill.exec.physical.PhysicalOperatorSetupException; @@ -56,27 +49,28 @@ import org.apache.drill.exec.store.ParquetOutputRecordWriter; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.TimedRunnable; import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.exec.store.dfs.DrillPathFilter; import org.apache.drill.exec.store.dfs.FileSelection; import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS; import org.apache.drill.exec.store.dfs.ReadEntryWithPath; import org.apache.drill.exec.store.dfs.easy.FileWork; +import org.apache.drill.exec.store.parquet.Metadata.ColumnMetadata; +import org.apache.drill.exec.store.parquet.Metadata.ParquetFileMetadata; +import org.apache.drill.exec.store.parquet.Metadata.ParquetTableMetadata_v1; +import org.apache.drill.exec.store.parquet.Metadata.RowGroupMetadata; import org.apache.drill.exec.store.schedule.AffinityCreator; import org.apache.drill.exec.store.schedule.AssignmentCreator; import org.apache.drill.exec.store.schedule.BlockMapBuilder; import org.apache.drill.exec.store.schedule.CompleteWork; import org.apache.drill.exec.store.schedule.EndpointByteMap; +import org.apache.drill.exec.store.schedule.EndpointByteMapImpl; import org.apache.drill.exec.util.ImpersonationUtil; -import org.apache.drill.exec.vector.BigIntVector; -import org.apache.drill.exec.vector.Float4Vector; -import org.apache.drill.exec.vector.Float8Vector; -import org.apache.drill.exec.vector.IntVector; import org.apache.drill.exec.vector.NullableBigIntVector; import org.apache.drill.exec.vector.NullableDateVector; import org.apache.drill.exec.vector.NullableDecimal18Vector; import org.apache.drill.exec.vector.NullableFloat4Vector; import org.apache.drill.exec.vector.NullableFloat8Vector; import org.apache.drill.exec.vector.NullableIntVector; -import org.apache.drill.exec.vector.NullableIntervalVector; import org.apache.drill.exec.vector.NullableSmallIntVector; import org.apache.drill.exec.vector.NullableTimeStampVector; import org.apache.drill.exec.vector.NullableTimeVector; @@ -87,27 +81,14 @@ import org.apache.drill.exec.vector.NullableUInt4Vector; import org.apache.drill.exec.vector.NullableVarBinaryVector; import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.VarBinaryVector; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.security.UserGroupInformation; import org.joda.time.DateTimeUtils; -import parquet.column.statistics.Statistics; -import parquet.format.ConvertedType; -import parquet.format.FileMetaData; -import parquet.format.SchemaElement; -import parquet.format.converter.ParquetMetadataConverter; -import parquet.hadoop.Footer; -import parquet.hadoop.ParquetFileWriter; -import parquet.hadoop.metadata.BlockMetaData; -import parquet.hadoop.metadata.ColumnChunkMetaData; -import parquet.hadoop.metadata.ParquetMetadata; import parquet.io.api.Binary; import parquet.org.codehaus.jackson.annotate.JsonCreator; import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.Timer; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; @@ -118,7 +99,6 @@ import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import parquet.schema.OriginalType; import parquet.schema.PrimitiveType.PrimitiveTypeName; -import parquet.schema.Type; @JsonTypeName("parquet-scan") public class ParquetGroupScan extends AbstractFileGroupScan { @@ -126,6 +106,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan { static final MetricRegistry metrics = DrillMetrics.getInstance(); static final String READ_FOOTER_TIMER = MetricRegistry.name(ParquetGroupScan.class, "readFooter"); + private final List<ReadEntryWithPath> entries; private final Stopwatch watch = new Stopwatch(); private final ParquetFormatPlugin formatPlugin; @@ -171,7 +152,8 @@ public class ParquetGroupScan extends AbstractFileGroupScan { this.formatConfig = formatPlugin.getConfig(); this.entries = entries; this.selectionRoot = selectionRoot; - this.readFooterFromEntries(); + + init(); } public ParquetGroupScan( // @@ -195,7 +177,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan { this.selectionRoot = selectionRoot; - readFooter(files); + init(); } /* @@ -238,137 +220,11 @@ public class ParquetGroupScan extends AbstractFileGroupScan { return selectionRoot; } - private void readFooterFromEntries() throws IOException { - List<FileStatus> files = Lists.newArrayList(); - for (ReadEntryWithPath e : entries) { - files.add(fs.getFileStatus(new Path(e.getPath()))); - } - readFooter(files); - } - - private void readFooter(final List<FileStatus> statuses) { - final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(getUserName()); - try { - ugi.doAs(new PrivilegedExceptionAction<Void>() { - public Void run() throws Exception { - readFooterHelper(statuses); - return null; - } - }); - } catch (InterruptedException | IOException e) { - final String errMsg = String.format("Failed to read footer entries from parquet input files: %s", e.getMessage()); - logger.error(errMsg, e); - throw new DrillRuntimeException(errMsg, e); - } - } - public Set<String> getFileSet() { return fileSet; } - private Set<String> fileSet = Sets.newHashSet(); - - private void readFooterHelper(List<FileStatus> statuses) throws IOException { - watch.reset(); - watch.start(); - Timer.Context tContext = metrics.timer(READ_FOOTER_TIMER).time(); - - columnTypeMap.clear(); - fileSet.clear(); - partitionValueMap.clear(); - - rowGroupInfos = Lists.newArrayList(); - long start = 0, length = 0; - rowCount = 0; - columnValueCounts = new HashMap<SchemaPath, Long>(); - - ColumnChunkMetaData columnChunkMetaData; - - List<Footer> footers = FooterGatherer.getFooters(formatPlugin.getFsConf(), statuses, 16); - boolean first = true; - ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter(); - for (Footer footer : footers) { - int index = 0; - ParquetMetadata metadata = footer.getParquetMetadata(); - FileMetaData fileMetaData = metadataConverter.toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, metadata); - HashMap<String, SchemaElement> schemaElements = new HashMap<>(); - for (SchemaElement se : fileMetaData.getSchema()) { - schemaElements.put(se.getName(), se); - } - for (BlockMetaData rowGroup : metadata.getBlocks()) { - String file = Path.getPathWithoutSchemeAndAuthority(footer.getFile()).toString(); - fileSet.add(file); - long valueCountInGrp = 0; - // need to grab block information from HDFS - columnChunkMetaData = rowGroup.getColumns().iterator().next(); - start = columnChunkMetaData.getFirstDataPageOffset(); - // this field is not being populated correctly, but the column chunks know their sizes, just summing them for - // now - // end = start + rowGroup.getTotalByteSize(); - length = 0; - for (ColumnChunkMetaData col : rowGroup.getColumns()) { - length += col.getTotalSize(); - valueCountInGrp = Math.max(col.getValueCount(), valueCountInGrp); - SchemaPath schemaPath = SchemaPath.getSimplePath(col.getPath().toString().replace("[", "").replace("]", "").toLowerCase()); - - long previousCount = 0; - long currentCount = 0; - - if (! columnValueCounts.containsKey(schemaPath)) { - // create an entry for this column - columnValueCounts.put(schemaPath, previousCount /* initialize to 0 */); - } else { - previousCount = columnValueCounts.get(schemaPath); - } - - boolean statsAvail = (col.getStatistics() != null && !col.getStatistics().isEmpty()); - - if (statsAvail && previousCount != GroupScan.NO_COLUMN_STATS) { - currentCount = col.getValueCount() - col.getStatistics().getNumNulls(); // only count non-nulls - columnValueCounts.put(schemaPath, previousCount + currentCount); - } else { - // even if 1 chunk does not have stats, we cannot rely on the value count for this column - columnValueCounts.put(schemaPath, GroupScan.NO_COLUMN_STATS); - } - - // check if this column can be used for partition pruning - SchemaElement se = schemaElements.get(schemaPath.getAsUnescapedPath()); - boolean partitionColumn = checkForPartitionColumn(schemaPath, col, se, first); - if (partitionColumn) { - Map<SchemaPath,Object> map = partitionValueMap.get(file); - if (map == null) { - map = Maps.newHashMap(); - partitionValueMap.put(file, map); - } - Object value = map.get(schemaPath); - Object currentValue = col.getStatistics().genericGetMax(); - if (value != null) { - if (value != currentValue) { - columnTypeMap.remove(schemaPath); - } - } else { - map.put(schemaPath, currentValue); - } - } else { - columnTypeMap.remove(schemaPath); - } - } - - String filePath = footer.getFile().toUri().getPath(); - rowGroupInfos.add(new ParquetGroupScan.RowGroupInfo(filePath, start, length, index)); - logger.debug("rowGroupInfo path: {} start: {} length {}", filePath, start, length); - index++; - - rowCount += rowGroup.getRowCount(); - first = false; - } - - } - Preconditions.checkState(!rowGroupInfos.isEmpty(), "No row groups found"); - tContext.stop(); - watch.stop(); - logger.debug("Took {} ms to get row group infos", watch.elapsed(TimeUnit.MILLISECONDS)); - } + private Set<String> fileSet; @JsonIgnore private Map<SchemaPath,MajorType> columnTypeMap = Maps.newHashMap(); @@ -378,30 +234,27 @@ public class ParquetGroupScan extends AbstractFileGroupScan { * every column to see if it is single valued, and if so, add it to the list of potential partition columns. For the * remaining footers, we will not find any new partition columns, but we may discover that what was previously a * potential partition column now no longer qualifies, so it needs to be removed from the list. - * @param column - * @param columnChunkMetaData - * @param se - * @param first * @return whether column is a potential partition column */ - private boolean checkForPartitionColumn(SchemaPath column, ColumnChunkMetaData columnChunkMetaData, SchemaElement se, boolean first) { + private boolean checkForPartitionColumn(ColumnMetadata columnMetadata, boolean first) { + SchemaPath schemaPath = columnMetadata.name; if (first) { - if (hasSingleValue(columnChunkMetaData)) { - columnTypeMap.put(column, getType(columnChunkMetaData, se)); + if (hasSingleValue(columnMetadata)) { + columnTypeMap.put(schemaPath, getType(columnMetadata.primitiveType, columnMetadata.originalType)); return true; } else { return false; } } else { - if (!columnTypeMap.keySet().contains(column)) { + if (!columnTypeMap.keySet().contains(schemaPath)) { return false; } else { - if (!hasSingleValue(columnChunkMetaData)) { - columnTypeMap.remove(column); + if (!hasSingleValue(columnMetadata)) { + columnTypeMap.remove(schemaPath); return false; } - if (!getType(columnChunkMetaData, se).equals(columnTypeMap.get(column))) { - columnTypeMap.remove(column); + if (!getType(columnMetadata.primitiveType, columnMetadata.originalType).equals(columnTypeMap.get(schemaPath))) { + columnTypeMap.remove(schemaPath); return false; } } @@ -409,9 +262,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan { return true; } - private MajorType getType(ColumnChunkMetaData columnChunkMetaData, SchemaElement schemaElement) { - ConvertedType originalType = schemaElement == null ? null : schemaElement.getConverted_type(); - + private MajorType getType(PrimitiveTypeName type, OriginalType originalType) { if (originalType != null) { switch (originalType) { case DECIMAL: @@ -439,7 +290,6 @@ public class ParquetGroupScan extends AbstractFileGroupScan { } } - PrimitiveTypeName type = columnChunkMetaData.getType(); switch (type) { case BOOLEAN: return Types.optional(MinorType.BIT); @@ -460,25 +310,28 @@ public class ParquetGroupScan extends AbstractFileGroupScan { } } - private boolean hasSingleValue(ColumnChunkMetaData columnChunkMetaData) { - Statistics stats = columnChunkMetaData.getStatistics(); - boolean hasStats = stats != null && !stats.isEmpty(); - if (hasStats) { - if (stats.genericGetMin() == null || stats.genericGetMax() == null) { - return false; - } - return stats.genericGetMax().equals(stats.genericGetMin()); - } else { - return false; - } + private boolean hasSingleValue(ColumnMetadata columnChunkMetaData) { + Object max = columnChunkMetaData.max; + Object min = columnChunkMetaData.min; + return max != null && max.equals(min); } @Override public void modifyFileSelection(FileSelection selection) { entries.clear(); + fileSet = Sets.newHashSet(); for (String fileName : selection.getAsFiles()) { entries.add(new ReadEntryWithPath(fileName)); + fileSet.add(fileName); + } + + List<RowGroupInfo> newRowGroupList = Lists.newArrayList(); + for (RowGroupInfo rowGroupInfo : rowGroupInfos) { + if (fileSet.contains(rowGroupInfo.getPath())) { + newRowGroupList.add(rowGroupInfo); + } } + this.rowGroupInfos = newRowGroupList; } public MajorType getTypeForColumn(SchemaPath schemaPath) { @@ -629,6 +482,125 @@ public class ParquetGroupScan extends AbstractFileGroupScan { } } + private void init() throws IOException { + ParquetTableMetadata_v1 parquetTableMetadata; + List<FileStatus> fileStatuses = null; + if (entries.size() == 1) { + Path p = Path.getPathWithoutSchemeAndAuthority(new Path(entries.get(0).getPath())); + Path metaPath = new Path(p, Metadata.METADATA_FILENAME); + if (fs.exists(metaPath)) { + parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString()); + } else { + parquetTableMetadata = Metadata.getParquetTableMetadata(fs, p.toString()); + } + } else { + Path p = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot)); + Path metaPath = new Path(p, Metadata.METADATA_FILENAME); + if (fs.exists(metaPath)) { + parquetTableMetadata = removeUnneededRowGroups(Metadata.readBlockMeta(fs, metaPath.toString())); + } else { + fileStatuses = Lists.newArrayList(); + for (ReadEntryWithPath entry : entries) { + getFiles(entry.getPath(), fileStatuses); + } + parquetTableMetadata = Metadata.getParquetTableMetadata(fs, fileStatuses); + } + } + + if (fileSet == null) { + fileSet = Sets.newHashSet(); + for (ParquetFileMetadata file : parquetTableMetadata.files) { + fileSet.add(file.path); + } + } + + Map<String,DrillbitEndpoint> hostEndpointMap = Maps.newHashMap(); + + for (DrillbitEndpoint endpoint : formatPlugin.getContext().getBits()) { + hostEndpointMap.put(endpoint.getAddress(), endpoint); + } + + rowGroupInfos = Lists.newArrayList(); + for (ParquetFileMetadata file : parquetTableMetadata.files) { + int rgIndex = 0; + for (RowGroupMetadata rg : file.rowGroups) { + RowGroupInfo rowGroupInfo = new RowGroupInfo(file.path, rg.start, rg.length, rgIndex); + EndpointByteMap endpointByteMap = new EndpointByteMapImpl(); + for (String host : rg.hostAffinity.keySet()) { + if (hostEndpointMap.containsKey(host)) { + endpointByteMap.add(hostEndpointMap.get(host), (long) (rg.hostAffinity.get(host) * rg.length)); + } + } + rowGroupInfo.setEndpointByteMap(endpointByteMap); + rgIndex++; + rowGroupInfos.add(rowGroupInfo); + } + } + + this.endpointAffinities = AffinityCreator.getAffinityMap(rowGroupInfos); + + columnValueCounts = Maps.newHashMap(); + this.rowCount = 0; + boolean first = true; + for (ParquetFileMetadata file : parquetTableMetadata.files) { + for (RowGroupMetadata rowGroup : file.rowGroups) { + long rowCount = rowGroup.rowCount; + for (ColumnMetadata column : rowGroup.columns) { + SchemaPath schemaPath = column.name; + Long previousCount = columnValueCounts.get(schemaPath); + if (previousCount != null) { + if (previousCount != GroupScan.NO_COLUMN_STATS) { + if (column.nulls != null) { + Long newCount = rowCount - column.nulls; + columnValueCounts.put(schemaPath, columnValueCounts.get(schemaPath) + newCount); + } else { + + } + } + } else { + if (column.nulls != null) { + Long newCount = rowCount - column.nulls; + columnValueCounts.put(schemaPath, newCount); + } else { + columnValueCounts.put(schemaPath, GroupScan.NO_COLUMN_STATS); + } + } + boolean partitionColumn = checkForPartitionColumn(column, first); + if (partitionColumn) { + Map<SchemaPath,Object> map = partitionValueMap.get(file.path); + if (map == null) { + map = Maps.newHashMap(); + partitionValueMap.put(file.path, map); + } + Object value = map.get(schemaPath); + Object currentValue = column.max; + if (value != null) { + if (value != currentValue) { + columnTypeMap.remove(schemaPath); + } + } else { + map.put(schemaPath, currentValue); + } + } else { + columnTypeMap.remove(schemaPath); + } + } + this.rowCount += rowGroup.rowCount; + first = false; + } + } + } + + private ParquetTableMetadata_v1 removeUnneededRowGroups(ParquetTableMetadata_v1 parquetTableMetadata) { + List<ParquetFileMetadata> newFileMetadataList = Lists.newArrayList(); + for (ParquetFileMetadata file : parquetTableMetadata.files) { + if (fileSet.contains(file.path)) { + newFileMetadataList.add(file); + } + } + return new ParquetTableMetadata_v1(newFileMetadataList, new ArrayList<String>()); + } + /** * Calculates the affinity each endpoint has for this scan, by adding up the affinity each endpoint has for each * rowGroup @@ -637,23 +609,19 @@ public class ParquetGroupScan extends AbstractFileGroupScan { */ @Override public List<EndpointAffinity> getOperatorAffinity() { + return this.endpointAffinities; + } - if (this.endpointAffinities == null) { - BlockMapBuilder bmb = new BlockMapBuilder(fs, formatPlugin.getContext().getBits()); - try { - List<TimedRunnable<Void>> blockMappers = Lists.newArrayList(); - for (RowGroupInfo rgi : rowGroupInfos) { - blockMappers.add(new BlockMapper(bmb, rgi)); - } - TimedRunnable.run("Load Parquet RowGroup block maps", logger, blockMappers, 16); - } catch (IOException e) { - logger.warn("Failure while determining operator affinity.", e); - return Collections.emptyList(); + private void getFiles(String path, List<FileStatus> fileStatuses) throws IOException { + Path p = Path.getPathWithoutSchemeAndAuthority(new Path(path)); + FileStatus fileStatus = fs.getFileStatus(p); + if (fileStatus.isDirectory()) { + for (FileStatus f : fs.listStatus(p, new DrillPathFilter())) { + getFiles(f.getPath().toString(), fileStatuses); } - - this.endpointAffinities = AffinityCreator.getAffinityMap(rowGroupInfos); + } else { + fileStatuses.add(fileStatus); } - return this.endpointAffinities; } private class BlockMapper extends TimedRunnable<Void> { @@ -704,9 +672,8 @@ public class ParquetGroupScan extends AbstractFileGroupScan { private List<RowGroupReadEntry> convertToReadEntries(List<RowGroupInfo> rowGroups) { List<RowGroupReadEntry> entries = Lists.newArrayList(); for (RowGroupInfo rgi : rowGroups) { - RowGroupReadEntry rgre = new RowGroupReadEntry(rgi.getPath(), rgi.getStart(), rgi.getLength(), - rgi.getRowGroupIndex()); - entries.add(rgre); + RowGroupReadEntry entry = new RowGroupReadEntry(rgi.getPath(), rgi.getStart(), rgi.getLength(), rgi.getRowGroupIndex()); + entries.add(entry); } return entries; } @@ -757,7 +724,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan { public FileGroupScan clone(FileSelection selection) throws IOException { ParquetGroupScan newScan = new ParquetGroupScan(this); newScan.modifyFileSelection(selection); - newScan.readFooterFromEntries(); + newScan.init(); return newScan; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AffinityCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AffinityCreator.java index fea0875bb..22d4ef590 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AffinityCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AffinityCreator.java @@ -45,7 +45,6 @@ public class AffinityCreator { for (ObjectLongCursor<DrillbitEndpoint> cursor : entry.getByteMap()) { long bytes = cursor.value; float affinity = (float)bytes / (float)totalBytes; - logger.debug("Work: {} Endpoint: {} Bytes: {}", work, cursor.key.getAddress(), bytes); affinities.putOrAdd(cursor.key, affinity, affinity); } } |