aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main
diff options
context:
space:
mode:
authorAman Sinha <asinha@maprtech.com>2014-07-27 16:22:53 -0700
committerJacques Nadeau <jacques@apache.org>2014-07-29 16:49:28 -0700
commit204aa5d255fda7d98a4dc9a01756070167846f63 (patch)
tree223315ee36bdb93d14671a97ccafceec9434cf7b /exec/java-exec/src/main
parent0a87076b51f54b8fa5bbc42d613279b09c4dc786 (diff)
DRILL-1220: Process the star columns in ProjectRecordBatch by classifying the expressions appropriately based on the annotated expressions created by planner.
Diffstat (limited to 'exec/java-exec/src/main')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java387
1 files changed, 329 insertions, 58 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 985d96e54..1ba010347 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -18,8 +18,10 @@
package org.apache.drill.exec.physical.impl.project;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
+import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.drill.common.expression.ConvertExpression;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
@@ -50,6 +52,7 @@ import org.apache.drill.exec.expr.fn.DrillComplexWriterFuncHolder;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.Project;
+import org.apache.drill.exec.planner.StarColumnHelper;
import org.apache.drill.exec.record.AbstractSingleRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -65,6 +68,7 @@ import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
import com.carrotsearch.hppc.IntOpenHashSet;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.sun.codemodel.JExpr;
public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
@@ -76,6 +80,27 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
private boolean hasRemainder = false;
private int remainderIndex = 0;
private int recordCount;
+
+ private static final String EMPTY_STRING = "";
+
+ private class ClassifierResult {
+ public boolean isStar = false;
+ public List<String> outputNames;
+ public String prefix = "";
+ public HashMap<String, Integer> prefixMap = Maps.newHashMap();
+ public CaseInsensitiveMap outputMap = new CaseInsensitiveMap();
+ private CaseInsensitiveMap sequenceMap = new CaseInsensitiveMap();
+
+ private void clear() {
+ isStar = false;
+ prefix = "";
+ if (outputNames != null) {
+ outputNames.clear();
+ }
+
+ // note: don't clear the internal maps since they have cumulative data..
+ }
+ }
public ProjectRecordBatch(Project pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException {
super(pop, context, incoming);
@@ -222,71 +247,118 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
IntOpenHashSet transferFieldIds = new IntOpenHashSet();
- boolean isAnyWildcard = isAnyWildcard(exprs);
-
- if(isAnyWildcard){
+ boolean isAnyWildcard = false;
+
+ ClassifierResult result = new ClassifierResult();
+ boolean classify = isClassificationNeeded(exprs);
+
+ for(int i = 0; i < exprs.size(); i++){
+ final NamedExpression namedExpression = exprs.get(i);
+ result.clear();
+
+ if (classify && namedExpression.getExpr() instanceof SchemaPath) {
+ classifyExpr(namedExpression, incoming, result);
+
+ if (result.isStar) {
+ isAnyWildcard = true;
+ Integer value = result.prefixMap.get(result.prefix);
+ if (value != null && value.intValue() == 1) {
+ int k = 0;
+ for(VectorWrapper<?> wrapper : incoming) {
+ ValueVector vvIn = wrapper.getValueVector();
+ SchemaPath originalPath = vvIn.getField().getPath();
+ if (k > result.outputNames.size()-1) {
+ assert false;
+ }
+ String name = result.outputNames.get(k++); // get the renamed column names
+ if (name == EMPTY_STRING) continue;
+ FieldReference ref = new FieldReference(name);
+ TransferPair tp = wrapper.getValueVector().getTransferPair(ref);
+ transfers.add(tp);
+ container.add(tp.getTo());
+ }
+ } else if (value != null && value.intValue() > 1) { // subsequent wildcards should do a copy of incoming valuevectors
+ int k = 0;
+ for(VectorWrapper<?> wrapper : incoming) {
+ ValueVector vvIn = wrapper.getValueVector();
+ SchemaPath originalPath = vvIn.getField().getPath();
+ if (k > result.outputNames.size()-1) {
+ assert false;
+ }
+ String name = result.outputNames.get(k++); // get the renamed column names
+ if (name == EMPTY_STRING) continue;
+
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(originalPath, incoming, collector, context.getFunctionRegistry() );
+ if(collector.hasErrors()){
+ throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
+ }
+
+ MaterializedField outputField = MaterializedField.create(name, expr.getMajorType());
+ ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator());
+ allocationVectors.add(vv);
+ TypedFieldId fid = container.add(vv);
+ ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
+ HoldingContainer hc = cg.addExpr(write);
+
+ cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
+ }
+ }
+ continue;
+ }
+ }
- // add this until we have sv2 project on wildcard working correctly.
- if(incoming.getSchema().getSelectionVectorMode() != SelectionVectorMode.NONE){
- throw new UnsupportedOperationException("Drill doesn't yet wildcard projects where there is a sv2, patch coming shortly.");
+ String outputName = getRef(namedExpression).getRootSegment().getPath();
+ if (result != null && result.outputNames != null && result.outputNames.size() > 0) {
+ if (result.outputNames.get(0) == EMPTY_STRING) continue;
+ outputName = result.outputNames.get(0);
+ }
+
+ final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true);
+ final MaterializedField outputField = MaterializedField.create(outputName, expr.getMajorType());
+ if(collector.hasErrors()){
+ throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
}
- for(VectorWrapper<?> wrapper : incoming){
- ValueVector vvIn = wrapper.getValueVector();
- String name = vvIn.getField().getPath().getRootSegment().getPath();
- FieldReference ref = new FieldReference(name);
- TransferPair tp = wrapper.getValueVector().getTransferPair(ref);
+ // add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack.
+ if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE
+ && !((ValueVectorReadExpression) expr).hasReadPath()
+ && !isAnyWildcard
+ && !transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldIds()[0])
+ ) {
+
+ ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
+ TypedFieldId id = vectorRead.getFieldId();
+ ValueVector vvIn = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
+ Preconditions.checkNotNull(incoming);
+
+ TransferPair tp = vvIn.getTransferPair(getRef(namedExpression));
transfers.add(tp);
container.add(tp.getTo());
- }
- }else{
- for(int i = 0; i < exprs.size(); i++){
- final NamedExpression namedExpression = exprs.get(i);
- final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true);
- final MaterializedField outputField = MaterializedField.create(getRef(namedExpression), expr.getMajorType());
- if(collector.hasErrors()){
- throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString()));
- }
-
- // add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack.
- if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE
- && !((ValueVectorReadExpression) expr).hasReadPath()
- && !isAnyWildcard
- && !transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldIds()[0])
- && !((ValueVectorReadExpression) expr).hasReadPath()) {
- ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr;
- TypedFieldId id = vectorRead.getFieldId();
- ValueVector vvIn = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector();
- Preconditions.checkNotNull(incoming);
-
- TransferPair tp = vvIn.getTransferPair(getRef(namedExpression));
- transfers.add(tp);
- container.add(tp.getTo());
- transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
-// logger.debug("Added transfer.");
- } else if (expr instanceof DrillFuncHolderExpr &&
- ((DrillFuncHolderExpr) expr).isComplexWriterFuncHolder()) {
- // Need to process ComplexWriter function evaluation.
- // Lazy initialization of the list of complex writers, if not done yet.
- if (complexWriters == null)
- complexWriters = Lists.newArrayList();
-
- // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer.
- ((DrillComplexWriterFuncHolder) ((DrillFuncHolderExpr) expr).getHolder()).setReference(namedExpression.getRef());
- cg.addExpr(expr);
- } else{
- // need to do evaluation.
- ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
- allocationVectors.add(vector);
- TypedFieldId fid = container.add(vector);
- ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
- HoldingContainer hc = cg.addExpr(write);
-
- cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
- logger.debug("Added eval.");
- }
+ transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]);
+ logger.debug("Added transfer for project expression.");
+ } else if (expr instanceof DrillFuncHolderExpr &&
+ ((DrillFuncHolderExpr) expr).isComplexWriterFuncHolder()) {
+ // Need to process ComplexWriter function evaluation.
+ // Lazy initialization of the list of complex writers, if not done yet.
+ if (complexWriters == null)
+ complexWriters = Lists.newArrayList();
+
+ // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer.
+ ((DrillComplexWriterFuncHolder) ((DrillFuncHolderExpr) expr).getHolder()).setReference(namedExpression.getRef());
+ cg.addExpr(expr);
+ } else{
+ // need to do evaluation.
+ ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator());
+ allocationVectors.add(vector);
+ TypedFieldId fid = container.add(vector);
+ ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true);
+ HoldingContainer hc = cg.addExpr(write);
+
+ cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE);
+ logger.debug("Added eval for project expression.");
}
}
+
cg.rotateBlock();
cg.getEvalBlock()._return(JExpr.TRUE);
@@ -327,4 +399,203 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
return exprs;
}
+ private boolean isClassificationNeeded(List<NamedExpression> exprs) {
+ boolean needed = false;
+ for(int i = 0; i < exprs.size(); i++){
+ final NamedExpression ex = exprs.get(i);
+ if (!(ex.getExpr() instanceof SchemaPath)) continue;
+ NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment();
+ NameSegment ref = ex.getRef().getRootSegment();
+ boolean exprHasPrefix = expr.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
+ boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
+ boolean exprContainsStar = expr.getPath().contains(StarColumnHelper.STAR_COLUMN);
+
+ if (exprHasPrefix || refHasPrefix || exprContainsStar) {
+ needed = true;
+ break;
+ }
+ }
+ return needed;
+ }
+
+ private String getUniqueName(String name, ClassifierResult result) {
+ Integer currentSeq = (Integer) result.sequenceMap.get(name);
+ if (currentSeq == null) { // name is unique, so return the original name
+ Integer n = -1;
+ result.sequenceMap.put(name, n);
+ return name;
+ }
+ // create a new name
+ Integer newSeq = currentSeq + 1;
+ result.sequenceMap.put(name, newSeq);
+
+ String newName = name + newSeq;
+ return newName;
+ }
+
+ private void addToResultMaps(String origName, ClassifierResult result, boolean allowDupsWithRename) {
+ String name = origName;
+ if (allowDupsWithRename) {
+ name = getUniqueName(origName, result);
+ }
+ if (!result.outputMap.containsKey(name)) {
+ result.outputNames.add(name);
+ result.outputMap.put(name, name);
+ } else {
+ result.outputNames.add(EMPTY_STRING);
+ }
+ }
+
+ private void classifyExpr(NamedExpression ex, RecordBatch incoming, ClassifierResult result) {
+ NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment();
+ NameSegment ref = ex.getRef().getRootSegment();
+ boolean exprHasPrefix = expr.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
+ boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER);
+ boolean exprIsStar = expr.getPath().equals(StarColumnHelper.STAR_COLUMN);
+ boolean refIsStar = ref.getPath().equals(StarColumnHelper.STAR_COLUMN);
+ boolean exprContainsStar = expr.getPath().contains(StarColumnHelper.STAR_COLUMN);
+ boolean refEndsWithStar = ref.getPath().endsWith(StarColumnHelper.STAR_COLUMN);
+
+ String exprPrefix = EMPTY_STRING;
+
+ if (exprHasPrefix) {
+ // get the prefix of the expr
+ String[] exprComponents = expr.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2);
+ assert(exprComponents.length == 2);
+ exprPrefix = exprComponents[0];
+ result.prefix = exprPrefix;
+ }
+
+ if (exprContainsStar) {
+ result.isStar = true;
+ Integer value = (Integer) result.prefixMap.get(exprPrefix);
+ if (value == null) {
+ Integer n = 1;
+ result.prefixMap.put(exprPrefix, n);
+ } else {
+ Integer n = value + 1;
+ result.prefixMap.put(exprPrefix, n);
+ }
+ }
+
+ int incomingSchemaSize = incoming.getSchema().getFieldCount();
+
+ // for debugging..
+ // if (incomingSchemaSize > 9) {
+ // assert false;
+ // }
+
+ // input is '*' and output is 'prefix_*'
+ if (exprIsStar && refHasPrefix && refEndsWithStar) {
+ String[] components = ref.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2);
+ assert(components.length == 2);
+ String prefix = components[0];
+ result.outputNames = Lists.newArrayList();
+ for(VectorWrapper<?> wrapper : incoming) {
+ ValueVector vvIn = wrapper.getValueVector();
+ String name = vvIn.getField().getPath().getRootSegment().getPath();
+
+ // add the prefix to the incoming column name
+ String newName = prefix + StarColumnHelper.PREFIX_DELIMITER + name;
+ addToResultMaps(newName, result, false);
+ }
+ }
+ // input and output are the same
+ else if (expr.getPath().equals(ref.getPath())) {
+ if (exprContainsStar && exprHasPrefix) {
+ assert exprPrefix != null;
+
+ int k = 0;
+ result.outputNames = Lists.newArrayListWithCapacity(incomingSchemaSize);
+ for (int j=0; j < incomingSchemaSize; j++) {
+ result.outputNames.add(EMPTY_STRING); // initialize
+ }
+
+ for(VectorWrapper<?> wrapper : incoming) {
+ ValueVector vvIn = wrapper.getValueVector();
+ String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
+ // get the prefix of the name
+ String[] nameComponents = incomingName.split(StarColumnHelper.PREFIX_DELIMITER, 2);
+ // if incoming valuevector does not have a prefix, ignore it since this expression is not referencing it
+ if (nameComponents.length <= 1) {
+ k++;
+ continue;
+ }
+ String namePrefix = nameComponents[0];
+ if (exprPrefix.equals(namePrefix)) {
+ String newName = incomingName;
+ if (!result.outputMap.containsKey(newName)) {
+ result.outputNames.set(k, newName);
+ result.outputMap.put(newName, newName);
+ }
+ }
+ k++;
+ }
+ } else {
+ result.outputNames = Lists.newArrayList();
+ if (exprIsStar) {
+ for (VectorWrapper<?> wrapper : incoming) {
+ ValueVector vvIn = wrapper.getValueVector();
+ String incomingName = vvIn.getField().getPath().getRootSegment().getPath();
+ if (refIsStar) {
+ addToResultMaps(incomingName, result, true); // allow dups since this is likely top-level project
+ } else {
+ addToResultMaps(incomingName, result, false);
+ }
+ }
+ } else {
+ String newName = expr.getPath();
+ if (!refHasPrefix && !exprHasPrefix) {
+ addToResultMaps(newName, result, true); // allow dups since this is likely top-level project
+ } else {
+ addToResultMaps(newName, result, false);
+ }
+ }
+ }
+ }
+ // only the output has prefix
+ else if (!exprHasPrefix && refHasPrefix) {
+ result.outputNames = Lists.newArrayList();
+ String newName = ref.getPath();
+ addToResultMaps(newName, result, false);
+ }
+ // input has prefix but output does not; this would happen for a top-level project
+ else if (exprHasPrefix && !refHasPrefix) {
+ int k = 0;
+ result.outputNames = Lists.newArrayListWithCapacity(incomingSchemaSize);
+ for (int j=0; j < incomingSchemaSize; j++) {
+ result.outputNames.add(EMPTY_STRING); // initialize
+ }
+ // remove the prefix from the incoming column names
+ for(VectorWrapper<?> wrapper : incoming) {
+ ValueVector vvIn = wrapper.getValueVector();
+ String name = vvIn.getField().getPath().getRootSegment().getPath();
+ String[] components = name.split(StarColumnHelper.PREFIX_DELIMITER, 2);
+ if (components.length <= 1) {
+ k++;
+ continue;
+ }
+ String namePrefix = components[0];
+ String nameSuffix = components[1];
+ if (exprPrefix.equals(namePrefix)) {
+ String newName = getUniqueName(nameSuffix, result); // for top level we need to make names unique
+ result.outputNames.set(k, newName);
+ } else {
+ result.outputNames.add(EMPTY_STRING);
+ }
+ k++;
+ }
+ }
+ // input and output have prefixes although they could be different...
+ else if (exprHasPrefix && refHasPrefix) {
+ String[] input = expr.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2);
+ assert(input.length == 2);
+ String inputSuffix = input[1];
+ assert false : "Unexpected project expression or reference"; // not handled yet
+ }
+ else {
+ assert false : "Unexpected project expression or reference." ;
+ }
+ }
+
}