From 3d29faf81da593035f6bd38dd56d48e719afe7d4 Mon Sep 17 00:00:00 2001 From: Vitalii Diravka Date: Mon, 18 Feb 2019 22:30:36 +0200 Subject: DRILL-5603: Replace String file paths to Hadoop Path - replaced all String path representation with org.apache.hadoop.fs.Path - added PathSerDe.Se JSON serializer - refactoring of DFSPartitionLocation code by leveraging existing listPartitionValues() functionality closes #1657 --- .../exec/store/mapr/db/MapRDBFormatPlugin.java | 4 +- .../store/mapr/streams/StreamsFormatPlugin.java | 12 +- .../exec/store/syslog/SyslogRecordReader.java | 3 +- .../exec/planner/sql/HivePartitionDescriptor.java | 9 +- .../exec/planner/sql/HivePartitionLocation.java | 8 +- .../hive/HiveDrillNativeParquetRowGroupScan.java | 2 +- .../store/hive/HiveDrillNativeParquetScan.java | 6 +- .../HiveDrillNativeParquetScanBatchCreator.java | 5 +- .../drill/exec/store/hive/HivePartitionHolder.java | 11 +- .../apache/drill/exec/physical/PhysicalPlan.java | 3 +- .../exec/physical/base/AbstractGroupScan.java | 3 +- .../apache/drill/exec/physical/base/GroupScan.java | 5 +- .../drill/exec/physical/base/SchemalessScan.java | 9 +- .../impl/scan/file/BaseFileScanFramework.java | 2 +- .../exec/physical/impl/scan/file/FileMetadata.java | 2 +- .../exec/planner/AbstractPartitionDescriptor.java | 3 +- .../exec/planner/DFSDirPartitionLocation.java | 18 ++- .../exec/planner/DFSFilePartitionLocation.java | 25 +--- .../planner/FileSystemPartitionDescriptor.java | 102 ++++++------- .../exec/planner/ParquetPartitionDescriptor.java | 18 +-- .../exec/planner/ParquetPartitionLocation.java | 8 +- .../drill/exec/planner/PartitionDescriptor.java | 44 ++++-- .../drill/exec/planner/PartitionLocation.java | 14 +- .../drill/exec/planner/PhysicalPlanReader.java | 53 +++---- .../exec/planner/SimplePartitionLocation.java | 5 +- .../planner/logical/partition/PruneScanRule.java | 17 ++- .../planner/sql/handlers/AnalyzeTableHandler.java | 28 ++-- .../sql/handlers/RefreshMetadataHandler.java | 4 +- .../apache/drill/exec/serialization/PathSerDe.java | 39 +++++ .../apache/drill/exec/store/ColumnExplorer.java | 46 +++--- .../drill/exec/store/avro/AvroDrillTable.java | 4 +- .../drill/exec/store/avro/AvroRecordReader.java | 12 +- .../apache/drill/exec/store/dfs/FileSelection.java | 161 ++++++++++----------- .../drill/exec/store/dfs/FormatSelection.java | 8 +- .../drill/exec/store/dfs/MetadataContext.java | 7 +- .../drill/exec/store/dfs/ReadEntryFromHDFS.java | 3 +- .../drill/exec/store/dfs/ReadEntryWithPath.java | 13 +- .../exec/store/dfs/easy/EasyFormatPlugin.java | 2 +- .../drill/exec/store/dfs/easy/EasyGroupScan.java | 33 ++--- .../drill/exec/store/dfs/easy/EasySubScan.java | 22 +-- .../apache/drill/exec/store/dfs/easy/FileWork.java | 8 +- .../exec/store/direct/MetadataDirectGroupScan.java | 10 +- .../exec/store/easy/json/JSONRecordReader.java | 13 +- .../sequencefile/SequenceFileFormatPlugin.java | 2 +- .../exec/store/easy/text/TextFormatPlugin.java | 2 +- .../exec/store/httpd/HttpdLogFormatPlugin.java | 2 +- .../drill/exec/store/image/ImageFormatPlugin.java | 9 +- .../drill/exec/store/image/ImageRecordReader.java | 4 +- .../drill/exec/store/log/LogRecordReader.java | 5 +- .../store/parquet/AbstractParquetGroupScan.java | 17 ++- .../parquet/AbstractParquetScanBatchCreator.java | 11 +- .../drill/exec/store/parquet/ParquetGroupScan.java | 42 +++--- .../store/parquet/ParquetGroupScanStatistics.java | 5 +- .../exec/store/parquet/ParquetRowGroupScan.java | 11 +- .../store/parquet/ParquetScanBatchCreator.java | 3 +- .../drill/exec/store/parquet/RowGroupInfo.java | 3 +- .../exec/store/parquet/RowGroupReadEntry.java | 3 +- .../parquet/columnreaders/ParquetRecordReader.java | 24 ++- .../exec/store/parquet/metadata/Metadata.java | 37 +++-- .../exec/store/parquet/metadata/MetadataBase.java | 5 +- .../store/parquet/metadata/MetadataPathUtils.java | 29 ++-- .../exec/store/parquet/metadata/Metadata_V1.java | 13 +- .../exec/store/parquet/metadata/Metadata_V2.java | 15 +- .../exec/store/parquet/metadata/Metadata_V3.java | 15 +- .../parquet/metadata/ParquetTableMetadataDirs.java | 7 +- .../exec/store/parquet2/DrillParquetReader.java | 2 +- .../drill/exec/store/pcap/PcapRecordReader.java | 4 +- .../exec/store/pcapng/PcapngRecordReader.java | 4 +- .../drill/exec/store/schedule/BlockMapBuilder.java | 22 +-- .../exec/store/schedule/CompleteFileWork.java | 42 ++++-- .../drill/exec/util/DrillFileSystemUtil.java | 11 ++ .../test/java/org/apache/drill/PlanTestBase.java | 3 +- .../apache/drill/exec/TestPathSerialization.java | 65 +++++++++ .../physical/impl/scan/TestFileScanFramework.java | 2 +- .../exec/physical/unit/MiniPlanUnitTestBase.java | 13 +- .../drill/exec/physical/unit/TestMiniPlan.java | 5 +- .../exec/physical/unit/TestNullInputMiniPlan.java | 8 +- .../drill/exec/store/CachedSingleFileSystem.java | 8 +- .../exec/store/dfs/TestDFSPartitionLocation.java | 54 +++++++ .../drill/exec/store/dfs/TestFileSelection.java | 7 +- .../store/parquet/ParquetRecordReaderTest.java | 4 +- .../drill/exec/store/store/TestAssignment.java | 3 +- .../apache/drill/test/PhysicalOpUnitTestBase.java | 5 +- .../java/org/apache/drill/test/QueryBuilder.java | 2 +- 84 files changed, 765 insertions(+), 567 deletions(-) create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/serialization/PathSerDe.java create mode 100644 exec/java-exec/src/test/java/org/apache/drill/exec/TestPathSerialization.java create mode 100644 exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDFSPartitionLocation.java diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java index 3011f4efb..6501f8c10 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java @@ -180,9 +180,9 @@ public class MapRDBFormatPlugin extends TableFormatPlugin { */ @JsonIgnore public String getTableName(FileSelection selection) { - List files = selection.getFiles(); + List files = selection.getFiles(); assert (files.size() == 1); - return files.get(0); + return files.get(0).toUri().getPath(); } } diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java index 92f134fb7..2ddf75296 100644 --- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java +++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java @@ -36,7 +36,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class StreamsFormatPlugin extends TableFormatPlugin { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamsFormatPlugin.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamsFormatPlugin.class); private StreamsFormatMatcher matcher; public StreamsFormatPlugin(String name, DrillbitContext context, Configuration fsConf, @@ -72,9 +72,8 @@ public class StreamsFormatPlugin extends TableFormatPlugin { } @Override - public AbstractGroupScan getGroupScan(String userName, FileSelection selection, - List columns) throws IOException { - List files = selection.getFiles(); + public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List columns) { + List files = selection.getFiles(); assert (files.size() == 1); //TableProperties props = getMaprFS().getTableProperties(new Path(files.get(0))); throw UserException.unsupportedError().message("MapR streams can not be querried at this time.").build(logger); @@ -86,13 +85,12 @@ public class StreamsFormatPlugin extends TableFormatPlugin { } @Override - public DrillStatsTable.TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException { + public DrillStatsTable.TableStatistics readStatistics(FileSystem fs, Path statsTablePath) { throw new UnsupportedOperationException("unimplemented"); } @Override - public void writeStatistics(DrillStatsTable.TableStatistics statistics, FileSystem fs, Path statsTablePath) throws IOException { + public void writeStatistics(DrillStatsTable.TableStatistics statistics, FileSystem fs, Path statsTablePath) { throw new UnsupportedOperationException("unimplemented"); } - } diff --git a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java index a198e3444..4b2831c2d 100644 --- a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java +++ b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java @@ -32,7 +32,6 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.dfs.easy.FileWork; import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter; import org.apache.drill.exec.vector.complex.writer.BaseWriter; -import org.apache.hadoop.fs.Path; import org.realityforge.jsyslog.message.StructuredDataParameter; import org.realityforge.jsyslog.message.SyslogMessage; @@ -95,7 +94,7 @@ public class SyslogRecordReader extends AbstractRecordReader { private void openFile() { InputStream in; try { - in = fileSystem.open(new Path(fileWork.getPath())); + in = fileSystem.open(fileWork.getPath()); } catch (Exception e) { throw UserException .dataReadError(e) diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java index 25a0c080e..a52b48ded 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java @@ -36,6 +36,7 @@ import org.apache.drill.exec.store.hive.HiveUtilities; import org.apache.drill.exec.store.hive.HiveReadEntry; import org.apache.drill.exec.store.hive.HiveScan; import org.apache.drill.exec.vector.ValueVector; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.Partition; import java.util.BitSet; @@ -87,9 +88,9 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor { } @Override - public String getBaseTableLocation() { + public Path getBaseTableLocation() { HiveReadEntry origEntry = ((HiveScan) scanRel.getGroupScan()).getHiveReadEntry(); - return origEntry.table.getTable().getSd().getLocation(); + return new Path(origEntry.table.getTable().getSd().getLocation()); } @Override @@ -145,7 +146,7 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor { List locations = new LinkedList<>(); HiveReadEntry origEntry = ((HiveScan) scanRel.getGroupScan()).getHiveReadEntry(); for (Partition partition: origEntry.getPartitions()) { - locations.add(new HivePartitionLocation(partition.getValues(), partition.getSd().getLocation())); + locations.add(new HivePartitionLocation(partition.getValues(), new Path(partition.getSd().getLocation()))); } locationSuperList = Lists.partition(locations, PartitionDescriptor.PARTITION_BATCH_SIZE); sublistsCreated = true; @@ -170,7 +171,7 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor { List newPartitions = Lists.newLinkedList(); for (HiveTableWrapper.HivePartitionWrapper part: oldPartitions) { - String partitionLocation = part.getPartition().getSd().getLocation(); + Path partitionLocation = new Path(part.getPartition().getSd().getLocation()); for (PartitionLocation newPartitionLocation: newPartitionLocations) { if (partitionLocation.equals(newPartitionLocation.getEntirePartitionLocation())) { newPartitions.add(part); diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java index bb0efe841..25821f58a 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java @@ -19,17 +19,19 @@ package org.apache.drill.exec.planner.sql; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; import org.apache.drill.exec.planner.SimplePartitionLocation; +import org.apache.hadoop.fs.Path; import java.util.List; public class HivePartitionLocation extends SimplePartitionLocation { - private final String partitionLocation; + private final Path partitionLocation; private final List partitionValues; - public HivePartitionLocation(final List partitionValues, final String partitionLocation) { + public HivePartitionLocation(List partitionValues, Path partitionLocation) { this.partitionValues = ImmutableList.copyOf(partitionValues); this.partitionLocation = partitionLocation; } + @Override public String getPartitionValue(int index) { assert index < partitionValues.size(); @@ -37,7 +39,7 @@ public class HivePartitionLocation extends SimplePartitionLocation { } @Override - public String getEntirePartitionLocation() { + public Path getEntirePartitionLocation() { return partitionLocation; } } diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java index bea06e073..78e107a85 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java @@ -124,7 +124,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS @Override public Configuration getFsConf(RowGroupReadEntry rowGroupReadEntry) throws IOException { - Path path = new Path(rowGroupReadEntry.getPath()).getParent(); + Path path = rowGroupReadEntry.getPath().getParent(); return new ProjectionPusher().pushProjectionsAndFilters( new JobConf(HiveUtilities.generateHiveConf(hiveStoragePlugin.getHiveConf(), confProperties)), path.getParent()); diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java index 617f6a59f..79a07f17b 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java @@ -114,7 +114,7 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan { assert split instanceof FileSplit; FileSplit fileSplit = (FileSplit) split; Path finalPath = fileSplit.getPath(); - String pathString = Path.getPathWithoutSchemeAndAuthority(finalPath).toString(); + Path pathString = Path.getPathWithoutSchemeAndAuthority(finalPath); entries.add(new ReadEntryWithPath(pathString)); // store partition values per path @@ -205,7 +205,7 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan { protected void initInternal() throws IOException { Map fileStatusConfMap = new LinkedHashMap<>(); for (ReadEntryWithPath entry : entries) { - Path path = new Path(entry.getPath()); + Path path = entry.getPath(); Configuration conf = new ProjectionPusher().pushProjectionsAndFilters( new JobConf(hiveStoragePlugin.getHiveConf()), path.getParent()); @@ -221,7 +221,7 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan { } @Override - protected AbstractParquetGroupScan cloneWithFileSelection(Collection filePaths) throws IOException { + protected AbstractParquetGroupScan cloneWithFileSelection(Collection filePaths) throws IOException { FileSelection newSelection = new FileSelection(null, new ArrayList<>(filePaths), null, null, false); return clone(newSelection); } diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScanBatchCreator.java index 339e1bda4..be906febe 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScanBatchCreator.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScanBatchCreator.java @@ -28,6 +28,7 @@ import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.parquet.AbstractParquetScanBatchCreator; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import java.io.IOException; import java.util.HashMap; @@ -54,7 +55,7 @@ public class HiveDrillNativeParquetScanBatchCreator extends AbstractParquetScanB */ private class HiveDrillNativeParquetDrillFileSystemManager extends AbstractDrillFileSystemManager { - private final Map fileSystems; + private final Map fileSystems; HiveDrillNativeParquetDrillFileSystemManager(OperatorContext operatorContext) { super(operatorContext); @@ -62,7 +63,7 @@ public class HiveDrillNativeParquetScanBatchCreator extends AbstractParquetScanB } @Override - protected DrillFileSystem get(Configuration config, String path) throws ExecutionSetupException { + protected DrillFileSystem get(Configuration config, Path path) throws ExecutionSetupException { DrillFileSystem fs = fileSystems.get(path); if (fs == null) { try { diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartitionHolder.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartitionHolder.java index 803144e0b..3e16acba4 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartitionHolder.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartitionHolder.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.store.hive; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.hadoop.fs.Path; import java.util.ArrayList; import java.util.Collections; @@ -34,11 +35,11 @@ import java.util.Map; */ public class HivePartitionHolder { - private final Map keyToIndexMapper; + private final Map keyToIndexMapper; private final List> partitionValues; @JsonCreator - public HivePartitionHolder(@JsonProperty("keyToIndexMapper") Map keyToIndexMapper, + public HivePartitionHolder(@JsonProperty("keyToIndexMapper") Map keyToIndexMapper, @JsonProperty("partitionValues") List> partitionValues) { this.keyToIndexMapper = keyToIndexMapper; this.partitionValues = partitionValues; @@ -50,7 +51,7 @@ public class HivePartitionHolder { } @JsonProperty - public Map getKeyToIndexMapper() { + public Map getKeyToIndexMapper() { return keyToIndexMapper; } @@ -67,7 +68,7 @@ public class HivePartitionHolder { * @param key mapper key * @param values partition values */ - public void add(String key, List values) { + public void add(Path key, List values) { int index = partitionValues.indexOf(values); if (index == -1) { index = partitionValues.size(); @@ -84,7 +85,7 @@ public class HivePartitionHolder { * @param key mapper key * @return list of partition values */ - public List get(String key) { + public List get(Path key) { Integer index = keyToIndexMapper.get(key); if (index == null) { return Collections.emptyList(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java index f73afb117..6eba65833 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java @@ -45,7 +45,8 @@ public class PhysicalPlan { Graph graph; @JsonCreator - public PhysicalPlan(@JsonProperty("head") PlanProperties properties, @JsonProperty("graph") List operators){ + public PhysicalPlan(@JsonProperty("head") PlanProperties properties, + @JsonProperty("graph") List operators) { this.properties = properties; this.graph = Graph.newGraph(operators, Root.class, Leaf.class); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java index 7e2623a19..03b53a4f7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java @@ -34,6 +34,7 @@ import org.apache.drill.exec.planner.physical.PlannerSettings; import com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.drill.exec.server.options.OptionManager; +import org.apache.hadoop.fs.Path; public abstract class AbstractGroupScan extends AbstractBase implements GroupScan { @@ -171,7 +172,7 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca } @Override - public Collection getFiles() { + public Collection getFiles() { return null; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java index e42ae2dc8..6dbad224c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java @@ -30,6 +30,7 @@ import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; +import org.apache.hadoop.fs.Path; /** * A GroupScan operator represents all data which will be scanned by a given physical @@ -142,8 +143,10 @@ public interface GroupScan extends Scan, HasAffinity{ /** * Returns a collection of file names associated with this GroupScan. This should be called after checking * hasFiles(). If this GroupScan cannot provide file names, it returns null. + * + * @return collection of files paths */ - Collection getFiles(); + Collection getFiles(); @JsonIgnore LogicalExpression getFilter(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SchemalessScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SchemalessScan.java index 999c417fc..fb60ddddb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SchemalessScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SchemalessScan.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.proto.CoordinationProtos; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.apache.hadoop.fs.Path; import java.util.List; @@ -32,17 +33,17 @@ import java.util.List; @JsonTypeName("schemaless-scan") public class SchemalessScan extends AbstractFileGroupScan implements SubScan { - private final String selectionRoot; + private final Path selectionRoot; @JsonCreator public SchemalessScan(@JsonProperty("userName") String userName, - @JsonProperty("selectionRoot") String selectionRoot, + @JsonProperty("selectionRoot") Path selectionRoot, @JsonProperty("columns") List columns) { this(userName, selectionRoot); } public SchemalessScan(@JsonProperty("userName") String userName, - @JsonProperty("selectionRoot") String selectionRoot) { + @JsonProperty("selectionRoot") Path selectionRoot) { super(userName); this.selectionRoot = selectionRoot; } @@ -53,7 +54,7 @@ public class SchemalessScan extends AbstractFileGroupScan implements SubScan { } @JsonProperty - public String getSelectionRoot() { + public Path getSelectionRoot() { return selectionRoot; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java index 5a8c526b5..8352dfa07 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java @@ -110,7 +110,7 @@ public abstract class BaseFileScanFramework paths = new ArrayList<>(); for (FileWork work : files) { - Path path = dfs.makeQualified(new Path(work.getPath())); + Path path = dfs.makeQualified(work.getPath()); paths.add(path); FileSplit split = new FileSplit(path, work.getStart(), work.getLength(), new String[]{""}); spilts.add(split); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadata.java index ff449d44e..2eb8af932 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadata.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadata.java @@ -52,7 +52,7 @@ public class FileMetadata { return; } - dirPath = ColumnExplorer.parsePartitions(filePath.toString(), rootPath.toString()); + dirPath = ColumnExplorer.parsePartitions(filePath, rootPath, false); if (dirPath == null) { throw new IllegalArgumentException( String.format("Selection root of \"%s\" is not a leading path of \"%s\"", diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java index cf0125692..d4148278d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.calcite.rel.core.TableScan; import org.apache.drill.exec.store.dfs.MetadataContext; +import org.apache.hadoop.fs.Path; /** * Abstract base class for file system based partition descriptors and Hive partition descriptors. @@ -65,7 +66,7 @@ public abstract class AbstractPartitionDescriptor implements PartitionDescriptor @Override - public TableScan createTableScan(List newPartitions, String cacheFileRoot, + public TableScan createTableScan(List newPartitions, Path cacheFileRoot, boolean isAllPruned, MetadataContext metaContext) throws Exception { throw new UnsupportedOperationException(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSDirPartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSDirPartitionLocation.java index 46afb30f5..440178b47 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSDirPartitionLocation.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSDirPartitionLocation.java @@ -21,7 +21,9 @@ package org.apache.drill.exec.planner; +import org.apache.drill.exec.util.DrillFileSystemUtil; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import org.apache.hadoop.fs.Path; import java.util.Collection; import java.util.List; @@ -46,7 +48,7 @@ public class DFSDirPartitionLocation implements PartitionLocation { } @Override - public String getEntirePartitionLocation() { + public Path getEntirePartitionLocation() { throw new UnsupportedOperationException("Should not call getEntirePartitionLocation for composite partition location!"); } @@ -67,15 +69,17 @@ public class DFSDirPartitionLocation implements PartitionLocation { } @Override - public String getCompositePartitionPath() { - String path = ""; - for (int i=0; i < dirs.length; i++) { - if (dirs[i] == null) { // get the prefix + public Path getCompositePartitionPath() { + StringBuilder path = new StringBuilder(); + for (String dir : dirs) { + if (dir == null) { // get the prefix break; } - path += "/" + dirs[i]; + path.append("/") + .append(dir); } - return path; + + return DrillFileSystemUtil.createPathSafe(path.toString()); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSFilePartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSFilePartitionLocation.java index ecfa6220d..0dfe31634 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSFilePartitionLocation.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSFilePartitionLocation.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.planner; +import org.apache.drill.exec.store.ColumnExplorer; import org.apache.hadoop.fs.Path; /** @@ -24,28 +25,14 @@ import org.apache.hadoop.fs.Path; */ public class DFSFilePartitionLocation extends SimplePartitionLocation { private final String[] dirs; - private final String file; + private final Path file; - public DFSFilePartitionLocation(int max, String selectionRoot, String file, boolean hasDirsOnly) { + public DFSFilePartitionLocation(int max, Path selectionRoot, Path file, boolean hasDirsOnly) { this.file = file; this.dirs = new String[max]; - // strip the scheme and authority if they exist - selectionRoot = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot)).toString(); - - int start = file.indexOf(selectionRoot) + selectionRoot.length(); - String postPath = file.substring(start); - if (postPath.length() == 0) { - return; - } - if(postPath.charAt(0) == '/'){ - postPath = postPath.substring(1); - } - String[] mostDirs = postPath.split("/"); - int maxLoop = Math.min(max, hasDirsOnly ? mostDirs.length : mostDirs.length - 1); - for(int i =0; i < maxLoop; i++) { - this.dirs[i] = mostDirs[i]; - } + String[] dirs = ColumnExplorer.parsePartitions(this.file, selectionRoot, hasDirsOnly); + System.arraycopy(dirs, 0, this.dirs, 0, Math.min(max, dirs.length)); } /** @@ -64,7 +51,7 @@ public class DFSFilePartitionLocation extends SimplePartitionLocation { * @return The partition location. */ @Override - public String getEntirePartitionLocation() { + public Path getEntirePartitionLocation() { return file; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java index 65268923d..188049003 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java @@ -26,6 +26,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import org.apache.calcite.plan.RelOptTable; +import org.apache.commons.lang3.tuple.Pair; import org.apache.drill.common.util.GuavaUtils; import org.apache.drill.shaded.guava.com.google.common.base.Charsets; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; @@ -37,7 +39,6 @@ import org.apache.calcite.adapter.enumerable.EnumerableTableScan; import org.apache.calcite.prepare.RelOptTableImpl; import org.apache.calcite.rel.core.TableScan; import org.apache.calcite.util.BitSets; -import org.apache.calcite.util.Pair; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; @@ -54,6 +55,7 @@ import org.apache.drill.exec.store.dfs.FormatSelection; import org.apache.drill.exec.store.dfs.MetadataContext; import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.ValueVector; +import org.apache.hadoop.fs.Path; // partition descriptor for file system based tables @@ -145,18 +147,18 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor { } @Override - public String getBaseTableLocation() { + public Path getBaseTableLocation() { final FormatSelection origSelection = (FormatSelection) table.getSelection(); - return origSelection.getSelection().selectionRoot; + return origSelection.getSelection().getSelectionRoot(); } @Override protected void createPartitionSublists() { - final Pair, Boolean> fileLocationsAndStatus = getFileLocationsAndStatus(); + final Pair, Boolean> fileLocationsAndStatus = getFileLocationsAndStatus(); List locations = new LinkedList<>(); - boolean hasDirsOnly = fileLocationsAndStatus.right; + boolean hasDirsOnly = fileLocationsAndStatus.getRight(); - final String selectionRoot = getBaseTableLocation(); + final Path selectionRoot = getBaseTableLocation(); // map used to map the partition keys (dir0, dir1, ..), to the list of partitions that share the same partition keys. // For example, @@ -166,35 +168,31 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor { // Figure out the list of leaf subdirectories. For each leaf subdirectory, find the list of files (DFSFilePartitionLocation) // it contains. - for (String file: fileLocationsAndStatus.left) { - DFSFilePartitionLocation dfsFilePartitionLocation = new DFSFilePartitionLocation(MAX_NESTED_SUBDIRS, selectionRoot, file, hasDirsOnly); + for (Path file: fileLocationsAndStatus.getLeft()) { + DFSFilePartitionLocation dfsFilePartitionLocation = new DFSFilePartitionLocation(MAX_NESTED_SUBDIRS, + selectionRoot, file, hasDirsOnly); + List dirList = Arrays.asList(dfsFilePartitionLocation.getDirs()); - final String[] dirs = dfsFilePartitionLocation.getDirs(); - final List dirList = Arrays.asList(dirs); - - if (!dirToFileMap.containsKey(dirList)) { - dirToFileMap.put(dirList, new ArrayList()); - } + dirToFileMap.putIfAbsent(dirList, new ArrayList<>()); dirToFileMap.get(dirList).add(dfsFilePartitionLocation); } // build a list of DFSDirPartitionLocation. - for (final List dirs : dirToFileMap.keySet()) { - locations.add( new DFSDirPartitionLocation(dirs.toArray(new String[dirs.size()]), dirToFileMap.get(dirs))); - } + dirToFileMap.keySet().stream() + .map(dirs -> new DFSDirPartitionLocation(dirs.toArray(new String[dirs.size()]), dirToFileMap.get(dirs))) + .forEach(locations::add); locationSuperList = Lists.partition(locations, PartitionDescriptor.PARTITION_BATCH_SIZE); sublistsCreated = true; } - protected Pair, Boolean> getFileLocationsAndStatus() { - Collection fileLocations = null; - Pair, Boolean> fileLocationsAndStatus = null; + protected Pair, Boolean> getFileLocationsAndStatus() { + Collection fileLocations = null; boolean isExpandedPartial = false; if (scanRel instanceof DrillScanRel) { // If a particular GroupScan provides files, get the list of files from there rather than // DrillTable because GroupScan would have the updated version of the selection - final DrillScanRel drillScan = (DrillScanRel) scanRel; + DrillScanRel drillScan = (DrillScanRel) scanRel; if (drillScan.getGroupScan().hasFiles()) { fileLocations = drillScan.getGroupScan().getFiles(); isExpandedPartial = false; @@ -208,67 +206,59 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor { fileLocations = selection.getFiles(); isExpandedPartial = selection.isExpandedPartial(); } - fileLocationsAndStatus = Pair.of(fileLocations, isExpandedPartial); - return fileLocationsAndStatus; + return Pair.of(fileLocations, isExpandedPartial); } @Override - public TableScan createTableScan(List newPartitionLocation, String cacheFileRoot, + public TableScan createTableScan(List newPartitionLocation, Path cacheFileRoot, boolean wasAllPartitionsPruned, MetadataContext metaContext) throws Exception { - List newFiles = Lists.newArrayList(); - for (final PartitionLocation location : newPartitionLocation) { + List newFiles = new ArrayList<>(); + for (PartitionLocation location : newPartitionLocation) { if (!location.isCompositePartition()) { newFiles.add(location.getEntirePartitionLocation()); } else { final Collection subPartitions = location.getPartitionLocationRecursive(); - for (final PartitionLocation subPart : subPartitions) { + for (PartitionLocation subPart : subPartitions) { newFiles.add(subPart.getEntirePartitionLocation()); } } } + FormatSelection formatSelection = (FormatSelection) table.getSelection(); + FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation(), + cacheFileRoot, wasAllPartitionsPruned, formatSelection.getSelection().getDirStatus()); + newFileSelection.setMetaContext(metaContext); + RelOptTable relOptTable = scanRel.getTable(); + if (scanRel instanceof DrillScanRel) { - final FormatSelection formatSelection = (FormatSelection)table.getSelection(); - final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation(), - cacheFileRoot, wasAllPartitionsPruned, formatSelection.getSelection().getDirStatus()); - newFileSelection.setMetaContext(metaContext); - final FileGroupScan newGroupScan = - ((FileGroupScan)((DrillScanRel)scanRel).getGroupScan()).clone(newFileSelection); + FileGroupScan newGroupScan = + ((FileGroupScan) ((DrillScanRel) scanRel).getGroupScan()).clone(newFileSelection); return new DrillScanRel(scanRel.getCluster(), scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL), - scanRel.getTable(), + relOptTable, newGroupScan, scanRel.getRowType(), ((DrillScanRel) scanRel).getColumns(), true /*filter pushdown*/); } else if (scanRel instanceof EnumerableTableScan) { - return createNewTableScanFromSelection((EnumerableTableScan)scanRel, newFiles, cacheFileRoot, - wasAllPartitionsPruned, metaContext); + FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection); + + DynamicDrillTable dynamicDrillTable = new DynamicDrillTable(table.getPlugin(), table.getStorageEngineName(), + table.getUserName(), newFormatSelection); + /* Copy statistics from the original relOptTable */ + dynamicDrillTable.setStatsTable(table.getStatsTable()); + DrillTranslatableTable newTable = new DrillTranslatableTable(dynamicDrillTable); + + RelOptTableImpl newOptTableImpl = RelOptTableImpl.create(relOptTable.getRelOptSchema(), relOptTable.getRowType(), + newTable, GuavaUtils.convertToUnshadedImmutableList(ImmutableList.of())); + + // return an EnumerableTableScan with fileSelection being part of digest of TableScan node. + return DirPrunedEnumerableTableScan.create(scanRel.getCluster(), newOptTableImpl, newFileSelection.toString()); } else { throw new UnsupportedOperationException("Only DrillScanRel and EnumerableTableScan is allowed!"); } } - private TableScan createNewTableScanFromSelection(EnumerableTableScan oldScan, List newFiles, String cacheFileRoot, - boolean wasAllPartitionsPruned, MetadataContext metaContext) { - final RelOptTableImpl t = (RelOptTableImpl) oldScan.getTable(); - final FormatSelection formatSelection = (FormatSelection) table.getSelection(); - final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation(), - cacheFileRoot, wasAllPartitionsPruned, formatSelection.getSelection().getDirStatus()); - newFileSelection.setMetaContext(metaContext); - final FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection); - final DynamicDrillTable dynamicDrillTable = new DynamicDrillTable(table.getPlugin(), table.getStorageEngineName(), - table.getUserName(), newFormatSelection); - /* Copy statistics from the original table */ - dynamicDrillTable.setStatsTable(table.getStatsTable()); - final DrillTranslatableTable newTable = new DrillTranslatableTable(dynamicDrillTable); - final RelOptTableImpl newOptTableImpl = RelOptTableImpl.create(t.getRelOptSchema(), t.getRowType(), newTable, - GuavaUtils.convertToUnshadedImmutableList(ImmutableList.of())); - - // return an EnumerableTableScan with fileSelection being part of digest of TableScan node. - return DirPrunedEnumerableTableScan.create(oldScan.getCluster(), newOptTableImpl, newFileSelection.toString()); - } - @Override public TableScan createTableScan(List newPartitionLocation, boolean wasAllPartitionsPruned) throws Exception { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java index df6deccb0..23675d1db 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java @@ -134,17 +134,17 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor { } @Override - public String getBaseTableLocation() { + public Path getBaseTableLocation() { final FormatSelection origSelection = (FormatSelection) scanRel.getDrillTable().getSelection(); return origSelection.getSelection().selectionRoot; } @Override public TableScan createTableScan(List newPartitionLocation, - String cacheFileRoot, + Path cacheFileRoot, boolean wasAllPartitionsPruned, MetadataContext metaContext) throws Exception { - List newFiles = new ArrayList<>(); + List newFiles = new ArrayList<>(); for (final PartitionLocation location : newPartitionLocation) { newFiles.add(location.getEntirePartitionLocation()); } @@ -172,17 +172,17 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor { @Override protected void createPartitionSublists() { - Set fileLocations = groupScan.getFileSet(); + Set fileLocations = groupScan.getFileSet(); List locations = new LinkedList<>(); - for (String file : fileLocations) { + for (Path file : fileLocations) { locations.add(new ParquetPartitionLocation(file)); } locationSuperList = Lists.partition(locations, PartitionDescriptor.PARTITION_BATCH_SIZE); sublistsCreated = true; } - private GroupScan createNewGroupScan(List newFiles, - String cacheFileRoot, + private GroupScan createNewGroupScan(List newFiles, + Path cacheFileRoot, boolean wasAllPartitionsPruned, MetadataContext metaContext) throws IOException { @@ -194,8 +194,8 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor { return groupScan.clone(newSelection); } - private void populatePruningVector(ValueVector v, int index, SchemaPath column, String file) { - String path = Path.getPathWithoutSchemeAndAuthority(new Path(file)).toString(); + private void populatePruningVector(ValueVector v, int index, SchemaPath column, Path file) { + Path path = Path.getPathWithoutSchemeAndAuthority(file); TypeProtos.MajorType majorType = getVectorType(column, null); TypeProtos.MinorType type = majorType.getMinorType(); switch (type) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionLocation.java index 49e51094d..eec0d5ebe 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionLocation.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionLocation.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.planner; +import org.apache.hadoop.fs.Path; + /* * PartitionLocation for the parquet auto partitioning scheme. We just store * the location of each partition within this class. Since the partition value @@ -25,9 +27,9 @@ package org.apache.drill.exec.planner; * invoked. */ public class ParquetPartitionLocation extends SimplePartitionLocation { - private final String file; + private final Path file; - public ParquetPartitionLocation(String file) { + public ParquetPartitionLocation(Path file) { this.file = file; } @@ -48,7 +50,7 @@ public class ParquetPartitionLocation extends SimplePartitionLocation { * @return String location of the partition */ @Override - public String getEntirePartitionLocation() { + public Path getEntirePartitionLocation() { return file; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java index 220bf291e..8759078ad 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java @@ -23,36 +23,52 @@ import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.store.dfs.MetadataContext; import org.apache.drill.exec.vector.ValueVector; +import org.apache.hadoop.fs.Path; import java.util.BitSet; import java.util.List; import java.util.Map; -// Interface used to describe partitions. Currently used by file system based partitions and hive partitions +/** + * Interface used to describe partitions. Currently used by file system based partitions and hive partitions + */ public interface PartitionDescriptor extends Iterable> { - public static final int PARTITION_BATCH_SIZE = Character.MAX_VALUE; + int PARTITION_BATCH_SIZE = Character.MAX_VALUE; - /* Get the hierarchy index of the given partition + /** + * Get the hierarchy index of the given partition * For eg: if we have the partition laid out as follows * 1997/q1/jan - * * then getPartitionHierarchyIndex("jan") => 2 + * + * @param partitionName Partition name + * @return the index of specified partition name in the hierarchy */ - public int getPartitionHierarchyIndex(String partitionName); + int getPartitionHierarchyIndex(String partitionName); - // Given a column name return boolean to indicate if its a partition column or not - public boolean isPartitionName(String name); + /** + * Given a column name return boolean to indicate if its a partition column or not + * + * @param name of Partition + * @return true, if this is the partition name and vise versa. + */ + boolean isPartitionName(String name); /** * Check to see if the name is a partition name. + * * @param name The field name you want to compare to partition names. * @return Return index if valid, otherwise return null; */ - public Integer getIdIfValid(String name); + Integer getIdIfValid(String name); - // Maximum level of partition nesting/ hierarchy supported - public int getMaxHierarchyLevel(); + /** + * Maximum level of partition nesting/ hierarchy supported + * + * @return maximum supported level number of partition hierarchy + */ + int getMaxHierarchyLevel(); /** * Method creates an in memory representation of all the partitions. For each level of partitioning we @@ -79,7 +95,7 @@ public interface PartitionDescriptor extends Iterable> { * @param wasAllPartitionsPruned * @throws Exception */ - public TableScan createTableScan(List newPartitions, + TableScan createTableScan(List newPartitions, boolean wasAllPartitionsPruned) throws Exception; /** @@ -91,11 +107,11 @@ public interface PartitionDescriptor extends Iterable> { * @param metaContext * @throws Exception */ - public TableScan createTableScan(List newPartitions, String cacheFileRoot, + TableScan createTableScan(List newPartitions, Path cacheFileRoot, boolean wasAllPartitionsPruned, MetadataContext metaContext) throws Exception; - public boolean supportsMetadataCachePruning(); + boolean supportsMetadataCachePruning(); - public String getBaseTableLocation(); + Path getBaseTableLocation(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java index 22088f7ee..2ef4e173d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.planner; +import org.apache.hadoop.fs.Path; + import java.util.List; /** @@ -37,27 +39,27 @@ public interface PartitionLocation { /** * Returns the value of the 'index' partition column */ - public String getPartitionValue(int index); + String getPartitionValue(int index); /** - * Returns the string representation of this partition. + * Returns the path of this partition. * Only a non-composite partition supports this. */ - public String getEntirePartitionLocation(); + Path getEntirePartitionLocation(); /** * Returns the list of the non-composite partitions that this partition consists of. */ - public List getPartitionLocationRecursive(); + List getPartitionLocationRecursive(); /** * Returns if this is a simple or composite partition. */ - public boolean isCompositePartition(); + boolean isCompositePartition(); /** * Returns the path string of directory names only for composite partition */ - public String getCompositePartitionPath(); + Path getCompositePartitionPath(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java index 20b06c430..920f9b2bd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java @@ -33,6 +33,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalOperatorUtil; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.record.MajorTypeSerDe; +import org.apache.drill.exec.serialization.PathSerDe; import org.apache.drill.exec.server.options.OptionList; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.pojo.DynamicPojoRecordReader; @@ -44,43 +45,43 @@ import com.fasterxml.jackson.databind.ObjectReader; import com.fasterxml.jackson.databind.deser.std.StdDelegatingDeserializer; import com.fasterxml.jackson.databind.module.SimpleModule; import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.fs.Path; public class PhysicalPlanReader { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlanReader.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlanReader.class); private final ObjectReader physicalPlanReader; private final ObjectMapper mapper; private final ObjectReader operatorReader; private final ObjectReader logicalPlanReader; - public PhysicalPlanReader(DrillConfig config, ScanResult scanResult, LogicalPlanPersistence lpPersistance, final DrillbitEndpoint endpoint, - final StoragePluginRegistry pluginRegistry) { + public PhysicalPlanReader(DrillConfig config, ScanResult scanResult, LogicalPlanPersistence lpPersistance, + final DrillbitEndpoint endpoint, final StoragePluginRegistry pluginRegistry) { ObjectMapper lpMapper = lpPersistance.getMapper(); // Endpoint serializer/deserializer. - SimpleModule deserModule = new SimpleModule("PhysicalOperatorModule") // - .addSerializer(DrillbitEndpoint.class, new DrillbitEndpointSerDe.Se()) // - .addDeserializer(DrillbitEndpoint.class, new DrillbitEndpointSerDe.De()) // + SimpleModule serDeModule = new SimpleModule("PhysicalOperatorModule") + .addSerializer(DrillbitEndpoint.class, new DrillbitEndpointSerDe.Se()) + .addDeserializer(DrillbitEndpoint.class, new DrillbitEndpointSerDe.De()) .addSerializer(MajorType.class, new MajorTypeSerDe.Se()) .addDeserializer(MajorType.class, new MajorTypeSerDe.De()) .addDeserializer(DynamicPojoRecordReader.class, - new StdDelegatingDeserializer<>(new DynamicPojoRecordReader.Converter(lpMapper))); + new StdDelegatingDeserializer<>(new DynamicPojoRecordReader.Converter(lpMapper))) + .addSerializer(Path.class, new PathSerDe.Se()); - lpMapper.registerModule(deserModule); + lpMapper.registerModule(serDeModule); Set> subTypes = PhysicalOperatorUtil.getSubTypes(scanResult); - for (Class subType : subTypes) { - lpMapper.registerSubtypes(subType); - } + subTypes.forEach(lpMapper::registerSubtypes); lpMapper.registerSubtypes(DynamicPojoRecordReader.class); - InjectableValues injectables = new InjectableValues.Std() // - .addValue(StoragePluginRegistry.class, pluginRegistry) // - .addValue(DrillbitEndpoint.class, endpoint); // + InjectableValues injectables = new InjectableValues.Std() + .addValue(StoragePluginRegistry.class, pluginRegistry) + .addValue(DrillbitEndpoint.class, endpoint); this.mapper = lpMapper; - this.physicalPlanReader = mapper.reader(PhysicalPlan.class).with(injectables); - this.operatorReader = mapper.reader(PhysicalOperator.class).with(injectables); - this.logicalPlanReader = mapper.reader(LogicalPlan.class).with(injectables); + this.physicalPlanReader = mapper.readerFor(PhysicalPlan.class).with(injectables); + this.operatorReader = mapper.readerFor(PhysicalOperator.class).with(injectables); + this.logicalPlanReader = mapper.readerFor(LogicalPlan.class).with(injectables); } public String writeJson(OptionList list) throws JsonProcessingException{ @@ -91,33 +92,35 @@ public class PhysicalPlanReader { return mapper.writeValueAsString(op); } - public PhysicalPlan readPhysicalPlan(String json) throws JsonProcessingException, IOException { + public PhysicalPlan readPhysicalPlan(String json) throws IOException { logger.debug("Reading physical plan {}", json); return physicalPlanReader.readValue(json); } - public FragmentRoot readFragmentRoot(String json) throws JsonProcessingException, IOException { + public FragmentRoot readFragmentRoot(String json) throws IOException { logger.debug("Attempting to read {}", json); PhysicalOperator op = operatorReader.readValue(json); - if(op instanceof FragmentRoot){ + if (op instanceof FragmentRoot) { return (FragmentRoot) op; - }else{ - throw new UnsupportedOperationException(String.format("The provided json fragment doesn't have a FragmentRoot as its root operator. The operator was %s.", op.getClass().getCanonicalName())); + } else { + throw new UnsupportedOperationException(String.format("The provided json fragment doesn't have a FragmentRoot " + + "as its root operator. The operator was %s.", op.getClass().getCanonicalName())); } } @VisibleForTesting - public FragmentLeaf readFragmentLeaf(String json) throws JsonProcessingException, IOException { + public FragmentLeaf readFragmentLeaf(String json) throws IOException { logger.debug("Attempting to read {}", json); PhysicalOperator op = operatorReader.readValue(json); if (op instanceof FragmentLeaf){ return (FragmentLeaf) op; } else { - throw new UnsupportedOperationException(String.format("The provided json fragment is not a FragmentLeaf. The operator was %s.", op.getClass().getCanonicalName())); + throw new UnsupportedOperationException(String.format("The provided json fragment is not a FragmentLeaf. " + + "The operator was %s.", op.getClass().getCanonicalName())); } } - public LogicalPlan readLogicalPlan(String json) throws JsonProcessingException, IOException{ + public LogicalPlan readLogicalPlan(String json) throws IOException{ logger.debug("Reading logical plan {}", json); return logicalPlanReader.readValue(json); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimplePartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimplePartitionLocation.java index 7c9afb088..ca00446fc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimplePartitionLocation.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimplePartitionLocation.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.planner; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; +import org.apache.hadoop.fs.Path; import java.util.List; @@ -26,14 +27,14 @@ import java.util.List; * location of the entire partition and also stores the * value of the individual partition keys for this partition. */ -public abstract class SimplePartitionLocation implements PartitionLocation{ +public abstract class SimplePartitionLocation implements PartitionLocation{ @Override public boolean isCompositePartition() { return false; } @Override - public String getCompositePartitionPath() { + public Path getCompositePartitionPath() { throw new UnsupportedOperationException(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java index 6f48b8508..8d3a8cec9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import org.apache.drill.exec.util.DrillFileSystemUtil; import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; import org.apache.calcite.adapter.enumerable.EnumerableTableScan; import org.apache.calcite.rel.core.Filter; @@ -74,7 +75,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.drill.shaded.guava.com.google.common.collect.Maps; import org.apache.drill.exec.vector.ValueVector; - +import org.apache.hadoop.fs.Path; public abstract class PruneScanRule extends StoragePluginOptimizerRule { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PruneScanRule.class); @@ -372,7 +373,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { // handle the case all partitions are filtered out. boolean canDropFilter = true; boolean wasAllPartitionsPruned = false; - String cacheFileRoot = null; + Path cacheFileRoot = null; if (newPartitions.isEmpty()) { assert firstLocation != null; @@ -388,7 +389,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { // set the cacheFileRoot appropriately if (firstLocation.isCompositePartition()) { - cacheFileRoot = descriptor.getBaseTableLocation() + firstLocation.getCompositePartitionPath(); + cacheFileRoot = Path.mergePaths(descriptor.getBaseTableLocation(), firstLocation.getCompositePartitionPath()); } } @@ -408,7 +409,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { // if metadata cache file could potentially be used, then assign a proper cacheFileRoot int index = -1; if (!matchBitSet.isEmpty()) { - String path = ""; + StringBuilder path = new StringBuilder(); index = matchBitSet.length() - 1; for (int j = 0; j < matchBitSet.length(); j++) { @@ -419,10 +420,12 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule { break; } } - for (int j=0; j <= index; j++) { - path += "/" + spInfo[j]; + for (int j = 0; j <= index; j++) { + path.append("/") + .append(spInfo[j]); } - cacheFileRoot = descriptor.getBaseTableLocation() + path; + cacheFileRoot = Path.mergePaths(descriptor.getBaseTableLocation(), + DrillFileSystemUtil.createPathSafe(path.toString())); } if (index != maxIndex) { // if multiple partitions are being selected, we should not drop the filter diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AnalyzeTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AnalyzeTableHandler.java index f1acc71ba..8f62713c1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AnalyzeTableHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AnalyzeTableHandler.java @@ -133,15 +133,13 @@ public class AnalyzeTableHandler extends DefaultSqlHandler { DrillFileSystem fs = new DrillFileSystem(plugin.getFormatPlugin( formatSelection.getFormat()).getFsConf()); - String selectionRoot = formatSelection.getSelection().getSelectionRoot(); - if (!selectionRoot.contains(tableName) - || !fs.getFileStatus(new Path(selectionRoot)).isDirectory()) { + Path selectionRoot = formatSelection.getSelection().getSelectionRoot(); + if (!selectionRoot.getName().equals(tableName) || !fs.getFileStatus(selectionRoot).isDirectory()) { return DrillStatsTable.notSupported(context, tableName); } // Do not recompute statistics, if stale - Path statsFilePath = new Path(new Path(selectionRoot), DotDrillType.STATS.getEnding()); - if (fs.exists(statsFilePath) - && !isStatsStale(fs, statsFilePath)) { + Path statsFilePath = new Path(selectionRoot, DotDrillType.STATS.getEnding()); + if (fs.exists(statsFilePath) && !isStatsStale(fs, statsFilePath)) { return DrillStatsTable.notRequired(context, tableName); } } @@ -165,13 +163,8 @@ public class AnalyzeTableHandler extends DefaultSqlHandler { Path parentPath = statsFilePath.getParent(); FileStatus directoryStatus = fs.getFileStatus(parentPath); // Parent directory modified after stats collection? - if (directoryStatus.getModificationTime() > statsFileModifyTime) { - return true; - } - if (tableModified(fs, parentPath, statsFileModifyTime)) { - return true; - } - return false; + return directoryStatus.getModificationTime() > statsFileModifyTime || + tableModified(fs, parentPath, statsFileModifyTime); } /* Determines if the table was modified after computing statistics based on @@ -185,10 +178,8 @@ public class AnalyzeTableHandler extends DefaultSqlHandler { return true; } // For a directory, we should recursively check sub-directories - if (file.isDirectory()) { - if (tableModified(fs, file.getPath(), statsModificationTime)) { - return true; - } + if (file.isDirectory() && tableModified(fs, file.getPath(), statsModificationTime)) { + return true; } } return false; @@ -215,8 +206,7 @@ public class AnalyzeTableHandler extends DefaultSqlHandler { /* Converts to Drill logical plan */ protected DrillRel convertToDrel(RelNode relNode, AbstractSchema schema, String analyzeTableName, - double samplePercent) - throws RelConversionException, SqlUnsupportedException { + double samplePercent) throws SqlUnsupportedException { DrillRel convertedRelNode = convertToRawDrel(relNode); if (convertedRelNode instanceof DrillStoreRel) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java index 4684251e9..3ca778706 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java @@ -108,8 +108,8 @@ public class RefreshMetadataHandler extends DefaultSqlHandler { FileSystemPlugin plugin = (FileSystemPlugin) drillTable.getPlugin(); DrillFileSystem fs = new DrillFileSystem(plugin.getFormatPlugin(formatSelection.getFormat()).getFsConf()); - String selectionRoot = formatSelection.getSelection().selectionRoot; - if (!fs.getFileStatus(new Path(selectionRoot)).isDirectory()) { + Path selectionRoot = formatSelection.getSelection().getSelectionRoot(); + if (!fs.getFileStatus(selectionRoot).isDirectory()) { return notSupported(tableName); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/PathSerDe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/PathSerDe.java new file mode 100644 index 000000000..03bb37e22 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/PathSerDe.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.serialization; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; + +/** + * Path serializer to simple String path. Without it the hadoop Path serialization creates a big JSON object. + */ +public class PathSerDe { + + public static class Se extends JsonSerializer { + + @Override + public void serialize(Path value, JsonGenerator gen, SerializerProvider serializers) throws IOException { + gen.writeString(value.toUri().getPath()); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java index d5d9784b6..33b500018 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashMap; @@ -133,20 +134,20 @@ public class ColumnExplorer { public static List getPartitionColumnNames(FileSelection selection, SchemaConfig schemaConfig) { int partitionsCount = 0; // a depth of table root path - int rootDepth = new Path(selection.getSelectionRoot()).depth(); + int rootDepth = selection.getSelectionRoot().depth(); - for (String file : selection.getFiles()) { + for (Path file : selection.getFiles()) { // Calculates partitions count for the concrete file: // depth of file path - depth of table root path - 1. // The depth of file path includes file itself, // so we should subtract 1 to consider only directories. - int currentPartitionsCount = new Path(file).depth() - rootDepth - 1; + int currentPartitionsCount = file.depth() - rootDepth - 1; // max depth of files path should be used to handle all partitions partitionsCount = Math.max(partitionsCount, currentPartitionsCount); } String partitionColumnLabel = schemaConfig.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val; - List partitions = Lists.newArrayList(); + List partitions = new ArrayList<>(); // generates partition column names: dir0, dir1 etc. for (int i = 0; i < partitionsCount; i++) { @@ -165,7 +166,7 @@ public class ColumnExplorer { * @param includeFileImplicitColumns if file implicit columns should be included into the result * @return implicit columns map */ - public Map populateImplicitColumns(String filePath, + public Map populateImplicitColumns(Path filePath, List partitionValues, boolean includeFileImplicitColumns) { Map implicitValues = new LinkedHashMap<>(); @@ -177,7 +178,7 @@ public class ColumnExplorer { } if (includeFileImplicitColumns) { - Path path = Path.getPathWithoutSchemeAndAuthority(new Path(filePath)); + Path path = Path.getPathWithoutSchemeAndAuthority(filePath); for (Map.Entry entry : selectedImplicitColumns.entrySet()) { implicitValues.put(entry.getKey(), entry.getValue().getValue(path)); } @@ -189,15 +190,16 @@ public class ColumnExplorer { /** * Compares root and file path to determine directories * that are present in the file path but absent in root. - * Example: root - a/b/c, filePath - a/b/c/d/e/0_0_0.parquet, result - d/e. + * Example: root - a/b/c, file - a/b/c/d/e/0_0_0.parquet, result - d/e. * Stores different directory names in the list in successive order. * - * @param filePath file path + * @param file file path * @param root root directory + * @param hasDirsOnly whether it is file or directory * @return list of directory names */ - public static List listPartitionValues(String filePath, String root) { - String[] dirs = parsePartitions(filePath, root); + public static List listPartitionValues(Path file, Path root, boolean hasDirsOnly) { + String[] dirs = parsePartitions(file, root, hasDirsOnly); if (dirs == null) { return Collections.emptyList(); } @@ -208,21 +210,23 @@ public class ColumnExplorer { * Low-level parse of partitions, returned as a string array. Returns a * null array for invalid values. * - * @param filePath file path + * @param file file path * @param root root directory + * @param hasDirsOnly whether it is file or directory * @return array of directory names, or null if the arguments are invalid */ - public static String[] parsePartitions(String filePath, String root) { - if (filePath == null || root == null) { + public static String[] parsePartitions(Path file, Path root, boolean hasDirsOnly) { + if (file == null || root == null) { return null; } - int rootDepth = new Path(root).depth(); - Path path = new Path(filePath); - int parentDepth = path.getParent().depth(); - - int diffCount = parentDepth - rootDepth; + if (!hasDirsOnly) { + file = file.getParent(); + } + int rootDepth = root.depth(); + int fileDepth = file.depth(); + int diffCount = fileDepth - rootDepth; if (diffCount < 0) { return null; } @@ -230,10 +234,10 @@ public class ColumnExplorer { String[] diffDirectoryNames = new String[diffCount]; // start filling in array from the end - for (int i = rootDepth; parentDepth > i; i++) { - path = path.getParent(); + for (int i = rootDepth; fileDepth > i; i++) { // place in the end of array - diffDirectoryNames[parentDepth - i - 1] = path.getName(); + diffDirectoryNames[fileDepth - i - 1] = file.getName(); + file = file.getParent(); } return diffDirectoryNames; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroDrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroDrillTable.java index 50d7ee8f9..10d90d4b7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroDrillTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroDrillTable.java @@ -54,8 +54,8 @@ public class AvroDrillTable extends DrillTable { SchemaConfig schemaConfig, FormatSelection selection) { super(storageEngineName, plugin, schemaConfig.getUserName(), selection); - List asFiles = selection.getAsFiles(); - Path path = new Path(asFiles.get(0)); + List asFiles = selection.getAsFiles(); + Path path = asFiles.get(0); this.schemaConfig = schemaConfig; try { reader = new DataFileReader<>(new FsInput(path, plugin.getFsConf()), new GenericDatumReader()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java index 1d7226ad1..07444e972 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java @@ -90,13 +90,13 @@ public class AvroRecordReader extends AbstractRecordReader { public AvroRecordReader(final FragmentContext fragmentContext, - final String inputPath, + final Path inputPath, final long start, final long length, final FileSystem fileSystem, final List projectedColumns, final String userName) { - hadoop = new Path(inputPath); + hadoop = inputPath; this.start = start; this.end = start + length; buffer = fragmentContext.getManagedBuffer(); @@ -111,12 +111,8 @@ public class AvroRecordReader extends AbstractRecordReader { private DataFileReader getReader(final Path hadoop, final FileSystem fs) throws ExecutionSetupException { try { final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(this.opUserName, this.queryUserName); - return ugi.doAs(new PrivilegedExceptionAction>() { - @Override - public DataFileReader run() throws Exception { - return new DataFileReader<>(new FsInput(hadoop, fs.getConf()), new GenericDatumReader()); - } - }); + return ugi.doAs((PrivilegedExceptionAction>) () -> + new DataFileReader<>(new FsInput(hadoop, fs.getConf()), new GenericDatumReader())); } catch (IOException | InterruptedException e) { throw new ExecutionSetupException( String.format("Error in creating avro reader for file: %s", hadoop), e); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java index 2fa9558b0..902abdab0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java @@ -19,12 +19,15 @@ package org.apache.drill.exec.store.dfs; import java.io.IOException; import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; -import org.apache.drill.shaded.guava.com.google.common.base.Strings; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.drill.common.exceptions.DrillRuntimeException; @@ -41,15 +44,15 @@ public class FileSelection { private List statuses; - public List files; + public List files; /** * root path for the selections */ - public final String selectionRoot; + public final Path selectionRoot; /** * root path for the metadata cache file (if any) */ - public final String cacheFileRoot; + public final Path cacheFileRoot; /** * metadata context useful for metadata operations (if any) @@ -82,17 +85,17 @@ public class FileSelection { * @param files list of files * @param selectionRoot root path for selections */ - public FileSelection(final List statuses, final List files, final String selectionRoot) { + public FileSelection(List statuses, List files, Path selectionRoot) { this(statuses, files, selectionRoot, null, false, StatusType.NOT_CHECKED); } - public FileSelection(final List statuses, final List files, final String selectionRoot, - final String cacheFileRoot, final boolean wasAllPartitionsPruned) { + public FileSelection(List statuses, List files, Path selectionRoot, Path cacheFileRoot, + boolean wasAllPartitionsPruned) { this(statuses, files, selectionRoot, cacheFileRoot, wasAllPartitionsPruned, StatusType.NOT_CHECKED); } - public FileSelection(final List statuses, final List files, final String selectionRoot, - final String cacheFileRoot, final boolean wasAllPartitionsPruned, final StatusType dirStatus) { + public FileSelection(List statuses, List files, Path selectionRoot, Path cacheFileRoot, + boolean wasAllPartitionsPruned, StatusType dirStatus) { this.statuses = statuses; this.files = files; this.selectionRoot = selectionRoot; @@ -104,7 +107,7 @@ public class FileSelection { /** * Copy constructor for convenience. */ - protected FileSelection(final FileSelection selection) { + protected FileSelection(FileSelection selection) { Preconditions.checkNotNull(selection, "selection cannot be null"); this.statuses = selection.statuses; this.files = selection.files; @@ -116,17 +119,17 @@ public class FileSelection { this.wasAllPartitionsPruned = selection.wasAllPartitionsPruned; } - public String getSelectionRoot() { + public Path getSelectionRoot() { return selectionRoot; } - public List getStatuses(final DrillFileSystem fs) throws IOException { + public List getStatuses(DrillFileSystem fs) throws IOException { Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; if (statuses == null) { - final List newStatuses = Lists.newArrayList(); - for (final String pathStr:files) { - newStatuses.add(fs.getFileStatus(new Path(pathStr))); + List newStatuses = Lists.newArrayList(); + for (Path pathStr : files) { + newStatuses.add(fs.getFileStatus(pathStr)); } statuses = newStatuses; } @@ -139,11 +142,11 @@ public class FileSelection { return statuses; } - public List getFiles() { + public List getFiles() { if (files == null) { - final List newFiles = Lists.newArrayList(); - for (final FileStatus status:statuses) { - newFiles.add(status.getPath().toString()); + List newFiles = Lists.newArrayList(); + for (FileStatus status:statuses) { + newFiles.add(status.getPath()); } files = newFiles; } @@ -153,7 +156,7 @@ public class FileSelection { public boolean containsDirectories(DrillFileSystem fs) throws IOException { if (dirStatus == StatusType.NOT_CHECKED) { dirStatus = StatusType.NO_DIRS; - for (final FileStatus status : getStatuses(fs)) { + for (FileStatus status : getStatuses(fs)) { if (status.isDirectory()) { dirStatus = StatusType.HAS_DIRS; break; @@ -175,7 +178,7 @@ public class FileSelection { nonDirectories.addAll(DrillFileSystemUtil.listFiles(fs, status.getPath(), true)); } - final FileSelection fileSel = create(nonDirectories, null, selectionRoot); + FileSelection fileSel = create(nonDirectories, null, selectionRoot); if (timer != null) { logger.debug("FileSelection.minusDirectories() took {} ms, numFiles: {}", timer.elapsed(TimeUnit.MILLISECONDS), statuses.size()); timer.stop(); @@ -223,38 +226,36 @@ public class FileSelection { * @param files list of files. * @return longest common path */ - private static String commonPathForFiles(final List files) { + private static Path commonPathForFiles(List files) { if (files == null || files.isEmpty()) { - return ""; + return new Path("/"); } - final int total = files.size(); - final String[][] folders = new String[total][]; + int total = files.size(); + String[][] folders = new String[total][]; int shortest = Integer.MAX_VALUE; for (int i = 0; i < total; i++) { - final Path path = new Path(files.get(i)); - folders[i] = Path.getPathWithoutSchemeAndAuthority(path).toString().split(Path.SEPARATOR); + folders[i] = files.get(i).toUri().getPath().split(Path.SEPARATOR); shortest = Math.min(shortest, folders[i].length); } int latest; out: for (latest = 0; latest < shortest; latest++) { - final String current = folders[0][latest]; + String current = folders[0][latest]; for (int i = 1; i < folders.length; i++) { if (!current.equals(folders[i][latest])) { break out; } } } - final Path path = new Path(files.get(0)); - final URI uri = path.toUri(); - final String pathString = buildPath(folders[0], latest); - return new Path(uri.getScheme(), uri.getAuthority(), pathString).toString(); + URI uri = files.get(0).toUri(); + String pathString = buildPath(folders[0], latest); + return new Path(uri.getScheme(), uri.getAuthority(), pathString); } - private static String buildPath(final String[] path, final int folderIndex) { - final StringBuilder builder = new StringBuilder(); + private static String buildPath(String[] path, int folderIndex) { + StringBuilder builder = new StringBuilder(); for (int i=0; i statuses, final List files, final String root, - final String cacheFileRoot, final boolean wasAllPartitionsPruned) { - final boolean bothNonEmptySelection = (statuses != null && statuses.size() > 0) && (files != null && files.size() > 0); - final boolean bothEmptySelection = (statuses == null || statuses.size() == 0) && (files == null || files.size() == 0); + public static FileSelection create(List statuses, List files, Path root, + Path cacheFileRoot, boolean wasAllPartitionsPruned) { + boolean bothNonEmptySelection = (statuses != null && statuses.size() > 0) && (files != null && files.size() > 0); + boolean bothEmptySelection = (statuses == null || statuses.size() == 0) && (files == null || files.size() == 0); if (bothNonEmptySelection || bothEmptySelection) { return null; } - final String selectionRoot; + Path selectionRoot; if (statuses == null || statuses.isEmpty()) { selectionRoot = commonPathForFiles(files); } else { - if (Strings.isNullOrEmpty(root)) { - throw new DrillRuntimeException("Selection root is null or empty" + root); - } - final Path rootPath = handleWildCard(root); - final URI uri = statuses.get(0).getPath().toUri(); - final Path path = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath()); - selectionRoot = path.toString(); + Objects.requireNonNull(root, "Selection root is null"); + Path rootPath = handleWildCard(root); + URI uri = statuses.get(0).getPath().toUri(); + selectionRoot = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath()); } return new FileSelection(statuses, files, selectionRoot, cacheFileRoot, wasAllPartitionsPruned); } - public static FileSelection create(final List statuses, final List files, final String root) { + public static FileSelection create(List statuses, List files, Path root) { return FileSelection.create(statuses, files, root, null, false); } - public static FileSelection createFromDirectories(final List dirPaths, final FileSelection selection, - final String cacheFileRoot) { + public static FileSelection createFromDirectories(List dirPaths, FileSelection selection, + Path cacheFileRoot) { Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; - final String root = selection.getSelectionRoot(); - if (Strings.isNullOrEmpty(root)) { - throw new DrillRuntimeException("Selection root is null or empty" + root); - } + Path root = selection.getSelectionRoot(); + Objects.requireNonNull(root, "Selection root is null"); if (dirPaths == null || dirPaths.isEmpty()) { throw new DrillRuntimeException("List of directories is null or empty"); } - List dirs = Lists.newArrayList(); + // for wildcard the directory list should have already been expanded + List dirs = selection.hadWildcard() ? selection.getFileStatuses().stream() + .map(FileStatus::getPath) + .collect(Collectors.toList()) : new ArrayList<>(dirPaths); - if (selection.hadWildcard()) { // for wildcard the directory list should have already been expanded - for (FileStatus status : selection.getFileStatuses()) { - dirs.add(status.getPath().toString()); - } - } else { - dirs.addAll(dirPaths); - } - - final Path rootPath = handleWildCard(root); - // final URI uri = dirPaths.get(0).toUri(); - final URI uri = selection.getFileStatuses().get(0).getPath().toUri(); - final Path path = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath()); - FileSelection fileSel = new FileSelection(null, dirs, path.toString(), cacheFileRoot, false); + Path rootPath = handleWildCard(root); + URI uri = selection.getFileStatuses().get(0).getPath().toUri(); + Path path = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath()); + FileSelection fileSel = new FileSelection(null, dirs, path, cacheFileRoot, false); fileSel.setHadWildcard(selection.hadWildcard()); if (timer != null) { logger.debug("FileSelection.createFromDirectories() took {} ms ", timer.elapsed(TimeUnit.MILLISECONDS)); @@ -362,18 +352,15 @@ public class FileSelection { return fileSel; } - private static Path handleWildCard(final String root) { - if (root.contains(WILD_CARD)) { - int idx = root.indexOf(WILD_CARD); // first wild card in the path - idx = root.lastIndexOf('/', idx); // file separator right before the first wild card - final String newRoot = root.substring(0, idx); - if (newRoot.length() == 0) { - // Ensure that we always return a valid root. - return new Path("/"); - } - return new Path(newRoot); + private static Path handleWildCard(Path root) { + String stringRoot = root.toUri().getPath(); + if (stringRoot.contains(WILD_CARD)) { + int idx = stringRoot.indexOf(WILD_CARD); // first wild card in the path + idx = stringRoot.lastIndexOf('/', idx); // file separator right before the first wild card + String newRoot = stringRoot.substring(0, idx); + return DrillFileSystemUtil.createPathSafe(newRoot); } else { - return new Path(root); + return new Path(stringRoot); } } @@ -426,7 +413,7 @@ public class FileSelection { return this.hadWildcard; } - public String getCacheFileRoot() { + public Path getCacheFileRoot() { return cacheFileRoot; } @@ -456,12 +443,12 @@ public class FileSelection { @Override public String toString() { - final StringBuilder sb = new StringBuilder(); + StringBuilder sb = new StringBuilder(); sb.append("root=").append(this.selectionRoot); sb.append("files=["); boolean isFirst = true; - for (final String file : this.files) { + for (Path file : this.files) { if (isFirst) { isFirst = false; sb.append(file); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java index 40549cc92..7d7bcfaa0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java @@ -24,6 +24,7 @@ import org.apache.drill.common.logical.FormatPluginConfig; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.hadoop.fs.Path; public class FormatSelection { @@ -32,16 +33,15 @@ public class FormatSelection { private FormatPluginConfig format; private FileSelection selection; - public FormatSelection(){} + public FormatSelection() {} @JsonCreator - public FormatSelection(@JsonProperty("format") FormatPluginConfig format, @JsonProperty("files") List files){ + public FormatSelection(@JsonProperty("format") FormatPluginConfig format, @JsonProperty("files") List files){ this.format = format; this.selection = FileSelection.create(null, files, null); } public FormatSelection(FormatPluginConfig format, FileSelection selection) { - super(); this.format = format; this.selection = selection; } @@ -52,7 +52,7 @@ public class FormatSelection { } @JsonProperty("files") - public List getAsFiles(){ + public List getAsFiles(){ return selection.getFiles(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java index 877ceb64c..073847812 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.store.dfs; import java.util.Map; import org.apache.drill.shaded.guava.com.google.common.collect.Maps; +import org.apache.hadoop.fs.Path; /** * A metadata context that holds state across multiple invocations of @@ -32,17 +33,17 @@ public class MetadataContext { * Note: the #directories is typically a small percentage of the #files, so the memory footprint * is expected to be relatively small. */ - private Map dirModifCheckMap = Maps.newHashMap(); + private Map dirModifCheckMap = Maps.newHashMap(); private PruneStatus pruneStatus = PruneStatus.NOT_STARTED; private boolean metadataCacheCorrupted; - public void setStatus(String dir) { + public void setStatus(Path dir) { dirModifCheckMap.put(dir, true); } - public void clearStatus(String dir) { + public void clearStatus(Path dir) { dirModifCheckMap.put(dir, false); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java index 15107ac18..ec925e37a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java @@ -21,6 +21,7 @@ import org.apache.drill.exec.store.dfs.easy.FileWork; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.hadoop.fs.Path; public class ReadEntryFromHDFS extends ReadEntryWithPath implements FileWork { @@ -28,7 +29,7 @@ public class ReadEntryFromHDFS extends ReadEntryWithPath implements FileWork { private long length; @JsonCreator - public ReadEntryFromHDFS(@JsonProperty("path") String path,@JsonProperty("start") long start, @JsonProperty("length") long length) { + public ReadEntryFromHDFS(@JsonProperty("path") Path path, @JsonProperty("start") long start, @JsonProperty("length") long length) { super(path); this.start = start; this.length = length; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java index 4564e159c..88bd9fbad 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java @@ -18,19 +18,20 @@ package org.apache.drill.exec.store.dfs; +import org.apache.hadoop.fs.Path; + public class ReadEntryWithPath { - protected String path; + protected Path path; + // Default constructor is needed for deserialization + public ReadEntryWithPath() {} - public ReadEntryWithPath(String path) { - super(); + public ReadEntryWithPath(Path path) { this.path = path; } - public ReadEntryWithPath(){} - - public String getPath(){ + public Path getPath() { return path; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java index d3bed8feb..d76c6489e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java @@ -156,7 +156,7 @@ public abstract class EasyFormatPlugin implements for (FileWork work : scan.getWorkUnits()){ RecordReader recordReader = getRecordReader(context, dfs, work, scan.getColumns(), scan.getUserName()); readers.add(recordReader); - List partitionValues = ColumnExplorer.listPartitionValues(work.getPath(), scan.getSelectionRoot()); + List partitionValues = ColumnExplorer.listPartitionValues(work.getPath(), scan.getSelectionRoot(), false); Map implicitValues = columnExplorer.populateImplicitColumns(work.getPath(), partitionValues, supportsFileImplicitColumns); implicitColumns.add(implicitValues); if (implicitValues.size() > mapWithMaxColumns.size()) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java index 759d07ff9..4449ec054 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java @@ -18,7 +18,6 @@ package org.apache.drill.exec.store.dfs.easy; import java.io.IOException; -import java.util.Iterator; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; @@ -52,6 +51,7 @@ import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.collect.Iterators; import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import org.apache.hadoop.fs.Path; @JsonTypeName("fs-scan") public class EasyGroupScan extends AbstractFileGroupScan { @@ -65,17 +65,17 @@ public class EasyGroupScan extends AbstractFileGroupScan { private ListMultimap mappings; private List chunks; private List endpointAffinities; - private String selectionRoot; + private Path selectionRoot; @JsonCreator public EasyGroupScan( @JsonProperty("userName") String userName, - @JsonProperty("files") List files, // - @JsonProperty("storage") StoragePluginConfig storageConfig, // - @JsonProperty("format") FormatPluginConfig formatConfig, // - @JacksonInject StoragePluginRegistry engineRegistry, // + @JsonProperty("files") List files, + @JsonProperty("storage") StoragePluginConfig storageConfig, + @JsonProperty("format") FormatPluginConfig formatConfig, + @JacksonInject StoragePluginRegistry engineRegistry, @JsonProperty("columns") List columns, - @JsonProperty("selectionRoot") String selectionRoot + @JsonProperty("selectionRoot") Path selectionRoot ) throws IOException, ExecutionSetupException { this(ImpersonationUtil.resolveUserName(userName), FileSelection.create(null, files, selectionRoot), @@ -84,17 +84,17 @@ public class EasyGroupScan extends AbstractFileGroupScan { selectionRoot); } - public EasyGroupScan(String userName, FileSelection selection, EasyFormatPlugin formatPlugin, String selectionRoot) + public EasyGroupScan(String userName, FileSelection selection, EasyFormatPlugin formatPlugin, Path selectionRoot) throws IOException { this(userName, selection, formatPlugin, ALL_COLUMNS, selectionRoot); } public EasyGroupScan( String userName, - FileSelection selection, // - EasyFormatPlugin formatPlugin, // + FileSelection selection, + EasyFormatPlugin formatPlugin, List columns, - String selectionRoot + Path selectionRoot ) throws IOException{ super(userName); this.selection = Preconditions.checkNotNull(selection); @@ -106,12 +106,7 @@ public class EasyGroupScan extends AbstractFileGroupScan { @JsonIgnore public Iterable getWorkIterable() { - return new Iterable() { - @Override - public Iterator iterator() { - return Iterators.unmodifiableIterator(chunks.iterator()); - } - }; + return () -> Iterators.unmodifiableIterator(chunks.iterator()); } private EasyGroupScan(final EasyGroupScan that) { @@ -136,7 +131,7 @@ public class EasyGroupScan extends AbstractFileGroupScan { this.endpointAffinities = AffinityCreator.getAffinityMap(chunks); } - public String getSelectionRoot() { + public Path getSelectionRoot() { return selectionRoot; } @@ -158,7 +153,7 @@ public class EasyGroupScan extends AbstractFileGroupScan { @JsonProperty("files") @Override - public List getFiles() { + public List getFiles() { return selection.getFiles(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java index 0dbae1e40..fbb3f475c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.store.dfs.easy; -import java.io.IOException; import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; @@ -34,6 +33,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.apache.hadoop.fs.Path; @JsonTypeName("fs-sub-scan") public class EasySubScan extends AbstractSubScan{ @@ -42,18 +42,18 @@ public class EasySubScan extends AbstractSubScan{ private final List files; private final EasyFormatPlugin formatPlugin; private final List columns; - private String selectionRoot; + private Path selectionRoot; @JsonCreator public EasySubScan( @JsonProperty("userName") String userName, - @JsonProperty("files") List files, // - @JsonProperty("storage") StoragePluginConfig storageConfig, // - @JsonProperty("format") FormatPluginConfig formatConfig, // - @JacksonInject StoragePluginRegistry engineRegistry, // - @JsonProperty("columns") List columns, // - @JsonProperty("selectionRoot") String selectionRoot - ) throws IOException, ExecutionSetupException { + @JsonProperty("files") List files, + @JsonProperty("storage") StoragePluginConfig storageConfig, + @JsonProperty("format") FormatPluginConfig formatConfig, + @JacksonInject StoragePluginRegistry engineRegistry, + @JsonProperty("columns") List columns, + @JsonProperty("selectionRoot") Path selectionRoot + ) throws ExecutionSetupException { super(userName); this.formatPlugin = (EasyFormatPlugin) engineRegistry.getFormatPlugin(storageConfig, formatConfig); Preconditions.checkNotNull(this.formatPlugin); @@ -63,7 +63,7 @@ public class EasySubScan extends AbstractSubScan{ } public EasySubScan(String userName, List files, EasyFormatPlugin plugin, List columns, - String selectionRoot){ + Path selectionRoot){ super(userName); this.formatPlugin = plugin; this.files = files; @@ -72,7 +72,7 @@ public class EasySubScan extends AbstractSubScan{ } @JsonProperty - public String getSelectionRoot() { + public Path getSelectionRoot() { return selectionRoot; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java index 587201ea9..3aeb2c26d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java @@ -17,8 +17,14 @@ */ package org.apache.drill.exec.store.dfs.easy; + +import org.apache.hadoop.fs.Path; + public interface FileWork { - String getPath(); + + Path getPath(); + long getStart(); + long getLength(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java index 505d68e78..6de8842cd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java @@ -18,12 +18,12 @@ package org.apache.drill.exec.store.direct; import com.fasterxml.jackson.annotation.JsonTypeName; -import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.ScanStats; import org.apache.drill.exec.store.RecordReader; +import org.apache.hadoop.fs.Path; import java.util.Collection; import java.util.List; @@ -37,20 +37,20 @@ import java.util.List; @JsonTypeName("metadata-direct-scan") public class MetadataDirectGroupScan extends DirectGroupScan { - private final Collection files; + private final Collection files; - public MetadataDirectGroupScan(RecordReader reader, Collection files) { + public MetadataDirectGroupScan(RecordReader reader, Collection files) { super(reader); this.files = files; } - public MetadataDirectGroupScan(RecordReader reader, Collection files, ScanStats stats) { + public MetadataDirectGroupScan(RecordReader reader, Collection files, ScanStats stats) { super(reader, stats); this.files = files; } @Override - public PhysicalOperator getNewWithChildren(List children) throws ExecutionSetupException { + public PhysicalOperator getNewWithChildren(List children) { assert children == null || children.isEmpty(); return new MetadataDirectGroupScan(reader, files, stats); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java index 428a4e1bd..d3fcc5aab 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java @@ -77,7 +77,7 @@ public class JSONRecordReader extends AbstractRecordReader { * @param columns pathnames of columns/subfields to read * @throws OutOfMemoryException */ - public JSONRecordReader(final FragmentContext fragmentContext, final String inputPath, final DrillFileSystem fileSystem, + public JSONRecordReader(final FragmentContext fragmentContext, final Path inputPath, final DrillFileSystem fileSystem, final List columns) throws OutOfMemoryException { this(fragmentContext, inputPath, null, fileSystem, columns); } @@ -90,14 +90,13 @@ public class JSONRecordReader extends AbstractRecordReader { * @param columns pathnames of columns/subfields to read * @throws OutOfMemoryException */ - public JSONRecordReader(final FragmentContext fragmentContext, final JsonNode embeddedContent, - final DrillFileSystem fileSystem, final List columns) throws OutOfMemoryException { + public JSONRecordReader(FragmentContext fragmentContext, JsonNode embeddedContent, DrillFileSystem fileSystem, + List columns) throws OutOfMemoryException { this(fragmentContext, null, embeddedContent, fileSystem, columns); } - private JSONRecordReader(final FragmentContext fragmentContext, final String inputPath, - final JsonNode embeddedContent, final DrillFileSystem fileSystem, - final List columns) { + private JSONRecordReader(FragmentContext fragmentContext, Path inputPath, JsonNode embeddedContent, + DrillFileSystem fileSystem, List columns) { Preconditions.checkArgument( (inputPath == null && embeddedContent != null) || @@ -106,7 +105,7 @@ public class JSONRecordReader extends AbstractRecordReader { ); if (inputPath != null) { - this.hadoopPath = new Path(inputPath); + this.hadoopPath = inputPath; } else { this.embeddedContent = embeddedContent; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java index 9dbe715b1..ec4bb1255 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java @@ -87,7 +87,7 @@ public class SequenceFileFormatPlugin extends EasyFormatPlugin columns, String userName) throws ExecutionSetupException { - final Path path = dfs.makeQualified(new Path(fileWork.getPath())); + final Path path = dfs.makeQualified(fileWork.getPath()); final FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""}); return new SequenceFileRecordReader(split, dfs, context.getQueryUserName(), userName); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java index 1c53a3774..03ae6f554 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java @@ -82,7 +82,7 @@ public class TextFormatPlugin extends EasyFormatPlugin columns, String userName) { - Path path = dfs.makeQualified(new Path(fileWork.getPath())); + Path path = dfs.makeQualified(fileWork.getPath()); FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""}); if (context.getOptions().getBoolean(ExecConstants.ENABLE_NEW_TEXT_READER_KEY)) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java index 3958a3270..5a78732af 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java @@ -192,7 +192,7 @@ public class HttpdLogFormatPlugin extends EasyFormatPlugin { @Override public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork, - List columns, String userName) throws ExecutionSetupException { - return new ImageRecordReader(context, dfs, fileWork.getPath(), - ((ImageFormatConfig)formatConfig).hasFileSystemMetadata(), - ((ImageFormatConfig)formatConfig).isDescriptive(), - ((ImageFormatConfig)formatConfig).getTimeZone()); + List columns, String userName) { + return new ImageRecordReader(context, dfs, fileWork.getPath(), formatConfig.hasFileSystemMetadata(), + formatConfig.isDescriptive(), formatConfig.getTimeZone()); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java index 2a4b4fb48..08ed4fd09 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java @@ -99,10 +99,10 @@ public class ImageRecordReader extends AbstractRecordReader { private DrillBuf managedBuffer; private boolean finish; - public ImageRecordReader(FragmentContext context, DrillFileSystem fs, String inputPath, + public ImageRecordReader(FragmentContext context, DrillFileSystem fs, Path inputPath, boolean fileSystemMetadata, boolean descriptive, String timeZone) { this.fs = fs; - hadoopPath = fs.makeQualified(new Path(inputPath)); + hadoopPath = fs.makeQualified(inputPath); this.fileSystemMetadata = fileSystemMetadata; this.descriptive = descriptive; this.timeZone = (timeZone != null) ? TimeZone.getTimeZone(timeZone) : TimeZone.getDefault(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java index e5d1dc438..ae0b733ff 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java @@ -41,9 +41,6 @@ import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.NullableTimeStampVector; import org.apache.drill.exec.vector.NullableTimeVector; - -import org.apache.hadoop.fs.Path; - import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; @@ -522,7 +519,7 @@ public class LogRecordReader extends AbstractRecordReader { private void openFile() { InputStream in; try { - in = dfs.open(new Path(fileWork.getPath())); + in = dfs.open(fileWork.getPath()); } catch (Exception e) { throw UserException .dataReadError(e) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java index e2d356953..5528d028c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java @@ -52,6 +52,7 @@ import org.apache.drill.exec.store.schedule.AffinityCreator; import org.apache.drill.exec.store.schedule.AssignmentCreator; import org.apache.drill.exec.store.schedule.EndpointByteMap; import org.apache.drill.exec.store.schedule.EndpointByteMapImpl; +import org.apache.hadoop.fs.Path; import java.io.IOException; import java.util.ArrayList; @@ -80,7 +81,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { protected ParquetTableMetadataBase parquetTableMetadata; protected List rowGroupInfos; protected ListMultimap mappings; - protected Set fileSet; + protected Set fileSet; protected ParquetReaderConfig readerConfig; private List endpointAffinities; @@ -146,7 +147,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { @JsonIgnore @Override - public Collection getFiles() { + public Collection getFiles() { return fileSet; } @@ -428,12 +429,12 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { } @JsonIgnore - public T getPartitionValue(String path, SchemaPath column, Class clazz) { + public T getPartitionValue(Path path, SchemaPath column, Class clazz) { return clazz.cast(parquetGroupScanStatistics.getPartitionValue(path, column)); } @JsonIgnore - public Set getFileSet() { + public Set getFileSet() { return fileSet; } // partition pruning methods end @@ -441,7 +442,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { // helper method used for partition pruning and filter push down @Override public void modifyFileSelection(FileSelection selection) { - List files = selection.getFiles(); + List files = selection.getFiles(); fileSet = new HashSet<>(files); entries = new ArrayList<>(files.size()); @@ -464,7 +465,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { if (fileSet == null) { fileSet = new HashSet<>(); fileSet.addAll(parquetTableMetadata.getFiles().stream() - .map((Function) ParquetFileMetadata::getPath) + .map((Function) ParquetFileMetadata::getPath) .collect(Collectors.toSet())); } @@ -505,7 +506,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { // abstract methods block start protected abstract void initInternal() throws IOException; protected abstract Collection getDrillbits(); - protected abstract AbstractParquetGroupScan cloneWithFileSelection(Collection filePaths) throws IOException; + protected abstract AbstractParquetGroupScan cloneWithFileSelection(Collection filePaths) throws IOException; protected abstract boolean supportsFileImplicitColumns(); protected abstract List getPartitionValues(RowGroupInfo rowGroupInfo); // abstract methods block end @@ -520,7 +521,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan { * @return new parquet group scan */ private AbstractParquetGroupScan cloneWithRowGroupInfos(List rowGroupInfos) throws IOException { - Set filePaths = rowGroupInfos.stream() + Set filePaths = rowGroupInfos.stream() .map(ReadEntryWithPath::getPath) .collect(Collectors.toSet()); // set keeps file names unique AbstractParquetGroupScan scan = cloneWithFileSelection(filePaths); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java index 99161fd59..b1819e639 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java @@ -52,7 +52,8 @@ public abstract class AbstractParquetScanBatchCreator { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractParquetScanBatchCreator.class); - protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRowGroupScan rowGroupScan, OperatorContext oContext) throws ExecutionSetupException { + protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRowGroupScan rowGroupScan, + OperatorContext oContext) throws ExecutionSetupException { final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), rowGroupScan.getColumns()); if (!columnExplorer.isStarQuery()) { @@ -63,7 +64,7 @@ public abstract class AbstractParquetScanBatchCreator { AbstractDrillFileSystemManager fsManager = getDrillFileSystemCreator(oContext, context.getOptions()); // keep footers in a map to avoid re-reading them - Map footers = new HashMap<>(); + Map footers = new HashMap<>(); List readers = new LinkedList<>(); List> implicitColumns = new ArrayList<>(); Map mapWithMaxColumns = new LinkedHashMap<>(); @@ -150,8 +151,8 @@ public abstract class AbstractParquetScanBatchCreator { protected abstract AbstractDrillFileSystemManager getDrillFileSystemCreator(OperatorContext operatorContext, OptionManager optionManager); - private ParquetMetadata readFooter(Configuration conf, String path, ParquetReaderConfig readerConfig) throws IOException { - try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(path), + private ParquetMetadata readFooter(Configuration conf, Path path, ParquetReaderConfig readerConfig) throws IOException { + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, readerConfig.addCountersToConf(conf)), readerConfig.toReadOptions())) { return reader.getFooter(); } @@ -168,6 +169,6 @@ public abstract class AbstractParquetScanBatchCreator { this.operatorContext = operatorContext; } - protected abstract DrillFileSystem get(Configuration config, String path) throws ExecutionSetupException; + protected abstract DrillFileSystem get(Configuration config, Path path) throws ExecutionSetupException; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java index a1d9f518d..cb9e9b96d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java @@ -68,8 +68,8 @@ public class ParquetGroupScan extends AbstractParquetGroupScan { private final MetadataContext metaContext; private boolean usedMetadataCache; // false by default // may change when filter push down / partition pruning is applied - private String selectionRoot; - private String cacheFileRoot; + private Path selectionRoot; + private Path cacheFileRoot; @JsonCreator public ParquetGroupScan(@JacksonInject StoragePluginRegistry engineRegistry, @@ -78,8 +78,8 @@ public class ParquetGroupScan extends AbstractParquetGroupScan { @JsonProperty("storage") StoragePluginConfig storageConfig, @JsonProperty("format") FormatPluginConfig formatConfig, @JsonProperty("columns") List columns, - @JsonProperty("selectionRoot") String selectionRoot, - @JsonProperty("cacheFileRoot") String cacheFileRoot, + @JsonProperty("selectionRoot") Path selectionRoot, + @JsonProperty("cacheFileRoot") Path cacheFileRoot, @JsonProperty("readerConfig") ParquetReaderConfig readerConfig, @JsonProperty("filter") LogicalExpression filter) throws IOException, ExecutionSetupException { super(ImpersonationUtil.resolveUserName(userName), columns, entries, readerConfig, filter); @@ -127,7 +127,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan { // The fully expanded list is already stored as part of the fileSet entries.add(new ReadEntryWithPath(fileSelection.getSelectionRoot())); } else { - for (String fileName : fileSelection.getFiles()) { + for (Path fileName : fileSelection.getFiles()) { entries.add(new ReadEntryWithPath(fileName)); } } @@ -169,12 +169,12 @@ public class ParquetGroupScan extends AbstractParquetGroupScan { } @JsonProperty - public String getSelectionRoot() { + public Path getSelectionRoot() { return selectionRoot; } @JsonProperty - public String getCacheFileRoot() { + public Path getCacheFileRoot() { return cacheFileRoot; } // getters for serialization / deserialization end @@ -224,8 +224,8 @@ public class ParquetGroupScan extends AbstractParquetGroupScan { // For EXPLAIN, remove the URI prefix from cacheFileRoot. If cacheFileRoot is null, we // would have read the cache file from selectionRoot String cacheFileRootString = (cacheFileRoot == null) ? - Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot)).toString() : - Path.getPathWithoutSchemeAndAuthority(new Path(cacheFileRoot)).toString(); + Path.getPathWithoutSchemeAndAuthority(selectionRoot).toString() : + Path.getPathWithoutSchemeAndAuthority(cacheFileRoot).toString(); builder.append(", cacheFileRoot=").append(cacheFileRootString); } @@ -241,7 +241,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan { FileSystem processUserFileSystem = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), fs.getConf()); Path metaPath = null; if (entries.size() == 1 && parquetTableMetadata == null) { - Path p = Path.getPathWithoutSchemeAndAuthority(new Path(entries.get(0).getPath())); + Path p = Path.getPathWithoutSchemeAndAuthority(entries.get(0).getPath()); if (fs.isDirectory(p)) { // Using the metadata file makes sense when querying a directory; otherwise // if querying a single file we can look up the metadata directly from the file @@ -257,9 +257,9 @@ public class ParquetGroupScan extends AbstractParquetGroupScan { parquetTableMetadata = Metadata.getParquetTableMetadata(processUserFileSystem, p.toString(), readerConfig); } } else { - Path p = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot)); + Path p = Path.getPathWithoutSchemeAndAuthority(selectionRoot); metaPath = new Path(p, Metadata.METADATA_FILENAME); - if (!metaContext.isMetadataCacheCorrupted() && fs.isDirectory(new Path(selectionRoot)) + if (!metaContext.isMetadataCacheCorrupted() && fs.isDirectory(selectionRoot) && fs.exists(metaPath)) { if (parquetTableMetadata == null) { parquetTableMetadata = Metadata.readBlockMeta(processUserFileSystem, metaPath, metaContext, readerConfig); @@ -275,7 +275,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan { final List fileStatuses = new ArrayList<>(); for (ReadEntryWithPath entry : entries) { fileStatuses.addAll( - DrillFileSystemUtil.listFiles(fs, Path.getPathWithoutSchemeAndAuthority(new Path(entry.getPath())), true)); + DrillFileSystemUtil.listFiles(fs, Path.getPathWithoutSchemeAndAuthority(entry.getPath()), true)); } Map statusMap = fileStatuses.stream() @@ -292,7 +292,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan { } @Override - protected AbstractParquetGroupScan cloneWithFileSelection(Collection filePaths) throws IOException { + protected AbstractParquetGroupScan cloneWithFileSelection(Collection filePaths) throws IOException { FileSelection newSelection = new FileSelection(null, new ArrayList<>(filePaths), getSelectionRoot(), cacheFileRoot, false); return clone(newSelection); } @@ -309,7 +309,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan { @Override protected List getPartitionValues(RowGroupInfo rowGroupInfo) { - return ColumnExplorer.listPartitionValues(rowGroupInfo.getPath(), selectionRoot); + return ColumnExplorer.listPartitionValues(rowGroupInfo.getPath(), selectionRoot, false); } // overridden protected methods block end @@ -425,7 +425,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan { } } else { final Path path = Path.getPathWithoutSchemeAndAuthority(cacheFileRoot); - fileSet.add(path.toString()); + fileSet.add(path); } } } @@ -436,21 +436,21 @@ public class ParquetGroupScan extends AbstractParquetGroupScan { return null; } - List fileNames = new ArrayList<>(fileSet); + List fileNames = new ArrayList<>(fileSet); // when creating the file selection, set the selection root without the URI prefix // The reason is that the file names above have been created in the form // /a/b/c.parquet and the format of the selection root must match that of the file names // otherwise downstream operations such as partition pruning can break. - final Path metaRootPath = Path.getPathWithoutSchemeAndAuthority(new Path(selection.getSelectionRoot())); - this.selectionRoot = metaRootPath.toString(); + final Path metaRootPath = Path.getPathWithoutSchemeAndAuthority(selection.getSelectionRoot()); + this.selectionRoot = metaRootPath; // Use the FileSelection constructor directly here instead of the FileSelection.create() method // because create() changes the root to include the scheme and authority; In future, if create() // is the preferred way to instantiate a file selection, we may need to do something different... // WARNING: file statuses and file names are inconsistent - FileSelection newSelection = new FileSelection(selection.getStatuses(fs), fileNames, metaRootPath.toString(), - cacheFileRoot, selection.wasAllPartitionsPruned()); + FileSelection newSelection = new FileSelection(selection.getStatuses(fs), fileNames, metaRootPath, cacheFileRoot, + selection.wasAllPartitionsPruned()); newSelection.setExpandedFully(); newSelection.setMetaContext(metaContext); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java index 9381043b5..915652413 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java @@ -21,6 +21,7 @@ import org.apache.commons.lang3.mutable.MutableLong; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.hadoop.fs.Path; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; @@ -41,7 +42,7 @@ import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetTa public class ParquetGroupScanStatistics { // map from file names to maps of column name to partition value mappings - private Map> partitionValueMap; + private Map> partitionValueMap; // only for partition columns : value is unique for each partition private Map partitionColTypeMap; // total number of non-null value for each column in parquet files @@ -78,7 +79,7 @@ public class ParquetGroupScanStatistics { return rowCount; } - public Object getPartitionValue(String path, SchemaPath column) { + public Object getPartitionValue(Path path, SchemaPath column) { return partitionValueMap.get(path).get(column); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java index eabe2df14..2513772c9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java @@ -37,6 +37,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; // Class containing information for reading a single parquet row group from HDFS @JsonTypeName("parquet-row-group-scan") @@ -45,7 +46,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan { private final ParquetFormatPlugin formatPlugin; private final ParquetFormatConfig formatConfig; - private final String selectionRoot; + private final Path selectionRoot; @JsonCreator public ParquetRowGroupScan(@JacksonInject StoragePluginRegistry registry, @@ -55,7 +56,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan { @JsonProperty("rowGroupReadEntries") LinkedList rowGroupReadEntries, @JsonProperty("columns") List columns, @JsonProperty("readerConfig") ParquetReaderConfig readerConfig, - @JsonProperty("selectionRoot") String selectionRoot, + @JsonProperty("selectionRoot") Path selectionRoot, @JsonProperty("filter") LogicalExpression filter) throws ExecutionSetupException { this(userName, (ParquetFormatPlugin) registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig), Preconditions.checkNotNull(formatConfig)), @@ -71,7 +72,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan { List rowGroupReadEntries, List columns, ParquetReaderConfig readerConfig, - String selectionRoot, + Path selectionRoot, LogicalExpression filter) { super(userName, rowGroupReadEntries, columns, readerConfig, filter); this.formatPlugin = Preconditions.checkNotNull(formatPlugin, "Could not find format config for the given configuration"); @@ -90,7 +91,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan { } @JsonProperty - public String getSelectionRoot() { + public Path getSelectionRoot() { return selectionRoot; } @@ -127,7 +128,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan { @Override public List getPartitionValues(RowGroupReadEntry rowGroupReadEntry) { - return ColumnExplorer.listPartitionValues(rowGroupReadEntry.getPath(), selectionRoot); + return ColumnExplorer.listPartitionValues(rowGroupReadEntry.getPath(), selectionRoot, false); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java index f0ef63989..8c91200d3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java @@ -28,6 +28,7 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import java.io.IOException; import java.util.List; @@ -61,7 +62,7 @@ public class ParquetScanBatchCreator extends AbstractParquetScanBatchCreator imp } @Override - protected DrillFileSystem get(Configuration config, String path) throws ExecutionSetupException { + protected DrillFileSystem get(Configuration config, Path path) throws ExecutionSetupException { if (fs == null) { try { fs = useAsyncPageReader ? operatorContext.newNonTrackingFileSystem(config) : operatorContext.newFileSystem(config); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java index 1c9ce107c..5fbadd6ee 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java @@ -23,6 +23,7 @@ import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS; import org.apache.drill.exec.store.dfs.easy.FileWork; import org.apache.drill.exec.store.schedule.CompleteWork; import org.apache.drill.exec.store.schedule.EndpointByteMap; +import org.apache.hadoop.fs.Path; import java.util.List; @@ -37,7 +38,7 @@ public class RowGroupInfo extends ReadEntryFromHDFS implements CompleteWork, Fil private long numRecordsToRead; @JsonCreator - public RowGroupInfo(@JsonProperty("path") String path, + public RowGroupInfo(@JsonProperty("path") Path path, @JsonProperty("start") long start, @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java index 665179f6b..be3f50c30 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java @@ -22,6 +22,7 @@ import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.hadoop.fs.Path; public class RowGroupReadEntry extends ReadEntryFromHDFS { @@ -29,7 +30,7 @@ public class RowGroupReadEntry extends ReadEntryFromHDFS { private long numRecordsToRead; @JsonCreator - public RowGroupReadEntry(@JsonProperty("path") String path, @JsonProperty("start") long start, + public RowGroupReadEntry(@JsonProperty("path") Path path, @JsonProperty("start") long start, @JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex, @JsonProperty("numRecordsToRead") long numRecordsToRead) { super(path, start, length); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java index 17cf8c44d..ba4c49330 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -90,7 +90,7 @@ public class ParquetRecordReader extends AbstractRecordReader { public boolean useBulkReader; @SuppressWarnings("unused") - private String name; + private Path name; public ParquetReaderStats parquetReaderStats = new ParquetReaderStats(); private BatchReader batchReader; @@ -123,44 +123,42 @@ public class ParquetRecordReader extends AbstractRecordReader { } public ParquetRecordReader(FragmentContext fragmentContext, - String path, + Path path, int rowGroupIndex, long numRecordsToRead, FileSystem fs, CodecFactory codecFactory, ParquetMetadata footer, List columns, - ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) throws ExecutionSetupException { - this(fragmentContext, numRecordsToRead, - path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus); + ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) { + this(fragmentContext, numRecordsToRead, path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus); } public ParquetRecordReader(FragmentContext fragmentContext, - String path, + Path path, int rowGroupIndex, FileSystem fs, CodecFactory codecFactory, ParquetMetadata footer, List columns, - ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) - throws ExecutionSetupException { - this(fragmentContext, footer.getBlocks().get(rowGroupIndex).getRowCount(), - path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus); + ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) { + this(fragmentContext, footer.getBlocks().get(rowGroupIndex).getRowCount(), path, rowGroupIndex, fs, codecFactory, + footer, columns, dateCorruptionStatus); } public ParquetRecordReader( FragmentContext fragmentContext, long numRecordsToRead, - String path, + Path path, int rowGroupIndex, FileSystem fs, CodecFactory codecFactory, ParquetMetadata footer, List columns, - ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) throws ExecutionSetupException { + ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) { this.name = path; - this.hadoopPath = new Path(path); + this.hadoopPath = path; this.fileSystem = fs; this.codecFactory = codecFactory; this.rowGroupIndex = rowGroupIndex; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java index 0db007ab6..d0e2734d2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.module.afterburner.AfterburnerModule; +import org.apache.drill.exec.serialization.PathSerDe; import org.apache.drill.exec.store.parquet.ParquetReaderConfig; import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; @@ -82,6 +83,9 @@ import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetFi import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetTableMetadata_v3; import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.RowGroupMetadata_v3; +/** + * This is an utility class, holder for Parquet Table Metadata and {@link ParquetReaderConfig} + */ public class Metadata { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Metadata.class); @@ -106,7 +110,7 @@ public class Metadata { * @param path path * @param readerConfig parquet reader configuration */ - public static void createMeta(FileSystem fs, String path, ParquetReaderConfig readerConfig) throws IOException { + public static void createMeta(FileSystem fs, Path path, ParquetReaderConfig readerConfig) throws IOException { Metadata metadata = new Metadata(readerConfig); metadata.createMetaFilesRecursively(path, fs); } @@ -208,13 +212,13 @@ public class Metadata { * {@code path} directory). * @throws IOException if parquet metadata can't be serialized and written to the json file */ - private Pair createMetaFilesRecursively(final String path, FileSystem fs) throws IOException { + private Pair createMetaFilesRecursively(final Path path, FileSystem fs) throws IOException { Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; List metaDataList = Lists.newArrayList(); - List directoryList = Lists.newArrayList(); + List directoryList = Lists.newArrayList(); ConcurrentHashMap columnTypeInfoSet = new ConcurrentHashMap<>(); - Path p = new Path(path); + Path p = path; FileStatus fileStatus = fs.getFileStatus(p); assert fileStatus.isDirectory() : "Expected directory"; @@ -222,10 +226,10 @@ public class Metadata { for (final FileStatus file : DrillFileSystemUtil.listAll(fs, p, false)) { if (file.isDirectory()) { - ParquetTableMetadata_v3 subTableMetadata = (createMetaFilesRecursively(file.getPath().toString(), fs)).getLeft(); + ParquetTableMetadata_v3 subTableMetadata = (createMetaFilesRecursively(file.getPath(), fs)).getLeft(); metaDataList.addAll(subTableMetadata.files); directoryList.addAll(subTableMetadata.directories); - directoryList.add(file.getPath().toString()); + directoryList.add(file.getPath()); // Merge the schema from the child level into the current level //TODO: We need a merge method that merges two columns with the same name but different types columnTypeInfoSet.putAll(subTableMetadata.columnTypeInfo); @@ -268,7 +272,7 @@ public class Metadata { ParquetTableMetadataDirs parquetTableMetadataDirs = new ParquetTableMetadataDirs(directoryList); return Pair.of(parquetTableMetadata, parquetTableMetadataDirs); } - List emptyDirList = Lists.newArrayList(); + List emptyDirList = new ArrayList<>(); if (timer != null) { logger.debug("Creating metadata files recursively took {} ms", timer.elapsed(TimeUnit.MILLISECONDS)); timer.stop(); @@ -495,7 +499,7 @@ public class Metadata { rowGroupMetadataList.add(rowGroupMeta); } - String path = Path.getPathWithoutSchemeAndAuthority(file.getPath()).toString(); + Path path = Path.getPathWithoutSchemeAndAuthority(file.getPath()); return new ParquetFileMetadata_v3(path, file.getLen(), rowGroupMetadataList); } @@ -535,6 +539,8 @@ public class Metadata { * * @param parquetTableMetadata parquet table metadata * @param p file path + * @param fs Drill file system + * @throws IOException if metadata can't be serialized */ private void writeFile(ParquetTableMetadata_v3 parquetTableMetadata, Path p, FileSystem fs) throws IOException { JsonFactory jsonFactory = new JsonFactory(); @@ -542,6 +548,7 @@ public class Metadata { jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false); ObjectMapper mapper = new ObjectMapper(jsonFactory); SimpleModule module = new SimpleModule(); + module.addSerializer(Path.class, new PathSerDe.Se()); module.addSerializer(ColumnMetadata_v3.class, new ColumnMetadata_v3.Serializer()); mapper.registerModule(module); OutputStream os = fs.create(p); @@ -556,6 +563,7 @@ public class Metadata { jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false); ObjectMapper mapper = new ObjectMapper(jsonFactory); SimpleModule module = new SimpleModule(); + module.addSerializer(Path.class, new PathSerDe.Se()); mapper.registerModule(module); OutputStream os = fs.create(p); mapper.writerWithDefaultPrettyPrinter().writeValue(os, parquetTableMetadataDirs); @@ -602,7 +610,7 @@ public class Metadata { parquetTableMetadataDirs.updateRelativePaths(metadataParentDirPath); if (!alreadyCheckedModification && tableModified(parquetTableMetadataDirs.getDirectories(), path, metadataParentDir, metaContext, fs)) { parquetTableMetadataDirs = - (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()).toString(), fs)).getRight(); + (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs)).getRight(); newMetadata = true; } } else { @@ -616,7 +624,7 @@ public class Metadata { } if (!alreadyCheckedModification && tableModified(parquetTableMetadata.getDirectories(), path, metadataParentDir, metaContext, fs)) { parquetTableMetadata = - (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()).toString(), fs)).getLeft(); + (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs)).getLeft(); newMetadata = true; } @@ -647,9 +655,10 @@ public class Metadata { * @return true if metadata needs to be updated, false otherwise * @throws IOException if some resources are not accessible */ - private boolean tableModified(List directories, Path metaFilePath, Path parentDir, MetadataContext metaContext, FileSystem fs) throws IOException { + private boolean tableModified(List directories, Path metaFilePath, Path parentDir, + MetadataContext metaContext, FileSystem fs) throws IOException { Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; - metaContext.setStatus(parentDir.toUri().getPath()); + metaContext.setStatus(parentDir); long metaFileModifyTime = fs.getFileStatus(metaFilePath).getModificationTime(); FileStatus directoryStatus = fs.getFileStatus(parentDir); int numDirs = 1; @@ -661,10 +670,10 @@ public class Metadata { } return true; } - for (String directory : directories) { + for (Path directory : directories) { numDirs++; metaContext.setStatus(directory); - directoryStatus = fs.getFileStatus(new Path(directory)); + directoryStatus = fs.getFileStatus(directory); if (directoryStatus.getModificationTime() > metaFileModifyTime) { if (timer != null) { logger.debug("Directory {} was modified. Took {} ms to check modification time of {} directories", diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java index bed8be67a..ee07470d2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.store.parquet.metadata; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.hadoop.fs.Path; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; @@ -57,7 +58,7 @@ public class MetadataBase { public static abstract class ParquetTableMetadataBase { @JsonIgnore - public abstract List getDirectories(); + public abstract List getDirectories(); @JsonIgnore public abstract List getFiles(); @@ -83,7 +84,7 @@ public class MetadataBase { } public static abstract class ParquetFileMetadata { - @JsonIgnore public abstract String getPath(); + @JsonIgnore public abstract Path getPath(); @JsonIgnore public abstract Long getLength(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java index 3e7c2ffcc..2794e2b14 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java @@ -21,6 +21,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.drill.common.util.DrillVersionInfo; import org.apache.hadoop.fs.Path; +import java.util.ArrayList; import java.util.List; import static org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.SUPPORTED_VERSIONS; @@ -39,12 +40,12 @@ public class MetadataPathUtils { * @param baseDir base parent directory * @return list of absolute paths */ - public static List convertToAbsolutePaths(List paths, String baseDir) { + public static List convertToAbsolutePaths(List paths, String baseDir) { if (!paths.isEmpty()) { - List absolutePaths = Lists.newArrayList(); - for (String relativePath : paths) { - String absolutePath = (new Path(relativePath).isAbsolute()) ? relativePath - : new Path(baseDir, relativePath).toUri().getPath(); + List absolutePaths = Lists.newArrayList(); + for (Path relativePath : paths) { + Path absolutePath = (relativePath.isAbsolute()) ? relativePath + : new Path(baseDir, relativePath); absolutePaths.add(absolutePath); } return absolutePaths; @@ -64,10 +65,10 @@ public class MetadataPathUtils { if (!files.isEmpty()) { List filesWithAbsolutePaths = Lists.newArrayList(); for (ParquetFileMetadata_v3 file : files) { - Path relativePath = new Path(file.getPath()); + Path relativePath = file.getPath(); // create a new file if old one contains a relative path, otherwise use an old file ParquetFileMetadata_v3 fileWithAbsolutePath = (relativePath.isAbsolute()) ? file - : new ParquetFileMetadata_v3(new Path(baseDir, relativePath).toUri().getPath(), file.length, file.rowGroups); + : new ParquetFileMetadata_v3(new Path(baseDir, relativePath), file.length, file.rowGroups); filesWithAbsolutePaths.add(fileWithAbsolutePath); } return filesWithAbsolutePaths; @@ -84,9 +85,9 @@ public class MetadataPathUtils { * @return parquet table metadata with relative paths for the files and directories */ public static ParquetTableMetadata_v3 createMetadataWithRelativePaths( - ParquetTableMetadata_v3 tableMetadataWithAbsolutePaths, String baseDir) { - List directoriesWithRelativePaths = Lists.newArrayList(); - for (String directory : tableMetadataWithAbsolutePaths.getDirectories()) { + ParquetTableMetadata_v3 tableMetadataWithAbsolutePaths, Path baseDir) { + List directoriesWithRelativePaths = new ArrayList<>(); + for (Path directory : tableMetadataWithAbsolutePaths.getDirectories()) { directoriesWithRelativePaths.add(relativize(baseDir, directory)); } List filesWithRelativePaths = Lists.newArrayList(); @@ -105,9 +106,9 @@ public class MetadataPathUtils { * @param baseDir base path (the part of the Path, which should be cut off from child path) * @return relative path */ - public static String relativize(String baseDir, String childPath) { - Path fullPathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(new Path(childPath)); - Path basePathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(new Path(baseDir)); + public static Path relativize(Path baseDir, Path childPath) { + Path fullPathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(childPath); + Path basePathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(baseDir); // Since hadoop Path hasn't relativize() we use uri.relativize() to get relative path Path relativeFilePath = new Path(basePathWithoutSchemeAndAuthority.toUri() @@ -116,7 +117,7 @@ public class MetadataPathUtils { throw new IllegalStateException(String.format("Path %s is not a subpath of %s.", basePathWithoutSchemeAndAuthority.toUri().getPath(), fullPathWithoutSchemeAndAuthority.toUri().getPath())); } - return relativeFilePath.toUri().getPath(); + return relativeFilePath; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V1.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V1.java index 92feb5f8a..4b0dca803 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V1.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V1.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import org.apache.drill.common.expression.SchemaPath; +import org.apache.hadoop.fs.Path; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; @@ -43,19 +44,19 @@ public class Metadata_V1 { @JsonProperty(value = "metadata_version", access = JsonProperty.Access.WRITE_ONLY) private String metadataVersion; @JsonProperty List files; - @JsonProperty List directories; + @JsonProperty List directories; public ParquetTableMetadata_v1() { } - public ParquetTableMetadata_v1(String metadataVersion, List files, List directories) { + public ParquetTableMetadata_v1(String metadataVersion, List files, List directories) { this.metadataVersion = metadataVersion; this.files = files; this.directories = directories; } @JsonIgnore - @Override public List getDirectories() { + @Override public List getDirectories() { return directories; } @@ -114,7 +115,7 @@ public class Metadata_V1 { */ public static class ParquetFileMetadata_v1 extends ParquetFileMetadata { @JsonProperty - public String path; + public Path path; @JsonProperty public Long length; @JsonProperty @@ -123,7 +124,7 @@ public class Metadata_V1 { public ParquetFileMetadata_v1() { } - public ParquetFileMetadata_v1(String path, Long length, List rowGroups) { + public ParquetFileMetadata_v1(Path path, Long length, List rowGroups) { this.path = path; this.length = length; this.rowGroups = rowGroups; @@ -134,7 +135,7 @@ public class Metadata_V1 { return String.format("path: %s rowGroups: %s", path, rowGroups); } - @JsonIgnore @Override public String getPath() { + @JsonIgnore @Override public Path getPath() { return path; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V2.java index 7eddc1279..a78fca4bf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V2.java @@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.JsonDeserializer; import com.fasterxml.jackson.databind.JsonSerializer; import com.fasterxml.jackson.databind.KeyDeserializer; import com.fasterxml.jackson.databind.SerializerProvider; +import org.apache.hadoop.fs.Path; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; @@ -59,7 +60,7 @@ public class Metadata_V2 { @JsonProperty public ConcurrentHashMap columnTypeInfo; @JsonProperty List files; - @JsonProperty List directories; + @JsonProperty List directories; @JsonProperty String drillVersion; public ParquetTableMetadata_v2() { @@ -71,7 +72,7 @@ public class Metadata_V2 { } public ParquetTableMetadata_v2(String metadataVersion, ParquetTableMetadataBase parquetTable, - List files, List directories, String drillVersion) { + List files, List directories, String drillVersion) { this.metadataVersion = metadataVersion; this.files = files; this.directories = directories; @@ -79,7 +80,7 @@ public class Metadata_V2 { this.drillVersion = drillVersion; } - public ParquetTableMetadata_v2(String metadataVersion, List files, List directories, + public ParquetTableMetadata_v2(String metadataVersion, List files, List directories, ConcurrentHashMap columnTypeInfo, String drillVersion) { this.metadataVersion = metadataVersion; this.files = files; @@ -93,7 +94,7 @@ public class Metadata_V2 { } @JsonIgnore - @Override public List getDirectories() { + @Override public List getDirectories() { return directories; } @@ -152,14 +153,14 @@ public class Metadata_V2 { * Struct which contains the metadata for a single parquet file */ public static class ParquetFileMetadata_v2 extends ParquetFileMetadata { - @JsonProperty public String path; + @JsonProperty public Path path; @JsonProperty public Long length; @JsonProperty public List rowGroups; public ParquetFileMetadata_v2() { } - public ParquetFileMetadata_v2(String path, Long length, List rowGroups) { + public ParquetFileMetadata_v2(Path path, Long length, List rowGroups) { this.path = path; this.length = length; this.rowGroups = rowGroups; @@ -169,7 +170,7 @@ public class Metadata_V2 { return String.format("path: %s rowGroups: %s", path, rowGroups); } - @JsonIgnore @Override public String getPath() { + @JsonIgnore @Override public Path getPath() { return path; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V3.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V3.java index 4bb07f78f..a5ff89795 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V3.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V3.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.JsonSerializer; import com.fasterxml.jackson.databind.KeyDeserializer; import com.fasterxml.jackson.databind.SerializerProvider; import org.apache.drill.common.expression.SchemaPath; +import org.apache.hadoop.fs.Path; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.PrimitiveType; @@ -54,7 +55,7 @@ public class Metadata_V3 { @JsonProperty public ConcurrentHashMap columnTypeInfo; @JsonProperty List files; - @JsonProperty List directories; + @JsonProperty List directories; @JsonProperty String drillVersion; /** @@ -74,7 +75,7 @@ public class Metadata_V3 { } public ParquetTableMetadata_v3(String metadataVersion, ParquetTableMetadataBase parquetTable, - List files, List directories, String drillVersion) { + List files, List directories, String drillVersion) { this.metadataVersion = metadataVersion; this.files = files; this.directories = directories; @@ -82,7 +83,7 @@ public class Metadata_V3 { this.drillVersion = drillVersion; } - public ParquetTableMetadata_v3(String metadataVersion, List files, List directories, + public ParquetTableMetadata_v3(String metadataVersion, List files, List directories, ConcurrentHashMap columnTypeInfo, String drillVersion) { this.metadataVersion = metadataVersion; @@ -97,7 +98,7 @@ public class Metadata_V3 { } @JsonIgnore - @Override public List getDirectories() { + @Override public List getDirectories() { return directories; } @@ -168,14 +169,14 @@ public class Metadata_V3 { * Struct which contains the metadata for a single parquet file */ public static class ParquetFileMetadata_v3 extends ParquetFileMetadata { - @JsonProperty public String path; + @JsonProperty public Path path; @JsonProperty public Long length; @JsonProperty public List rowGroups; public ParquetFileMetadata_v3() { } - public ParquetFileMetadata_v3(String path, Long length, List rowGroups) { + public ParquetFileMetadata_v3(Path path, Long length, List rowGroups) { this.path = path; this.length = length; this.rowGroups = rowGroups; @@ -185,7 +186,7 @@ public class Metadata_V3 { return String.format("path: %s rowGroups: %s", path, rowGroups); } - @JsonIgnore @Override public String getPath() { + @JsonIgnore @Override public Path getPath() { return path; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/ParquetTableMetadataDirs.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/ParquetTableMetadataDirs.java index 186f53415..b1fd7f23e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/ParquetTableMetadataDirs.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/ParquetTableMetadataDirs.java @@ -19,24 +19,25 @@ package org.apache.drill.exec.store.parquet.metadata; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.hadoop.fs.Path; import java.util.List; public class ParquetTableMetadataDirs { @JsonProperty - List directories; + List directories; public ParquetTableMetadataDirs() { // default constructor needed for deserialization } - public ParquetTableMetadataDirs(List directories) { + public ParquetTableMetadataDirs(List directories) { this.directories = directories; } @JsonIgnore - public List getDirectories() { + public List getDirectories() { return directories; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java index 09c016a5f..5c49e040a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java @@ -235,7 +235,7 @@ public class DrillParquetReader extends AbstractRecordReader { paths.put(md.getPath(), md); } - Path filePath = new Path(entry.getPath()); + Path filePath = entry.getPath(); BlockMetaData blockMetaData = footer.getBlocks().get(entry.getRowGroupIndex()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java index d688f3b07..aef56a307 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java @@ -92,11 +92,11 @@ public class PcapRecordReader extends AbstractRecordReader { .build(); } - public PcapRecordReader(final String pathToFile, + public PcapRecordReader(final Path pathToFile, final FileSystem fileSystem, final List projectedColumns) { this.fs = fileSystem; - this.pathToFile = fs.makeQualified(new Path(pathToFile)); + this.pathToFile = fs.makeQualified(pathToFile); this.projectedColumns = projectedColumns; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java index b1c5f2427..0ad234de7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java @@ -65,11 +65,11 @@ public class PcapngRecordReader extends AbstractRecordReader { private Iterator it; - public PcapngRecordReader(final String pathToFile, + public PcapngRecordReader(final Path pathToFile, final FileSystem fileSystem, final List columns) { this.fs = fileSystem; - this.pathToFile = fs.makeQualified(new Path(pathToFile)); + this.pathToFile = fs.makeQualified(pathToFile); this.columns = columns; setColumns(columns); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java index 7de31a063..6bc7bb02a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java @@ -116,8 +116,8 @@ public class BlockMapBuilder { try { ImmutableRangeMap rangeMap = getBlockMap(status); for (Entry, BlockLocation> l : rangeMap.asMapOfRanges().entrySet()) { - work.add(new CompleteFileWork(getEndpointByteMap(noDrillbitHosts, new FileStatusWork(status)), l.getValue().getOffset(), l.getValue().getLength(), status.getPath() - .toString())); + work.add(new CompleteFileWork(getEndpointByteMap(noDrillbitHosts, new FileStatusWork(status)), + l.getValue().getOffset(), l.getValue().getLength(), status.getPath())); } } catch (IOException e) { logger.warn("failure while generating file work.", e); @@ -127,7 +127,8 @@ public class BlockMapBuilder { if (!blockify || error || compressed(status)) { - work.add(new CompleteFileWork(getEndpointByteMap(noDrillbitHosts, new FileStatusWork(status)), 0, status.getLen(), status.getPath().toString())); + work.add(new CompleteFileWork(getEndpointByteMap(noDrillbitHosts, new FileStatusWork(status)), 0, + status.getLen(), status.getPath())); } // This if-condition is specific for empty CSV file @@ -135,7 +136,8 @@ public class BlockMapBuilder { // And if this CSV file is empty, rangeMap would be empty also // Therefore, at the point before this if-condition, work would not be populated if(work.isEmpty()) { - work.add(new CompleteFileWork(getEndpointByteMap(noDrillbitHosts, new FileStatusWork(status)), 0, 0, status.getPath().toString())); + work.add(new CompleteFileWork(getEndpointByteMap(noDrillbitHosts, new FileStatusWork(status)), 0, 0, + status.getPath())); } if (noDrillbitHosts != null) { @@ -162,8 +164,8 @@ public class BlockMapBuilder { } @Override - public String getPath() { - return status.getPath().toString(); + public Path getPath() { + return status.getPath(); } @Override @@ -231,20 +233,20 @@ public class BlockMapBuilder { */ public EndpointByteMap getEndpointByteMap(Set noDrillbitHosts, FileWork work) throws IOException { Stopwatch watch = Stopwatch.createStarted(); - Path fileName = new Path(work.getPath()); + Path fileName = work.getPath(); - ImmutableRangeMap blockMap = getBlockMap(fileName); + ImmutableRangeMap blockMap = getBlockMap(fileName); EndpointByteMapImpl endpointByteMap = new EndpointByteMapImpl(); long start = work.getStart(); long end = start + work.getLength(); Range rowGroupRange = Range.closedOpen(start, end); // Find submap of ranges that intersect with the rowGroup - ImmutableRangeMap subRangeMap = blockMap.subRangeMap(rowGroupRange); + ImmutableRangeMap subRangeMap = blockMap.subRangeMap(rowGroupRange); // Iterate through each block in this submap and get the host for the block location - for (Map.Entry,BlockLocation> block : subRangeMap.asMapOfRanges().entrySet()) { + for (Map.Entry, BlockLocation> block : subRangeMap.asMapOfRanges().entrySet()) { String[] hosts; Range blockRange = block.getKey(); try { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java index 04c4eb0db..4b0402bd8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java @@ -21,16 +21,17 @@ import org.apache.drill.exec.store.dfs.easy.FileWork; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.hadoop.fs.Path; public class CompleteFileWork implements FileWork, CompleteWork { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CompleteFileWork.class); private long start; private long length; - private String path; + private Path path; private EndpointByteMap byteMap; - public CompleteFileWork(EndpointByteMap byteMap, long start, long length, String path) { + public CompleteFileWork(EndpointByteMap byteMap, long start, long length, Path path) { super(); this.start = start; this.length = length; @@ -69,7 +70,7 @@ public class CompleteFileWork implements FileWork, CompleteWork { } @Override - public String getPath() { + public Path getPath() { return path; } @@ -87,22 +88,28 @@ public class CompleteFileWork implements FileWork, CompleteWork { return new FileWorkImpl(start, length, path); } - public static class FileWorkImpl implements FileWork{ + @Override + public String toString() { + return String.format("File: %s start: %d length: %d", path, start, length); + } + + public static class FileWorkImpl implements FileWork { + + private long start; + private long length; + private Path path; @JsonCreator - public FileWorkImpl(@JsonProperty("start") long start, @JsonProperty("length") long length, @JsonProperty("path") String path) { - super(); + public FileWorkImpl(@JsonProperty("start") long start, + @JsonProperty("length") long length, + @JsonProperty("path") Path path) { this.start = start; this.length = length; this.path = path; } - public long start; - public long length; - public String path; - @Override - public String getPath() { + public Path getPath() { return path; } @@ -116,10 +123,13 @@ public class CompleteFileWork implements FileWork, CompleteWork { return length; } - } - - @Override - public String toString() { - return String.format("File: %s start: %d length: %d", path, start, length); + @Override + public String toString() { + return "FileWorkImpl{" + + "start=" + start + + ", length=" + length + + ", path=" + path + + '}'; + } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java index fcee3b0e0..bfb83e07c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.util; import org.apache.drill.exec.store.dfs.DrillFileSystem; +import org.apache.drill.shaded.guava.com.google.common.base.Strings; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -133,4 +134,14 @@ public class DrillFileSystemUtil { return FileSystemUtil.listAllSafe(fs, path, recursive, FileSystemUtil.mergeFilters(DRILL_SYSTEM_FILTER, filters)); } + /** + * Safely creates Hadoop Path for null and empty String paths + * + * @param path String path, which can be null or empty + * @return Hadoop Path. Root - for empty or null path + */ + public static Path createPathSafe(String path) { + return Strings.isNullOrEmpty(path) ? new Path("/") : new Path(path); + } + } diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java index 9e0d95c85..247d7842e 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java @@ -143,8 +143,7 @@ public class PlanTestBase extends BaseTestQuery { * planning process throws an exception */ public static void testPlanWithAttributesMatchingPatterns(String query, String[] expectedPatterns, - String[] excludedPatterns) - throws Exception { + String[] excludedPatterns) throws Exception { final String plan = getPlanInString("EXPLAIN PLAN INCLUDING ALL ATTRIBUTES for " + QueryTestUtil.normalizeQuery(query), OPTIQ_FORMAT); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestPathSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestPathSerialization.java new file mode 100644 index 000000000..cb9356568 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestPathSerialization.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec; + + +import com.fasterxml.jackson.databind.module.SimpleModule; +import org.apache.drill.exec.serialization.PathSerDe; +import org.apache.drill.exec.store.schedule.CompleteFileWork; +import org.apache.drill.test.DrillTest; +import org.apache.hadoop.fs.Path; +import org.junit.Test; + +import java.io.IOException; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; + +public class TestPathSerialization extends DrillTest { + + @Test + public void testDeSerializingWithJsonCreator() throws IOException { + + String jsonString = "{\"start\": 1, \"length\": 2, \"path\": \"/tmp/drill/test\"}"; + + SimpleModule module = new SimpleModule(); + module.addSerializer(Path.class, new PathSerDe.Se()); + objectMapper.registerModule(module); + + CompleteFileWork.FileWorkImpl bean = objectMapper.readValue(jsonString, CompleteFileWork.FileWorkImpl.class); + + assertThat(bean.getStart() == 1, equalTo( true )); + assertThat(bean.getLength() == 2, equalTo( true )); + assertThat(bean.getPath().equals(new Path("/tmp/drill/test")), equalTo( true )); + } + + @Test + public void testHadoopPathSerDe() throws IOException { + CompleteFileWork.FileWorkImpl fileWork = new CompleteFileWork.FileWorkImpl(5, 6, new Path("/tmp")); + SimpleModule module = new SimpleModule(); + module.addSerializer(Path.class, new PathSerDe.Se()); + objectMapper.registerModule(module); + + CompleteFileWork.FileWorkImpl bean = + objectMapper.readValue(objectMapper.writeValueAsString(fileWork), CompleteFileWork.FileWorkImpl.class); + + assertThat(bean.getStart() == 5, equalTo( true )); + assertThat(bean.getLength() == 6, equalTo( true )); + assertThat(bean.getPath().equals(new Path("/tmp")), equalTo( true )); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java index 2fdf3e637..b05bb28f3 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java @@ -85,7 +85,7 @@ public class TestFileScanFramework extends SubOperatorTest { } @Override - public String getPath() { return path.toString(); } + public Path getPath() { return path; } @Override public long getStart() { return 0; } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java index 9a2bb1f85..911a09744 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java @@ -32,6 +32,7 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator; import org.apache.drill.exec.store.parquet.ParquetReaderUtility; import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader; +import org.apache.drill.test.LegacyOperatorTestBuilder; import org.apache.drill.test.PhysicalOpUnitTestBase; import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.CodecFactory; @@ -358,7 +359,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase { */ public class JsonScanBuilder extends ScanPopBuider { List jsonBatches = null; - List inputPaths = Collections.emptyList(); + List inputPaths = Collections.emptyList(); public JsonScanBuilder(PopBuilder parent) { super(parent); @@ -373,7 +374,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase { return this; } - public JsonScanBuilder inputPaths(List inputPaths) { + public JsonScanBuilder inputPaths(List inputPaths) { this.inputPaths = inputPaths; return this; } @@ -412,7 +413,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase { * Builder for parquet Scan RecordBatch. */ public class ParquetScanBuilder extends ScanPopBuider { - List inputPaths = Collections.emptyList(); + List inputPaths = Collections.emptyList(); public ParquetScanBuilder() { super(); @@ -422,7 +423,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase { super(parent); } - public ParquetScanBuilder inputPaths(List inputPaths) { + public ParquetScanBuilder inputPaths(List inputPaths) { this.inputPaths = inputPaths; return this; } @@ -443,8 +444,8 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase { private RecordBatch getScanBatch() throws Exception { List readers = new LinkedList<>(); - for (String path : inputPaths) { - ParquetMetadata footer = ParquetFileReader.readFooter(fs.getConf(), new Path(path)); + for (Path path : inputPaths) { + ParquetMetadata footer = ParquetFileReader.readFooter(fs.getConf(), path); for (int i = 0; i < footer.getBlocks().size(); i++) { readers.add(new ParquetRecordReader(fragContext, diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java index cc259cc1b..a1b700153 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java @@ -32,6 +32,7 @@ import org.apache.drill.exec.record.metadata.SchemaBuilder; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -59,11 +60,11 @@ public class TestMiniPlan extends MiniPlanUnitTestBase { @Test public void testSimpleParquetScan() throws Exception { String file = DrillFileUtils.getResourceAsFile("/tpchmulti/region/01.parquet").toURI().toString(); - + List filePath = Collections.singletonList(new Path(file)); RecordBatch scanBatch = new ParquetScanBuilder() .fileSystem(fs) .columnsToRead("R_REGIONKEY") - .inputPaths(Lists.newArrayList(file)) + .inputPaths(filePath) .build(); BatchSchema expectedSchema = new SchemaBuilder() diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java index bdff80b14..69a421481 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java @@ -41,6 +41,7 @@ import org.apache.drill.exec.record.metadata.SchemaBuilder; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Test; @@ -272,11 +273,12 @@ public class TestNullInputMiniPlan extends MiniPlanUnitTestBase{ RecordBatch left = createScanBatchFromJson(SINGLE_EMPTY_JSON); String file = DrillFileUtils.getResourceAsFile("/tpchmulti/region/01.parquet").toURI().toString(); + List filePath = Collections.singletonList(new Path(file)); RecordBatch scanBatch = new ParquetScanBuilder() .fileSystem(fs) .columnsToRead("R_REGIONKEY") - .inputPaths(Lists.newArrayList(file)) + .inputPaths(filePath) .build(); RecordBatch projectBatch = new PopBuilder() @@ -554,10 +556,10 @@ public class TestNullInputMiniPlan extends MiniPlanUnitTestBase{ } private RecordBatch createScanBatchFromJson(String... resourcePaths) throws Exception { - List inputPaths = new ArrayList<>(); + List inputPaths = new ArrayList<>(); for (String resource : resourcePaths) { - inputPaths.add(DrillFileUtils.getResourceAsFile(resource).toURI().toString()); + inputPaths.add(new Path(DrillFileUtils.getResourceAsFile(resource).toURI())); } RecordBatch scanBatch = new JsonScanBuilder() diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/CachedSingleFileSystem.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/CachedSingleFileSystem.java index 6bd7e4de7..3db92564d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/CachedSingleFileSystem.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/CachedSingleFileSystem.java @@ -41,16 +41,16 @@ import org.apache.hadoop.util.Progressable; public class CachedSingleFileSystem extends FileSystem { private ByteBuf file; - private String path; + private Path path; - public CachedSingleFileSystem(String path) throws IOException { + public CachedSingleFileSystem(Path path) throws IOException { this.path = path; - File f = new File(path); + File f = new File(path.toUri().getPath()); long length = f.length(); if (length > Integer.MAX_VALUE) { throw new UnsupportedOperationException("Cached file system only supports files of less than 2GB."); } - try (InputStream is = new BufferedInputStream(new FileInputStream(path))) { + try (InputStream is = new BufferedInputStream(new FileInputStream(path.toUri().getPath()))) { byte[] buffer = new byte[64*1024]; this.file = UnpooledByteBufAllocator.DEFAULT.directBuffer((int) length); int read; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDFSPartitionLocation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDFSPartitionLocation.java new file mode 100644 index 000000000..067abece3 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDFSPartitionLocation.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.dfs; + +import org.apache.drill.exec.planner.DFSFilePartitionLocation; +import org.apache.drill.test.DrillTest; +import org.apache.hadoop.fs.Path; +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class TestDFSPartitionLocation extends DrillTest { + + private static final Path SELECTION_ROOT = new Path("/tmp/drill"); + private static final Path PARTITION = new Path("/tmp/drill/test_table/first_dir/second_dir/"); + + @Test + public void testDFSFilePartitionLocation() { + Path file = new Path(PARTITION, "0_0_0.parquet"); + DFSFilePartitionLocation dfsPartition = new DFSFilePartitionLocation(4, SELECTION_ROOT, file, false); + checkSubdirectories(dfsPartition, file); + } + + @Test + public void testDFSDirectoryPartitionLocation() { + DFSFilePartitionLocation dfsPartition = new DFSFilePartitionLocation(4, SELECTION_ROOT, PARTITION, true); + checkSubdirectories(dfsPartition, PARTITION); + } + + private void checkSubdirectories(DFSFilePartitionLocation dfsPartition, Path partition) { + assertArrayEquals("Wrong partition dirs", new String[]{"test_table", "first_dir", "second_dir", null}, dfsPartition.getDirs()); + assertEquals("Wrong partition value","test_table", dfsPartition.getPartitionValue(0)); + assertEquals("Wrong partition value", "first_dir", dfsPartition.getPartitionValue(1)); + assertEquals("Wrong partition value", "second_dir", dfsPartition.getPartitionValue(2)); + assertEquals("Wrong partition location", partition, dfsPartition.getEntirePartitionLocation()); + } + +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFileSelection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFileSelection.java index b1f233e94..b853b9b8a 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFileSelection.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFileSelection.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertNull; import java.util.List; +import org.apache.drill.exec.util.DrillFileSystemUtil; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; import org.apache.drill.test.BaseTestQuery; import org.apache.hadoop.fs.FileStatus; @@ -30,7 +31,7 @@ import org.junit.Test; public class TestFileSelection extends BaseTestQuery { private static final List EMPTY_STATUSES = ImmutableList.of(); - private static final List EMPTY_FILES = ImmutableList.of(); + private static final List EMPTY_FILES = ImmutableList.of(); private static final String EMPTY_ROOT = ""; @Test @@ -38,8 +39,8 @@ public class TestFileSelection extends BaseTestQuery { for (final Object statuses : new Object[] { null, EMPTY_STATUSES}) { for (final Object files : new Object[]{null, EMPTY_FILES}) { for (final Object root : new Object[]{null, EMPTY_ROOT}) { - final FileSelection selection = FileSelection.create((List) statuses, (List) files, - (String)root); + FileSelection selection = FileSelection.create((List) statuses, (List) files, + DrillFileSystemUtil.createPathSafe((String) root)); assertNull(selection); } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java index 1bd90b37e..16b25ce13 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java @@ -610,13 +610,13 @@ public class ParquetRecordReaderTest extends BaseTestQuery { final FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c); final FragmentContextImpl context = new FragmentContextImpl(bitContext, BitControl.PlanFragment.getDefaultInstance(), connection, registry); - final String fileName = "/tmp/parquet_test_performance.parquet"; + final Path fileName = new Path("/tmp/parquet_test_performance.parquet"); final HashMap fields = new HashMap<>(); final ParquetTestProperties props = new ParquetTestProperties(1, 20 * 1000 * 1000, DEFAULT_BYTES_PER_PAGE, fields); populateFieldInfoMap(props); final Configuration dfsConfig = new Configuration(); - final List