diff options
author | Mehant Baid <mehantr@gmail.com> | 2015-07-20 15:44:33 -0700 |
---|---|---|
committer | Mehant Baid <mehantr@gmail.com> | 2015-07-30 20:21:14 -0700 |
commit | 95d576da572d1dcb96c42f662504d7340cc718cd (patch) | |
tree | 55ef4c3ee522c558f7035763ec9499aefcc0e107 /exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java | |
parent | a65c0a6d43264f6aac23bf3c95e35d1b60a73666 (diff) |
DRILL-3503: Make PruneScanRule pluggable.
Extend PartitionDescriptor to provide functionality needed by PruneScanRule.
Removed redundant logic in PruneScanRule.
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java | 76 |
1 files changed, 72 insertions, 4 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java index 9ad14b131..9816f140f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/FileSystemPartitionDescriptor.java @@ -17,9 +17,26 @@ */ package org.apache.drill.exec.planner; +import java.io.IOException; +import java.util.BitSet; +import java.util.LinkedList; +import java.util.List; import java.util.Map; +import com.google.common.base.Charsets; import com.google.common.collect.Maps; +import org.apache.calcite.util.BitSets; +import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.common.types.TypeProtos; +import org.apache.drill.common.types.Types; +import org.apache.drill.exec.physical.base.FileGroupScan; +import org.apache.drill.exec.physical.base.GroupScan; +import org.apache.drill.exec.planner.logical.DrillScanRel; +import org.apache.drill.exec.planner.physical.PlannerSettings; +import org.apache.drill.exec.store.dfs.FileSelection; +import org.apache.drill.exec.store.dfs.FormatSelection; +import org.apache.drill.exec.vector.NullableVarCharVector; +import org.apache.drill.exec.vector.ValueVector; // partition descriptor for file system based tables @@ -30,10 +47,12 @@ public class FileSystemPartitionDescriptor implements PartitionDescriptor { private final String partitionLabel; private final int partitionLabelLength; private final Map<String, Integer> partitions = Maps.newHashMap(); + private final DrillScanRel scanRel; - public FileSystemPartitionDescriptor(String partitionLabel) { - this.partitionLabel = partitionLabel; + public FileSystemPartitionDescriptor(PlannerSettings settings, DrillScanRel scanRel) { + this.partitionLabel = settings.getFsPartitionColumnLabel(); this.partitionLabelLength = partitionLabel.length(); + this.scanRel = scanRel; for(int i =0; i < 10; i++){ partitions.put(partitionLabel + i, i); } @@ -60,9 +79,58 @@ public class FileSystemPartitionDescriptor implements PartitionDescriptor { return MAX_NESTED_SUBDIRS; } - public String getName(int index){ - return partitionLabel + index; + @Override + public GroupScan createNewGroupScan(List<String> newFiles) throws IOException { + final FileSelection newFileSelection = new FileSelection(newFiles, getBaseTableLocation(), true); + final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newFileSelection); + return newScan; + } + + @Override + public List<PartitionLocation> getPartitions() { + List<String> fileLocations = ((FormatSelection) scanRel.getDrillTable().getSelection()).getAsFiles(); + List<PartitionLocation> partitions = new LinkedList<>(); + for (String file: fileLocations) { + partitions.add(new DFSPartitionLocation(MAX_NESTED_SUBDIRS, getBaseTableLocation(), file)); + } + return partitions; + } + + @Override + public void populatePartitionVectors(ValueVector[] vectors, List<PartitionLocation> partitions, + BitSet partitionColumnBitSet, Map<Integer, String> fieldNameMap) { + int record = 0; + for (PartitionLocation partitionLocation: partitions) { + for (int partitionColumnIndex : BitSets.toIter(partitionColumnBitSet)) { + if (partitionLocation.getPartitionValue(partitionColumnIndex) == null) { + ((NullableVarCharVector) vectors[partitionColumnIndex]).getMutator().setNull(record); + } else { + byte[] bytes = (partitionLocation.getPartitionValue(partitionColumnIndex)).getBytes(Charsets.UTF_8); + ((NullableVarCharVector) vectors[partitionColumnIndex]).getMutator().setSafe(record, bytes, 0, bytes.length); + } + } + record++; + } + + for (ValueVector v : vectors) { + if (v == null) { + continue; + } + v.getMutator().setValueCount(partitions.size()); + } } + @Override + public TypeProtos.MajorType getVectorType(SchemaPath column, PlannerSettings plannerSettings) { + return Types.optional(TypeProtos.MinorType.VARCHAR); + } + public String getName(int index) { + return partitionLabel + index; + } + + private String getBaseTableLocation() { + final FormatSelection origSelection = (FormatSelection) scanRel.getDrillTable().getSelection(); + return origSelection.getSelection().selectionRoot; + } } |