aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java50
1 files changed, 39 insertions, 11 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
index 876cd5b72..df84a9d1c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFormatPlugin.java
@@ -17,13 +17,16 @@
*/
package org.apache.drill.exec.store.parquet;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
+import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
-
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
@@ -35,6 +38,8 @@ import org.apache.drill.exec.physical.base.AbstractWriter;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.SchemalessScan;
import org.apache.drill.exec.physical.impl.WriterRecordBatch;
+import org.apache.drill.exec.planner.common.DrillStatsTable;
+import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
@@ -48,26 +53,28 @@ import org.apache.drill.exec.store.dfs.BasicFormatMatcher;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.FileSelection;
import org.apache.drill.exec.store.dfs.FileSystemPlugin;
-import org.apache.drill.exec.store.parquet.metadata.Metadata;
-import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.drill.exec.store.dfs.FormatMatcher;
import org.apache.drill.exec.store.dfs.FormatPlugin;
import org.apache.drill.exec.store.dfs.FormatSelection;
import org.apache.drill.exec.store.dfs.MagicString;
import org.apache.drill.exec.store.dfs.MetadataContext;
import org.apache.drill.exec.store.mock.MockStorageEngine;
+import org.apache.drill.exec.store.parquet.metadata.Metadata;
import org.apache.drill.exec.store.parquet.metadata.ParquetTableMetadataDirs;
+import org.apache.drill.exec.util.DrillFileSystemUtil;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileWriter;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-
public class ParquetFormatPlugin implements FormatPlugin {
+
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MockStorageEngine.class);
public static final ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
@@ -126,15 +133,14 @@ public class ParquetFormatPlugin implements FormatPlugin {
}
@Override
- public AbstractWriter getWriter(PhysicalOperator child, String location, boolean append, List<String> partitionColumns) throws IOException {
- return new ParquetWriter(child, location, append, partitionColumns, this);
+ public AbstractWriter getWriter(PhysicalOperator child, String location, List<String> partitionColumns) throws IOException {
+ return new ParquetWriter(child, location, partitionColumns, this);
}
public RecordWriter getRecordWriter(FragmentContext context, ParquetWriter writer) throws IOException, OutOfMemoryException {
Map<String, String> options = new HashMap<>();
options.put("location", writer.getLocation());
- options.put("append", Boolean.toString(writer.getAppend()));
FragmentHandle handle = context.getHandle();
String fragmentId = String.format("%d_%d", handle.getMajorFragmentId(), handle.getMinorFragmentId());
@@ -194,6 +200,28 @@ public class ParquetFormatPlugin implements FormatPlugin {
}
@Override
+ public boolean supportsStatistics() {
+ return true;
+ }
+
+ @Override
+ public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException {
+ Stopwatch timer = Stopwatch.createStarted();
+ ObjectMapper mapper = DrillStatsTable.getMapper();
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ FSDataInputStream is = fs.open(statsTablePath);
+ TableStatistics statistics = mapper.readValue((InputStream) is, TableStatistics.class);
+ logger.info("Took {} ms to read statistics from {} format plugin", timer.elapsed(TimeUnit.MILLISECONDS), name);
+ timer.stop();
+ return statistics;
+ }
+
+ @Override
+ public void writeStatistics(TableStatistics statistics, FileSystem fs, Path statsTablePath) throws IOException {
+ throw new UnsupportedOperationException("unimplemented");
+ }
+
+ @Override
public StoragePluginConfig getStorageConfig() {
return storageConfig;
}
@@ -262,9 +290,9 @@ public class ParquetFormatPlugin implements FormatPlugin {
new FormatSelection(plugin.getConfig(), selection));
}
}
- if (!super.supportDirectoryReads() && selection.containsDirectories(fs)) {
+ /*if (!super.supportDirectoryReads() && selection.containsDirectories(fs)) {
return null;
- }
+ }*/
return super.isReadable(fs, selection, fsPlugin, storageEngineName, schemaConfig);
}