aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill
diff options
context:
space:
mode:
authorSteven Phillips <smp@apache.org>2015-08-18 13:24:33 -0700
committeradeneche <adeneche@gmail.com>2015-09-18 16:19:13 -0700
commit0ee609581423b9649391be02013da86fe9b0e2f2 (patch)
treefd72a9db0f32318a8f4b2b1181caf75fa5329995 /exec/java-exec/src/main/java/org/apache/drill
parent3f5ebafcaa8fc0ed08d5964081e4c22a6906d46b (diff)
DRILL-2743: Parquet file metadata caching
rebasing on top of master required conflict resolution in Parser.tdd and parserImpls.ftl this closes #114
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java128
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlRefreshMetadata.java97
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java498
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java367
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AffinityCreator.java1
7 files changed, 896 insertions, 201 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
new file mode 100644
index 000000000..ce4059b08
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.handlers;
+
+import java.io.IOException;
+import java.util.List;
+
+
+import org.apache.calcite.schema.SchemaPlus;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.tools.RelConversionException;
+import org.apache.calcite.tools.ValidationException;
+import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.logical.DrillRel;
+import org.apache.drill.exec.planner.logical.DrillScreenRel;
+import org.apache.drill.exec.planner.logical.DrillStoreRel;
+import org.apache.drill.exec.planner.logical.DrillTable;
+import org.apache.drill.exec.planner.logical.DrillWriterRel;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.sql.DirectPlan;
+import org.apache.drill.exec.planner.sql.DrillSqlWorker;
+import org.apache.drill.exec.planner.sql.parser.SqlRefreshMetadata;
+import org.apache.drill.exec.store.AbstractSchema;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.FileSystemPlugin;
+import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.store.dfs.NamedFormatPluginConfig;
+import org.apache.drill.exec.store.parquet.Metadata;
+import org.apache.drill.exec.store.parquet.ParquetFormatConfig;
+import org.apache.drill.exec.work.foreman.ForemanSetupException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.drill.exec.planner.sql.SchemaUtilites.findSchema;
+
+public class RefreshMetadataHandler extends DefaultSqlHandler {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RefreshMetadataHandler.class);
+
+ public RefreshMetadataHandler(SqlHandlerConfig config) {
+ super(config);
+ }
+
+ private PhysicalPlan direct(boolean outcome, String message, Object... values){
+ return DirectPlan.createDirectPlan(context, outcome, String.format(message, values));
+ }
+
+ private PhysicalPlan notSupported(String tbl){
+ return direct(false, "Table %s does not support metadata refresh. Support is currently limited to single-directory-based Parquet tables.", tbl);
+ }
+
+ @Override
+ public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException, ForemanSetupException {
+ final SqlRefreshMetadata refreshTable = unwrap(sqlNode, SqlRefreshMetadata.class);
+
+ try {
+
+ final SchemaPlus schema = findSchema(context.getNewDefaultSchema(),
+ refreshTable.getSchemaPath());
+
+ final String tableName = refreshTable.getName();
+
+ if (tableName.contains("*") || tableName.contains("?")) {
+ return direct(false, "Glob path %s not supported for metadata refresh", tableName);
+ }
+
+ final Table table = schema.getTable(tableName);
+
+ if(table == null){
+ return direct(false, "Table %s does not exist.", tableName);
+ }
+
+ if(! (table instanceof DrillTable) ){
+ return notSupported(tableName);
+ }
+
+
+ final DrillTable drillTable = (DrillTable) table;
+
+ final Object selection = drillTable.getSelection();
+ if( !(selection instanceof FormatSelection) ){
+ return notSupported(tableName);
+ }
+
+ FormatSelection formatSelection = (FormatSelection) selection;
+
+ FormatPluginConfig formatConfig = formatSelection.getFormat();
+ if (!((formatConfig instanceof ParquetFormatConfig) ||
+ ((formatConfig instanceof NamedFormatPluginConfig) && ((NamedFormatPluginConfig) formatConfig).name.equals("parquet")))) {
+ return notSupported(tableName);
+ }
+
+ FileSystemPlugin plugin = (FileSystemPlugin) drillTable.getPlugin();
+ DrillFileSystem fs = new DrillFileSystem(plugin.getFormatPlugin(formatSelection.getFormat()).getFsConf());
+
+ String selectionRoot = formatSelection.getSelection().selectionRoot;
+ if (!fs.getFileStatus(new Path(selectionRoot)).isDirectory()) {
+ return notSupported(tableName);
+ }
+
+ Metadata.createMeta(fs, selectionRoot);
+ return direct(true, "Successfully updated metadata for table %s.", tableName);
+
+ } catch(Exception e) {
+ logger.error("Failed to update metadata for table '{}'", refreshTable.getName(), e);
+ return DirectPlan.createDirectPlan(context, false, String.format("Error: %s", e.getMessage()));
+ }
+ }
+
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlRefreshMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlRefreshMetadata.java
new file mode 100644
index 000000000..01050b83c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/parser/SqlRefreshMetadata.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.sql.parser;
+
+import java.util.List;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
+import org.apache.drill.exec.planner.sql.handlers.RefreshMetadataHandler;
+import org.apache.drill.exec.planner.sql.handlers.SqlHandlerConfig;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
+/**
+ * Sql parse tree node to represent statement:
+ * REFRESH TABLE METADATA tblname
+ */
+public class SqlRefreshMetadata extends DrillSqlCall {
+ public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("REFRESH_TABLE_METADATA", SqlKind.OTHER) {
+ @Override
+ public SqlCall createCall(SqlLiteral functionQualifier, SqlParserPos pos, SqlNode... operands) {
+ return new SqlRefreshMetadata(pos, (SqlIdentifier) operands[0]);
+ }
+ };
+
+ private SqlIdentifier tblName;
+
+ public SqlRefreshMetadata(SqlParserPos pos, SqlIdentifier tblName){
+ super(pos);
+ this.tblName = tblName;
+ }
+
+ @Override
+ public SqlOperator getOperator() {
+ return OPERATOR;
+ }
+
+ @Override
+ public List<SqlNode> getOperandList() {
+ List<SqlNode> ops = Lists.newArrayList();
+ ops.add(tblName);
+ return ops;
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.keyword("REFRESH");
+ writer.keyword("TABLE");
+ writer.keyword("METADATA");
+ tblName.unparse(writer, leftPrec, rightPrec);
+ }
+
+ public String getName() {
+ if (tblName.isSimple()) {
+ return tblName.getSimple();
+ }
+
+ return tblName.names.get(tblName.names.size() - 1);
+ }
+
+ public List<String> getSchemaPath() {
+ if (tblName.isSimple()) {
+ return ImmutableList.of();
+ }
+
+ return tblName.names.subList(0, tblName.names.size() - 1);
+ }
+
+ @Override
+ public AbstractSqlHandler getSqlHandler(SqlHandlerConfig config) {
+ return new RefreshMetadataHandler(config);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java
index 00f463d1c..5c2d71a1d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillPathFilter.java
@@ -29,6 +29,9 @@ public class DrillPathFilter extends Utils.OutputFileUtils.OutputFilesFilter {
if (path.getName().startsWith(DrillFileSystem.DOT_FILE_PREFIX)) {
return false;
}
+ if (path.getName().startsWith(".")) {
+ return false;
+ }
return super.accept(path);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
new file mode 100644
index 000000000..02414a4b4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/Metadata.java
@@ -0,0 +1,498 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator.Feature;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.expression.SchemaPath.De;
+import org.apache.drill.exec.store.TimedRunnable;
+import org.apache.drill.exec.store.dfs.DrillPathFilter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import parquet.column.statistics.Statistics;
+import parquet.hadoop.ParquetFileReader;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.schema.GroupType;
+import parquet.schema.MessageType;
+import parquet.schema.OriginalType;
+import parquet.schema.PrimitiveType.PrimitiveTypeName;
+import parquet.schema.Type;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class Metadata {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Metadata.class);
+
+ public static final String METADATA_FILENAME = ".drill.parquet_metadata";
+
+ private final FileSystem fs;
+
+ /**
+ * Create the parquet metadata file for the directory at the given path, and for any subdirectories
+ * @param fs
+ * @param path
+ * @throws IOException
+ */
+ public static void createMeta(FileSystem fs, String path) throws IOException {
+ Metadata metadata = new Metadata(fs);
+ metadata.createMetaFilesRecursively(path);
+ }
+
+ /**
+ * Get the parquet metadata for the parquet files in the given directory, including those in subdirectories
+ * @param fs
+ * @param path
+ * @return
+ * @throws IOException
+ */
+ public static ParquetTableMetadata_v1 getParquetTableMetadata(FileSystem fs, String path) throws IOException {
+ Metadata metadata = new Metadata(fs);
+ return metadata.getParquetTableMetadata(path);
+ }
+
+ /**
+ * Get the parquet metadata for a list of parquet files
+ * @param fs
+ * @param fileStatuses
+ * @return
+ * @throws IOException
+ */
+ public static ParquetTableMetadata_v1 getParquetTableMetadata(FileSystem fs,
+ List<FileStatus> fileStatuses) throws IOException {
+ Metadata metadata = new Metadata(fs);
+ return metadata.getParquetTableMetadata(fileStatuses);
+ }
+
+ /**
+ * Get the parquet metadata for a directory by reading the metadata file
+ * @param fs
+ * @param path The path to the metadata file, located in the directory that contains the parquet files
+ * @return
+ * @throws IOException
+ */
+ public static ParquetTableMetadata_v1 readBlockMeta(FileSystem fs, String path) throws IOException {
+ Metadata metadata = new Metadata(fs);
+ return metadata.readBlockMeta(path);
+ }
+
+ private Metadata(FileSystem fs) {
+ this.fs = fs;
+ }
+
+ /**
+ * Create the parquet metadata file for the directory at the given path, and for any subdirectories
+ * @param path
+ * @throws IOException
+ */
+ private ParquetTableMetadata_v1 createMetaFilesRecursively(final String path) throws IOException {
+ List<ParquetFileMetadata> metaDataList = Lists.newArrayList();
+ List<String> directoryList = Lists.newArrayList();
+ Path p = new Path(path);
+ FileStatus fileStatus = fs.getFileStatus(p);
+ assert fileStatus.isDirectory() : "Expected directory";
+
+ final List<FileStatus> childFiles = Lists.newArrayList();
+
+ for (final FileStatus file : fs.listStatus(p, new DrillPathFilter())) {
+ if (file.isDirectory()) {
+ ParquetTableMetadata_v1 subTableMetadata = createMetaFilesRecursively(file.getPath().toString());
+ metaDataList.addAll(subTableMetadata.files);
+ directoryList.addAll(subTableMetadata.directories);
+ directoryList.add(file.getPath().toString());
+ } else {
+ childFiles.add(file);
+ }
+ }
+ if (childFiles.size() > 0) {
+ metaDataList.addAll(getParquetFileMetadata(childFiles));
+ }
+ ParquetTableMetadata_v1 parquetTableMetadata = new ParquetTableMetadata_v1(metaDataList, directoryList);
+ writeFile(parquetTableMetadata, new Path(p, METADATA_FILENAME));
+ return parquetTableMetadata;
+ }
+
+ /**
+ * Get the parquet metadata for the parquet files in a directory
+ * @param path the path of the directory
+ * @return
+ * @throws IOException
+ */
+ private ParquetTableMetadata_v1 getParquetTableMetadata(String path) throws IOException {
+ Path p = new Path(path);
+ FileStatus fileStatus = fs.getFileStatus(p);
+ Stopwatch watch = new Stopwatch();
+ watch.start();
+ List<FileStatus> fileStatuses = getFileStatuses(fileStatus);
+ logger.info("Took {} ms to get file statuses", watch.elapsed(TimeUnit.MILLISECONDS));
+ return getParquetTableMetadata(fileStatuses);
+ }
+
+ /**
+ * Get the parquet metadata for a list of parquet files
+ * @param fileStatuses
+ * @return
+ * @throws IOException
+ */
+ private ParquetTableMetadata_v1 getParquetTableMetadata(List<FileStatus> fileStatuses) throws IOException {
+ List<ParquetFileMetadata> fileMetadataList = getParquetFileMetadata(fileStatuses);
+ return new ParquetTableMetadata_v1(fileMetadataList, new ArrayList<String>());
+ }
+
+ /**
+ * Get a list of file metadata for a list of parquet files
+ * @param fileStatuses
+ * @return
+ * @throws IOException
+ */
+ private List<ParquetFileMetadata> getParquetFileMetadata(List<FileStatus> fileStatuses) throws IOException {
+ List<TimedRunnable<ParquetFileMetadata>> gatherers = Lists.newArrayList();
+ for (FileStatus file : fileStatuses) {
+ gatherers.add(new MetadataGatherer(file));
+ }
+
+ List<ParquetFileMetadata> metaDataList = Lists.newArrayList();
+ metaDataList.addAll(TimedRunnable.run("Fetch parquet metadata", logger, gatherers, 16));
+ return metaDataList;
+ }
+
+ /**
+ * Recursively get a list of files
+ * @param fileStatus
+ * @return
+ * @throws IOException
+ */
+ private List<FileStatus> getFileStatuses(FileStatus fileStatus) throws IOException {
+ List<FileStatus> statuses = Lists.newArrayList();
+ if (fileStatus.isDirectory()) {
+ for (FileStatus child : fs.listStatus(fileStatus.getPath(), new DrillPathFilter())) {
+ statuses.addAll(getFileStatuses(child));
+ }
+ } else {
+ statuses.add(fileStatus);
+ }
+ return statuses;
+ }
+
+ /**
+ * TimedRunnable that reads the footer from parquet and collects file metadata
+ */
+ private class MetadataGatherer extends TimedRunnable<ParquetFileMetadata> {
+
+ private FileStatus fileStatus;
+
+ public MetadataGatherer(FileStatus fileStatus) {
+ this.fileStatus = fileStatus;
+ }
+
+ @Override
+ protected ParquetFileMetadata runInner() throws Exception {
+ return getParquetFileMetadata(fileStatus);
+ }
+
+ @Override
+ protected IOException convertToIOException(Exception e) {
+ if (e instanceof IOException) {
+ return (IOException) e;
+ } else {
+ return new IOException(e);
+ }
+ }
+ }
+
+ private OriginalType getOriginalType(Type type, String[] path, int depth) {
+ if (type.isPrimitive()) {
+ return type.getOriginalType();
+ }
+ Type t = ((GroupType) type).getType(path[path.length - depth - 1]);
+ return getOriginalType(t, path, depth + 1);
+ }
+
+ /**
+ * Get the metadata for a single file
+ * @param file
+ * @return
+ * @throws IOException
+ */
+ private ParquetFileMetadata getParquetFileMetadata(FileStatus file) throws IOException {
+ ParquetMetadata metadata = ParquetFileReader.readFooter(fs.getConf(), file);
+ MessageType schema = metadata.getFileMetaData().getSchema();
+
+ Map<SchemaPath,OriginalType> originalTypeMap = Maps.newHashMap();
+ schema.getPaths();
+ for (String[] path : schema.getPaths()) {
+ originalTypeMap.put(SchemaPath.getCompoundPath(path), getOriginalType(schema, path, 0));
+ }
+
+ List<RowGroupMetadata> rowGroupMetadataList = Lists.newArrayList();
+
+ for (BlockMetaData rowGroup : metadata.getBlocks()) {
+ List<ColumnMetadata> columnMetadataList = Lists.newArrayList();
+ long length = 0;
+ for (ColumnChunkMetaData col : rowGroup.getColumns()) {
+ ColumnMetadata columnMetadata;
+
+ boolean statsAvailable = (col.getStatistics() != null && !col.getStatistics().isEmpty());
+
+ Statistics stats = col.getStatistics();
+ SchemaPath columnName = SchemaPath.getCompoundPath(col.getPath().toArray());
+ if (statsAvailable) {
+ columnMetadata = new ColumnMetadata(columnName, col.getType(), originalTypeMap.get(columnName),
+ stats.genericGetMax(), stats.genericGetMin(), stats.getNumNulls());
+ } else {
+ columnMetadata = new ColumnMetadata(columnName, col.getType(), originalTypeMap.get(columnName),
+ null, null, null);
+ }
+ columnMetadataList.add(columnMetadata);
+ length += col.getTotalSize();
+ }
+
+ RowGroupMetadata rowGroupMeta = new RowGroupMetadata(rowGroup.getStartingPos(), length, rowGroup.getRowCount(),
+ getHostAffinity(file, rowGroup.getStartingPos(), length), columnMetadataList);
+
+ rowGroupMetadataList.add(rowGroupMeta);
+ }
+ String path = Path.getPathWithoutSchemeAndAuthority(file.getPath()).toString();
+
+ return new ParquetFileMetadata(path, file.getLen(), rowGroupMetadataList);
+ }
+
+ /**
+ * Get the host affinity for a row group
+ * @param fileStatus the parquet file
+ * @param start the start of the row group
+ * @param length the length of the row group
+ * @return
+ * @throws IOException
+ */
+ private Map<String,Float> getHostAffinity(FileStatus fileStatus, long start, long length) throws IOException {
+ BlockLocation[] blockLocations = fs.getFileBlockLocations(fileStatus, start, length);
+ Map<String,Float> hostAffinityMap = Maps.newHashMap();
+ for (BlockLocation blockLocation : blockLocations) {
+ for (String host : blockLocation.getHosts()) {
+ Float currentAffinity = hostAffinityMap.get(host);
+ float blockStart = blockLocation.getOffset();
+ float blockEnd = blockStart + blockLocation.getLength();
+ float rowGroupEnd = start + length;
+ Float newAffinity = (blockLocation.getLength() - (blockStart < start ? start - blockStart : 0) -
+ (blockEnd > rowGroupEnd ? blockEnd - rowGroupEnd : 0)) / length;
+ if (currentAffinity != null) {
+ hostAffinityMap.put(host, currentAffinity + newAffinity);
+ } else {
+ hostAffinityMap.put(host, newAffinity);
+ }
+ }
+ }
+ return hostAffinityMap;
+ }
+
+ /**
+ * Serialize parquet metadata to json and write to a file
+ * @param parquetTableMetadata
+ * @param p
+ * @throws IOException
+ */
+ private void writeFile(ParquetTableMetadata_v1 parquetTableMetadata, Path p) throws IOException {
+ JsonFactory jsonFactory = new JsonFactory();
+ jsonFactory.configure(Feature.AUTO_CLOSE_TARGET, false);
+ jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
+ ObjectMapper mapper = new ObjectMapper(jsonFactory);
+ FSDataOutputStream os = fs.create(p);
+ mapper.writerWithDefaultPrettyPrinter().writeValue(os, parquetTableMetadata);
+ os.flush();
+ os.close();
+ }
+
+ /**
+ * Read the parquet metadata from a file
+ * @param path
+ * @return
+ * @throws IOException
+ */
+ private ParquetTableMetadata_v1 readBlockMeta(String path) throws IOException {
+ Path p = new Path(path);
+ ObjectMapper mapper = new ObjectMapper();
+ SimpleModule module = new SimpleModule();
+ module.addDeserializer(SchemaPath.class, new De());
+ mapper.registerModule(module);
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ FSDataInputStream is = fs.open(p);
+ ParquetTableMetadata_v1 parquetTableMetadata = mapper.readValue(is, ParquetTableMetadata_v1.class);
+ if (tableModified(parquetTableMetadata, p)) {
+ parquetTableMetadata = createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(p.getParent()).toString());
+ }
+ return parquetTableMetadata;
+ }
+
+ /**
+ * Check if the parquet metadata needs to be updated by comparing the modification time of the directories with
+ * the modification time of the metadata file
+ * @param tableMetadata
+ * @param metaFilePath
+ * @return
+ * @throws IOException
+ */
+ private boolean tableModified(ParquetTableMetadata_v1 tableMetadata, Path metaFilePath) throws IOException {
+ long metaFileModifyTime = fs.getFileStatus(metaFilePath).getModificationTime();
+ FileStatus directoryStatus = fs.getFileStatus(metaFilePath.getParent());
+ if (directoryStatus.getModificationTime() > metaFileModifyTime) {
+ return true;
+ }
+ for (String directory : tableMetadata.directories) {
+ directoryStatus = fs.getFileStatus(new Path(directory));
+ if (directoryStatus.getModificationTime() > metaFileModifyTime) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "metadata_version")
+ public static class ParquetTableMetadataBase {
+
+ }
+
+ /**
+ * Struct which contains the metadata for an entire parquet directory structure
+ */
+ @JsonTypeName("v1")
+ public static class ParquetTableMetadata_v1 extends ParquetTableMetadataBase {
+ @JsonProperty
+ List<ParquetFileMetadata> files;
+ @JsonProperty
+ List<String> directories;
+
+ public ParquetTableMetadata_v1() {
+ super();
+ }
+
+ public ParquetTableMetadata_v1(List<ParquetFileMetadata> files, List<String> directories) {
+ this.files = files;
+ this.directories = directories;
+ }
+ }
+
+ /**
+ * Struct which contains the metadata for a single parquet file
+ */
+ public static class ParquetFileMetadata {
+ @JsonProperty
+ public String path;
+ @JsonProperty
+ public Long length;
+ @JsonProperty
+ public List<RowGroupMetadata> rowGroups;
+
+ public ParquetFileMetadata() {
+ super();
+ }
+
+ public ParquetFileMetadata(String path, Long length, List<RowGroupMetadata> rowGroups) {
+ this.path = path;
+ this.length = length;
+ this.rowGroups = rowGroups;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("path: %s rowGroups: %s", path, rowGroups);
+ }
+ }
+
+ /**
+ * A struct that contains the metadata for a parquet row group
+ */
+ public static class RowGroupMetadata {
+ @JsonProperty
+ public Long start;
+ @JsonProperty
+ public Long length;
+ @JsonProperty
+ public Long rowCount;
+ @JsonProperty
+ public Map<String, Float> hostAffinity;
+ @JsonProperty
+ public List<ColumnMetadata> columns;
+
+ public RowGroupMetadata() {
+ super();
+ }
+
+ public RowGroupMetadata(Long start, Long length, Long rowCount,
+ Map<String, Float> hostAffinity, List<ColumnMetadata> columns) {
+ this.start = start;
+ this.length = length;
+ this.rowCount = rowCount;
+ this.hostAffinity = hostAffinity;
+ this.columns = columns;
+ }
+ }
+
+ /**
+ * A struct that contains the metadata for a column in a parquet file
+ */
+ public static class ColumnMetadata {
+ @JsonProperty
+ public SchemaPath name;
+ @JsonProperty
+ public PrimitiveTypeName primitiveType;
+ @JsonProperty
+ public OriginalType originalType;
+ @JsonProperty
+ public Object max;
+ @JsonProperty
+ public Object min;
+ @JsonProperty
+ public Long nulls;
+
+ public ColumnMetadata() {
+ super();
+ }
+
+ public ColumnMetadata(SchemaPath name, PrimitiveTypeName primitiveType, OriginalType originalType,
+ Object max, Object min, Long nulls) {
+ this.name = name;
+ this.primitiveType = primitiveType;
+ this.originalType = originalType;
+ this.max = max;
+ this.min = min;
+ this.nulls = nulls;
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 446e12aa3..eeb522ab6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -217,6 +217,9 @@ public class ParquetFormatPlugin implements FormatPlugin{
return true;
} else {
+ if (fs.exists(new Path(dir.getPath(), Metadata.METADATA_FILENAME))) {
+ return true;
+ }
PathFilter filter = new DrillPathFilter();
FileStatus[] files = fs.listStatus(dir.getPath(), filter);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index 845bce95c..00d36ffaa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -18,30 +18,23 @@
package org.apache.drill.exec.store.parquet;
import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import io.netty.buffer.DrillBuf;
-import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.expr.holders.IntervalHolder;
import org.apache.drill.exec.metrics.DrillMetrics;
import org.apache.drill.exec.physical.EndpointAffinity;
import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
@@ -56,27 +49,28 @@ import org.apache.drill.exec.store.ParquetOutputRecordWriter;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.TimedRunnable;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.DrillPathFilter;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
import org.apache.drill.exec.store.dfs.ReadEntryWithPath;
import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.store.parquet.Metadata.ColumnMetadata;
+import org.apache.drill.exec.store.parquet.Metadata.ParquetFileMetadata;
+import org.apache.drill.exec.store.parquet.Metadata.ParquetTableMetadata_v1;
+import org.apache.drill.exec.store.parquet.Metadata.RowGroupMetadata;
import org.apache.drill.exec.store.schedule.AffinityCreator;
import org.apache.drill.exec.store.schedule.AssignmentCreator;
import org.apache.drill.exec.store.schedule.BlockMapBuilder;
import org.apache.drill.exec.store.schedule.CompleteWork;
import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
import org.apache.drill.exec.util.ImpersonationUtil;
-import org.apache.drill.exec.vector.BigIntVector;
-import org.apache.drill.exec.vector.Float4Vector;
-import org.apache.drill.exec.vector.Float8Vector;
-import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.NullableBigIntVector;
import org.apache.drill.exec.vector.NullableDateVector;
import org.apache.drill.exec.vector.NullableDecimal18Vector;
import org.apache.drill.exec.vector.NullableFloat4Vector;
import org.apache.drill.exec.vector.NullableFloat8Vector;
import org.apache.drill.exec.vector.NullableIntVector;
-import org.apache.drill.exec.vector.NullableIntervalVector;
import org.apache.drill.exec.vector.NullableSmallIntVector;
import org.apache.drill.exec.vector.NullableTimeStampVector;
import org.apache.drill.exec.vector.NullableTimeVector;
@@ -87,27 +81,14 @@ import org.apache.drill.exec.vector.NullableUInt4Vector;
import org.apache.drill.exec.vector.NullableVarBinaryVector;
import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.VarBinaryVector;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
import org.joda.time.DateTimeUtils;
-import parquet.column.statistics.Statistics;
-import parquet.format.ConvertedType;
-import parquet.format.FileMetaData;
-import parquet.format.SchemaElement;
-import parquet.format.converter.ParquetMetadataConverter;
-import parquet.hadoop.Footer;
-import parquet.hadoop.ParquetFileWriter;
-import parquet.hadoop.metadata.BlockMetaData;
-import parquet.hadoop.metadata.ColumnChunkMetaData;
-import parquet.hadoop.metadata.ParquetMetadata;
import parquet.io.api.Binary;
import parquet.org.codehaus.jackson.annotate.JsonCreator;
import com.codahale.metrics.MetricRegistry;
-import com.codahale.metrics.Timer;
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
@@ -118,7 +99,6 @@ import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import parquet.schema.OriginalType;
import parquet.schema.PrimitiveType.PrimitiveTypeName;
-import parquet.schema.Type;
@JsonTypeName("parquet-scan")
public class ParquetGroupScan extends AbstractFileGroupScan {
@@ -126,6 +106,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
static final MetricRegistry metrics = DrillMetrics.getInstance();
static final String READ_FOOTER_TIMER = MetricRegistry.name(ParquetGroupScan.class, "readFooter");
+
private final List<ReadEntryWithPath> entries;
private final Stopwatch watch = new Stopwatch();
private final ParquetFormatPlugin formatPlugin;
@@ -171,7 +152,8 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
this.formatConfig = formatPlugin.getConfig();
this.entries = entries;
this.selectionRoot = selectionRoot;
- this.readFooterFromEntries();
+
+ init();
}
public ParquetGroupScan( //
@@ -195,7 +177,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
this.selectionRoot = selectionRoot;
- readFooter(files);
+ init();
}
/*
@@ -238,137 +220,11 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
return selectionRoot;
}
- private void readFooterFromEntries() throws IOException {
- List<FileStatus> files = Lists.newArrayList();
- for (ReadEntryWithPath e : entries) {
- files.add(fs.getFileStatus(new Path(e.getPath())));
- }
- readFooter(files);
- }
-
- private void readFooter(final List<FileStatus> statuses) {
- final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(getUserName());
- try {
- ugi.doAs(new PrivilegedExceptionAction<Void>() {
- public Void run() throws Exception {
- readFooterHelper(statuses);
- return null;
- }
- });
- } catch (InterruptedException | IOException e) {
- final String errMsg = String.format("Failed to read footer entries from parquet input files: %s", e.getMessage());
- logger.error(errMsg, e);
- throw new DrillRuntimeException(errMsg, e);
- }
- }
-
public Set<String> getFileSet() {
return fileSet;
}
- private Set<String> fileSet = Sets.newHashSet();
-
- private void readFooterHelper(List<FileStatus> statuses) throws IOException {
- watch.reset();
- watch.start();
- Timer.Context tContext = metrics.timer(READ_FOOTER_TIMER).time();
-
- columnTypeMap.clear();
- fileSet.clear();
- partitionValueMap.clear();
-
- rowGroupInfos = Lists.newArrayList();
- long start = 0, length = 0;
- rowCount = 0;
- columnValueCounts = new HashMap<SchemaPath, Long>();
-
- ColumnChunkMetaData columnChunkMetaData;
-
- List<Footer> footers = FooterGatherer.getFooters(formatPlugin.getFsConf(), statuses, 16);
- boolean first = true;
- ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
- for (Footer footer : footers) {
- int index = 0;
- ParquetMetadata metadata = footer.getParquetMetadata();
- FileMetaData fileMetaData = metadataConverter.toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, metadata);
- HashMap<String, SchemaElement> schemaElements = new HashMap<>();
- for (SchemaElement se : fileMetaData.getSchema()) {
- schemaElements.put(se.getName(), se);
- }
- for (BlockMetaData rowGroup : metadata.getBlocks()) {
- String file = Path.getPathWithoutSchemeAndAuthority(footer.getFile()).toString();
- fileSet.add(file);
- long valueCountInGrp = 0;
- // need to grab block information from HDFS
- columnChunkMetaData = rowGroup.getColumns().iterator().next();
- start = columnChunkMetaData.getFirstDataPageOffset();
- // this field is not being populated correctly, but the column chunks know their sizes, just summing them for
- // now
- // end = start + rowGroup.getTotalByteSize();
- length = 0;
- for (ColumnChunkMetaData col : rowGroup.getColumns()) {
- length += col.getTotalSize();
- valueCountInGrp = Math.max(col.getValueCount(), valueCountInGrp);
- SchemaPath schemaPath = SchemaPath.getSimplePath(col.getPath().toString().replace("[", "").replace("]", "").toLowerCase());
-
- long previousCount = 0;
- long currentCount = 0;
-
- if (! columnValueCounts.containsKey(schemaPath)) {
- // create an entry for this column
- columnValueCounts.put(schemaPath, previousCount /* initialize to 0 */);
- } else {
- previousCount = columnValueCounts.get(schemaPath);
- }
-
- boolean statsAvail = (col.getStatistics() != null && !col.getStatistics().isEmpty());
-
- if (statsAvail && previousCount != GroupScan.NO_COLUMN_STATS) {
- currentCount = col.getValueCount() - col.getStatistics().getNumNulls(); // only count non-nulls
- columnValueCounts.put(schemaPath, previousCount + currentCount);
- } else {
- // even if 1 chunk does not have stats, we cannot rely on the value count for this column
- columnValueCounts.put(schemaPath, GroupScan.NO_COLUMN_STATS);
- }
-
- // check if this column can be used for partition pruning
- SchemaElement se = schemaElements.get(schemaPath.getAsUnescapedPath());
- boolean partitionColumn = checkForPartitionColumn(schemaPath, col, se, first);
- if (partitionColumn) {
- Map<SchemaPath,Object> map = partitionValueMap.get(file);
- if (map == null) {
- map = Maps.newHashMap();
- partitionValueMap.put(file, map);
- }
- Object value = map.get(schemaPath);
- Object currentValue = col.getStatistics().genericGetMax();
- if (value != null) {
- if (value != currentValue) {
- columnTypeMap.remove(schemaPath);
- }
- } else {
- map.put(schemaPath, currentValue);
- }
- } else {
- columnTypeMap.remove(schemaPath);
- }
- }
-
- String filePath = footer.getFile().toUri().getPath();
- rowGroupInfos.add(new ParquetGroupScan.RowGroupInfo(filePath, start, length, index));
- logger.debug("rowGroupInfo path: {} start: {} length {}", filePath, start, length);
- index++;
-
- rowCount += rowGroup.getRowCount();
- first = false;
- }
-
- }
- Preconditions.checkState(!rowGroupInfos.isEmpty(), "No row groups found");
- tContext.stop();
- watch.stop();
- logger.debug("Took {} ms to get row group infos", watch.elapsed(TimeUnit.MILLISECONDS));
- }
+ private Set<String> fileSet;
@JsonIgnore
private Map<SchemaPath,MajorType> columnTypeMap = Maps.newHashMap();
@@ -378,30 +234,27 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
* every column to see if it is single valued, and if so, add it to the list of potential partition columns. For the
* remaining footers, we will not find any new partition columns, but we may discover that what was previously a
* potential partition column now no longer qualifies, so it needs to be removed from the list.
- * @param column
- * @param columnChunkMetaData
- * @param se
- * @param first
* @return whether column is a potential partition column
*/
- private boolean checkForPartitionColumn(SchemaPath column, ColumnChunkMetaData columnChunkMetaData, SchemaElement se, boolean first) {
+ private boolean checkForPartitionColumn(ColumnMetadata columnMetadata, boolean first) {
+ SchemaPath schemaPath = columnMetadata.name;
if (first) {
- if (hasSingleValue(columnChunkMetaData)) {
- columnTypeMap.put(column, getType(columnChunkMetaData, se));
+ if (hasSingleValue(columnMetadata)) {
+ columnTypeMap.put(schemaPath, getType(columnMetadata.primitiveType, columnMetadata.originalType));
return true;
} else {
return false;
}
} else {
- if (!columnTypeMap.keySet().contains(column)) {
+ if (!columnTypeMap.keySet().contains(schemaPath)) {
return false;
} else {
- if (!hasSingleValue(columnChunkMetaData)) {
- columnTypeMap.remove(column);
+ if (!hasSingleValue(columnMetadata)) {
+ columnTypeMap.remove(schemaPath);
return false;
}
- if (!getType(columnChunkMetaData, se).equals(columnTypeMap.get(column))) {
- columnTypeMap.remove(column);
+ if (!getType(columnMetadata.primitiveType, columnMetadata.originalType).equals(columnTypeMap.get(schemaPath))) {
+ columnTypeMap.remove(schemaPath);
return false;
}
}
@@ -409,9 +262,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
return true;
}
- private MajorType getType(ColumnChunkMetaData columnChunkMetaData, SchemaElement schemaElement) {
- ConvertedType originalType = schemaElement == null ? null : schemaElement.getConverted_type();
-
+ private MajorType getType(PrimitiveTypeName type, OriginalType originalType) {
if (originalType != null) {
switch (originalType) {
case DECIMAL:
@@ -439,7 +290,6 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
}
}
- PrimitiveTypeName type = columnChunkMetaData.getType();
switch (type) {
case BOOLEAN:
return Types.optional(MinorType.BIT);
@@ -460,25 +310,28 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
}
}
- private boolean hasSingleValue(ColumnChunkMetaData columnChunkMetaData) {
- Statistics stats = columnChunkMetaData.getStatistics();
- boolean hasStats = stats != null && !stats.isEmpty();
- if (hasStats) {
- if (stats.genericGetMin() == null || stats.genericGetMax() == null) {
- return false;
- }
- return stats.genericGetMax().equals(stats.genericGetMin());
- } else {
- return false;
- }
+ private boolean hasSingleValue(ColumnMetadata columnChunkMetaData) {
+ Object max = columnChunkMetaData.max;
+ Object min = columnChunkMetaData.min;
+ return max != null && max.equals(min);
}
@Override
public void modifyFileSelection(FileSelection selection) {
entries.clear();
+ fileSet = Sets.newHashSet();
for (String fileName : selection.getAsFiles()) {
entries.add(new ReadEntryWithPath(fileName));
+ fileSet.add(fileName);
+ }
+
+ List<RowGroupInfo> newRowGroupList = Lists.newArrayList();
+ for (RowGroupInfo rowGroupInfo : rowGroupInfos) {
+ if (fileSet.contains(rowGroupInfo.getPath())) {
+ newRowGroupList.add(rowGroupInfo);
+ }
}
+ this.rowGroupInfos = newRowGroupList;
}
public MajorType getTypeForColumn(SchemaPath schemaPath) {
@@ -629,6 +482,125 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
}
}
+ private void init() throws IOException {
+ ParquetTableMetadata_v1 parquetTableMetadata;
+ List<FileStatus> fileStatuses = null;
+ if (entries.size() == 1) {
+ Path p = Path.getPathWithoutSchemeAndAuthority(new Path(entries.get(0).getPath()));
+ Path metaPath = new Path(p, Metadata.METADATA_FILENAME);
+ if (fs.exists(metaPath)) {
+ parquetTableMetadata = Metadata.readBlockMeta(fs, metaPath.toString());
+ } else {
+ parquetTableMetadata = Metadata.getParquetTableMetadata(fs, p.toString());
+ }
+ } else {
+ Path p = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot));
+ Path metaPath = new Path(p, Metadata.METADATA_FILENAME);
+ if (fs.exists(metaPath)) {
+ parquetTableMetadata = removeUnneededRowGroups(Metadata.readBlockMeta(fs, metaPath.toString()));
+ } else {
+ fileStatuses = Lists.newArrayList();
+ for (ReadEntryWithPath entry : entries) {
+ getFiles(entry.getPath(), fileStatuses);
+ }
+ parquetTableMetadata = Metadata.getParquetTableMetadata(fs, fileStatuses);
+ }
+ }
+
+ if (fileSet == null) {
+ fileSet = Sets.newHashSet();
+ for (ParquetFileMetadata file : parquetTableMetadata.files) {
+ fileSet.add(file.path);
+ }
+ }
+
+ Map<String,DrillbitEndpoint> hostEndpointMap = Maps.newHashMap();
+
+ for (DrillbitEndpoint endpoint : formatPlugin.getContext().getBits()) {
+ hostEndpointMap.put(endpoint.getAddress(), endpoint);
+ }
+
+ rowGroupInfos = Lists.newArrayList();
+ for (ParquetFileMetadata file : parquetTableMetadata.files) {
+ int rgIndex = 0;
+ for (RowGroupMetadata rg : file.rowGroups) {
+ RowGroupInfo rowGroupInfo = new RowGroupInfo(file.path, rg.start, rg.length, rgIndex);
+ EndpointByteMap endpointByteMap = new EndpointByteMapImpl();
+ for (String host : rg.hostAffinity.keySet()) {
+ if (hostEndpointMap.containsKey(host)) {
+ endpointByteMap.add(hostEndpointMap.get(host), (long) (rg.hostAffinity.get(host) * rg.length));
+ }
+ }
+ rowGroupInfo.setEndpointByteMap(endpointByteMap);
+ rgIndex++;
+ rowGroupInfos.add(rowGroupInfo);
+ }
+ }
+
+ this.endpointAffinities = AffinityCreator.getAffinityMap(rowGroupInfos);
+
+ columnValueCounts = Maps.newHashMap();
+ this.rowCount = 0;
+ boolean first = true;
+ for (ParquetFileMetadata file : parquetTableMetadata.files) {
+ for (RowGroupMetadata rowGroup : file.rowGroups) {
+ long rowCount = rowGroup.rowCount;
+ for (ColumnMetadata column : rowGroup.columns) {
+ SchemaPath schemaPath = column.name;
+ Long previousCount = columnValueCounts.get(schemaPath);
+ if (previousCount != null) {
+ if (previousCount != GroupScan.NO_COLUMN_STATS) {
+ if (column.nulls != null) {
+ Long newCount = rowCount - column.nulls;
+ columnValueCounts.put(schemaPath, columnValueCounts.get(schemaPath) + newCount);
+ } else {
+
+ }
+ }
+ } else {
+ if (column.nulls != null) {
+ Long newCount = rowCount - column.nulls;
+ columnValueCounts.put(schemaPath, newCount);
+ } else {
+ columnValueCounts.put(schemaPath, GroupScan.NO_COLUMN_STATS);
+ }
+ }
+ boolean partitionColumn = checkForPartitionColumn(column, first);
+ if (partitionColumn) {
+ Map<SchemaPath,Object> map = partitionValueMap.get(file.path);
+ if (map == null) {
+ map = Maps.newHashMap();
+ partitionValueMap.put(file.path, map);
+ }
+ Object value = map.get(schemaPath);
+ Object currentValue = column.max;
+ if (value != null) {
+ if (value != currentValue) {
+ columnTypeMap.remove(schemaPath);
+ }
+ } else {
+ map.put(schemaPath, currentValue);
+ }
+ } else {
+ columnTypeMap.remove(schemaPath);
+ }
+ }
+ this.rowCount += rowGroup.rowCount;
+ first = false;
+ }
+ }
+ }
+
+ private ParquetTableMetadata_v1 removeUnneededRowGroups(ParquetTableMetadata_v1 parquetTableMetadata) {
+ List<ParquetFileMetadata> newFileMetadataList = Lists.newArrayList();
+ for (ParquetFileMetadata file : parquetTableMetadata.files) {
+ if (fileSet.contains(file.path)) {
+ newFileMetadataList.add(file);
+ }
+ }
+ return new ParquetTableMetadata_v1(newFileMetadataList, new ArrayList<String>());
+ }
+
/**
* Calculates the affinity each endpoint has for this scan, by adding up the affinity each endpoint has for each
* rowGroup
@@ -637,23 +609,19 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
*/
@Override
public List<EndpointAffinity> getOperatorAffinity() {
+ return this.endpointAffinities;
+ }
- if (this.endpointAffinities == null) {
- BlockMapBuilder bmb = new BlockMapBuilder(fs, formatPlugin.getContext().getBits());
- try {
- List<TimedRunnable<Void>> blockMappers = Lists.newArrayList();
- for (RowGroupInfo rgi : rowGroupInfos) {
- blockMappers.add(new BlockMapper(bmb, rgi));
- }
- TimedRunnable.run("Load Parquet RowGroup block maps", logger, blockMappers, 16);
- } catch (IOException e) {
- logger.warn("Failure while determining operator affinity.", e);
- return Collections.emptyList();
+ private void getFiles(String path, List<FileStatus> fileStatuses) throws IOException {
+ Path p = Path.getPathWithoutSchemeAndAuthority(new Path(path));
+ FileStatus fileStatus = fs.getFileStatus(p);
+ if (fileStatus.isDirectory()) {
+ for (FileStatus f : fs.listStatus(p, new DrillPathFilter())) {
+ getFiles(f.getPath().toString(), fileStatuses);
}
-
- this.endpointAffinities = AffinityCreator.getAffinityMap(rowGroupInfos);
+ } else {
+ fileStatuses.add(fileStatus);
}
- return this.endpointAffinities;
}
private class BlockMapper extends TimedRunnable<Void> {
@@ -704,9 +672,8 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
private List<RowGroupReadEntry> convertToReadEntries(List<RowGroupInfo> rowGroups) {
List<RowGroupReadEntry> entries = Lists.newArrayList();
for (RowGroupInfo rgi : rowGroups) {
- RowGroupReadEntry rgre = new RowGroupReadEntry(rgi.getPath(), rgi.getStart(), rgi.getLength(),
- rgi.getRowGroupIndex());
- entries.add(rgre);
+ RowGroupReadEntry entry = new RowGroupReadEntry(rgi.getPath(), rgi.getStart(), rgi.getLength(), rgi.getRowGroupIndex());
+ entries.add(entry);
}
return entries;
}
@@ -757,7 +724,7 @@ public class ParquetGroupScan extends AbstractFileGroupScan {
public FileGroupScan clone(FileSelection selection) throws IOException {
ParquetGroupScan newScan = new ParquetGroupScan(this);
newScan.modifyFileSelection(selection);
- newScan.readFooterFromEntries();
+ newScan.init();
return newScan;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AffinityCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AffinityCreator.java
index fea0875bb..22d4ef590 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AffinityCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AffinityCreator.java
@@ -45,7 +45,6 @@ public class AffinityCreator {
for (ObjectLongCursor<DrillbitEndpoint> cursor : entry.getByteMap()) {
long bytes = cursor.value;
float affinity = (float)bytes / (float)totalBytes;
- logger.debug("Work: {} Endpoint: {} Bytes: {}", work, cursor.key.getAddress(), bytes);
affinities.putOrAdd(cursor.key, affinity, affinity);
}
}