diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java | 50 |
1 files changed, 39 insertions, 11 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java index 876cd5b72..df84a9d1c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java @@ -17,13 +17,16 @@ */ package org.apache.drill.exec.store.parquet; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; +import java.io.InputStream; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; - import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.logical.StoragePluginConfig; @@ -35,6 +38,8 @@ import org.apache.drill.exec.physical.base.AbstractWriter; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.SchemalessScan; import org.apache.drill.exec.physical.impl.WriterRecordBatch; +import org.apache.drill.exec.planner.common.DrillStatsTable; +import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics; import org.apache.drill.exec.planner.logical.DrillTable; import org.apache.drill.exec.planner.logical.DynamicDrillTable; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; @@ -48,26 +53,28 @@ import org.apache.drill.exec.store.dfs.BasicFormatMatcher; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.dfs.FileSelection; import org.apache.drill.exec.store.dfs.FileSystemPlugin; -import org.apache.drill.exec.store.parquet.metadata.Metadata; -import org.apache.drill.exec.util.DrillFileSystemUtil; import org.apache.drill.exec.store.dfs.FormatMatcher; import org.apache.drill.exec.store.dfs.FormatPlugin; import org.apache.drill.exec.store.dfs.FormatSelection; import org.apache.drill.exec.store.dfs.MagicString; import org.apache.drill.exec.store.dfs.MetadataContext; import org.apache.drill.exec.store.mock.MockStorageEngine; +import org.apache.drill.exec.store.parquet.metadata.Metadata; import org.apache.drill.exec.store.parquet.metadata.ParquetTableMetadataDirs; +import org.apache.drill.exec.util.DrillFileSystemUtil; +import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.ParquetFileWriter; -import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; - public class ParquetFormatPlugin implements FormatPlugin { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class); public static final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); @@ -126,15 +133,14 @@ public class ParquetFormatPlugin implements FormatPlugin { } @Override - public AbstractWriter getWriter(PhysicalOperator child, String location, boolean append, List<String> partitionColumns) throws IOException { - return new ParquetWriter(child, location, append, partitionColumns, this); + public AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) throws IOException { + return new ParquetWriter(child, location, partitionColumns, this); } public RecordWriter getRecordWriter(FragmentContext context, ParquetWriter writer) throws IOException, OutOfMemoryException { Map<String, String> options = new HashMap<>(); options.put("location", writer.getLocation()); - options.put("append", Boolean.toString(writer.getAppend())); FragmentHandle handle = context.getHandle(); String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId()); @@ -194,6 +200,28 @@ public class ParquetFormatPlugin implements FormatPlugin { } @Override + public boolean supportsStatistics() { + return true; + } + + @Override + public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException { + Stopwatch timer = Stopwatch.createStarted(); + ObjectMapper mapper = DrillStatsTable.getMapper(); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + FSDataInputStream is = fs.open(statsTablePath); + TableStatistics statistics = mapper.readValue((InputStream) is, TableStatistics.class); + logger.info("Took {} ms to read statistics from {} format plugin", timer.elapsed(TimeUnit.MILLISECONDS), name); + timer.stop(); + return statistics; + } + + @Override + public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) throws IOException { + throw new UnsupportedOperationException("unimplemented"); + } + + @Override public StoragePluginConfig getStorageConfig() { return storageConfig; } @@ -262,9 +290,9 @@ public class ParquetFormatPlugin implements FormatPlugin { new FormatSelection(plugin.getConfig(), selection)); } } - if (!super.supportDirectoryReads() && selection.containsDirectories(fs)) { + /*if (!super.supportDirectoryReads() && selection.containsDirectories(fs)) { return null; - } + }*/ return super.isReadable(fs, selection, fsPlugin, storageEngineName, schemaConfig); } |