aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
diff options
context:
space:
mode:
authorSteven Phillips <sphillips@maprtech.com>2014-03-26 11:50:04 -0700
committerJacques Nadeau <jacques@apache.org>2014-04-22 20:06:03 -0700
commit69c571ccd841b7bcda1c38979716862690cba696 (patch)
tree7a4baf1f20b8eadf6bc1555c183767ebec016875 /exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
parent54287d0761f97f337035aa8988faf380178aba08 (diff)
DRILL-468 Support for FileSystem partitions
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java60
1 files changed, 49 insertions, 11 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index d36dbc02f..6278a7974 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -18,33 +18,29 @@
package org.apache.drill.exec.store.parquet;
import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.base.Stopwatch;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.mock.MockScanBatchCreator;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import parquet.hadoop.CodecFactoryExposer;
import parquet.hadoop.ParquetFileReader;
import parquet.hadoop.metadata.ParquetMetadata;
@@ -54,12 +50,34 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
@Override
public RecordBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws ExecutionSetupException {
Preconditions.checkArgument(children.isEmpty());
+ String partitionDesignator = context.getConfig().getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL);
+ List<SchemaPath> columns = rowGroupScan.getColumns();
+
List<RecordReader> readers = Lists.newArrayList();
-
+
+ List<String[]> partitionColumns = Lists.newArrayList();
+ List<Integer> selectedPartitionColumns = Lists.newArrayList();
+ boolean selectAllColumns = false;
+
+ if (columns == null || columns.size() == 0) {
+ selectAllColumns = true;
+ } else {
+ Pattern pattern = Pattern.compile(String.format("%s[0-9]+", partitionDesignator));
+ for (SchemaPath column : columns) {
+ Matcher m = pattern.matcher(column.getAsUnescapedPath());
+ if (m.matches()) {
+ columns.remove(column);
+ selectedPartitionColumns.add(Integer.parseInt(column.getAsUnescapedPath().toString().substring(partitionDesignator.length())));
+ }
+ }
+ }
+
+
FileSystem fs = rowGroupScan.getStorageEngine().getFileSystem().getUnderlying();
// keep footers in a map to avoid re-reading them
Map<String, ParquetMetadata> footers = new HashMap<String, ParquetMetadata>();
+ int numParts = 0;
for(RowGroupReadEntry e : rowGroupScan.getRowGroupReadEntries()){
/*
Here we could store a map from file names to footers, to prevent re-reading the footer for each row group in a file
@@ -81,10 +99,30 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
rowGroupScan.getColumns()
)
);
+ if (rowGroupScan.getSelectionRoot() != null) {
+ String[] r = rowGroupScan.getSelectionRoot().split("/");
+ String[] p = e.getPath().split("/");
+ if (p.length > r.length) {
+ String[] q = ArrayUtils.subarray(p, r.length, p.length - 1);
+ partitionColumns.add(q);
+ numParts = Math.max(numParts, q.length);
+ } else {
+ partitionColumns.add(new String[] {});
+ }
+ } else {
+ partitionColumns.add(new String[] {});
+ }
} catch (IOException e1) {
throw new ExecutionSetupException(e1);
}
}
- return new ScanBatch(context, readers.iterator());
+
+ if (selectAllColumns) {
+ for (int i = 0; i < numParts; i++) {
+ selectedPartitionColumns.add(i);
+ }
+ }
+
+ return new ScanBatch(context, readers.iterator(), partitionColumns, selectedPartitionColumns);
}
}