aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
diff options
context:
space:
mode:
authorMehant Baid <mehantr@gmail.com>2015-07-20 15:44:33 -0700
committerMehant Baid <mehantr@gmail.com>2015-07-30 20:21:14 -0700
commit95d576da572d1dcb96c42f662504d7340cc718cd (patch)
tree55ef4c3ee522c558f7035763ec9499aefcc0e107 /exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
parenta65c0a6d43264f6aac23bf3c95e35d1b60a73666 (diff)
DRILL-3503: Make PruneScanRule pluggable.
Extend PartitionDescriptor to provide functionality needed by PruneScanRule. Removed redundant logic in PruneScanRule.
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java76
1 files changed, 72 insertions, 4 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
index 9ad14b131..9816f140f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java
@@ -17,9 +17,26 @@
*/
package org.apache.drill.exec.planner;
+import java.io.IOException;
+import java.util.BitSet;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
+import com.google.common.base.Charsets;
import com.google.common.collect.Maps;
+import org.apache.calcite.util.BitSets;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.physical.base.FileGroupScan;
+import org.apache.drill.exec.physical.base.GroupScan;
+import org.apache.drill.exec.planner.logical.DrillScanRel;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.exec.store.dfs.FileSelection;
+import org.apache.drill.exec.store.dfs.FormatSelection;
+import org.apache.drill.exec.vector.NullableVarCharVector;
+import org.apache.drill.exec.vector.ValueVector;
// partition descriptor for file system based tables
@@ -30,10 +47,12 @@ public class FileSystemPartitionDescriptor implements PartitionDescriptor {
private final String partitionLabel;
private final int partitionLabelLength;
private final Map<String, Integer> partitions = Maps.newHashMap();
+ private final DrillScanRel scanRel;
- public FileSystemPartitionDescriptor(String partitionLabel) {
- this.partitionLabel = partitionLabel;
+ public FileSystemPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) {
+ this.partitionLabel = settings.getFsPartitionColumnLabel();
this.partitionLabelLength = partitionLabel.length();
+ this.scanRel = scanRel;
for(int i =0; i < 10; i++){
partitions.put(partitionLabel + i, i);
}
@@ -60,9 +79,58 @@ public class FileSystemPartitionDescriptor implements PartitionDescriptor {
return MAX_NESTED_SUBDIRS;
}
- public String getName(int index){
- return partitionLabel + index;
+ @Override
+ public GroupScan createNewGroupScan(List<String> newFiles) throws IOException {
+ final FileSelection newFileSelection = new FileSelection(newFiles, getBaseTableLocation(), true);
+ final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newFileSelection);
+ return newScan;
+ }
+
+ @Override
+ public List<PartitionLocation> getPartitions() {
+ List<String> fileLocations = ((FormatSelection) scanRel.getDrillTable().getSelection()).getAsFiles();
+ List<PartitionLocation> partitions = new LinkedList<>();
+ for (String file: fileLocations) {
+ partitions.add(new DFSPartitionLocation(MAX_NESTED_SUBDIRS, getBaseTableLocation(), file));
+ }
+ return partitions;
+ }
+
+ @Override
+ public void populatePartitionVectors(ValueVector[] vectors, List<PartitionLocation> partitions,
+ BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap) {
+ int record = 0;
+ for (PartitionLocation partitionLocation: partitions) {
+ for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) {
+ if (partitionLocation.getPartitionValue(partitionColumnIndex) == null) {
+ ((NullableVarCharVector) vectors[partitionColumnIndex]).getMutator().setNull(record);
+ } else {
+ byte[] bytes = (partitionLocation.getPartitionValue(partitionColumnIndex)).getBytes(Charsets.UTF_8);
+ ((NullableVarCharVector) vectors[partitionColumnIndex]).getMutator().setSafe(record, bytes, 0, bytes.length);
+ }
+ }
+ record++;
+ }
+
+ for (ValueVector v : vectors) {
+ if (v == null) {
+ continue;
+ }
+ v.getMutator().setValueCount(partitions.size());
+ }
}
+ @Override
+ public TypeProtos.MajorType getVectorType(SchemaPath column, PlannerSettings plannerSettings) {
+ return Types.optional(TypeProtos.MinorType.VARCHAR);
+ }
+ public String getName(int index) {
+ return partitionLabel + index;
+ }
+
+ private String getBaseTableLocation() {
+ final FormatSelection origSelection = (FormatSelection) scanRel.getDrillTable().getSelection();
+ return origSelection.getSelection().selectionRoot;
+ }
}