diff options
author | Steven Phillips <sphillips@maprtech.com> | 2014-03-26 11:50:04 -0700 |
---|---|---|
committer | Jacques Nadeau <jacques@apache.org> | 2014-04-22 20:06:03 -0700 |
commit | 69c571ccd841b7bcda1c38979716862690cba696 (patch) | |
tree | 7a4baf1f20b8eadf6bc1555c183767ebec016875 /exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java | |
parent | 54287d0761f97f337035aa8988faf380178aba08 (diff) |
DRILL-468 Support for FileSystem partitions
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java | 60 |
1 files changed, 49 insertions, 11 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java index d36dbc02f..6278a7974 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java @@ -18,33 +18,29 @@ package org.apache.drill.exec.store.parquet; import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.Date; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; - -import com.google.common.base.Stopwatch; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.commons.lang3.ArrayUtils; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.BatchCreator; import org.apache.drill.exec.physical.impl.ScanBatch; import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.store.RecordReader; -import org.apache.drill.exec.store.mock.MockScanBatchCreator; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import parquet.hadoop.CodecFactoryExposer; import parquet.hadoop.ParquetFileReader; import parquet.hadoop.metadata.ParquetMetadata; @@ -54,12 +50,34 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan @Override public RecordBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupScan, List<RecordBatch> children) throws ExecutionSetupException { Preconditions.checkArgument(children.isEmpty()); + String partitionDesignator = context.getConfig().getString(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL); + List<SchemaPath> columns = rowGroupScan.getColumns(); + List<RecordReader> readers = Lists.newArrayList(); - + + List<String[]> partitionColumns = Lists.newArrayList(); + List<Integer> selectedPartitionColumns = Lists.newArrayList(); + boolean selectAllColumns = false; + + if (columns == null || columns.size() == 0) { + selectAllColumns = true; + } else { + Pattern pattern = Pattern.compile(String.format("%s[0-9]+", partitionDesignator)); + for (SchemaPath column : columns) { + Matcher m = pattern.matcher(column.getAsUnescapedPath()); + if (m.matches()) { + columns.remove(column); + selectedPartitionColumns.add(Integer.parseInt(column.getAsUnescapedPath().toString().substring(partitionDesignator.length()))); + } + } + } + + FileSystem fs = rowGroupScan.getStorageEngine().getFileSystem().getUnderlying(); // keep footers in a map to avoid re-reading them Map<String, ParquetMetadata> footers = new HashMap<String, ParquetMetadata>(); + int numParts = 0; for(RowGroupReadEntry e : rowGroupScan.getRowGroupReadEntries()){ /* Here we could store a map from file names to footers, to prevent re-reading the footer for each row group in a file @@ -81,10 +99,30 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan rowGroupScan.getColumns() ) ); + if (rowGroupScan.getSelectionRoot() != null) { + String[] r = rowGroupScan.getSelectionRoot().split("/"); + String[] p = e.getPath().split("/"); + if (p.length > r.length) { + String[] q = ArrayUtils.subarray(p, r.length, p.length - 1); + partitionColumns.add(q); + numParts = Math.max(numParts, q.length); + } else { + partitionColumns.add(new String[] {}); + } + } else { + partitionColumns.add(new String[] {}); + } } catch (IOException e1) { throw new ExecutionSetupException(e1); } } - return new ScanBatch(context, readers.iterator()); + + if (selectAllColumns) { + for (int i = 0; i < numParts; i++) { + selectedPartitionColumns.add(i); + } + } + + return new ScanBatch(context, readers.iterator(), partitionColumns, selectedPartitionColumns); } } |