aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVitalii Diravka <vitalii.diravka@gmail.com>2019-02-18 22:30:36 +0200
committerVitalii Diravka <vitalii.diravka@gmail.com>2019-03-05 16:32:05 +0200
commit3d29faf81da593035f6bd38dd56d48e719afe7d4 (patch)
treed77b926b0de304677bf81a883e20e9776f986ea0
parent7e3b45967dbb97da18ba49a2fa6a67a48e33b092 (diff)
DRILL-5603: Replace String file paths to Hadoop Path
- replaced all String path representation with org.apache.hadoop.fs.Path - added PathSerDe.Se JSON serializer - refactoring of DFSPartitionLocation code by leveraging existing listPartitionValues() functionality closes #1657
-rw-r--r--contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java4
-rw-r--r--contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java12
-rw-r--r--contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java3
-rw-r--r--contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java9
-rw-r--r--contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java8
-rw-r--r--contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java2
-rw-r--r--contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java6
-rw-r--r--contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScanBatchCreator.java5
-rw-r--r--contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartitionHolder.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SchemalessScan.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadata.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSDirPartitionLocation.java18
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSFilePartitionLocation.java25
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java102
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java18
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionLocation.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java44
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java53
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimplePartitionLocation.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AnalyzeTableHandler.java28
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/serialization/PathSerDe.java39
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java46
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroDrillTable.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java161
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java33
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java22
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageFormatPlugin.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java42
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java24
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java37
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java29
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V1.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V2.java15
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V3.java15
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/ParquetTableMetadataDirs.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java22
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java42
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java11
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java3
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/TestPathSerialization.java65
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java2
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java13
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java5
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java8
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/CachedSingleFileSystem.java8
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDFSPartitionLocation.java54
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFileSelection.java7
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java4
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java3
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java5
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java2
84 files changed, 765 insertions, 567 deletions
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
index 3011f4efb..6501f8c10 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/db/MapRDBFormatPlugin.java
@@ -180,9 +180,9 @@ public class MapRDBFormatPlugin extends TableFormatPlugin {
*/
@JsonIgnore
public String getTableName(FileSelection selection) {
- List<String> files = selection.getFiles();
+ List<Path> files = selection.getFiles();
assert (files.size() == 1);
- return files.get(0);
+ return files.get(0).toUri().getPath();
}
}
diff --git a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java
index 92f134fb7..2ddf75296 100644
--- a/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java
+++ b/contrib/format-maprdb/src/main/java/org/apache/drill/exec/store/mapr/streams/StreamsFormatPlugin.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
public class StreamsFormatPlugin extends TableFormatPlugin {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamsFormatPlugin.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StreamsFormatPlugin.class);
private StreamsFormatMatcher matcher;
public StreamsFormatPlugin(String name, DrillbitContext context, Configuration fsConf,
@@ -72,9 +72,8 @@ public class StreamsFormatPlugin extends TableFormatPlugin {
}
@Override
- public AbstractGroupScan getGroupScan(String userName, FileSelection selection,
- List<SchemaPath> columns) throws IOException {
- List<String> files = selection.getFiles();
+ public AbstractGroupScan getGroupScan(String userName, FileSelection selection, List<SchemaPath> columns) {
+ List<Path> files = selection.getFiles();
assert (files.size() == 1);
//TableProperties props = getMaprFS().getTableProperties(new Path(files.get(0)));
throw UserException.unsupportedError().message("MapR streams can not be querried at this time.").build(logger);
@@ -86,13 +85,12 @@ public class StreamsFormatPlugin extends TableFormatPlugin {
}
@Override
- public DrillStatsTable.TableStatistics readStatistics(FileSystem fs, Path statsTablePath) throws IOException {
+ public DrillStatsTable.TableStatistics readStatistics(FileSystem fs, Path statsTablePath) {
throw new UnsupportedOperationException("unimplemented");
}
@Override
- public void writeStatistics(DrillStatsTable.TableStatistics statistics, FileSystem fs, Path statsTablePath) throws IOException {
+ public void writeStatistics(DrillStatsTable.TableStatistics statistics, FileSystem fs, Path statsTablePath) {
throw new UnsupportedOperationException("unimplemented");
}
-
}
diff --git a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java
index a198e3444..4b2831c2d 100644
--- a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java
+++ b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java
@@ -32,7 +32,6 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.dfs.easy.FileWork;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
import org.apache.drill.exec.vector.complex.writer.BaseWriter;
-import org.apache.hadoop.fs.Path;
import org.realityforge.jsyslog.message.StructuredDataParameter;
import org.realityforge.jsyslog.message.SyslogMessage;
@@ -95,7 +94,7 @@ public class SyslogRecordReader extends AbstractRecordReader {
private void openFile() {
InputStream in;
try {
- in = fileSystem.open(new Path(fileWork.getPath()));
+ in = fileSystem.open(fileWork.getPath());
} catch (Exception e) {
throw UserException
.dataReadError(e)
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
index 25a0c080e..a52b48ded 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionDescriptor.java
@@ -36,6 +36,7 @@ import org.apache.drill.exec.store.hive.HiveUtilities;
import org.apache.drill.exec.store.hive.HiveReadEntry;
import org.apache.drill.exec.store.hive.HiveScan;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.Partition;
import java.util.BitSet;
@@ -87,9 +88,9 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor {
}
@Override
- public String getBaseTableLocation() {
+ public Path getBaseTableLocation() {
HiveReadEntry origEntry = ((HiveScan) scanRel.getGroupScan()).getHiveReadEntry();
- return origEntry.table.getTable().getSd().getLocation();
+ return new Path(origEntry.table.getTable().getSd().getLocation());
}
@Override
@@ -145,7 +146,7 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor {
List<PartitionLocation> locations = new LinkedList<>();
HiveReadEntry origEntry = ((HiveScan) scanRel.getGroupScan()).getHiveReadEntry();
for (Partition partition: origEntry.getPartitions()) {
- locations.add(new HivePartitionLocation(partition.getValues(), partition.getSd().getLocation()));
+ locations.add(new HivePartitionLocation(partition.getValues(), new Path(partition.getSd().getLocation())));
}
locationSuperList = Lists.partition(locations, PartitionDescriptor.PARTITION_BATCH_SIZE);
sublistsCreated = true;
@@ -170,7 +171,7 @@ public class HivePartitionDescriptor extends AbstractPartitionDescriptor {
List<HiveTableWrapper.HivePartitionWrapper> newPartitions = Lists.newLinkedList();
for (HiveTableWrapper.HivePartitionWrapper part: oldPartitions) {
- String partitionLocation = part.getPartition().getSd().getLocation();
+ Path partitionLocation = new Path(part.getPartition().getSd().getLocation());
for (PartitionLocation newPartitionLocation: newPartitionLocations) {
if (partitionLocation.equals(newPartitionLocation.getEntirePartitionLocation())) {
newPartitions.add(part);
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java
index bb0efe841..25821f58a 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/HivePartitionLocation.java
@@ -19,17 +19,19 @@ package org.apache.drill.exec.planner.sql;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
import org.apache.drill.exec.planner.SimplePartitionLocation;
+import org.apache.hadoop.fs.Path;
import java.util.List;
public class HivePartitionLocation extends SimplePartitionLocation {
- private final String partitionLocation;
+ private final Path partitionLocation;
private final List<String> partitionValues;
- public HivePartitionLocation(final List<String> partitionValues, final String partitionLocation) {
+ public HivePartitionLocation(List<String> partitionValues, Path partitionLocation) {
this.partitionValues = ImmutableList.copyOf(partitionValues);
this.partitionLocation = partitionLocation;
}
+
@Override
public String getPartitionValue(int index) {
assert index < partitionValues.size();
@@ -37,7 +39,7 @@ public class HivePartitionLocation extends SimplePartitionLocation {
}
@Override
- public String getEntirePartitionLocation() {
+ public Path getEntirePartitionLocation() {
return partitionLocation;
}
}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
index bea06e073..78e107a85 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetRowGroupScan.java
@@ -124,7 +124,7 @@ public class HiveDrillNativeParquetRowGroupScan extends AbstractParquetRowGroupS
@Override
public Configuration getFsConf(RowGroupReadEntry rowGroupReadEntry) throws IOException {
- Path path = new Path(rowGroupReadEntry.getPath()).getParent();
+ Path path = rowGroupReadEntry.getPath().getParent();
return new ProjectionPusher().pushProjectionsAndFilters(
new JobConf(HiveUtilities.generateHiveConf(hiveStoragePlugin.getHiveConf(), confProperties)),
path.getParent());
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
index 617f6a59f..79a07f17b 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScan.java
@@ -114,7 +114,7 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
assert split instanceof FileSplit;
FileSplit fileSplit = (FileSplit) split;
Path finalPath = fileSplit.getPath();
- String pathString = Path.getPathWithoutSchemeAndAuthority(finalPath).toString();
+ Path pathString = Path.getPathWithoutSchemeAndAuthority(finalPath);
entries.add(new ReadEntryWithPath(pathString));
// store partition values per path
@@ -205,7 +205,7 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
protected void initInternal() throws IOException {
Map<FileStatus, FileSystem> fileStatusConfMap = new LinkedHashMap<>();
for (ReadEntryWithPath entry : entries) {
- Path path = new Path(entry.getPath());
+ Path path = entry.getPath();
Configuration conf = new ProjectionPusher().pushProjectionsAndFilters(
new JobConf(hiveStoragePlugin.getHiveConf()),
path.getParent());
@@ -221,7 +221,7 @@ public class HiveDrillNativeParquetScan extends AbstractParquetGroupScan {
}
@Override
- protected AbstractParquetGroupScan cloneWithFileSelection(Collection<String> filePaths) throws IOException {
+ protected AbstractParquetGroupScan cloneWithFileSelection(Collection<Path> filePaths) throws IOException {
FileSelection newSelection = new FileSelection(null, new ArrayList<>(filePaths), null, null, false);
return clone(newSelection);
}
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScanBatchCreator.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScanBatchCreator.java
index 339e1bda4..be906febe 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScanBatchCreator.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveDrillNativeParquetScanBatchCreator.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.parquet.AbstractParquetScanBatchCreator;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.util.HashMap;
@@ -54,7 +55,7 @@ public class HiveDrillNativeParquetScanBatchCreator extends AbstractParquetScanB
*/
private class HiveDrillNativeParquetDrillFileSystemManager extends AbstractDrillFileSystemManager {
- private final Map<String, DrillFileSystem> fileSystems;
+ private final Map<Path, DrillFileSystem> fileSystems;
HiveDrillNativeParquetDrillFileSystemManager(OperatorContext operatorContext) {
super(operatorContext);
@@ -62,7 +63,7 @@ public class HiveDrillNativeParquetScanBatchCreator extends AbstractParquetScanB
}
@Override
- protected DrillFileSystem get(Configuration config, String path) throws ExecutionSetupException {
+ protected DrillFileSystem get(Configuration config, Path path) throws ExecutionSetupException {
DrillFileSystem fs = fileSystems.get(path);
if (fs == null) {
try {
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartitionHolder.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartitionHolder.java
index 803144e0b..3e16acba4 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartitionHolder.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HivePartitionHolder.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.store.hive;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.hadoop.fs.Path;
import java.util.ArrayList;
import java.util.Collections;
@@ -34,11 +35,11 @@ import java.util.Map;
*/
public class HivePartitionHolder {
- private final Map<String, Integer> keyToIndexMapper;
+ private final Map<Path, Integer> keyToIndexMapper;
private final List<List<String>> partitionValues;
@JsonCreator
- public HivePartitionHolder(@JsonProperty("keyToIndexMapper") Map<String, Integer> keyToIndexMapper,
+ public HivePartitionHolder(@JsonProperty("keyToIndexMapper") Map<Path, Integer> keyToIndexMapper,
@JsonProperty("partitionValues") List<List<String>> partitionValues) {
this.keyToIndexMapper = keyToIndexMapper;
this.partitionValues = partitionValues;
@@ -50,7 +51,7 @@ public class HivePartitionHolder {
}
@JsonProperty
- public Map<String, Integer> getKeyToIndexMapper() {
+ public Map<Path, Integer> getKeyToIndexMapper() {
return keyToIndexMapper;
}
@@ -67,7 +68,7 @@ public class HivePartitionHolder {
* @param key mapper key
* @param values partition values
*/
- public void add(String key, List<String> values) {
+ public void add(Path key, List<String> values) {
int index = partitionValues.indexOf(values);
if (index == -1) {
index = partitionValues.size();
@@ -84,7 +85,7 @@ public class HivePartitionHolder {
* @param key mapper key
* @return list of partition values
*/
- public List<String> get(String key) {
+ public List<String> get(Path key) {
Integer index = keyToIndexMapper.get(key);
if (index == null) {
return Collections.emptyList();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
index f73afb117..6eba65833 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
@@ -45,7 +45,8 @@ public class PhysicalPlan {
Graph<PhysicalOperator, Root, Leaf> graph;
@JsonCreator
- public PhysicalPlan(@JsonProperty("head") PlanProperties properties, @JsonProperty("graph") List<PhysicalOperator> operators){
+ public PhysicalPlan(@JsonProperty("head") PlanProperties properties,
+ @JsonProperty("graph") List<PhysicalOperator> operators) {
this.properties = properties;
this.graph = Graph.newGraph(operators, Root.class, Leaf.class);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
index 7e2623a19..03b53a4f7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractGroupScan.java
@@ -34,6 +34,7 @@ import org.apache.drill.exec.planner.physical.PlannerSettings;
import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.hadoop.fs.Path;
public abstract class AbstractGroupScan extends AbstractBase implements GroupScan {
@@ -171,7 +172,7 @@ public abstract class AbstractGroupScan extends AbstractBase implements GroupSca
}
@Override
- public Collection<String> getFiles() {
+ public Collection<Path> getFiles() {
return null;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
index e42ae2dc8..6dbad224c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/GroupScan.java
@@ -30,6 +30,7 @@ import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.hadoop.fs.Path;
/**
* A GroupScan operator represents all data which will be scanned by a given physical
@@ -142,8 +143,10 @@ public interface GroupScan extends Scan, HasAffinity{
/**
* Returns a collection of file names associated with this GroupScan. This should be called after checking
* hasFiles(). If this GroupScan cannot provide file names, it returns null.
+ *
+ * @return collection of files paths
*/
- Collection<String> getFiles();
+ Collection<Path> getFiles();
@JsonIgnore
LogicalExpression getFilter();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SchemalessScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SchemalessScan.java
index 999c417fc..fb60ddddb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SchemalessScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/SchemalessScan.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.Path;
import java.util.List;
@@ -32,17 +33,17 @@ import java.util.List;
@JsonTypeName("schemaless-scan")
public class SchemalessScan extends AbstractFileGroupScan implements SubScan {
- private final String selectionRoot;
+ private final Path selectionRoot;
@JsonCreator
public SchemalessScan(@JsonProperty("userName") String userName,
- @JsonProperty("selectionRoot") String selectionRoot,
+ @JsonProperty("selectionRoot") Path selectionRoot,
@JsonProperty("columns") List<SchemaPath> columns) {
this(userName, selectionRoot);
}
public SchemalessScan(@JsonProperty("userName") String userName,
- @JsonProperty("selectionRoot") String selectionRoot) {
+ @JsonProperty("selectionRoot") Path selectionRoot) {
super(userName);
this.selectionRoot = selectionRoot;
}
@@ -53,7 +54,7 @@ public class SchemalessScan extends AbstractFileGroupScan implements SubScan {
}
@JsonProperty
- public String getSelectionRoot() {
+ public Path getSelectionRoot() {
return selectionRoot;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java
index 5a8c526b5..8352dfa07 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/BaseFileScanFramework.java
@@ -110,7 +110,7 @@ public abstract class BaseFileScanFramework<T extends BaseFileScanFramework.File
List<Path> paths = new ArrayList<>();
for (FileWork work : files) {
- Path path = dfs.makeQualified(new Path(work.getPath()));
+ Path path = dfs.makeQualified(work.getPath());
paths.add(path);
FileSplit split = new FileSplit(path, work.getStart(), work.getLength(), new String[]{""});
spilts.add(split);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadata.java
index ff449d44e..2eb8af932 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/scan/file/FileMetadata.java
@@ -52,7 +52,7 @@ public class FileMetadata {
return;
}
- dirPath = ColumnExplorer.parsePartitions(filePath.toString(), rootPath.toString());
+ dirPath = ColumnExplorer.parsePartitions(filePath, rootPath, false);
if (dirPath == null) {
throw new IllegalArgumentException(
String.format("Selection root of \"%s\" is not a leading path of \"%s\"",
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
index cf0125692..d4148278d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/AbstractPartitionDescriptor.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.calcite.rel.core.TableScan;
import org.apache.drill.exec.store.dfs.MetadataContext;
+import org.apache.hadoop.fs.Path;
/**
* Abstract base class for file system based partition descriptors and Hive partition descriptors.
@@ -65,7 +66,7 @@ public abstract class AbstractPartitionDescriptor implements PartitionDescriptor
@Override
- public TableScan createTableScan(List<PartitionLocation> newPartitions, String cacheFileRoot,
+ public TableScan createTableScan(List<PartitionLocation> newPartitions, Path cacheFileRoot,
boolean isAllPruned, MetadataContext metaContext) throws Exception {
throw new UnsupportedOperationException();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSDirPartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSDirPartitionLocation.java
index 46afb30f5..440178b47 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSDirPartitionLocation.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSDirPartitionLocation.java
@@ -21,7 +21,9 @@
package org.apache.drill.exec.planner;
+import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
import java.util.Collection;
import java.util.List;
@@ -46,7 +48,7 @@ public class DFSDirPartitionLocation implements PartitionLocation {
}
@Override
- public String getEntirePartitionLocation() {
+ public Path getEntirePartitionLocation() {
throw new UnsupportedOperationException("Should not call getEntirePartitionLocation for composite partition location!");
}
@@ -67,15 +69,17 @@ public class DFSDirPartitionLocation implements PartitionLocation {
}
@Override
- public String getCompositePartitionPath() {
- String path = "";
- for (int i=0; i < dirs.length; i++) {
- if (dirs[i] == null) { // get the prefix
+ public Path getCompositePartitionPath() {
+ StringBuilder path = new StringBuilder();
+ for (String dir : dirs) {
+ if (dir == null) { // get the prefix
break;
}
- path += "/" + dirs[i];
+ path.append("/")
+ .append(dir);
}
- return path;
+
+ return DrillFileSystemUtil.createPathSafe(path.toString());
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSFilePartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSFilePartitionLocation.java
index ecfa6220d..0dfe31634 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSFilePartitionLocation.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/DFSFilePartitionLocation.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.planner;
+import org.apache.drill.exec.store.ColumnExplorer;
import org.apache.hadoop.fs.Path;
/**
@@ -24,28 +25,14 @@ import org.apache.hadoop.fs.Path;
*/
public class DFSFilePartitionLocation extends SimplePartitionLocation {
private final String[] dirs;
- private final String file;
+ private final Path file;
- public DFSFilePartitionLocation(int max, String selectionRoot, String file, boolean hasDirsOnly) {
+ public DFSFilePartitionLocation(int max, Path selectionRoot, Path file, boolean hasDirsOnly) {
this.file = file;
this.dirs = new String[max];
- // strip the scheme and authority if they exist
- selectionRoot = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot)).toString();
-
- int start = file.indexOf(selectionRoot) + selectionRoot.length();
- String postPath = file.substring(start);
- if (postPath.length() == 0) {
- return;
- }
- if(postPath.charAt(0) == '/'){
- postPath = postPath.substring(1);
- }
- String[] mostDirs = postPath.split("/");
- int maxLoop = Math.min(max, hasDirsOnly ? mostDirs.length : mostDirs.length - 1);
- for(int i =0; i < maxLoop; i++) {
- this.dirs[i] = mostDirs[i];
- }
+ String[] dirs = ColumnExplorer.parsePartitions(this.file, selectionRoot, hasDirsOnly);
+ System.arraycopy(dirs, 0, this.dirs, 0, Math.min(max, dirs.length));
}
/**
@@ -64,7 +51,7 @@ public class DFSFilePartitionLocation extends SimplePartitionLocation {
* @return The partition location.
*/
@Override
- public String getEntirePartitionLocation() {
+ public Path getEntirePartitionLocation() {
return file;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
index 65268923d..188049003 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
@@ -26,6 +26,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.common.util.GuavaUtils;
import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
@@ -37,7 +39,6 @@ import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
import org.apache.calcite.prepare.RelOptTableImpl;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.util.BitSets;
-import org.apache.calcite.util.Pair;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
@@ -54,6 +55,7 @@ import org.apache.drill.exec.store.dfs.FormatSelection;
import org.apache.drill.exec.store.dfs.MetadataContext;
import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.fs.Path;
// partition descriptor for file system based tables
@@ -145,18 +147,18 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
}
@Override
- public String getBaseTableLocation() {
+ public Path getBaseTableLocation() {
final FormatSelection origSelection = (FormatSelection) table.getSelection();
- return origSelection.getSelection().selectionRoot;
+ return origSelection.getSelection().getSelectionRoot();
}
@Override
protected void createPartitionSublists() {
- final Pair<Collection<String>, Boolean> fileLocationsAndStatus = getFileLocationsAndStatus();
+ final Pair<Collection<Path>, Boolean> fileLocationsAndStatus = getFileLocationsAndStatus();
List<PartitionLocation> locations = new LinkedList<>();
- boolean hasDirsOnly = fileLocationsAndStatus.right;
+ boolean hasDirsOnly = fileLocationsAndStatus.getRight();
- final String selectionRoot = getBaseTableLocation();
+ final Path selectionRoot = getBaseTableLocation();
// map used to map the partition keys (dir0, dir1, ..), to the list of partitions that share the same partition keys.
// For example,
@@ -166,35 +168,31 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
// Figure out the list of leaf subdirectories. For each leaf subdirectory, find the list of files (DFSFilePartitionLocation)
// it contains.
- for (String file: fileLocationsAndStatus.left) {
- DFSFilePartitionLocation dfsFilePartitionLocation = new DFSFilePartitionLocation(MAX_NESTED_SUBDIRS, selectionRoot, file, hasDirsOnly);
+ for (Path file: fileLocationsAndStatus.getLeft()) {
+ DFSFilePartitionLocation dfsFilePartitionLocation = new DFSFilePartitionLocation(MAX_NESTED_SUBDIRS,
+ selectionRoot, file, hasDirsOnly);
+ List<String> dirList = Arrays.asList(dfsFilePartitionLocation.getDirs());
- final String[] dirs = dfsFilePartitionLocation.getDirs();
- final List<String> dirList = Arrays.asList(dirs);
-
- if (!dirToFileMap.containsKey(dirList)) {
- dirToFileMap.put(dirList, new ArrayList<PartitionLocation>());
- }
+ dirToFileMap.putIfAbsent(dirList, new ArrayList<>());
dirToFileMap.get(dirList).add(dfsFilePartitionLocation);
}
// build a list of DFSDirPartitionLocation.
- for (final List<String> dirs : dirToFileMap.keySet()) {
- locations.add( new DFSDirPartitionLocation(dirs.toArray(new String[dirs.size()]), dirToFileMap.get(dirs)));
- }
+ dirToFileMap.keySet().stream()
+ .map(dirs -> new DFSDirPartitionLocation(dirs.toArray(new String[dirs.size()]), dirToFileMap.get(dirs)))
+ .forEach(locations::add);
locationSuperList = Lists.partition(locations, PartitionDescriptor.PARTITION_BATCH_SIZE);
sublistsCreated = true;
}
- protected Pair<Collection<String>, Boolean> getFileLocationsAndStatus() {
- Collection<String> fileLocations = null;
- Pair<Collection<String>, Boolean> fileLocationsAndStatus = null;
+ protected Pair<Collection<Path>, Boolean> getFileLocationsAndStatus() {
+ Collection<Path> fileLocations = null;
boolean isExpandedPartial = false;
if (scanRel instanceof DrillScanRel) {
// If a particular GroupScan provides files, get the list of files from there rather than
// DrillTable because GroupScan would have the updated version of the selection
- final DrillScanRel drillScan = (DrillScanRel) scanRel;
+ DrillScanRel drillScan = (DrillScanRel) scanRel;
if (drillScan.getGroupScan().hasFiles()) {
fileLocations = drillScan.getGroupScan().getFiles();
isExpandedPartial = false;
@@ -208,67 +206,59 @@ public class FileSystemPartitionDescriptor extends AbstractPartitionDescriptor {
fileLocations = selection.getFiles();
isExpandedPartial = selection.isExpandedPartial();
}
- fileLocationsAndStatus = Pair.of(fileLocations, isExpandedPartial);
- return fileLocationsAndStatus;
+ return Pair.of(fileLocations, isExpandedPartial);
}
@Override
- public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, String cacheFileRoot,
+ public TableScan createTableScan(List<PartitionLocation> newPartitionLocation, Path cacheFileRoot,
boolean wasAllPartitionsPruned, MetadataContext metaContext) throws Exception {
- List<String> newFiles = Lists.newArrayList();
- for (final PartitionLocation location : newPartitionLocation) {
+ List<Path> newFiles = new ArrayList<>();
+ for (PartitionLocation location : newPartitionLocation) {
if (!location.isCompositePartition()) {
newFiles.add(location.getEntirePartitionLocation());
} else {
final Collection<SimplePartitionLocation> subPartitions = location.getPartitionLocationRecursive();
- for (final PartitionLocation subPart : subPartitions) {
+ for (PartitionLocation subPart : subPartitions) {
newFiles.add(subPart.getEntirePartitionLocation());
}
}
}
+ FormatSelection formatSelection = (FormatSelection) table.getSelection();
+ FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation(),
+ cacheFileRoot, wasAllPartitionsPruned, formatSelection.getSelection().getDirStatus());
+ newFileSelection.setMetaContext(metaContext);
+ RelOptTable relOptTable = scanRel.getTable();
+
if (scanRel instanceof DrillScanRel) {
- final FormatSelection formatSelection = (FormatSelection)table.getSelection();
- final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation(),
- cacheFileRoot, wasAllPartitionsPruned, formatSelection.getSelection().getDirStatus());
- newFileSelection.setMetaContext(metaContext);
- final FileGroupScan newGroupScan =
- ((FileGroupScan)((DrillScanRel)scanRel).getGroupScan()).clone(newFileSelection);
+ FileGroupScan newGroupScan =
+ ((FileGroupScan) ((DrillScanRel) scanRel).getGroupScan()).clone(newFileSelection);
return new DrillScanRel(scanRel.getCluster(),
scanRel.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
- scanRel.getTable(),
+ relOptTable,
newGroupScan,
scanRel.getRowType(),
((DrillScanRel) scanRel).getColumns(),
true /*filter pushdown*/);
} else if (scanRel instanceof EnumerableTableScan) {
- return createNewTableScanFromSelection((EnumerableTableScan)scanRel, newFiles, cacheFileRoot,
- wasAllPartitionsPruned, metaContext);
+ FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection);
+
+ DynamicDrillTable dynamicDrillTable = new DynamicDrillTable(table.getPlugin(), table.getStorageEngineName(),
+ table.getUserName(), newFormatSelection);
+ /* Copy statistics from the original relOptTable */
+ dynamicDrillTable.setStatsTable(table.getStatsTable());
+ DrillTranslatableTable newTable = new DrillTranslatableTable(dynamicDrillTable);
+
+ RelOptTableImpl newOptTableImpl = RelOptTableImpl.create(relOptTable.getRelOptSchema(), relOptTable.getRowType(),
+ newTable, GuavaUtils.convertToUnshadedImmutableList(ImmutableList.of()));
+
+ // return an EnumerableTableScan with fileSelection being part of digest of TableScan node.
+ return DirPrunedEnumerableTableScan.create(scanRel.getCluster(), newOptTableImpl, newFileSelection.toString());
} else {
throw new UnsupportedOperationException("Only DrillScanRel and EnumerableTableScan is allowed!");
}
}
- private TableScan createNewTableScanFromSelection(EnumerableTableScan oldScan, List<String> newFiles, String cacheFileRoot,
- boolean wasAllPartitionsPruned, MetadataContext metaContext) {
- final RelOptTableImpl t = (RelOptTableImpl) oldScan.getTable();
- final FormatSelection formatSelection = (FormatSelection) table.getSelection();
- final FileSelection newFileSelection = new FileSelection(null, newFiles, getBaseTableLocation(),
- cacheFileRoot, wasAllPartitionsPruned, formatSelection.getSelection().getDirStatus());
- newFileSelection.setMetaContext(metaContext);
- final FormatSelection newFormatSelection = new FormatSelection(formatSelection.getFormat(), newFileSelection);
- final DynamicDrillTable dynamicDrillTable = new DynamicDrillTable(table.getPlugin(), table.getStorageEngineName(),
- table.getUserName(), newFormatSelection);
- /* Copy statistics from the original table */
- dynamicDrillTable.setStatsTable(table.getStatsTable());
- final DrillTranslatableTable newTable = new DrillTranslatableTable(dynamicDrillTable);
- final RelOptTableImpl newOptTableImpl = RelOptTableImpl.create(t.getRelOptSchema(), t.getRowType(), newTable,
- GuavaUtils.convertToUnshadedImmutableList(ImmutableList.of()));
-
- // return an EnumerableTableScan with fileSelection being part of digest of TableScan node.
- return DirPrunedEnumerableTableScan.create(oldScan.getCluster(), newOptTableImpl, newFileSelection.toString());
- }
-
@Override
public TableScan createTableScan(List<PartitionLocation> newPartitionLocation,
boolean wasAllPartitionsPruned) throws Exception {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
index df6deccb0..23675d1db 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionDescriptor.java
@@ -134,17 +134,17 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor {
}
@Override
- public String getBaseTableLocation() {
+ public Path getBaseTableLocation() {
final FormatSelection origSelection = (FormatSelection) scanRel.getDrillTable().getSelection();
return origSelection.getSelection().selectionRoot;
}
@Override
public TableScan createTableScan(List<PartitionLocation> newPartitionLocation,
- String cacheFileRoot,
+ Path cacheFileRoot,
boolean wasAllPartitionsPruned,
MetadataContext metaContext) throws Exception {
- List<String> newFiles = new ArrayList<>();
+ List<Path> newFiles = new ArrayList<>();
for (final PartitionLocation location : newPartitionLocation) {
newFiles.add(location.getEntirePartitionLocation());
}
@@ -172,17 +172,17 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor {
@Override
protected void createPartitionSublists() {
- Set<String> fileLocations = groupScan.getFileSet();
+ Set<Path> fileLocations = groupScan.getFileSet();
List<PartitionLocation> locations = new LinkedList<>();
- for (String file : fileLocations) {
+ for (Path file : fileLocations) {
locations.add(new ParquetPartitionLocation(file));
}
locationSuperList = Lists.partition(locations, PartitionDescriptor.PARTITION_BATCH_SIZE);
sublistsCreated = true;
}
- private GroupScan createNewGroupScan(List<String> newFiles,
- String cacheFileRoot,
+ private GroupScan createNewGroupScan(List<Path> newFiles,
+ Path cacheFileRoot,
boolean wasAllPartitionsPruned,
MetadataContext metaContext) throws IOException {
@@ -194,8 +194,8 @@ public class ParquetPartitionDescriptor extends AbstractPartitionDescriptor {
return groupScan.clone(newSelection);
}
- private void populatePruningVector(ValueVector v, int index, SchemaPath column, String file) {
- String path = Path.getPathWithoutSchemeAndAuthority(new Path(file)).toString();
+ private void populatePruningVector(ValueVector v, int index, SchemaPath column, Path file) {
+ Path path = Path.getPathWithoutSchemeAndAuthority(file);
TypeProtos.MajorType majorType = getVectorType(column, null);
TypeProtos.MinorType type = majorType.getMinorType();
switch (type) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionLocation.java
index 49e51094d..eec0d5ebe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionLocation.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/ParquetPartitionLocation.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.planner;
+import org.apache.hadoop.fs.Path;
+
/*
* PartitionLocation for the parquet auto partitioning scheme. We just store
* the location of each partition within this class. Since the partition value
@@ -25,9 +27,9 @@ package org.apache.drill.exec.planner;
* invoked.
*/
public class ParquetPartitionLocation extends SimplePartitionLocation {
- private final String file;
+ private final Path file;
- public ParquetPartitionLocation(String file) {
+ public ParquetPartitionLocation(Path file) {
this.file = file;
}
@@ -48,7 +50,7 @@ public class ParquetPartitionLocation extends SimplePartitionLocation {
* @return String location of the partition
*/
@Override
- public String getEntirePartitionLocation() {
+ public Path getEntirePartitionLocation() {
return file;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
index 220bf291e..8759078ad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionDescriptor.java
@@ -23,36 +23,52 @@ import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.store.dfs.MetadataContext;
import org.apache.drill.exec.vector.ValueVector;
+import org.apache.hadoop.fs.Path;
import java.util.BitSet;
import java.util.List;
import java.util.Map;
-// Interface used to describe partitions. Currently used by file system based partitions and hive partitions
+/**
+ * Interface used to describe partitions. Currently used by file system based partitions and hive partitions
+ */
public interface PartitionDescriptor extends Iterable<List<PartitionLocation>> {
- public static final int PARTITION_BATCH_SIZE = Character.MAX_VALUE;
+ int PARTITION_BATCH_SIZE = Character.MAX_VALUE;
- /* Get the hierarchy index of the given partition
+ /**
+ * Get the hierarchy index of the given partition
* For eg: if we have the partition laid out as follows
* 1997/q1/jan
- *
* then getPartitionHierarchyIndex("jan") => 2
+ *
+ * @param partitionName Partition name
+ * @return the index of specified partition name in the hierarchy
*/
- public int getPartitionHierarchyIndex(String partitionName);
+ int getPartitionHierarchyIndex(String partitionName);
- // Given a column name return boolean to indicate if its a partition column or not
- public boolean isPartitionName(String name);
+ /**
+ * Given a column name return boolean to indicate if its a partition column or not
+ *
+ * @param name of Partition
+ * @return true, if this is the partition name and vise versa.
+ */
+ boolean isPartitionName(String name);
/**
* Check to see if the name is a partition name.
+ *
* @param name The field name you want to compare to partition names.
* @return Return index if valid, otherwise return null;
*/
- public Integer getIdIfValid(String name);
+ Integer getIdIfValid(String name);
- // Maximum level of partition nesting/ hierarchy supported
- public int getMaxHierarchyLevel();
+ /**
+ * Maximum level of partition nesting/ hierarchy supported
+ *
+ * @return maximum supported level number of partition hierarchy
+ */
+ int getMaxHierarchyLevel();
/**
* Method creates an in memory representation of all the partitions. For each level of partitioning we
@@ -79,7 +95,7 @@ public interface PartitionDescriptor extends Iterable<List<PartitionLocation>> {
* @param wasAllPartitionsPruned
* @throws Exception
*/
- public TableScan createTableScan(List<PartitionLocation> newPartitions,
+ TableScan createTableScan(List<PartitionLocation> newPartitions,
boolean wasAllPartitionsPruned) throws Exception;
/**
@@ -91,11 +107,11 @@ public interface PartitionDescriptor extends Iterable<List<PartitionLocation>> {
* @param metaContext
* @throws Exception
*/
- public TableScan createTableScan(List<PartitionLocation> newPartitions, String cacheFileRoot,
+ TableScan createTableScan(List<PartitionLocation> newPartitions, Path cacheFileRoot,
boolean wasAllPartitionsPruned, MetadataContext metaContext) throws Exception;
- public boolean supportsMetadataCachePruning();
+ boolean supportsMetadataCachePruning();
- public String getBaseTableLocation();
+ Path getBaseTableLocation();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java
index 22088f7ee..2ef4e173d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PartitionLocation.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.planner;
+import org.apache.hadoop.fs.Path;
+
import java.util.List;
/**
@@ -37,27 +39,27 @@ public interface PartitionLocation {
/**
* Returns the value of the 'index' partition column
*/
- public String getPartitionValue(int index);
+ String getPartitionValue(int index);
/**
- * Returns the string representation of this partition.
+ * Returns the path of this partition.
* Only a non-composite partition supports this.
*/
- public String getEntirePartitionLocation();
+ Path getEntirePartitionLocation();
/**
* Returns the list of the non-composite partitions that this partition consists of.
*/
- public List<SimplePartitionLocation> getPartitionLocationRecursive();
+ List<SimplePartitionLocation> getPartitionLocationRecursive();
/**
* Returns if this is a simple or composite partition.
*/
- public boolean isCompositePartition();
+ boolean isCompositePartition();
/**
* Returns the path string of directory names only for composite partition
*/
- public String getCompositePartitionPath();
+ Path getCompositePartitionPath();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
index 20b06c430..920f9b2bd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PhysicalPlanReader.java
@@ -33,6 +33,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalOperatorUtil;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.record.MajorTypeSerDe;
+import org.apache.drill.exec.serialization.PathSerDe;
import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.pojo.DynamicPojoRecordReader;
@@ -44,43 +45,43 @@ import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.deser.std.StdDelegatingDeserializer;
import com.fasterxml.jackson.databind.module.SimpleModule;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.Path;
public class PhysicalPlanReader {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlanReader.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PhysicalPlanReader.class);
private final ObjectReader physicalPlanReader;
private final ObjectMapper mapper;
private final ObjectReader operatorReader;
private final ObjectReader logicalPlanReader;
- public PhysicalPlanReader(DrillConfig config, ScanResult scanResult, LogicalPlanPersistence lpPersistance, final DrillbitEndpoint endpoint,
- final StoragePluginRegistry pluginRegistry) {
+ public PhysicalPlanReader(DrillConfig config, ScanResult scanResult, LogicalPlanPersistence lpPersistance,
+ final DrillbitEndpoint endpoint, final StoragePluginRegistry pluginRegistry) {
ObjectMapper lpMapper = lpPersistance.getMapper();
// Endpoint serializer/deserializer.
- SimpleModule deserModule = new SimpleModule("PhysicalOperatorModule") //
- .addSerializer(DrillbitEndpoint.class, new DrillbitEndpointSerDe.Se()) //
- .addDeserializer(DrillbitEndpoint.class, new DrillbitEndpointSerDe.De()) //
+ SimpleModule serDeModule = new SimpleModule("PhysicalOperatorModule")
+ .addSerializer(DrillbitEndpoint.class, new DrillbitEndpointSerDe.Se())
+ .addDeserializer(DrillbitEndpoint.class, new DrillbitEndpointSerDe.De())
.addSerializer(MajorType.class, new MajorTypeSerDe.Se())
.addDeserializer(MajorType.class, new MajorTypeSerDe.De())
.addDeserializer(DynamicPojoRecordReader.class,
- new StdDelegatingDeserializer<>(new DynamicPojoRecordReader.Converter(lpMapper)));
+ new StdDelegatingDeserializer<>(new DynamicPojoRecordReader.Converter(lpMapper)))
+ .addSerializer(Path.class, new PathSerDe.Se());
- lpMapper.registerModule(deserModule);
+ lpMapper.registerModule(serDeModule);
Set<Class<? extends PhysicalOperator>> subTypes = PhysicalOperatorUtil.getSubTypes(scanResult);
- for (Class<? extends PhysicalOperator> subType : subTypes) {
- lpMapper.registerSubtypes(subType);
- }
+ subTypes.forEach(lpMapper::registerSubtypes);
lpMapper.registerSubtypes(DynamicPojoRecordReader.class);
- InjectableValues injectables = new InjectableValues.Std() //
- .addValue(StoragePluginRegistry.class, pluginRegistry) //
- .addValue(DrillbitEndpoint.class, endpoint); //
+ InjectableValues injectables = new InjectableValues.Std()
+ .addValue(StoragePluginRegistry.class, pluginRegistry)
+ .addValue(DrillbitEndpoint.class, endpoint);
this.mapper = lpMapper;
- this.physicalPlanReader = mapper.reader(PhysicalPlan.class).with(injectables);
- this.operatorReader = mapper.reader(PhysicalOperator.class).with(injectables);
- this.logicalPlanReader = mapper.reader(LogicalPlan.class).with(injectables);
+ this.physicalPlanReader = mapper.readerFor(PhysicalPlan.class).with(injectables);
+ this.operatorReader = mapper.readerFor(PhysicalOperator.class).with(injectables);
+ this.logicalPlanReader = mapper.readerFor(LogicalPlan.class).with(injectables);
}
public String writeJson(OptionList list) throws JsonProcessingException{
@@ -91,33 +92,35 @@ public class PhysicalPlanReader {
return mapper.writeValueAsString(op);
}
- public PhysicalPlan readPhysicalPlan(String json) throws JsonProcessingException, IOException {
+ public PhysicalPlan readPhysicalPlan(String json) throws IOException {
logger.debug("Reading physical plan {}", json);
return physicalPlanReader.readValue(json);
}
- public FragmentRoot readFragmentRoot(String json) throws JsonProcessingException, IOException {
+ public FragmentRoot readFragmentRoot(String json) throws IOException {
logger.debug("Attempting to read {}", json);
PhysicalOperator op = operatorReader.readValue(json);
- if(op instanceof FragmentRoot){
+ if (op instanceof FragmentRoot) {
return (FragmentRoot) op;
- }else{
- throw new UnsupportedOperationException(String.format("The provided json fragment doesn't have a FragmentRoot as its root operator. The operator was %s.", op.getClass().getCanonicalName()));
+ } else {
+ throw new UnsupportedOperationException(String.format("The provided json fragment doesn't have a FragmentRoot " +
+ "as its root operator. The operator was %s.", op.getClass().getCanonicalName()));
}
}
@VisibleForTesting
- public FragmentLeaf readFragmentLeaf(String json) throws JsonProcessingException, IOException {
+ public FragmentLeaf readFragmentLeaf(String json) throws IOException {
logger.debug("Attempting to read {}", json);
PhysicalOperator op = operatorReader.readValue(json);
if (op instanceof FragmentLeaf){
return (FragmentLeaf) op;
} else {
- throw new UnsupportedOperationException(String.format("The provided json fragment is not a FragmentLeaf. The operator was %s.", op.getClass().getCanonicalName()));
+ throw new UnsupportedOperationException(String.format("The provided json fragment is not a FragmentLeaf. " +
+ "The operator was %s.", op.getClass().getCanonicalName()));
}
}
- public LogicalPlan readLogicalPlan(String json) throws JsonProcessingException, IOException{
+ public LogicalPlan readLogicalPlan(String json) throws IOException{
logger.debug("Reading logical plan {}", json);
return logicalPlanReader.readValue(json);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimplePartitionLocation.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimplePartitionLocation.java
index 7c9afb088..ca00446fc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimplePartitionLocation.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/SimplePartitionLocation.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.planner;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.apache.hadoop.fs.Path;
import java.util.List;
@@ -26,14 +27,14 @@ import java.util.List;
* location of the entire partition and also stores the
* value of the individual partition keys for this partition.
*/
-public abstract class SimplePartitionLocation implements PartitionLocation{
+public abstract class SimplePartitionLocation implements PartitionLocation{
@Override
public boolean isCompositePartition() {
return false;
}
@Override
- public String getCompositePartitionPath() {
+ public Path getCompositePartitionPath() {
throw new UnsupportedOperationException();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
index 6f48b8508..8d3a8cec9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/partition/PruneScanRule.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.calcite.adapter.enumerable.EnumerableTableScan;
import org.apache.calcite.rel.core.Filter;
@@ -74,7 +75,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.drill.exec.vector.ValueVector;
-
+import org.apache.hadoop.fs.Path;
public abstract class PruneScanRule extends StoragePluginOptimizerRule {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PruneScanRule.class);
@@ -372,7 +373,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
// handle the case all partitions are filtered out.
boolean canDropFilter = true;
boolean wasAllPartitionsPruned = false;
- String cacheFileRoot = null;
+ Path cacheFileRoot = null;
if (newPartitions.isEmpty()) {
assert firstLocation != null;
@@ -388,7 +389,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
// set the cacheFileRoot appropriately
if (firstLocation.isCompositePartition()) {
- cacheFileRoot = descriptor.getBaseTableLocation() + firstLocation.getCompositePartitionPath();
+ cacheFileRoot = Path.mergePaths(descriptor.getBaseTableLocation(), firstLocation.getCompositePartitionPath());
}
}
@@ -408,7 +409,7 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
// if metadata cache file could potentially be used, then assign a proper cacheFileRoot
int index = -1;
if (!matchBitSet.isEmpty()) {
- String path = "";
+ StringBuilder path = new StringBuilder();
index = matchBitSet.length() - 1;
for (int j = 0; j < matchBitSet.length(); j++) {
@@ -419,10 +420,12 @@ public abstract class PruneScanRule extends StoragePluginOptimizerRule {
break;
}
}
- for (int j=0; j <= index; j++) {
- path += "/" + spInfo[j];
+ for (int j = 0; j <= index; j++) {
+ path.append("/")
+ .append(spInfo[j]);
}
- cacheFileRoot = descriptor.getBaseTableLocation() + path;
+ cacheFileRoot = Path.mergePaths(descriptor.getBaseTableLocation(),
+ DrillFileSystemUtil.createPathSafe(path.toString()));
}
if (index != maxIndex) {
// if multiple partitions are being selected, we should not drop the filter
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AnalyzeTableHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AnalyzeTableHandler.java
index f1acc71ba..8f62713c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AnalyzeTableHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/AnalyzeTableHandler.java
@@ -133,15 +133,13 @@ public class AnalyzeTableHandler extends DefaultSqlHandler {
DrillFileSystem fs = new DrillFileSystem(plugin.getFormatPlugin(
formatSelection.getFormat()).getFsConf());
- String selectionRoot = formatSelection.getSelection().getSelectionRoot();
- if (!selectionRoot.contains(tableName)
- || !fs.getFileStatus(new Path(selectionRoot)).isDirectory()) {
+ Path selectionRoot = formatSelection.getSelection().getSelectionRoot();
+ if (!selectionRoot.getName().equals(tableName) || !fs.getFileStatus(selectionRoot).isDirectory()) {
return DrillStatsTable.notSupported(context, tableName);
}
// Do not recompute statistics, if stale
- Path statsFilePath = new Path(new Path(selectionRoot), DotDrillType.STATS.getEnding());
- if (fs.exists(statsFilePath)
- && !isStatsStale(fs, statsFilePath)) {
+ Path statsFilePath = new Path(selectionRoot, DotDrillType.STATS.getEnding());
+ if (fs.exists(statsFilePath) && !isStatsStale(fs, statsFilePath)) {
return DrillStatsTable.notRequired(context, tableName);
}
}
@@ -165,13 +163,8 @@ public class AnalyzeTableHandler extends DefaultSqlHandler {
Path parentPath = statsFilePath.getParent();
FileStatus directoryStatus = fs.getFileStatus(parentPath);
// Parent directory modified after stats collection?
- if (directoryStatus.getModificationTime() > statsFileModifyTime) {
- return true;
- }
- if (tableModified(fs, parentPath, statsFileModifyTime)) {
- return true;
- }
- return false;
+ return directoryStatus.getModificationTime() > statsFileModifyTime ||
+ tableModified(fs, parentPath, statsFileModifyTime);
}
/* Determines if the table was modified after computing statistics based on
@@ -185,10 +178,8 @@ public class AnalyzeTableHandler extends DefaultSqlHandler {
return true;
}
// For a directory, we should recursively check sub-directories
- if (file.isDirectory()) {
- if (tableModified(fs, file.getPath(), statsModificationTime)) {
- return true;
- }
+ if (file.isDirectory() && tableModified(fs, file.getPath(), statsModificationTime)) {
+ return true;
}
}
return false;
@@ -215,8 +206,7 @@ public class AnalyzeTableHandler extends DefaultSqlHandler {
/* Converts to Drill logical plan */
protected DrillRel convertToDrel(RelNode relNode, AbstractSchema schema, String analyzeTableName,
- double samplePercent)
- throws RelConversionException, SqlUnsupportedException {
+ double samplePercent) throws SqlUnsupportedException {
DrillRel convertedRelNode = convertToRawDrel(relNode);
if (convertedRelNode instanceof DrillStoreRel) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
index 4684251e9..3ca778706 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/RefreshMetadataHandler.java
@@ -108,8 +108,8 @@ public class RefreshMetadataHandler extends DefaultSqlHandler {
FileSystemPlugin plugin = (FileSystemPlugin) drillTable.getPlugin();
DrillFileSystem fs = new DrillFileSystem(plugin.getFormatPlugin(formatSelection.getFormat()).getFsConf());
- String selectionRoot = formatSelection.getSelection().selectionRoot;
- if (!fs.getFileStatus(new Path(selectionRoot)).isDirectory()) {
+ Path selectionRoot = formatSelection.getSelection().getSelectionRoot();
+ if (!fs.getFileStatus(selectionRoot).isDirectory()) {
return notSupported(tableName);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/PathSerDe.java b/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/PathSerDe.java
new file mode 100644
index 000000000..03bb37e22
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/serialization/PathSerDe.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.serialization;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+/**
+ * Path serializer to simple String path. Without it the hadoop Path serialization creates a big JSON object.
+ */
+public class PathSerDe {
+
+ public static class Se extends JsonSerializer<Path> {
+
+ @Override
+ public void serialize(Path value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
+ gen.writeString(value.toUri().getPath());
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
index d5d9784b6..33b500018 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/ColumnExplorer.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
@@ -133,20 +134,20 @@ public class ColumnExplorer {
public static List<String> getPartitionColumnNames(FileSelection selection, SchemaConfig schemaConfig) {
int partitionsCount = 0;
// a depth of table root path
- int rootDepth = new Path(selection.getSelectionRoot()).depth();
+ int rootDepth = selection.getSelectionRoot().depth();
- for (String file : selection.getFiles()) {
+ for (Path file : selection.getFiles()) {
// Calculates partitions count for the concrete file:
// depth of file path - depth of table root path - 1.
// The depth of file path includes file itself,
// so we should subtract 1 to consider only directories.
- int currentPartitionsCount = new Path(file).depth() - rootDepth - 1;
+ int currentPartitionsCount = file.depth() - rootDepth - 1;
// max depth of files path should be used to handle all partitions
partitionsCount = Math.max(partitionsCount, currentPartitionsCount);
}
String partitionColumnLabel = schemaConfig.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
- List<String> partitions = Lists.newArrayList();
+ List<String> partitions = new ArrayList<>();
// generates partition column names: dir0, dir1 etc.
for (int i = 0; i < partitionsCount; i++) {
@@ -165,7 +166,7 @@ public class ColumnExplorer {
* @param includeFileImplicitColumns if file implicit columns should be included into the result
* @return implicit columns map
*/
- public Map<String, String> populateImplicitColumns(String filePath,
+ public Map<String, String> populateImplicitColumns(Path filePath,
List<String> partitionValues,
boolean includeFileImplicitColumns) {
Map<String, String> implicitValues = new LinkedHashMap<>();
@@ -177,7 +178,7 @@ public class ColumnExplorer {
}
if (includeFileImplicitColumns) {
- Path path = Path.getPathWithoutSchemeAndAuthority(new Path(filePath));
+ Path path = Path.getPathWithoutSchemeAndAuthority(filePath);
for (Map.Entry<String, ImplicitFileColumns> entry : selectedImplicitColumns.entrySet()) {
implicitValues.put(entry.getKey(), entry.getValue().getValue(path));
}
@@ -189,15 +190,16 @@ public class ColumnExplorer {
/**
* Compares root and file path to determine directories
* that are present in the file path but absent in root.
- * Example: root - a/b/c, filePath - a/b/c/d/e/0_0_0.parquet, result - d/e.
+ * Example: root - a/b/c, file - a/b/c/d/e/0_0_0.parquet, result - d/e.
* Stores different directory names in the list in successive order.
*
- * @param filePath file path
+ * @param file file path
* @param root root directory
+ * @param hasDirsOnly whether it is file or directory
* @return list of directory names
*/
- public static List<String> listPartitionValues(String filePath, String root) {
- String[] dirs = parsePartitions(filePath, root);
+ public static List<String> listPartitionValues(Path file, Path root, boolean hasDirsOnly) {
+ String[] dirs = parsePartitions(file, root, hasDirsOnly);
if (dirs == null) {
return Collections.emptyList();
}
@@ -208,21 +210,23 @@ public class ColumnExplorer {
* Low-level parse of partitions, returned as a string array. Returns a
* null array for invalid values.
*
- * @param filePath file path
+ * @param file file path
* @param root root directory
+ * @param hasDirsOnly whether it is file or directory
* @return array of directory names, or null if the arguments are invalid
*/
- public static String[] parsePartitions(String filePath, String root) {
- if (filePath == null || root == null) {
+ public static String[] parsePartitions(Path file, Path root, boolean hasDirsOnly) {
+ if (file == null || root == null) {
return null;
}
- int rootDepth = new Path(root).depth();
- Path path = new Path(filePath);
- int parentDepth = path.getParent().depth();
-
- int diffCount = parentDepth - rootDepth;
+ if (!hasDirsOnly) {
+ file = file.getParent();
+ }
+ int rootDepth = root.depth();
+ int fileDepth = file.depth();
+ int diffCount = fileDepth - rootDepth;
if (diffCount < 0) {
return null;
}
@@ -230,10 +234,10 @@ public class ColumnExplorer {
String[] diffDirectoryNames = new String[diffCount];
// start filling in array from the end
- for (int i = rootDepth; parentDepth > i; i++) {
- path = path.getParent();
+ for (int i = rootDepth; fileDepth > i; i++) {
// place in the end of array
- diffDirectoryNames[parentDepth - i - 1] = path.getName();
+ diffDirectoryNames[fileDepth - i - 1] = file.getName();
+ file = file.getParent();
}
return diffDirectoryNames;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroDrillTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroDrillTable.java
index 50d7ee8f9..10d90d4b7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroDrillTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroDrillTable.java
@@ -54,8 +54,8 @@ public class AvroDrillTable extends DrillTable {
SchemaConfig schemaConfig,
FormatSelection selection) {
super(storageEngineName, plugin, schemaConfig.getUserName(), selection);
- List<String> asFiles = selection.getAsFiles();
- Path path = new Path(asFiles.get(0));
+ List<Path> asFiles = selection.getAsFiles();
+ Path path = asFiles.get(0);
this.schemaConfig = schemaConfig;
try {
reader = new DataFileReader<>(new FsInput(path, plugin.getFsConf()), new GenericDatumReader<GenericContainer>());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
index 1d7226ad1..07444e972 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/avro/AvroRecordReader.java
@@ -90,13 +90,13 @@ public class AvroRecordReader extends AbstractRecordReader {
public AvroRecordReader(final FragmentContext fragmentContext,
- final String inputPath,
+ final Path inputPath,
final long start,
final long length,
final FileSystem fileSystem,
final List<SchemaPath> projectedColumns,
final String userName) {
- hadoop = new Path(inputPath);
+ hadoop = inputPath;
this.start = start;
this.end = start + length;
buffer = fragmentContext.getManagedBuffer();
@@ -111,12 +111,8 @@ public class AvroRecordReader extends AbstractRecordReader {
private DataFileReader<GenericContainer> getReader(final Path hadoop, final FileSystem fs) throws ExecutionSetupException {
try {
final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(this.opUserName, this.queryUserName);
- return ugi.doAs(new PrivilegedExceptionAction<DataFileReader<GenericContainer>>() {
- @Override
- public DataFileReader<GenericContainer> run() throws Exception {
- return new DataFileReader<>(new FsInput(hadoop, fs.getConf()), new GenericDatumReader<GenericContainer>());
- }
- });
+ return ugi.doAs((PrivilegedExceptionAction<DataFileReader<GenericContainer>>) () ->
+ new DataFileReader<>(new FsInput(hadoop, fs.getConf()), new GenericDatumReader<GenericContainer>()));
} catch (IOException | InterruptedException e) {
throw new ExecutionSetupException(
String.format("Error in creating avro reader for file: %s", hadoop), e);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
index 2fa9558b0..902abdab0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FileSelection.java
@@ -19,12 +19,15 @@ package org.apache.drill.exec.store.dfs;
import java.io.IOException;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.base.Strings;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.DrillRuntimeException;
@@ -41,15 +44,15 @@ public class FileSelection {
private List<FileStatus> statuses;
- public List<String> files;
+ public List<Path> files;
/**
* root path for the selections
*/
- public final String selectionRoot;
+ public final Path selectionRoot;
/**
* root path for the metadata cache file (if any)
*/
- public final String cacheFileRoot;
+ public final Path cacheFileRoot;
/**
* metadata context useful for metadata operations (if any)
@@ -82,17 +85,17 @@ public class FileSelection {
* @param files list of files
* @param selectionRoot root path for selections
*/
- public FileSelection(final List<FileStatus> statuses, final List<String> files, final String selectionRoot) {
+ public FileSelection(List<FileStatus> statuses, List<Path> files, Path selectionRoot) {
this(statuses, files, selectionRoot, null, false, StatusType.NOT_CHECKED);
}
- public FileSelection(final List<FileStatus> statuses, final List<String> files, final String selectionRoot,
- final String cacheFileRoot, final boolean wasAllPartitionsPruned) {
+ public FileSelection(List<FileStatus> statuses, List<Path> files, Path selectionRoot, Path cacheFileRoot,
+ boolean wasAllPartitionsPruned) {
this(statuses, files, selectionRoot, cacheFileRoot, wasAllPartitionsPruned, StatusType.NOT_CHECKED);
}
- public FileSelection(final List<FileStatus> statuses, final List<String> files, final String selectionRoot,
- final String cacheFileRoot, final boolean wasAllPartitionsPruned, final StatusType dirStatus) {
+ public FileSelection(List<FileStatus> statuses, List<Path> files, Path selectionRoot, Path cacheFileRoot,
+ boolean wasAllPartitionsPruned, StatusType dirStatus) {
this.statuses = statuses;
this.files = files;
this.selectionRoot = selectionRoot;
@@ -104,7 +107,7 @@ public class FileSelection {
/**
* Copy constructor for convenience.
*/
- protected FileSelection(final FileSelection selection) {
+ protected FileSelection(FileSelection selection) {
Preconditions.checkNotNull(selection, "selection cannot be null");
this.statuses = selection.statuses;
this.files = selection.files;
@@ -116,17 +119,17 @@ public class FileSelection {
this.wasAllPartitionsPruned = selection.wasAllPartitionsPruned;
}
- public String getSelectionRoot() {
+ public Path getSelectionRoot() {
return selectionRoot;
}
- public List<FileStatus> getStatuses(final DrillFileSystem fs) throws IOException {
+ public List<FileStatus> getStatuses(DrillFileSystem fs) throws IOException {
Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
if (statuses == null) {
- final List<FileStatus> newStatuses = Lists.newArrayList();
- for (final String pathStr:files) {
- newStatuses.add(fs.getFileStatus(new Path(pathStr)));
+ List<FileStatus> newStatuses = Lists.newArrayList();
+ for (Path pathStr : files) {
+ newStatuses.add(fs.getFileStatus(pathStr));
}
statuses = newStatuses;
}
@@ -139,11 +142,11 @@ public class FileSelection {
return statuses;
}
- public List<String> getFiles() {
+ public List<Path> getFiles() {
if (files == null) {
- final List<String> newFiles = Lists.newArrayList();
- for (final FileStatus status:statuses) {
- newFiles.add(status.getPath().toString());
+ List<Path> newFiles = Lists.newArrayList();
+ for (FileStatus status:statuses) {
+ newFiles.add(status.getPath());
}
files = newFiles;
}
@@ -153,7 +156,7 @@ public class FileSelection {
public boolean containsDirectories(DrillFileSystem fs) throws IOException {
if (dirStatus == StatusType.NOT_CHECKED) {
dirStatus = StatusType.NO_DIRS;
- for (final FileStatus status : getStatuses(fs)) {
+ for (FileStatus status : getStatuses(fs)) {
if (status.isDirectory()) {
dirStatus = StatusType.HAS_DIRS;
break;
@@ -175,7 +178,7 @@ public class FileSelection {
nonDirectories.addAll(DrillFileSystemUtil.listFiles(fs, status.getPath(), true));
}
- final FileSelection fileSel = create(nonDirectories, null, selectionRoot);
+ FileSelection fileSel = create(nonDirectories, null, selectionRoot);
if (timer != null) {
logger.debug("FileSelection.minusDirectories() took {} ms, numFiles: {}", timer.elapsed(TimeUnit.MILLISECONDS), statuses.size());
timer.stop();
@@ -223,38 +226,36 @@ public class FileSelection {
* @param files list of files.
* @return longest common path
*/
- private static String commonPathForFiles(final List<String> files) {
+ private static Path commonPathForFiles(List<Path> files) {
if (files == null || files.isEmpty()) {
- return "";
+ return new Path("/");
}
- final int total = files.size();
- final String[][] folders = new String[total][];
+ int total = files.size();
+ String[][] folders = new String[total][];
int shortest = Integer.MAX_VALUE;
for (int i = 0; i < total; i++) {
- final Path path = new Path(files.get(i));
- folders[i] = Path.getPathWithoutSchemeAndAuthority(path).toString().split(Path.SEPARATOR);
+ folders[i] = files.get(i).toUri().getPath().split(Path.SEPARATOR);
shortest = Math.min(shortest, folders[i].length);
}
int latest;
out:
for (latest = 0; latest < shortest; latest++) {
- final String current = folders[0][latest];
+ String current = folders[0][latest];
for (int i = 1; i < folders.length; i++) {
if (!current.equals(folders[i][latest])) {
break out;
}
}
}
- final Path path = new Path(files.get(0));
- final URI uri = path.toUri();
- final String pathString = buildPath(folders[0], latest);
- return new Path(uri.getScheme(), uri.getAuthority(), pathString).toString();
+ URI uri = files.get(0).toUri();
+ String pathString = buildPath(folders[0], latest);
+ return new Path(uri.getScheme(), uri.getAuthority(), pathString);
}
- private static String buildPath(final String[] path, final int folderIndex) {
- final StringBuilder builder = new StringBuilder();
+ private static String buildPath(String[] path, int folderIndex) {
+ StringBuilder builder = new StringBuilder();
for (int i=0; i<folderIndex; i++) {
builder.append(path[i]).append(Path.SEPARATOR);
}
@@ -262,20 +263,20 @@ public class FileSelection {
return builder.toString();
}
- public static FileSelection create(final DrillFileSystem fs, final String parent, final String path,
- final boolean allowAccessOutsideWorkspace) throws IOException {
+ public static FileSelection create(DrillFileSystem fs, String parent, String path,
+ boolean allowAccessOutsideWorkspace) throws IOException {
Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
boolean hasWildcard = path.contains(WILD_CARD);
- final Path combined = new Path(parent, removeLeadingSlash(path));
+ Path combined = new Path(parent, removeLeadingSlash(path));
if (!allowAccessOutsideWorkspace) {
checkBackPaths(new Path(parent).toUri().getPath(), combined.toUri().getPath(), path);
}
- final FileStatus[] statuses = fs.globStatus(combined); // note: this would expand wildcards
+ FileStatus[] statuses = fs.globStatus(combined); // note: this would expand wildcards
if (statuses == null) {
return null;
}
- final FileSelection fileSel = create(Lists.newArrayList(statuses), null, combined.toUri().getPath());
+ FileSelection fileSel = create(Arrays.asList(statuses), null, combined);
if (timer != null) {
logger.debug("FileSelection.create() took {} ms ", timer.elapsed(TimeUnit.MILLISECONDS));
timer.stop();
@@ -298,62 +299,51 @@ public class FileSelection {
* @return null if creation of {@link FileSelection} fails with an {@link IllegalArgumentException}
* otherwise a new selection.
*
- * @see FileSelection#FileSelection(List, List, String)
+ * @see FileSelection#FileSelection(List, List, Path)
*/
- public static FileSelection create(final List<FileStatus> statuses, final List<String> files, final String root,
- final String cacheFileRoot, final boolean wasAllPartitionsPruned) {
- final boolean bothNonEmptySelection = (statuses != null && statuses.size() > 0) && (files != null && files.size() > 0);
- final boolean bothEmptySelection = (statuses == null || statuses.size() == 0) && (files == null || files.size() == 0);
+ public static FileSelection create(List<FileStatus> statuses, List<Path> files, Path root,
+ Path cacheFileRoot, boolean wasAllPartitionsPruned) {
+ boolean bothNonEmptySelection = (statuses != null && statuses.size() > 0) && (files != null && files.size() > 0);
+ boolean bothEmptySelection = (statuses == null || statuses.size() == 0) && (files == null || files.size() == 0);
if (bothNonEmptySelection || bothEmptySelection) {
return null;
}
- final String selectionRoot;
+ Path selectionRoot;
if (statuses == null || statuses.isEmpty()) {
selectionRoot = commonPathForFiles(files);
} else {
- if (Strings.isNullOrEmpty(root)) {
- throw new DrillRuntimeException("Selection root is null or empty" + root);
- }
- final Path rootPath = handleWildCard(root);
- final URI uri = statuses.get(0).getPath().toUri();
- final Path path = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath());
- selectionRoot = path.toString();
+ Objects.requireNonNull(root, "Selection root is null");
+ Path rootPath = handleWildCard(root);
+ URI uri = statuses.get(0).getPath().toUri();
+ selectionRoot = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath());
}
return new FileSelection(statuses, files, selectionRoot, cacheFileRoot, wasAllPartitionsPruned);
}
- public static FileSelection create(final List<FileStatus> statuses, final List<String> files, final String root) {
+ public static FileSelection create(List<FileStatus> statuses, List<Path> files, Path root) {
return FileSelection.create(statuses, files, root, null, false);
}
- public static FileSelection createFromDirectories(final List<String> dirPaths, final FileSelection selection,
- final String cacheFileRoot) {
+ public static FileSelection createFromDirectories(List<Path> dirPaths, FileSelection selection,
+ Path cacheFileRoot) {
Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
- final String root = selection.getSelectionRoot();
- if (Strings.isNullOrEmpty(root)) {
- throw new DrillRuntimeException("Selection root is null or empty" + root);
- }
+ Path root = selection.getSelectionRoot();
+ Objects.requireNonNull(root, "Selection root is null");
if (dirPaths == null || dirPaths.isEmpty()) {
throw new DrillRuntimeException("List of directories is null or empty");
}
- List<String> dirs = Lists.newArrayList();
+ // for wildcard the directory list should have already been expanded
+ List<Path> dirs = selection.hadWildcard() ? selection.getFileStatuses().stream()
+ .map(FileStatus::getPath)
+ .collect(Collectors.toList()) : new ArrayList<>(dirPaths);
- if (selection.hadWildcard()) { // for wildcard the directory list should have already been expanded
- for (FileStatus status : selection.getFileStatuses()) {
- dirs.add(status.getPath().toString());
- }
- } else {
- dirs.addAll(dirPaths);
- }
-
- final Path rootPath = handleWildCard(root);
- // final URI uri = dirPaths.get(0).toUri();
- final URI uri = selection.getFileStatuses().get(0).getPath().toUri();
- final Path path = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath());
- FileSelection fileSel = new FileSelection(null, dirs, path.toString(), cacheFileRoot, false);
+ Path rootPath = handleWildCard(root);
+ URI uri = selection.getFileStatuses().get(0).getPath().toUri();
+ Path path = new Path(uri.getScheme(), uri.getAuthority(), rootPath.toUri().getPath());
+ FileSelection fileSel = new FileSelection(null, dirs, path, cacheFileRoot, false);
fileSel.setHadWildcard(selection.hadWildcard());
if (timer != null) {
logger.debug("FileSelection.createFromDirectories() took {} ms ", timer.elapsed(TimeUnit.MILLISECONDS));
@@ -362,18 +352,15 @@ public class FileSelection {
return fileSel;
}
- private static Path handleWildCard(final String root) {
- if (root.contains(WILD_CARD)) {
- int idx = root.indexOf(WILD_CARD); // first wild card in the path
- idx = root.lastIndexOf('/', idx); // file separator right before the first wild card
- final String newRoot = root.substring(0, idx);
- if (newRoot.length() == 0) {
- // Ensure that we always return a valid root.
- return new Path("/");
- }
- return new Path(newRoot);
+ private static Path handleWildCard(Path root) {
+ String stringRoot = root.toUri().getPath();
+ if (stringRoot.contains(WILD_CARD)) {
+ int idx = stringRoot.indexOf(WILD_CARD); // first wild card in the path
+ idx = stringRoot.lastIndexOf('/', idx); // file separator right before the first wild card
+ String newRoot = stringRoot.substring(0, idx);
+ return DrillFileSystemUtil.createPathSafe(newRoot);
} else {
- return new Path(root);
+ return new Path(stringRoot);
}
}
@@ -426,7 +413,7 @@ public class FileSelection {
return this.hadWildcard;
}
- public String getCacheFileRoot() {
+ public Path getCacheFileRoot() {
return cacheFileRoot;
}
@@ -456,12 +443,12 @@ public class FileSelection {
@Override
public String toString() {
- final StringBuilder sb = new StringBuilder();
+ StringBuilder sb = new StringBuilder();
sb.append("root=").append(this.selectionRoot);
sb.append("files=[");
boolean isFirst = true;
- for (final String file : this.files) {
+ for (Path file : this.files) {
if (isFirst) {
isFirst = false;
sb.append(file);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
index 40549cc92..7d7bcfaa0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/FormatSelection.java
@@ -24,6 +24,7 @@ import org.apache.drill.common.logical.FormatPluginConfig;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.hadoop.fs.Path;
public class FormatSelection {
@@ -32,16 +33,15 @@ public class FormatSelection {
private FormatPluginConfig format;
private FileSelection selection;
- public FormatSelection(){}
+ public FormatSelection() {}
@JsonCreator
- public FormatSelection(@JsonProperty("format") FormatPluginConfig format, @JsonProperty("files") List<String> files){
+ public FormatSelection(@JsonProperty("format") FormatPluginConfig format, @JsonProperty("files") List<Path> files){
this.format = format;
this.selection = FileSelection.create(null, files, null);
}
public FormatSelection(FormatPluginConfig format, FileSelection selection) {
- super();
this.format = format;
this.selection = selection;
}
@@ -52,7 +52,7 @@ public class FormatSelection {
}
@JsonProperty("files")
- public List<String> getAsFiles(){
+ public List<Path> getAsFiles(){
return selection.getFiles();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java
index 877ceb64c..073847812 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/MetadataContext.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.dfs;
import java.util.Map;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.hadoop.fs.Path;
/**
* A metadata context that holds state across multiple invocations of
@@ -32,17 +33,17 @@ public class MetadataContext {
* Note: the #directories is typically a small percentage of the #files, so the memory footprint
* is expected to be relatively small.
*/
- private Map<String, Boolean> dirModifCheckMap = Maps.newHashMap();
+ private Map<Path, Boolean> dirModifCheckMap = Maps.newHashMap();
private PruneStatus pruneStatus = PruneStatus.NOT_STARTED;
private boolean metadataCacheCorrupted;
- public void setStatus(String dir) {
+ public void setStatus(Path dir) {
dirModifCheckMap.put(dir, true);
}
- public void clearStatus(String dir) {
+ public void clearStatus(Path dir) {
dirModifCheckMap.put(dir, false);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java
index 15107ac18..ec925e37a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryFromHDFS.java
@@ -21,6 +21,7 @@ import org.apache.drill.exec.store.dfs.easy.FileWork;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.hadoop.fs.Path;
public class ReadEntryFromHDFS extends ReadEntryWithPath implements FileWork {
@@ -28,7 +29,7 @@ public class ReadEntryFromHDFS extends ReadEntryWithPath implements FileWork {
private long length;
@JsonCreator
- public ReadEntryFromHDFS(@JsonProperty("path") String path,@JsonProperty("start") long start, @JsonProperty("length") long length) {
+ public ReadEntryFromHDFS(@JsonProperty("path") Path path, @JsonProperty("start") long start, @JsonProperty("length") long length) {
super(path);
this.start = start;
this.length = length;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java
index 4564e159c..88bd9fbad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/ReadEntryWithPath.java
@@ -18,19 +18,20 @@
package org.apache.drill.exec.store.dfs;
+import org.apache.hadoop.fs.Path;
+
public class ReadEntryWithPath {
- protected String path;
+ protected Path path;
+ // Default constructor is needed for deserialization
+ public ReadEntryWithPath() {}
- public ReadEntryWithPath(String path) {
- super();
+ public ReadEntryWithPath(Path path) {
this.path = path;
}
- public ReadEntryWithPath(){}
-
- public String getPath(){
+ public Path getPath() {
return path;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index d3bed8feb..d76c6489e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -156,7 +156,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
for (FileWork work : scan.getWorkUnits()){
RecordReader recordReader = getRecordReader(context, dfs, work, scan.getColumns(), scan.getUserName());
readers.add(recordReader);
- List<String> partitionValues = ColumnExplorer.listPartitionValues(work.getPath(), scan.getSelectionRoot());
+ List<String> partitionValues = ColumnExplorer.listPartitionValues(work.getPath(), scan.getSelectionRoot(), false);
Map<String, String> implicitValues = columnExplorer.populateImplicitColumns(work.getPath(), partitionValues, supportsFileImplicitColumns);
implicitColumns.add(implicitValues);
if (implicitValues.size() > mapWithMaxColumns.size()) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
index 759d07ff9..4449ec054 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyGroupScan.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.store.dfs.easy;
import java.io.IOException;
-import java.util.Iterator;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -52,6 +51,7 @@ import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Iterators;
import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
@JsonTypeName("fs-scan")
public class EasyGroupScan extends AbstractFileGroupScan {
@@ -65,17 +65,17 @@ public class EasyGroupScan extends AbstractFileGroupScan {
private ListMultimap<Integer, CompleteFileWork> mappings;
private List<CompleteFileWork> chunks;
private List<EndpointAffinity> endpointAffinities;
- private String selectionRoot;
+ private Path selectionRoot;
@JsonCreator
public EasyGroupScan(
@JsonProperty("userName") String userName,
- @JsonProperty("files") List<String> files, //
- @JsonProperty("storage") StoragePluginConfig storageConfig, //
- @JsonProperty("format") FormatPluginConfig formatConfig, //
- @JacksonInject StoragePluginRegistry engineRegistry, //
+ @JsonProperty("files") List<Path> files,
+ @JsonProperty("storage") StoragePluginConfig storageConfig,
+ @JsonProperty("format") FormatPluginConfig formatConfig,
+ @JacksonInject StoragePluginRegistry engineRegistry,
@JsonProperty("columns") List<SchemaPath> columns,
- @JsonProperty("selectionRoot") String selectionRoot
+ @JsonProperty("selectionRoot") Path selectionRoot
) throws IOException, ExecutionSetupException {
this(ImpersonationUtil.resolveUserName(userName),
FileSelection.create(null, files, selectionRoot),
@@ -84,17 +84,17 @@ public class EasyGroupScan extends AbstractFileGroupScan {
selectionRoot);
}
- public EasyGroupScan(String userName, FileSelection selection, EasyFormatPlugin<?> formatPlugin, String selectionRoot)
+ public EasyGroupScan(String userName, FileSelection selection, EasyFormatPlugin<?> formatPlugin, Path selectionRoot)
throws IOException {
this(userName, selection, formatPlugin, ALL_COLUMNS, selectionRoot);
}
public EasyGroupScan(
String userName,
- FileSelection selection, //
- EasyFormatPlugin<?> formatPlugin, //
+ FileSelection selection,
+ EasyFormatPlugin<?> formatPlugin,
List<SchemaPath> columns,
- String selectionRoot
+ Path selectionRoot
) throws IOException{
super(userName);
this.selection = Preconditions.checkNotNull(selection);
@@ -106,12 +106,7 @@ public class EasyGroupScan extends AbstractFileGroupScan {
@JsonIgnore
public Iterable<CompleteFileWork> getWorkIterable() {
- return new Iterable<CompleteFileWork>() {
- @Override
- public Iterator<CompleteFileWork> iterator() {
- return Iterators.unmodifiableIterator(chunks.iterator());
- }
- };
+ return () -> Iterators.unmodifiableIterator(chunks.iterator());
}
private EasyGroupScan(final EasyGroupScan that) {
@@ -136,7 +131,7 @@ public class EasyGroupScan extends AbstractFileGroupScan {
this.endpointAffinities = AffinityCreator.getAffinityMap(chunks);
}
- public String getSelectionRoot() {
+ public Path getSelectionRoot() {
return selectionRoot;
}
@@ -158,7 +153,7 @@ public class EasyGroupScan extends AbstractFileGroupScan {
@JsonProperty("files")
@Override
- public List<String> getFiles() {
+ public List<Path> getFiles() {
return selection.getFiles();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
index 0dbae1e40..fbb3f475c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasySubScan.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.store.dfs.easy;
-import java.io.IOException;
import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -34,6 +33,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.Path;
@JsonTypeName("fs-sub-scan")
public class EasySubScan extends AbstractSubScan{
@@ -42,18 +42,18 @@ public class EasySubScan extends AbstractSubScan{
private final List<FileWorkImpl> files;
private final EasyFormatPlugin<?> formatPlugin;
private final List<SchemaPath> columns;
- private String selectionRoot;
+ private Path selectionRoot;
@JsonCreator
public EasySubScan(
@JsonProperty("userName") String userName,
- @JsonProperty("files") List<FileWorkImpl> files, //
- @JsonProperty("storage") StoragePluginConfig storageConfig, //
- @JsonProperty("format") FormatPluginConfig formatConfig, //
- @JacksonInject StoragePluginRegistry engineRegistry, //
- @JsonProperty("columns") List<SchemaPath> columns, //
- @JsonProperty("selectionRoot") String selectionRoot
- ) throws IOException, ExecutionSetupException {
+ @JsonProperty("files") List<FileWorkImpl> files,
+ @JsonProperty("storage") StoragePluginConfig storageConfig,
+ @JsonProperty("format") FormatPluginConfig formatConfig,
+ @JacksonInject StoragePluginRegistry engineRegistry,
+ @JsonProperty("columns") List<SchemaPath> columns,
+ @JsonProperty("selectionRoot") Path selectionRoot
+ ) throws ExecutionSetupException {
super(userName);
this.formatPlugin = (EasyFormatPlugin<?>) engineRegistry.getFormatPlugin(storageConfig, formatConfig);
Preconditions.checkNotNull(this.formatPlugin);
@@ -63,7 +63,7 @@ public class EasySubScan extends AbstractSubScan{
}
public EasySubScan(String userName, List<FileWorkImpl> files, EasyFormatPlugin<?> plugin, List<SchemaPath> columns,
- String selectionRoot){
+ Path selectionRoot){
super(userName);
this.formatPlugin = plugin;
this.files = files;
@@ -72,7 +72,7 @@ public class EasySubScan extends AbstractSubScan{
}
@JsonProperty
- public String getSelectionRoot() {
+ public Path getSelectionRoot() {
return selectionRoot;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java
index 587201ea9..3aeb2c26d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/FileWork.java
@@ -17,8 +17,14 @@
*/
package org.apache.drill.exec.store.dfs.easy;
+
+import org.apache.hadoop.fs.Path;
+
public interface FileWork {
- String getPath();
+
+ Path getPath();
+
long getStart();
+
long getLength();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java
index 505d68e78..6de8842cd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/direct/MetadataDirectGroupScan.java
@@ -18,12 +18,12 @@
package org.apache.drill.exec.store.direct;
import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.store.RecordReader;
+import org.apache.hadoop.fs.Path;
import java.util.Collection;
import java.util.List;
@@ -37,20 +37,20 @@ import java.util.List;
@JsonTypeName("metadata-direct-scan")
public class MetadataDirectGroupScan extends DirectGroupScan {
- private final Collection<String> files;
+ private final Collection<Path> files;
- public MetadataDirectGroupScan(RecordReader reader, Collection<String> files) {
+ public MetadataDirectGroupScan(RecordReader reader, Collection<Path> files) {
super(reader);
this.files = files;
}
- public MetadataDirectGroupScan(RecordReader reader, Collection<String> files, ScanStats stats) {
+ public MetadataDirectGroupScan(RecordReader reader, Collection<Path> files, ScanStats stats) {
super(reader, stats);
this.files = files;
}
@Override
- public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) throws ExecutionSetupException {
+ public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
assert children == null || children.isEmpty();
return new MetadataDirectGroupScan(reader, files, stats);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
index 428a4e1bd..d3fcc5aab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader.java
@@ -77,7 +77,7 @@ public class JSONRecordReader extends AbstractRecordReader {
* @param columns pathnames of columns/subfields to read
* @throws OutOfMemoryException
*/
- public JSONRecordReader(final FragmentContext fragmentContext, final String inputPath, final DrillFileSystem fileSystem,
+ public JSONRecordReader(final FragmentContext fragmentContext, final Path inputPath, final DrillFileSystem fileSystem,
final List<SchemaPath> columns) throws OutOfMemoryException {
this(fragmentContext, inputPath, null, fileSystem, columns);
}
@@ -90,14 +90,13 @@ public class JSONRecordReader extends AbstractRecordReader {
* @param columns pathnames of columns/subfields to read
* @throws OutOfMemoryException
*/
- public JSONRecordReader(final FragmentContext fragmentContext, final JsonNode embeddedContent,
- final DrillFileSystem fileSystem, final List<SchemaPath> columns) throws OutOfMemoryException {
+ public JSONRecordReader(FragmentContext fragmentContext, JsonNode embeddedContent, DrillFileSystem fileSystem,
+ List<SchemaPath> columns) throws OutOfMemoryException {
this(fragmentContext, null, embeddedContent, fileSystem, columns);
}
- private JSONRecordReader(final FragmentContext fragmentContext, final String inputPath,
- final JsonNode embeddedContent, final DrillFileSystem fileSystem,
- final List<SchemaPath> columns) {
+ private JSONRecordReader(FragmentContext fragmentContext, Path inputPath, JsonNode embeddedContent,
+ DrillFileSystem fileSystem, List<SchemaPath> columns) {
Preconditions.checkArgument(
(inputPath == null && embeddedContent != null) ||
@@ -106,7 +105,7 @@ public class JSONRecordReader extends AbstractRecordReader {
);
if (inputPath != null) {
- this.hadoopPath = new Path(inputPath);
+ this.hadoopPath = inputPath;
} else {
this.embeddedContent = embeddedContent;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
index 9dbe715b1..ec4bb1255 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/sequencefile/SequenceFileFormatPlugin.java
@@ -87,7 +87,7 @@ public class SequenceFileFormatPlugin extends EasyFormatPlugin<SequenceFileForma
FileWork fileWork,
List<SchemaPath> columns,
String userName) throws ExecutionSetupException {
- final Path path = dfs.makeQualified(new Path(fileWork.getPath()));
+ final Path path = dfs.makeQualified(fileWork.getPath());
final FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""});
return new SequenceFileRecordReader(split, dfs, context.getQueryUserName(), userName);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
index 1c53a3774..03ae6f554 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/text/TextFormatPlugin.java
@@ -82,7 +82,7 @@ public class TextFormatPlugin extends EasyFormatPlugin<TextFormatPlugin.TextForm
FileWork fileWork,
List<SchemaPath> columns,
String userName) {
- Path path = dfs.makeQualified(new Path(fileWork.getPath()));
+ Path path = dfs.makeQualified(fileWork.getPath());
FileSplit split = new FileSplit(path, fileWork.getStart(), fileWork.getLength(), new String[]{""});
if (context.getOptions().getBoolean(ExecConstants.ENABLE_NEW_TEXT_READER_KEY)) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
index 3958a3270..5a78732af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/httpd/HttpdLogFormatPlugin.java
@@ -192,7 +192,7 @@ public class HttpdLogFormatPlugin extends EasyFormatPlugin<HttpdLogFormatPlugin.
HttpdLogFormatPlugin.this.getConfig().getTimestampFormat(),
fieldMapping);
- final Path path = fs.makeQualified(new Path(work.getPath()));
+ final Path path = fs.makeQualified(work.getPath());
FileSplit split = new FileSplit(path, work.getStart(), work.getLength(), new String[]{""});
TextInputFormat inputFormat = new TextInputFormat();
JobConf job = new JobConf(fs.getConf());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageFormatPlugin.java
index 15ea1b462..048aa8230 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageFormatPlugin.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
import org.apache.drill.exec.ops.FragmentContext;
@@ -57,11 +56,9 @@ public class ImageFormatPlugin extends EasyFormatPlugin<ImageFormatConfig> {
@Override
public RecordReader getRecordReader(FragmentContext context, DrillFileSystem dfs, FileWork fileWork,
- List<SchemaPath> columns, String userName) throws ExecutionSetupException {
- return new ImageRecordReader(context, dfs, fileWork.getPath(),
- ((ImageFormatConfig)formatConfig).hasFileSystemMetadata(),
- ((ImageFormatConfig)formatConfig).isDescriptive(),
- ((ImageFormatConfig)formatConfig).getTimeZone());
+ List<SchemaPath> columns, String userName) {
+ return new ImageRecordReader(context, dfs, fileWork.getPath(), formatConfig.hasFileSystemMetadata(),
+ formatConfig.isDescriptive(), formatConfig.getTimeZone());
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java
index 2a4b4fb48..08ed4fd09 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/image/ImageRecordReader.java
@@ -99,10 +99,10 @@ public class ImageRecordReader extends AbstractRecordReader {
private DrillBuf managedBuffer;
private boolean finish;
- public ImageRecordReader(FragmentContext context, DrillFileSystem fs, String inputPath,
+ public ImageRecordReader(FragmentContext context, DrillFileSystem fs, Path inputPath,
boolean fileSystemMetadata, boolean descriptive, String timeZone) {
this.fs = fs;
- hadoopPath = fs.makeQualified(new Path(inputPath));
+ hadoopPath = fs.makeQualified(inputPath);
this.fileSystemMetadata = fileSystemMetadata;
this.descriptive = descriptive;
this.timeZone = (timeZone != null) ? TimeZone.getTimeZone(timeZone) : TimeZone.getDefault();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
index e5d1dc438..ae0b733ff 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/log/LogRecordReader.java
@@ -41,9 +41,6 @@ import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.NullableTimeStampVector;
import org.apache.drill.exec.vector.NullableTimeVector;
-
-import org.apache.hadoop.fs.Path;
-
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
@@ -522,7 +519,7 @@ public class LogRecordReader extends AbstractRecordReader {
private void openFile() {
InputStream in;
try {
- in = dfs.open(new Path(fileWork.getPath()));
+ in = dfs.open(fileWork.getPath());
} catch (Exception e) {
throw UserException
.dataReadError(e)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
index e2d356953..5528d028c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
@@ -52,6 +52,7 @@ import org.apache.drill.exec.store.schedule.AffinityCreator;
import org.apache.drill.exec.store.schedule.AssignmentCreator;
import org.apache.drill.exec.store.schedule.EndpointByteMap;
import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.util.ArrayList;
@@ -80,7 +81,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
protected ParquetTableMetadataBase parquetTableMetadata;
protected List<RowGroupInfo> rowGroupInfos;
protected ListMultimap<Integer, RowGroupInfo> mappings;
- protected Set<String> fileSet;
+ protected Set<Path> fileSet;
protected ParquetReaderConfig readerConfig;
private List<EndpointAffinity> endpointAffinities;
@@ -146,7 +147,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
@JsonIgnore
@Override
- public Collection<String> getFiles() {
+ public Collection<Path> getFiles() {
return fileSet;
}
@@ -428,12 +429,12 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
}
@JsonIgnore
- public <T> T getPartitionValue(String path, SchemaPath column, Class<T> clazz) {
+ public <T> T getPartitionValue(Path path, SchemaPath column, Class<T> clazz) {
return clazz.cast(parquetGroupScanStatistics.getPartitionValue(path, column));
}
@JsonIgnore
- public Set<String> getFileSet() {
+ public Set<Path> getFileSet() {
return fileSet;
}
// partition pruning methods end
@@ -441,7 +442,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
// helper method used for partition pruning and filter push down
@Override
public void modifyFileSelection(FileSelection selection) {
- List<String> files = selection.getFiles();
+ List<Path> files = selection.getFiles();
fileSet = new HashSet<>(files);
entries = new ArrayList<>(files.size());
@@ -464,7 +465,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
if (fileSet == null) {
fileSet = new HashSet<>();
fileSet.addAll(parquetTableMetadata.getFiles().stream()
- .map((Function<ParquetFileMetadata, String>) ParquetFileMetadata::getPath)
+ .map((Function<ParquetFileMetadata, Path>) ParquetFileMetadata::getPath)
.collect(Collectors.toSet()));
}
@@ -505,7 +506,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
// abstract methods block start
protected abstract void initInternal() throws IOException;
protected abstract Collection<CoordinationProtos.DrillbitEndpoint> getDrillbits();
- protected abstract AbstractParquetGroupScan cloneWithFileSelection(Collection<String> filePaths) throws IOException;
+ protected abstract AbstractParquetGroupScan cloneWithFileSelection(Collection<Path> filePaths) throws IOException;
protected abstract boolean supportsFileImplicitColumns();
protected abstract List<String> getPartitionValues(RowGroupInfo rowGroupInfo);
// abstract methods block end
@@ -520,7 +521,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
* @return new parquet group scan
*/
private AbstractParquetGroupScan cloneWithRowGroupInfos(List<RowGroupInfo> rowGroupInfos) throws IOException {
- Set<String> filePaths = rowGroupInfos.stream()
+ Set<Path> filePaths = rowGroupInfos.stream()
.map(ReadEntryWithPath::getPath)
.collect(Collectors.toSet()); // set keeps file names unique
AbstractParquetGroupScan scan = cloneWithFileSelection(filePaths);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
index 99161fd59..b1819e639 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetScanBatchCreator.java
@@ -52,7 +52,8 @@ public abstract class AbstractParquetScanBatchCreator {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AbstractParquetScanBatchCreator.class);
- protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRowGroupScan rowGroupScan, OperatorContext oContext) throws ExecutionSetupException {
+ protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRowGroupScan rowGroupScan,
+ OperatorContext oContext) throws ExecutionSetupException {
final ColumnExplorer columnExplorer = new ColumnExplorer(context.getOptions(), rowGroupScan.getColumns());
if (!columnExplorer.isStarQuery()) {
@@ -63,7 +64,7 @@ public abstract class AbstractParquetScanBatchCreator {
AbstractDrillFileSystemManager fsManager = getDrillFileSystemCreator(oContext, context.getOptions());
// keep footers in a map to avoid re-reading them
- Map<String, ParquetMetadata> footers = new HashMap<>();
+ Map<Path, ParquetMetadata> footers = new HashMap<>();
List<RecordReader> readers = new LinkedList<>();
List<Map<String, String>> implicitColumns = new ArrayList<>();
Map<String, String> mapWithMaxColumns = new LinkedHashMap<>();
@@ -150,8 +151,8 @@ public abstract class AbstractParquetScanBatchCreator {
protected abstract AbstractDrillFileSystemManager getDrillFileSystemCreator(OperatorContext operatorContext, OptionManager optionManager);
- private ParquetMetadata readFooter(Configuration conf, String path, ParquetReaderConfig readerConfig) throws IOException {
- try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(new Path(path),
+ private ParquetMetadata readFooter(Configuration conf, Path path, ParquetReaderConfig readerConfig) throws IOException {
+ try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path,
readerConfig.addCountersToConf(conf)), readerConfig.toReadOptions())) {
return reader.getFooter();
}
@@ -168,6 +169,6 @@ public abstract class AbstractParquetScanBatchCreator {
this.operatorContext = operatorContext;
}
- protected abstract DrillFileSystem get(Configuration config, String path) throws ExecutionSetupException;
+ protected abstract DrillFileSystem get(Configuration config, Path path) throws ExecutionSetupException;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
index a1d9f518d..cb9e9b96d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScan.java
@@ -68,8 +68,8 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
private final MetadataContext metaContext;
private boolean usedMetadataCache; // false by default
// may change when filter push down / partition pruning is applied
- private String selectionRoot;
- private String cacheFileRoot;
+ private Path selectionRoot;
+ private Path cacheFileRoot;
@JsonCreator
public ParquetGroupScan(@JacksonInject StoragePluginRegistry engineRegistry,
@@ -78,8 +78,8 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
@JsonProperty("storage") StoragePluginConfig storageConfig,
@JsonProperty("format") FormatPluginConfig formatConfig,
@JsonProperty("columns") List<SchemaPath> columns,
- @JsonProperty("selectionRoot") String selectionRoot,
- @JsonProperty("cacheFileRoot") String cacheFileRoot,
+ @JsonProperty("selectionRoot") Path selectionRoot,
+ @JsonProperty("cacheFileRoot") Path cacheFileRoot,
@JsonProperty("readerConfig") ParquetReaderConfig readerConfig,
@JsonProperty("filter") LogicalExpression filter) throws IOException, ExecutionSetupException {
super(ImpersonationUtil.resolveUserName(userName), columns, entries, readerConfig, filter);
@@ -127,7 +127,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
// The fully expanded list is already stored as part of the fileSet
entries.add(new ReadEntryWithPath(fileSelection.getSelectionRoot()));
} else {
- for (String fileName : fileSelection.getFiles()) {
+ for (Path fileName : fileSelection.getFiles()) {
entries.add(new ReadEntryWithPath(fileName));
}
}
@@ -169,12 +169,12 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
}
@JsonProperty
- public String getSelectionRoot() {
+ public Path getSelectionRoot() {
return selectionRoot;
}
@JsonProperty
- public String getCacheFileRoot() {
+ public Path getCacheFileRoot() {
return cacheFileRoot;
}
// getters for serialization / deserialization end
@@ -224,8 +224,8 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
// For EXPLAIN, remove the URI prefix from cacheFileRoot. If cacheFileRoot is null, we
// would have read the cache file from selectionRoot
String cacheFileRootString = (cacheFileRoot == null) ?
- Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot)).toString() :
- Path.getPathWithoutSchemeAndAuthority(new Path(cacheFileRoot)).toString();
+ Path.getPathWithoutSchemeAndAuthority(selectionRoot).toString() :
+ Path.getPathWithoutSchemeAndAuthority(cacheFileRoot).toString();
builder.append(", cacheFileRoot=").append(cacheFileRootString);
}
@@ -241,7 +241,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
FileSystem processUserFileSystem = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), fs.getConf());
Path metaPath = null;
if (entries.size() == 1 && parquetTableMetadata == null) {
- Path p = Path.getPathWithoutSchemeAndAuthority(new Path(entries.get(0).getPath()));
+ Path p = Path.getPathWithoutSchemeAndAuthority(entries.get(0).getPath());
if (fs.isDirectory(p)) {
// Using the metadata file makes sense when querying a directory; otherwise
// if querying a single file we can look up the metadata directly from the file
@@ -257,9 +257,9 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
parquetTableMetadata = Metadata.getParquetTableMetadata(processUserFileSystem, p.toString(), readerConfig);
}
} else {
- Path p = Path.getPathWithoutSchemeAndAuthority(new Path(selectionRoot));
+ Path p = Path.getPathWithoutSchemeAndAuthority(selectionRoot);
metaPath = new Path(p, Metadata.METADATA_FILENAME);
- if (!metaContext.isMetadataCacheCorrupted() && fs.isDirectory(new Path(selectionRoot))
+ if (!metaContext.isMetadataCacheCorrupted() && fs.isDirectory(selectionRoot)
&& fs.exists(metaPath)) {
if (parquetTableMetadata == null) {
parquetTableMetadata = Metadata.readBlockMeta(processUserFileSystem, metaPath, metaContext, readerConfig);
@@ -275,7 +275,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
final List<FileStatus> fileStatuses = new ArrayList<>();
for (ReadEntryWithPath entry : entries) {
fileStatuses.addAll(
- DrillFileSystemUtil.listFiles(fs, Path.getPathWithoutSchemeAndAuthority(new Path(entry.getPath())), true));
+ DrillFileSystemUtil.listFiles(fs, Path.getPathWithoutSchemeAndAuthority(entry.getPath()), true));
}
Map<FileStatus, FileSystem> statusMap = fileStatuses.stream()
@@ -292,7 +292,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
}
@Override
- protected AbstractParquetGroupScan cloneWithFileSelection(Collection<String> filePaths) throws IOException {
+ protected AbstractParquetGroupScan cloneWithFileSelection(Collection<Path> filePaths) throws IOException {
FileSelection newSelection = new FileSelection(null, new ArrayList<>(filePaths), getSelectionRoot(), cacheFileRoot, false);
return clone(newSelection);
}
@@ -309,7 +309,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
@Override
protected List<String> getPartitionValues(RowGroupInfo rowGroupInfo) {
- return ColumnExplorer.listPartitionValues(rowGroupInfo.getPath(), selectionRoot);
+ return ColumnExplorer.listPartitionValues(rowGroupInfo.getPath(), selectionRoot, false);
}
// overridden protected methods block end
@@ -425,7 +425,7 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
}
} else {
final Path path = Path.getPathWithoutSchemeAndAuthority(cacheFileRoot);
- fileSet.add(path.toString());
+ fileSet.add(path);
}
}
}
@@ -436,21 +436,21 @@ public class ParquetGroupScan extends AbstractParquetGroupScan {
return null;
}
- List<String> fileNames = new ArrayList<>(fileSet);
+ List<Path> fileNames = new ArrayList<>(fileSet);
// when creating the file selection, set the selection root without the URI prefix
// The reason is that the file names above have been created in the form
// /a/b/c.parquet and the format of the selection root must match that of the file names
// otherwise downstream operations such as partition pruning can break.
- final Path metaRootPath = Path.getPathWithoutSchemeAndAuthority(new Path(selection.getSelectionRoot()));
- this.selectionRoot = metaRootPath.toString();
+ final Path metaRootPath = Path.getPathWithoutSchemeAndAuthority(selection.getSelectionRoot());
+ this.selectionRoot = metaRootPath;
// Use the FileSelection constructor directly here instead of the FileSelection.create() method
// because create() changes the root to include the scheme and authority; In future, if create()
// is the preferred way to instantiate a file selection, we may need to do something different...
// WARNING: file statuses and file names are inconsistent
- FileSelection newSelection = new FileSelection(selection.getStatuses(fs), fileNames, metaRootPath.toString(),
- cacheFileRoot, selection.wasAllPartitionsPruned());
+ FileSelection newSelection = new FileSelection(selection.getStatuses(fs), fileNames, metaRootPath, cacheFileRoot,
+ selection.wasAllPartitionsPruned());
newSelection.setExpandedFully();
newSelection.setMetaContext(metaContext);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java
index 9381043b5..915652413 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetGroupScanStatistics.java
@@ -21,6 +21,7 @@ import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.hadoop.fs.Path;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
@@ -41,7 +42,7 @@ import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetTa
public class ParquetGroupScanStatistics {
// map from file names to maps of column name to partition value mappings
- private Map<String, Map<SchemaPath, Object>> partitionValueMap;
+ private Map<Path, Map<SchemaPath, Object>> partitionValueMap;
// only for partition columns : value is unique for each partition
private Map<SchemaPath, TypeProtos.MajorType> partitionColTypeMap;
// total number of non-null value for each column in parquet files
@@ -78,7 +79,7 @@ public class ParquetGroupScanStatistics {
return rowCount;
}
- public Object getPartitionValue(String path, SchemaPath column) {
+ public Object getPartitionValue(Path path, SchemaPath column) {
return partitionValueMap.get(path).get(column);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
index eabe2df14..2513772c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRowGroupScan.java
@@ -37,6 +37,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
// Class containing information for reading a single parquet row group from HDFS
@JsonTypeName("parquet-row-group-scan")
@@ -45,7 +46,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
private final ParquetFormatPlugin formatPlugin;
private final ParquetFormatConfig formatConfig;
- private final String selectionRoot;
+ private final Path selectionRoot;
@JsonCreator
public ParquetRowGroupScan(@JacksonInject StoragePluginRegistry registry,
@@ -55,7 +56,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
@JsonProperty("rowGroupReadEntries") LinkedList<RowGroupReadEntry> rowGroupReadEntries,
@JsonProperty("columns") List<SchemaPath> columns,
@JsonProperty("readerConfig") ParquetReaderConfig readerConfig,
- @JsonProperty("selectionRoot") String selectionRoot,
+ @JsonProperty("selectionRoot") Path selectionRoot,
@JsonProperty("filter") LogicalExpression filter) throws ExecutionSetupException {
this(userName,
(ParquetFormatPlugin) registry.getFormatPlugin(Preconditions.checkNotNull(storageConfig), Preconditions.checkNotNull(formatConfig)),
@@ -71,7 +72,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
List<RowGroupReadEntry> rowGroupReadEntries,
List<SchemaPath> columns,
ParquetReaderConfig readerConfig,
- String selectionRoot,
+ Path selectionRoot,
LogicalExpression filter) {
super(userName, rowGroupReadEntries, columns, readerConfig, filter);
this.formatPlugin = Preconditions.checkNotNull(formatPlugin, "Could not find format config for the given configuration");
@@ -90,7 +91,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
}
@JsonProperty
- public String getSelectionRoot() {
+ public Path getSelectionRoot() {
return selectionRoot;
}
@@ -127,7 +128,7 @@ public class ParquetRowGroupScan extends AbstractParquetRowGroupScan {
@Override
public List<String> getPartitionValues(RowGroupReadEntry rowGroupReadEntry) {
- return ColumnExplorer.listPartitionValues(rowGroupReadEntry.getPath(), selectionRoot);
+ return ColumnExplorer.listPartitionValues(rowGroupReadEntry.getPath(), selectionRoot, false);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index f0ef63989..8c91200d3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.util.List;
@@ -61,7 +62,7 @@ public class ParquetScanBatchCreator extends AbstractParquetScanBatchCreator imp
}
@Override
- protected DrillFileSystem get(Configuration config, String path) throws ExecutionSetupException {
+ protected DrillFileSystem get(Configuration config, Path path) throws ExecutionSetupException {
if (fs == null) {
try {
fs = useAsyncPageReader ? operatorContext.newNonTrackingFileSystem(config) : operatorContext.newFileSystem(config);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java
index 1c9ce107c..5fbadd6ee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java
@@ -23,6 +23,7 @@ import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
import org.apache.drill.exec.store.dfs.easy.FileWork;
import org.apache.drill.exec.store.schedule.CompleteWork;
import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.hadoop.fs.Path;
import java.util.List;
@@ -37,7 +38,7 @@ public class RowGroupInfo extends ReadEntryFromHDFS implements CompleteWork, Fil
private long numRecordsToRead;
@JsonCreator
- public RowGroupInfo(@JsonProperty("path") String path,
+ public RowGroupInfo(@JsonProperty("path") Path path,
@JsonProperty("start") long start,
@JsonProperty("length") long length,
@JsonProperty("rowGroupIndex") int rowGroupIndex,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java
index 665179f6b..be3f50c30 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupReadEntry.java
@@ -22,6 +22,7 @@ import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.hadoop.fs.Path;
public class RowGroupReadEntry extends ReadEntryFromHDFS {
@@ -29,7 +30,7 @@ public class RowGroupReadEntry extends ReadEntryFromHDFS {
private long numRecordsToRead;
@JsonCreator
- public RowGroupReadEntry(@JsonProperty("path") String path, @JsonProperty("start") long start,
+ public RowGroupReadEntry(@JsonProperty("path") Path path, @JsonProperty("start") long start,
@JsonProperty("length") long length, @JsonProperty("rowGroupIndex") int rowGroupIndex,
@JsonProperty("numRecordsToRead") long numRecordsToRead) {
super(path, start, length);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 17cf8c44d..ba4c49330 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -90,7 +90,7 @@ public class ParquetRecordReader extends AbstractRecordReader {
public boolean useBulkReader;
@SuppressWarnings("unused")
- private String name;
+ private Path name;
public ParquetReaderStats parquetReaderStats = new ParquetReaderStats();
private BatchReader batchReader;
@@ -123,44 +123,42 @@ public class ParquetRecordReader extends AbstractRecordReader {
}
public ParquetRecordReader(FragmentContext fragmentContext,
- String path,
+ Path path,
int rowGroupIndex,
long numRecordsToRead,
FileSystem fs,
CodecFactory codecFactory,
ParquetMetadata footer,
List<SchemaPath> columns,
- ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) throws ExecutionSetupException {
- this(fragmentContext, numRecordsToRead,
- path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus);
+ ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) {
+ this(fragmentContext, numRecordsToRead, path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus);
}
public ParquetRecordReader(FragmentContext fragmentContext,
- String path,
+ Path path,
int rowGroupIndex,
FileSystem fs,
CodecFactory codecFactory,
ParquetMetadata footer,
List<SchemaPath> columns,
- ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus)
- throws ExecutionSetupException {
- this(fragmentContext, footer.getBlocks().get(rowGroupIndex).getRowCount(),
- path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus);
+ ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) {
+ this(fragmentContext, footer.getBlocks().get(rowGroupIndex).getRowCount(), path, rowGroupIndex, fs, codecFactory,
+ footer, columns, dateCorruptionStatus);
}
public ParquetRecordReader(
FragmentContext fragmentContext,
long numRecordsToRead,
- String path,
+ Path path,
int rowGroupIndex,
FileSystem fs,
CodecFactory codecFactory,
ParquetMetadata footer,
List<SchemaPath> columns,
- ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) throws ExecutionSetupException {
+ ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) {
this.name = path;
- this.hadoopPath = new Path(path);
+ this.hadoopPath = path;
this.fileSystem = fs;
this.codecFactory = codecFactory;
this.rowGroupIndex = rowGroupIndex;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
index 0db007ab6..d0e2734d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
+import org.apache.drill.exec.serialization.PathSerDe;
import org.apache.drill.exec.store.parquet.ParquetReaderConfig;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -82,6 +83,9 @@ import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetFi
import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.ParquetTableMetadata_v3;
import static org.apache.drill.exec.store.parquet.metadata.Metadata_V3.RowGroupMetadata_v3;
+/**
+ * This is an utility class, holder for Parquet Table Metadata and {@link ParquetReaderConfig}
+ */
public class Metadata {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Metadata.class);
@@ -106,7 +110,7 @@ public class Metadata {
* @param path path
* @param readerConfig parquet reader configuration
*/
- public static void createMeta(FileSystem fs, String path, ParquetReaderConfig readerConfig) throws IOException {
+ public static void createMeta(FileSystem fs, Path path, ParquetReaderConfig readerConfig) throws IOException {
Metadata metadata = new Metadata(readerConfig);
metadata.createMetaFilesRecursively(path, fs);
}
@@ -208,13 +212,13 @@ public class Metadata {
* {@code path} directory).
* @throws IOException if parquet metadata can't be serialized and written to the json file
*/
- private Pair<ParquetTableMetadata_v3, ParquetTableMetadataDirs> createMetaFilesRecursively(final String path, FileSystem fs) throws IOException {
+ private Pair<ParquetTableMetadata_v3, ParquetTableMetadataDirs> createMetaFilesRecursively(final Path path, FileSystem fs) throws IOException {
Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
List<ParquetFileMetadata_v3> metaDataList = Lists.newArrayList();
- List<String> directoryList = Lists.newArrayList();
+ List<Path> directoryList = Lists.newArrayList();
ConcurrentHashMap<ColumnTypeMetadata_v3.Key, ColumnTypeMetadata_v3> columnTypeInfoSet =
new ConcurrentHashMap<>();
- Path p = new Path(path);
+ Path p = path;
FileStatus fileStatus = fs.getFileStatus(p);
assert fileStatus.isDirectory() : "Expected directory";
@@ -222,10 +226,10 @@ public class Metadata {
for (final FileStatus file : DrillFileSystemUtil.listAll(fs, p, false)) {
if (file.isDirectory()) {
- ParquetTableMetadata_v3 subTableMetadata = (createMetaFilesRecursively(file.getPath().toString(), fs)).getLeft();
+ ParquetTableMetadata_v3 subTableMetadata = (createMetaFilesRecursively(file.getPath(), fs)).getLeft();
metaDataList.addAll(subTableMetadata.files);
directoryList.addAll(subTableMetadata.directories);
- directoryList.add(file.getPath().toString());
+ directoryList.add(file.getPath());
// Merge the schema from the child level into the current level
//TODO: We need a merge method that merges two columns with the same name but different types
columnTypeInfoSet.putAll(subTableMetadata.columnTypeInfo);
@@ -268,7 +272,7 @@ public class Metadata {
ParquetTableMetadataDirs parquetTableMetadataDirs = new ParquetTableMetadataDirs(directoryList);
return Pair.of(parquetTableMetadata, parquetTableMetadataDirs);
}
- List<String> emptyDirList = Lists.newArrayList();
+ List<Path> emptyDirList = new ArrayList<>();
if (timer != null) {
logger.debug("Creating metadata files recursively took {} ms", timer.elapsed(TimeUnit.MILLISECONDS));
timer.stop();
@@ -495,7 +499,7 @@ public class Metadata {
rowGroupMetadataList.add(rowGroupMeta);
}
- String path = Path.getPathWithoutSchemeAndAuthority(file.getPath()).toString();
+ Path path = Path.getPathWithoutSchemeAndAuthority(file.getPath());
return new ParquetFileMetadata_v3(path, file.getLen(), rowGroupMetadataList);
}
@@ -535,6 +539,8 @@ public class Metadata {
*
* @param parquetTableMetadata parquet table metadata
* @param p file path
+ * @param fs Drill file system
+ * @throws IOException if metadata can't be serialized
*/
private void writeFile(ParquetTableMetadata_v3 parquetTableMetadata, Path p, FileSystem fs) throws IOException {
JsonFactory jsonFactory = new JsonFactory();
@@ -542,6 +548,7 @@ public class Metadata {
jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
ObjectMapper mapper = new ObjectMapper(jsonFactory);
SimpleModule module = new SimpleModule();
+ module.addSerializer(Path.class, new PathSerDe.Se());
module.addSerializer(ColumnMetadata_v3.class, new ColumnMetadata_v3.Serializer());
mapper.registerModule(module);
OutputStream os = fs.create(p);
@@ -556,6 +563,7 @@ public class Metadata {
jsonFactory.configure(JsonParser.Feature.AUTO_CLOSE_SOURCE, false);
ObjectMapper mapper = new ObjectMapper(jsonFactory);
SimpleModule module = new SimpleModule();
+ module.addSerializer(Path.class, new PathSerDe.Se());
mapper.registerModule(module);
OutputStream os = fs.create(p);
mapper.writerWithDefaultPrettyPrinter().writeValue(os, parquetTableMetadataDirs);
@@ -602,7 +610,7 @@ public class Metadata {
parquetTableMetadataDirs.updateRelativePaths(metadataParentDirPath);
if (!alreadyCheckedModification && tableModified(parquetTableMetadataDirs.getDirectories(), path, metadataParentDir, metaContext, fs)) {
parquetTableMetadataDirs =
- (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()).toString(), fs)).getRight();
+ (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs)).getRight();
newMetadata = true;
}
} else {
@@ -616,7 +624,7 @@ public class Metadata {
}
if (!alreadyCheckedModification && tableModified(parquetTableMetadata.getDirectories(), path, metadataParentDir, metaContext, fs)) {
parquetTableMetadata =
- (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()).toString(), fs)).getLeft();
+ (createMetaFilesRecursively(Path.getPathWithoutSchemeAndAuthority(path.getParent()), fs)).getLeft();
newMetadata = true;
}
@@ -647,9 +655,10 @@ public class Metadata {
* @return true if metadata needs to be updated, false otherwise
* @throws IOException if some resources are not accessible
*/
- private boolean tableModified(List<String> directories, Path metaFilePath, Path parentDir, MetadataContext metaContext, FileSystem fs) throws IOException {
+ private boolean tableModified(List<Path> directories, Path metaFilePath, Path parentDir,
+ MetadataContext metaContext, FileSystem fs) throws IOException {
Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
- metaContext.setStatus(parentDir.toUri().getPath());
+ metaContext.setStatus(parentDir);
long metaFileModifyTime = fs.getFileStatus(metaFilePath).getModificationTime();
FileStatus directoryStatus = fs.getFileStatus(parentDir);
int numDirs = 1;
@@ -661,10 +670,10 @@ public class Metadata {
}
return true;
}
- for (String directory : directories) {
+ for (Path directory : directories) {
numDirs++;
metaContext.setStatus(directory);
- directoryStatus = fs.getFileStatus(new Path(directory));
+ directoryStatus = fs.getFileStatus(directory);
if (directoryStatus.getModificationTime() > metaFileModifyTime) {
if (timer != null) {
logger.debug("Directory {} was modified. Took {} ms to check modification time of {} directories",
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java
index bed8be67a..ee07470d2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataBase.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.store.parquet.metadata;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.hadoop.fs.Path;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
@@ -57,7 +58,7 @@ public class MetadataBase {
public static abstract class ParquetTableMetadataBase {
@JsonIgnore
- public abstract List<String> getDirectories();
+ public abstract List<Path> getDirectories();
@JsonIgnore public abstract List<? extends ParquetFileMetadata> getFiles();
@@ -83,7 +84,7 @@ public class MetadataBase {
}
public static abstract class ParquetFileMetadata {
- @JsonIgnore public abstract String getPath();
+ @JsonIgnore public abstract Path getPath();
@JsonIgnore public abstract Long getLength();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java
index 3e7c2ffcc..2794e2b14 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/MetadataPathUtils.java
@@ -21,6 +21,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.common.util.DrillVersionInfo;
import org.apache.hadoop.fs.Path;
+import java.util.ArrayList;
import java.util.List;
import static org.apache.drill.exec.store.parquet.metadata.MetadataVersion.Constants.SUPPORTED_VERSIONS;
@@ -39,12 +40,12 @@ public class MetadataPathUtils {
* @param baseDir base parent directory
* @return list of absolute paths
*/
- public static List<String> convertToAbsolutePaths(List<String> paths, String baseDir) {
+ public static List<Path> convertToAbsolutePaths(List<Path> paths, String baseDir) {
if (!paths.isEmpty()) {
- List<String> absolutePaths = Lists.newArrayList();
- for (String relativePath : paths) {
- String absolutePath = (new Path(relativePath).isAbsolute()) ? relativePath
- : new Path(baseDir, relativePath).toUri().getPath();
+ List<Path> absolutePaths = Lists.newArrayList();
+ for (Path relativePath : paths) {
+ Path absolutePath = (relativePath.isAbsolute()) ? relativePath
+ : new Path(baseDir, relativePath);
absolutePaths.add(absolutePath);
}
return absolutePaths;
@@ -64,10 +65,10 @@ public class MetadataPathUtils {
if (!files.isEmpty()) {
List<ParquetFileMetadata_v3> filesWithAbsolutePaths = Lists.newArrayList();
for (ParquetFileMetadata_v3 file : files) {
- Path relativePath = new Path(file.getPath());
+ Path relativePath = file.getPath();
// create a new file if old one contains a relative path, otherwise use an old file
ParquetFileMetadata_v3 fileWithAbsolutePath = (relativePath.isAbsolute()) ? file
- : new ParquetFileMetadata_v3(new Path(baseDir, relativePath).toUri().getPath(), file.length, file.rowGroups);
+ : new ParquetFileMetadata_v3(new Path(baseDir, relativePath), file.length, file.rowGroups);
filesWithAbsolutePaths.add(fileWithAbsolutePath);
}
return filesWithAbsolutePaths;
@@ -84,9 +85,9 @@ public class MetadataPathUtils {
* @return parquet table metadata with relative paths for the files and directories
*/
public static ParquetTableMetadata_v3 createMetadataWithRelativePaths(
- ParquetTableMetadata_v3 tableMetadataWithAbsolutePaths, String baseDir) {
- List<String> directoriesWithRelativePaths = Lists.newArrayList();
- for (String directory : tableMetadataWithAbsolutePaths.getDirectories()) {
+ ParquetTableMetadata_v3 tableMetadataWithAbsolutePaths, Path baseDir) {
+ List<Path> directoriesWithRelativePaths = new ArrayList<>();
+ for (Path directory : tableMetadataWithAbsolutePaths.getDirectories()) {
directoriesWithRelativePaths.add(relativize(baseDir, directory));
}
List<ParquetFileMetadata_v3> filesWithRelativePaths = Lists.newArrayList();
@@ -105,9 +106,9 @@ public class MetadataPathUtils {
* @param baseDir base path (the part of the Path, which should be cut off from child path)
* @return relative path
*/
- public static String relativize(String baseDir, String childPath) {
- Path fullPathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(new Path(childPath));
- Path basePathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(new Path(baseDir));
+ public static Path relativize(Path baseDir, Path childPath) {
+ Path fullPathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(childPath);
+ Path basePathWithoutSchemeAndAuthority = Path.getPathWithoutSchemeAndAuthority(baseDir);
// Since hadoop Path hasn't relativize() we use uri.relativize() to get relative path
Path relativeFilePath = new Path(basePathWithoutSchemeAndAuthority.toUri()
@@ -116,7 +117,7 @@ public class MetadataPathUtils {
throw new IllegalStateException(String.format("Path %s is not a subpath of %s.",
basePathWithoutSchemeAndAuthority.toUri().getPath(), fullPathWithoutSchemeAndAuthority.toUri().getPath()));
}
- return relativeFilePath.toUri().getPath();
+ return relativeFilePath;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V1.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V1.java
index 92feb5f8a..4b0dca803 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V1.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V1.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.hadoop.fs.Path;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
@@ -43,19 +44,19 @@ public class Metadata_V1 {
@JsonProperty(value = "metadata_version", access = JsonProperty.Access.WRITE_ONLY) private String metadataVersion;
@JsonProperty
List<ParquetFileMetadata_v1> files;
- @JsonProperty List<String> directories;
+ @JsonProperty List<Path> directories;
public ParquetTableMetadata_v1() {
}
- public ParquetTableMetadata_v1(String metadataVersion, List<ParquetFileMetadata_v1> files, List<String> directories) {
+ public ParquetTableMetadata_v1(String metadataVersion, List<ParquetFileMetadata_v1> files, List<Path> directories) {
this.metadataVersion = metadataVersion;
this.files = files;
this.directories = directories;
}
@JsonIgnore
- @Override public List<String> getDirectories() {
+ @Override public List<Path> getDirectories() {
return directories;
}
@@ -114,7 +115,7 @@ public class Metadata_V1 {
*/
public static class ParquetFileMetadata_v1 extends ParquetFileMetadata {
@JsonProperty
- public String path;
+ public Path path;
@JsonProperty
public Long length;
@JsonProperty
@@ -123,7 +124,7 @@ public class Metadata_V1 {
public ParquetFileMetadata_v1() {
}
- public ParquetFileMetadata_v1(String path, Long length, List<RowGroupMetadata_v1> rowGroups) {
+ public ParquetFileMetadata_v1(Path path, Long length, List<RowGroupMetadata_v1> rowGroups) {
this.path = path;
this.length = length;
this.rowGroups = rowGroups;
@@ -134,7 +135,7 @@ public class Metadata_V1 {
return String.format("path: %s rowGroups: %s", path, rowGroups);
}
- @JsonIgnore @Override public String getPath() {
+ @JsonIgnore @Override public Path getPath() {
return path;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V2.java
index 7eddc1279..a78fca4bf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V2.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V2.java
@@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.KeyDeserializer;
import com.fasterxml.jackson.databind.SerializerProvider;
+import org.apache.hadoop.fs.Path;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
@@ -59,7 +60,7 @@ public class Metadata_V2 {
@JsonProperty public ConcurrentHashMap<ColumnTypeMetadata_v2.Key, ColumnTypeMetadata_v2> columnTypeInfo;
@JsonProperty
List<ParquetFileMetadata_v2> files;
- @JsonProperty List<String> directories;
+ @JsonProperty List<Path> directories;
@JsonProperty String drillVersion;
public ParquetTableMetadata_v2() {
@@ -71,7 +72,7 @@ public class Metadata_V2 {
}
public ParquetTableMetadata_v2(String metadataVersion, ParquetTableMetadataBase parquetTable,
- List<ParquetFileMetadata_v2> files, List<String> directories, String drillVersion) {
+ List<ParquetFileMetadata_v2> files, List<Path> directories, String drillVersion) {
this.metadataVersion = metadataVersion;
this.files = files;
this.directories = directories;
@@ -79,7 +80,7 @@ public class Metadata_V2 {
this.drillVersion = drillVersion;
}
- public ParquetTableMetadata_v2(String metadataVersion, List<ParquetFileMetadata_v2> files, List<String> directories,
+ public ParquetTableMetadata_v2(String metadataVersion, List<ParquetFileMetadata_v2> files, List<Path> directories,
ConcurrentHashMap<ColumnTypeMetadata_v2.Key, ColumnTypeMetadata_v2> columnTypeInfo, String drillVersion) {
this.metadataVersion = metadataVersion;
this.files = files;
@@ -93,7 +94,7 @@ public class Metadata_V2 {
}
@JsonIgnore
- @Override public List<String> getDirectories() {
+ @Override public List<Path> getDirectories() {
return directories;
}
@@ -152,14 +153,14 @@ public class Metadata_V2 {
* Struct which contains the metadata for a single parquet file
*/
public static class ParquetFileMetadata_v2 extends ParquetFileMetadata {
- @JsonProperty public String path;
+ @JsonProperty public Path path;
@JsonProperty public Long length;
@JsonProperty public List<RowGroupMetadata_v2> rowGroups;
public ParquetFileMetadata_v2() {
}
- public ParquetFileMetadata_v2(String path, Long length, List<RowGroupMetadata_v2> rowGroups) {
+ public ParquetFileMetadata_v2(Path path, Long length, List<RowGroupMetadata_v2> rowGroups) {
this.path = path;
this.length = length;
this.rowGroups = rowGroups;
@@ -169,7 +170,7 @@ public class Metadata_V2 {
return String.format("path: %s rowGroups: %s", path, rowGroups);
}
- @JsonIgnore @Override public String getPath() {
+ @JsonIgnore @Override public Path getPath() {
return path;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V3.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V3.java
index 4bb07f78f..a5ff89795 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V3.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/Metadata_V3.java
@@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.KeyDeserializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.hadoop.fs.Path;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
@@ -54,7 +55,7 @@ public class Metadata_V3 {
@JsonProperty public ConcurrentHashMap<ColumnTypeMetadata_v3.Key, ColumnTypeMetadata_v3> columnTypeInfo;
@JsonProperty
List<ParquetFileMetadata_v3> files;
- @JsonProperty List<String> directories;
+ @JsonProperty List<Path> directories;
@JsonProperty String drillVersion;
/**
@@ -74,7 +75,7 @@ public class Metadata_V3 {
}
public ParquetTableMetadata_v3(String metadataVersion, ParquetTableMetadataBase parquetTable,
- List<ParquetFileMetadata_v3> files, List<String> directories, String drillVersion) {
+ List<ParquetFileMetadata_v3> files, List<Path> directories, String drillVersion) {
this.metadataVersion = metadataVersion;
this.files = files;
this.directories = directories;
@@ -82,7 +83,7 @@ public class Metadata_V3 {
this.drillVersion = drillVersion;
}
- public ParquetTableMetadata_v3(String metadataVersion, List<ParquetFileMetadata_v3> files, List<String> directories,
+ public ParquetTableMetadata_v3(String metadataVersion, List<ParquetFileMetadata_v3> files, List<Path> directories,
ConcurrentHashMap<ColumnTypeMetadata_v3.Key, ColumnTypeMetadata_v3> columnTypeInfo,
String drillVersion) {
this.metadataVersion = metadataVersion;
@@ -97,7 +98,7 @@ public class Metadata_V3 {
}
@JsonIgnore
- @Override public List<String> getDirectories() {
+ @Override public List<Path> getDirectories() {
return directories;
}
@@ -168,14 +169,14 @@ public class Metadata_V3 {
* Struct which contains the metadata for a single parquet file
*/
public static class ParquetFileMetadata_v3 extends ParquetFileMetadata {
- @JsonProperty public String path;
+ @JsonProperty public Path path;
@JsonProperty public Long length;
@JsonProperty public List<RowGroupMetadata_v3> rowGroups;
public ParquetFileMetadata_v3() {
}
- public ParquetFileMetadata_v3(String path, Long length, List<RowGroupMetadata_v3> rowGroups) {
+ public ParquetFileMetadata_v3(Path path, Long length, List<RowGroupMetadata_v3> rowGroups) {
this.path = path;
this.length = length;
this.rowGroups = rowGroups;
@@ -185,7 +186,7 @@ public class Metadata_V3 {
return String.format("path: %s rowGroups: %s", path, rowGroups);
}
- @JsonIgnore @Override public String getPath() {
+ @JsonIgnore @Override public Path getPath() {
return path;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/ParquetTableMetadataDirs.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/ParquetTableMetadataDirs.java
index 186f53415..b1fd7f23e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/ParquetTableMetadataDirs.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/metadata/ParquetTableMetadataDirs.java
@@ -19,24 +19,25 @@ package org.apache.drill.exec.store.parquet.metadata;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.hadoop.fs.Path;
import java.util.List;
public class ParquetTableMetadataDirs {
@JsonProperty
- List<String> directories;
+ List<Path> directories;
public ParquetTableMetadataDirs() {
// default constructor needed for deserialization
}
- public ParquetTableMetadataDirs(List<String> directories) {
+ public ParquetTableMetadataDirs(List<Path> directories) {
this.directories = directories;
}
@JsonIgnore
- public List<String> getDirectories() {
+ public List<Path> getDirectories() {
return directories;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 09c016a5f..5c49e040a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -235,7 +235,7 @@ public class DrillParquetReader extends AbstractRecordReader {
paths.put(md.getPath(), md);
}
- Path filePath = new Path(entry.getPath());
+ Path filePath = entry.getPath();
BlockMetaData blockMetaData = footer.getBlocks().get(entry.getRowGroupIndex());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
index d688f3b07..aef56a307 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
@@ -92,11 +92,11 @@ public class PcapRecordReader extends AbstractRecordReader {
.build();
}
- public PcapRecordReader(final String pathToFile,
+ public PcapRecordReader(final Path pathToFile,
final FileSystem fileSystem,
final List<SchemaPath> projectedColumns) {
this.fs = fileSystem;
- this.pathToFile = fs.makeQualified(new Path(pathToFile));
+ this.pathToFile = fs.makeQualified(pathToFile);
this.projectedColumns = projectedColumns;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java
index b1c5f2427..0ad234de7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java
@@ -65,11 +65,11 @@ public class PcapngRecordReader extends AbstractRecordReader {
private Iterator<IPcapngType> it;
- public PcapngRecordReader(final String pathToFile,
+ public PcapngRecordReader(final Path pathToFile,
final FileSystem fileSystem,
final List<SchemaPath> columns) {
this.fs = fileSystem;
- this.pathToFile = fs.makeQualified(new Path(pathToFile));
+ this.pathToFile = fs.makeQualified(pathToFile);
this.columns = columns;
setColumns(columns);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
index 7de31a063..6bc7bb02a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/BlockMapBuilder.java
@@ -116,8 +116,8 @@ public class BlockMapBuilder {
try {
ImmutableRangeMap<Long, BlockLocation> rangeMap = getBlockMap(status);
for (Entry<Range<Long>, BlockLocation> l : rangeMap.asMapOfRanges().entrySet()) {
- work.add(new CompleteFileWork(getEndpointByteMap(noDrillbitHosts, new FileStatusWork(status)), l.getValue().getOffset(), l.getValue().getLength(), status.getPath()
- .toString()));
+ work.add(new CompleteFileWork(getEndpointByteMap(noDrillbitHosts, new FileStatusWork(status)),
+ l.getValue().getOffset(), l.getValue().getLength(), status.getPath()));
}
} catch (IOException e) {
logger.warn("failure while generating file work.", e);
@@ -127,7 +127,8 @@ public class BlockMapBuilder {
if (!blockify || error || compressed(status)) {
- work.add(new CompleteFileWork(getEndpointByteMap(noDrillbitHosts, new FileStatusWork(status)), 0, status.getLen(), status.getPath().toString()));
+ work.add(new CompleteFileWork(getEndpointByteMap(noDrillbitHosts, new FileStatusWork(status)), 0,
+ status.getLen(), status.getPath()));
}
// This if-condition is specific for empty CSV file
@@ -135,7 +136,8 @@ public class BlockMapBuilder {
// And if this CSV file is empty, rangeMap would be empty also
// Therefore, at the point before this if-condition, work would not be populated
if(work.isEmpty()) {
- work.add(new CompleteFileWork(getEndpointByteMap(noDrillbitHosts, new FileStatusWork(status)), 0, 0, status.getPath().toString()));
+ work.add(new CompleteFileWork(getEndpointByteMap(noDrillbitHosts, new FileStatusWork(status)), 0, 0,
+ status.getPath()));
}
if (noDrillbitHosts != null) {
@@ -162,8 +164,8 @@ public class BlockMapBuilder {
}
@Override
- public String getPath() {
- return status.getPath().toString();
+ public Path getPath() {
+ return status.getPath();
}
@Override
@@ -231,20 +233,20 @@ public class BlockMapBuilder {
*/
public EndpointByteMap getEndpointByteMap(Set<String> noDrillbitHosts, FileWork work) throws IOException {
Stopwatch watch = Stopwatch.createStarted();
- Path fileName = new Path(work.getPath());
+ Path fileName = work.getPath();
- ImmutableRangeMap<Long,BlockLocation> blockMap = getBlockMap(fileName);
+ ImmutableRangeMap<Long, BlockLocation> blockMap = getBlockMap(fileName);
EndpointByteMapImpl endpointByteMap = new EndpointByteMapImpl();
long start = work.getStart();
long end = start + work.getLength();
Range<Long> rowGroupRange = Range.closedOpen(start, end);
// Find submap of ranges that intersect with the rowGroup
- ImmutableRangeMap<Long,BlockLocation> subRangeMap = blockMap.subRangeMap(rowGroupRange);
+ ImmutableRangeMap<Long, BlockLocation> subRangeMap = blockMap.subRangeMap(rowGroupRange);
// Iterate through each block in this submap and get the host for the block location
- for (Map.Entry<Range<Long>,BlockLocation> block : subRangeMap.asMapOfRanges().entrySet()) {
+ for (Map.Entry<Range<Long>, BlockLocation> block : subRangeMap.asMapOfRanges().entrySet()) {
String[] hosts;
Range<Long> blockRange = block.getKey();
try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
index 04c4eb0db..4b0402bd8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/CompleteFileWork.java
@@ -21,16 +21,17 @@ import org.apache.drill.exec.store.dfs.easy.FileWork;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.hadoop.fs.Path;
public class CompleteFileWork implements FileWork, CompleteWork {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CompleteFileWork.class);
private long start;
private long length;
- private String path;
+ private Path path;
private EndpointByteMap byteMap;
- public CompleteFileWork(EndpointByteMap byteMap, long start, long length, String path) {
+ public CompleteFileWork(EndpointByteMap byteMap, long start, long length, Path path) {
super();
this.start = start;
this.length = length;
@@ -69,7 +70,7 @@ public class CompleteFileWork implements FileWork, CompleteWork {
}
@Override
- public String getPath() {
+ public Path getPath() {
return path;
}
@@ -87,22 +88,28 @@ public class CompleteFileWork implements FileWork, CompleteWork {
return new FileWorkImpl(start, length, path);
}
- public static class FileWorkImpl implements FileWork{
+ @Override
+ public String toString() {
+ return String.format("File: %s start: %d length: %d", path, start, length);
+ }
+
+ public static class FileWorkImpl implements FileWork {
+
+ private long start;
+ private long length;
+ private Path path;
@JsonCreator
- public FileWorkImpl(@JsonProperty("start") long start, @JsonProperty("length") long length, @JsonProperty("path") String path) {
- super();
+ public FileWorkImpl(@JsonProperty("start") long start,
+ @JsonProperty("length") long length,
+ @JsonProperty("path") Path path) {
this.start = start;
this.length = length;
this.path = path;
}
- public long start;
- public long length;
- public String path;
-
@Override
- public String getPath() {
+ public Path getPath() {
return path;
}
@@ -116,10 +123,13 @@ public class CompleteFileWork implements FileWork, CompleteWork {
return length;
}
- }
-
- @Override
- public String toString() {
- return String.format("File: %s start: %d length: %d", path, start, length);
+ @Override
+ public String toString() {
+ return "FileWorkImpl{" +
+ "start=" + start +
+ ", length=" + length +
+ ", path=" + path +
+ '}';
+ }
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java
index fcee3b0e0..bfb83e07c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/DrillFileSystemUtil.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.util;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.shaded.guava.com.google.common.base.Strings;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -133,4 +134,14 @@ public class DrillFileSystemUtil {
return FileSystemUtil.listAllSafe(fs, path, recursive, FileSystemUtil.mergeFilters(DRILL_SYSTEM_FILTER, filters));
}
+ /**
+ * Safely creates Hadoop Path for null and empty String paths
+ *
+ * @param path String path, which can be null or empty
+ * @return Hadoop Path. Root - for empty or null path
+ */
+ public static Path createPathSafe(String path) {
+ return Strings.isNullOrEmpty(path) ? new Path("/") : new Path(path);
+ }
+
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
index 9e0d95c85..247d7842e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java
@@ -143,8 +143,7 @@ public class PlanTestBase extends BaseTestQuery {
* planning process throws an exception
*/
public static void testPlanWithAttributesMatchingPatterns(String query, String[] expectedPatterns,
- String[] excludedPatterns)
- throws Exception {
+ String[] excludedPatterns) throws Exception {
final String plan = getPlanInString("EXPLAIN PLAN INCLUDING ALL ATTRIBUTES for " +
QueryTestUtil.normalizeQuery(query), OPTIQ_FORMAT);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestPathSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestPathSerialization.java
new file mode 100644
index 000000000..cb9356568
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestPathSerialization.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec;
+
+
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.drill.exec.serialization.PathSerDe;
+import org.apache.drill.exec.store.schedule.CompleteFileWork;
+import org.apache.drill.test.DrillTest;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+public class TestPathSerialization extends DrillTest {
+
+ @Test
+ public void testDeSerializingWithJsonCreator() throws IOException {
+
+ String jsonString = "{\"start\": 1, \"length\": 2, \"path\": \"/tmp/drill/test\"}";
+
+ SimpleModule module = new SimpleModule();
+ module.addSerializer(Path.class, new PathSerDe.Se());
+ objectMapper.registerModule(module);
+
+ CompleteFileWork.FileWorkImpl bean = objectMapper.readValue(jsonString, CompleteFileWork.FileWorkImpl.class);
+
+ assertThat(bean.getStart() == 1, equalTo( true ));
+ assertThat(bean.getLength() == 2, equalTo( true ));
+ assertThat(bean.getPath().equals(new Path("/tmp/drill/test")), equalTo( true ));
+ }
+
+ @Test
+ public void testHadoopPathSerDe() throws IOException {
+ CompleteFileWork.FileWorkImpl fileWork = new CompleteFileWork.FileWorkImpl(5, 6, new Path("/tmp"));
+ SimpleModule module = new SimpleModule();
+ module.addSerializer(Path.class, new PathSerDe.Se());
+ objectMapper.registerModule(module);
+
+ CompleteFileWork.FileWorkImpl bean =
+ objectMapper.readValue(objectMapper.writeValueAsString(fileWork), CompleteFileWork.FileWorkImpl.class);
+
+ assertThat(bean.getStart() == 5, equalTo( true ));
+ assertThat(bean.getLength() == 6, equalTo( true ));
+ assertThat(bean.getPath().equals(new Path("/tmp")), equalTo( true ));
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
index 2fdf3e637..b05bb28f3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/scan/TestFileScanFramework.java
@@ -85,7 +85,7 @@ public class TestFileScanFramework extends SubOperatorTest {
}
@Override
- public String getPath() { return path.toString(); }
+ public Path getPath() { return path; }
@Override
public long getStart() { return 0; }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
index 9a2bb1f85..911a09744 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/MiniPlanUnitTestBase.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
+import org.apache.drill.test.LegacyOperatorTestBuilder;
import org.apache.drill.test.PhysicalOpUnitTestBase;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.CodecFactory;
@@ -358,7 +359,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
*/
public class JsonScanBuilder extends ScanPopBuider<JsonScanBuilder> {
List<String> jsonBatches = null;
- List<String> inputPaths = Collections.emptyList();
+ List<Path> inputPaths = Collections.emptyList();
public JsonScanBuilder(PopBuilder parent) {
super(parent);
@@ -373,7 +374,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
return this;
}
- public JsonScanBuilder inputPaths(List<String> inputPaths) {
+ public JsonScanBuilder inputPaths(List<Path> inputPaths) {
this.inputPaths = inputPaths;
return this;
}
@@ -412,7 +413,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
* Builder for parquet Scan RecordBatch.
*/
public class ParquetScanBuilder extends ScanPopBuider<ParquetScanBuilder> {
- List<String> inputPaths = Collections.emptyList();
+ List<Path> inputPaths = Collections.emptyList();
public ParquetScanBuilder() {
super();
@@ -422,7 +423,7 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
super(parent);
}
- public ParquetScanBuilder inputPaths(List<String> inputPaths) {
+ public ParquetScanBuilder inputPaths(List<Path> inputPaths) {
this.inputPaths = inputPaths;
return this;
}
@@ -443,8 +444,8 @@ public class MiniPlanUnitTestBase extends PhysicalOpUnitTestBase {
private RecordBatch getScanBatch() throws Exception {
List<RecordReader> readers = new LinkedList<>();
- for (String path : inputPaths) {
- ParquetMetadata footer = ParquetFileReader.readFooter(fs.getConf(), new Path(path));
+ for (Path path : inputPaths) {
+ ParquetMetadata footer = ParquetFileReader.readFooter(fs.getConf(), path);
for (int i = 0; i < footer.getBlocks().size(); i++) {
readers.add(new ParquetRecordReader(fragContext,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java
index cc259cc1b..a1b700153 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestMiniPlan.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -59,11 +60,11 @@ public class TestMiniPlan extends MiniPlanUnitTestBase {
@Test
public void testSimpleParquetScan() throws Exception {
String file = DrillFileUtils.getResourceAsFile("/tpchmulti/region/01.parquet").toURI().toString();
-
+ List<Path> filePath = Collections.singletonList(new Path(file));
RecordBatch scanBatch = new ParquetScanBuilder()
.fileSystem(fs)
.columnsToRead("R_REGIONKEY")
- .inputPaths(Lists.newArrayList(file))
+ .inputPaths(filePath)
.build();
BatchSchema expectedSchema = new SchemaBuilder()
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java
index bdff80b14..69a421481 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/unit/TestNullInputMiniPlan.java
@@ -41,6 +41,7 @@ import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
@@ -272,11 +273,12 @@ public class TestNullInputMiniPlan extends MiniPlanUnitTestBase{
RecordBatch left = createScanBatchFromJson(SINGLE_EMPTY_JSON);
String file = DrillFileUtils.getResourceAsFile("/tpchmulti/region/01.parquet").toURI().toString();
+ List<Path> filePath = Collections.singletonList(new Path(file));
RecordBatch scanBatch = new ParquetScanBuilder()
.fileSystem(fs)
.columnsToRead("R_REGIONKEY")
- .inputPaths(Lists.newArrayList(file))
+ .inputPaths(filePath)
.build();
RecordBatch projectBatch = new PopBuilder()
@@ -554,10 +556,10 @@ public class TestNullInputMiniPlan extends MiniPlanUnitTestBase{
}
private RecordBatch createScanBatchFromJson(String... resourcePaths) throws Exception {
- List<String> inputPaths = new ArrayList<>();
+ List<Path> inputPaths = new ArrayList<>();
for (String resource : resourcePaths) {
- inputPaths.add(DrillFileUtils.getResourceAsFile(resource).toURI().toString());
+ inputPaths.add(new Path(DrillFileUtils.getResourceAsFile(resource).toURI()));
}
RecordBatch scanBatch = new JsonScanBuilder()
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/CachedSingleFileSystem.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/CachedSingleFileSystem.java
index 6bd7e4de7..3db92564d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/CachedSingleFileSystem.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/CachedSingleFileSystem.java
@@ -41,16 +41,16 @@ import org.apache.hadoop.util.Progressable;
public class CachedSingleFileSystem extends FileSystem {
private ByteBuf file;
- private String path;
+ private Path path;
- public CachedSingleFileSystem(String path) throws IOException {
+ public CachedSingleFileSystem(Path path) throws IOException {
this.path = path;
- File f = new File(path);
+ File f = new File(path.toUri().getPath());
long length = f.length();
if (length > Integer.MAX_VALUE) {
throw new UnsupportedOperationException("Cached file system only supports files of less than 2GB.");
}
- try (InputStream is = new BufferedInputStream(new FileInputStream(path))) {
+ try (InputStream is = new BufferedInputStream(new FileInputStream(path.toUri().getPath()))) {
byte[] buffer = new byte[64*1024];
this.file = UnpooledByteBufAllocator.DEFAULT.directBuffer((int) length);
int read;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDFSPartitionLocation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDFSPartitionLocation.java
new file mode 100644
index 000000000..067abece3
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDFSPartitionLocation.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import org.apache.drill.exec.planner.DFSFilePartitionLocation;
+import org.apache.drill.test.DrillTest;
+import org.apache.hadoop.fs.Path;
+import org.junit.Test;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+public class TestDFSPartitionLocation extends DrillTest {
+
+ private static final Path SELECTION_ROOT = new Path("/tmp/drill");
+ private static final Path PARTITION = new Path("/tmp/drill/test_table/first_dir/second_dir/");
+
+ @Test
+ public void testDFSFilePartitionLocation() {
+ Path file = new Path(PARTITION, "0_0_0.parquet");
+ DFSFilePartitionLocation dfsPartition = new DFSFilePartitionLocation(4, SELECTION_ROOT, file, false);
+ checkSubdirectories(dfsPartition, file);
+ }
+
+ @Test
+ public void testDFSDirectoryPartitionLocation() {
+ DFSFilePartitionLocation dfsPartition = new DFSFilePartitionLocation(4, SELECTION_ROOT, PARTITION, true);
+ checkSubdirectories(dfsPartition, PARTITION);
+ }
+
+ private void checkSubdirectories(DFSFilePartitionLocation dfsPartition, Path partition) {
+ assertArrayEquals("Wrong partition dirs", new String[]{"test_table", "first_dir", "second_dir", null}, dfsPartition.getDirs());
+ assertEquals("Wrong partition value","test_table", dfsPartition.getPartitionValue(0));
+ assertEquals("Wrong partition value", "first_dir", dfsPartition.getPartitionValue(1));
+ assertEquals("Wrong partition value", "second_dir", dfsPartition.getPartitionValue(2));
+ assertEquals("Wrong partition location", partition, dfsPartition.getEntirePartitionLocation());
+ }
+
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFileSelection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFileSelection.java
index b1f233e94..b853b9b8a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFileSelection.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFileSelection.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertNull;
import java.util.List;
+import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
import org.apache.drill.test.BaseTestQuery;
import org.apache.hadoop.fs.FileStatus;
@@ -30,7 +31,7 @@ import org.junit.Test;
public class TestFileSelection extends BaseTestQuery {
private static final List<FileStatus> EMPTY_STATUSES = ImmutableList.of();
- private static final List<String> EMPTY_FILES = ImmutableList.of();
+ private static final List<Path> EMPTY_FILES = ImmutableList.of();
private static final String EMPTY_ROOT = "";
@Test
@@ -38,8 +39,8 @@ public class TestFileSelection extends BaseTestQuery {
for (final Object statuses : new Object[] { null, EMPTY_STATUSES}) {
for (final Object files : new Object[]{null, EMPTY_FILES}) {
for (final Object root : new Object[]{null, EMPTY_ROOT}) {
- final FileSelection selection = FileSelection.create((List<FileStatus>) statuses, (List<String>) files,
- (String)root);
+ FileSelection selection = FileSelection.create((List<FileStatus>) statuses, (List<Path>) files,
+ DrillFileSystemUtil.createPathSafe((String) root));
assertNull(selection);
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
index 1bd90b37e..16b25ce13 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetRecordReaderTest.java
@@ -610,13 +610,13 @@ public class ParquetRecordReaderTest extends BaseTestQuery {
final FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c);
final FragmentContextImpl context = new FragmentContextImpl(bitContext, BitControl.PlanFragment.getDefaultInstance(), connection, registry);
- final String fileName = "/tmp/parquet_test_performance.parquet";
+ final Path fileName = new Path("/tmp/parquet_test_performance.parquet");
final HashMap<String, FieldInfo> fields = new HashMap<>();
final ParquetTestProperties props = new ParquetTestProperties(1, 20 * 1000 * 1000, DEFAULT_BYTES_PER_PAGE, fields);
populateFieldInfoMap(props);
final Configuration dfsConfig = new Configuration();
- final List<Footer> footers = ParquetFileReader.readFooters(dfsConfig, new Path(fileName));
+ final List<Footer> footers = ParquetFileReader.readFooters(dfsConfig, fileName);
final Footer f = footers.iterator().next();
final List<SchemaPath> columns = Lists.newArrayList();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
index 51e8c1b0a..129796788 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.store.schedule.AssignmentCreator;
import org.apache.drill.exec.store.schedule.CompleteFileWork;
import org.apache.drill.exec.store.schedule.EndpointByteMap;
import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -125,7 +126,7 @@ public class TestAssignment {
private List<CompleteFileWork> generateChunks(int chunks) {
List<CompleteFileWork> chunkList = Lists.newArrayList();
for (int i = 0; i < chunks; i++) {
- CompleteFileWork chunk = new CompleteFileWork(createByteMap(), 0, FILE_SIZE, "file" + i);
+ CompleteFileWork chunk = new CompleteFileWork(createByteMap(), 0, FILE_SIZE, new Path("file", Integer.toString(i)));
chunkList.add(chunk);
}
return chunkList;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
index b0820e926..27a4ad668 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
@@ -56,6 +56,7 @@ import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.rpc.NamedThreadFactory;
+import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -399,9 +400,9 @@ public class PhysicalOpUnitTestBase extends ExecTest {
* @param columnsToRead
* @return The {@link org.apache.drill.exec.store.easy.json.JSONRecordReader} corresponding to each given input path.
*/
- public static Iterator<RecordReader> getJsonReadersFromInputFiles(DrillFileSystem fs, List<String> inputPaths, FragmentContext fragContext, List<SchemaPath> columnsToRead) {
+ public static Iterator<RecordReader> getJsonReadersFromInputFiles(DrillFileSystem fs, List<Path> inputPaths, FragmentContext fragContext, List<SchemaPath> columnsToRead) {
List<RecordReader> readers = new ArrayList<>();
- for (String inputPath : inputPaths) {
+ for (Path inputPath : inputPaths) {
readers.add(new JSONRecordReader(fragContext, inputPath, fs, columnsToRead));
}
return readers.iterator();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
index b4fcedf0f..629714b36 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/QueryBuilder.java
@@ -643,7 +643,7 @@ public class QueryBuilder {
*/
protected String queryPlan(String columnName) throws Exception {
- Preconditions.checkArgument(queryType == QueryType.SQL, "Can only explan an SQL query.");
+ Preconditions.checkArgument(queryType == QueryType.SQL, "Can only explain an SQL query.");
final List<QueryDataBatch> results = results();
final RecordBatchLoader loader = new RecordBatchLoader(client.allocator());
final StringBuilder builder = new StringBuilder();