diff options
author | Aman Sinha <asinha@maprtech.com> | 2014-07-27 16:22:53 -0700 |
---|---|---|
committer | Jacques Nadeau <jacques@apache.org> | 2014-07-29 16:49:28 -0700 |
commit | 204aa5d255fda7d98a4dc9a01756070167846f63 (patch) | |
tree | 223315ee36bdb93d14671a97ccafceec9434cf7b /exec/java-exec/src/main | |
parent | 0a87076b51f54b8fa5bbc42d613279b09c4dc786 (diff) |
DRILL-1220: Process the star columns in ProjectRecordBatch by classifying the expressions appropriately based on the annotated expressions created by planner.
Diffstat (limited to 'exec/java-exec/src/main')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java | 387 |
1 files changed, 329 insertions, 58 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 985d96e54..1ba010347 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -18,8 +18,10 @@ package org.apache.drill.exec.physical.impl.project; import java.io.IOException; +import java.util.HashMap; import java.util.List; +import org.apache.commons.collections.map.CaseInsensitiveMap; import org.apache.drill.common.expression.ConvertExpression; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; @@ -50,6 +52,7 @@ import org.apache.drill.exec.expr.fn.DrillComplexWriterFuncHolder; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.Project; +import org.apache.drill.exec.planner.StarColumnHelper; import org.apache.drill.exec.record.AbstractSingleRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; @@ -65,6 +68,7 @@ import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; import com.carrotsearch.hppc.IntOpenHashSet; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.sun.codemodel.JExpr; public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ @@ -76,6 +80,27 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ private boolean hasRemainder = false; private int remainderIndex = 0; private int recordCount; + + private static final String EMPTY_STRING = ""; + + private class ClassifierResult { + public boolean isStar = false; + public List<String> outputNames; + public String prefix = ""; + public HashMap<String, Integer> prefixMap = Maps.newHashMap(); + public CaseInsensitiveMap outputMap = new CaseInsensitiveMap(); + private CaseInsensitiveMap sequenceMap = new CaseInsensitiveMap(); + + private void clear() { + isStar = false; + prefix = ""; + if (outputNames != null) { + outputNames.clear(); + } + + // note: don't clear the internal maps since they have cumulative data.. + } + } public ProjectRecordBatch(Project pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException { super(pop, context, incoming); @@ -222,71 +247,118 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ IntOpenHashSet transferFieldIds = new IntOpenHashSet(); - boolean isAnyWildcard = isAnyWildcard(exprs); - - if(isAnyWildcard){ + boolean isAnyWildcard = false; + + ClassifierResult result = new ClassifierResult(); + boolean classify = isClassificationNeeded(exprs); + + for(int i = 0; i < exprs.size(); i++){ + final NamedExpression namedExpression = exprs.get(i); + result.clear(); + + if (classify && namedExpression.getExpr() instanceof SchemaPath) { + classifyExpr(namedExpression, incoming, result); + + if (result.isStar) { + isAnyWildcard = true; + Integer value = result.prefixMap.get(result.prefix); + if (value != null && value.intValue() == 1) { + int k = 0; + for(VectorWrapper<?> wrapper : incoming) { + ValueVector vvIn = wrapper.getValueVector(); + SchemaPath originalPath = vvIn.getField().getPath(); + if (k > result.outputNames.size()-1) { + assert false; + } + String name = result.outputNames.get(k++); // get the renamed column names + if (name == EMPTY_STRING) continue; + FieldReference ref = new FieldReference(name); + TransferPair tp = wrapper.getValueVector().getTransferPair(ref); + transfers.add(tp); + container.add(tp.getTo()); + } + } else if (value != null && value.intValue() > 1) { // subsequent wildcards should do a copy of incoming valuevectors + int k = 0; + for(VectorWrapper<?> wrapper : incoming) { + ValueVector vvIn = wrapper.getValueVector(); + SchemaPath originalPath = vvIn.getField().getPath(); + if (k > result.outputNames.size()-1) { + assert false; + } + String name = result.outputNames.get(k++); // get the renamed column names + if (name == EMPTY_STRING) continue; + + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(originalPath, incoming, collector, context.getFunctionRegistry() ); + if(collector.hasErrors()){ + throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); + } + + MaterializedField outputField = MaterializedField.create(name, expr.getMajorType()); + ValueVector vv = TypeHelper.getNewVector(outputField, oContext.getAllocator()); + allocationVectors.add(vv); + TypedFieldId fid = container.add(vv); + ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true); + HoldingContainer hc = cg.addExpr(write); + + cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE); + } + } + continue; + } + } - // add this until we have sv2 project on wildcard working correctly. - if(incoming.getSchema().getSelectionVectorMode() != SelectionVectorMode.NONE){ - throw new UnsupportedOperationException("Drill doesn't yet wildcard projects where there is a sv2, patch coming shortly."); + String outputName = getRef(namedExpression).getRootSegment().getPath(); + if (result != null && result.outputNames != null && result.outputNames.size() > 0) { + if (result.outputNames.get(0) == EMPTY_STRING) continue; + outputName = result.outputNames.get(0); + } + + final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true); + final MaterializedField outputField = MaterializedField.create(outputName, expr.getMajorType()); + if(collector.hasErrors()){ + throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); } - for(VectorWrapper<?> wrapper : incoming){ - ValueVector vvIn = wrapper.getValueVector(); - String name = vvIn.getField().getPath().getRootSegment().getPath(); - FieldReference ref = new FieldReference(name); - TransferPair tp = wrapper.getValueVector().getTransferPair(ref); + // add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack. + if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE + && !((ValueVectorReadExpression) expr).hasReadPath() + && !isAnyWildcard + && !transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldIds()[0]) + ) { + + ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr; + TypedFieldId id = vectorRead.getFieldId(); + ValueVector vvIn = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector(); + Preconditions.checkNotNull(incoming); + + TransferPair tp = vvIn.getTransferPair(getRef(namedExpression)); transfers.add(tp); container.add(tp.getTo()); - } - }else{ - for(int i = 0; i < exprs.size(); i++){ - final NamedExpression namedExpression = exprs.get(i); - final LogicalExpression expr = ExpressionTreeMaterializer.materialize(namedExpression.getExpr(), incoming, collector, context.getFunctionRegistry(), true); - final MaterializedField outputField = MaterializedField.create(getRef(namedExpression), expr.getMajorType()); - if(collector.hasErrors()){ - throw new SchemaChangeException(String.format("Failure while trying to materialize incoming schema. Errors:\n %s.", collector.toErrorString())); - } - - // add value vector to transfer if direct reference and this is allowed, otherwise, add to evaluation stack. - if(expr instanceof ValueVectorReadExpression && incoming.getSchema().getSelectionVectorMode() == SelectionVectorMode.NONE - && !((ValueVectorReadExpression) expr).hasReadPath() - && !isAnyWildcard - && !transferFieldIds.contains(((ValueVectorReadExpression) expr).getFieldId().getFieldIds()[0]) - && !((ValueVectorReadExpression) expr).hasReadPath()) { - ValueVectorReadExpression vectorRead = (ValueVectorReadExpression) expr; - TypedFieldId id = vectorRead.getFieldId(); - ValueVector vvIn = incoming.getValueAccessorById(id.getIntermediateClass(), id.getFieldIds()).getValueVector(); - Preconditions.checkNotNull(incoming); - - TransferPair tp = vvIn.getTransferPair(getRef(namedExpression)); - transfers.add(tp); - container.add(tp.getTo()); - transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]); -// logger.debug("Added transfer."); - } else if (expr instanceof DrillFuncHolderExpr && - ((DrillFuncHolderExpr) expr).isComplexWriterFuncHolder()) { - // Need to process ComplexWriter function evaluation. - // Lazy initialization of the list of complex writers, if not done yet. - if (complexWriters == null) - complexWriters = Lists.newArrayList(); - - // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer. - ((DrillComplexWriterFuncHolder) ((DrillFuncHolderExpr) expr).getHolder()).setReference(namedExpression.getRef()); - cg.addExpr(expr); - } else{ - // need to do evaluation. - ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); - allocationVectors.add(vector); - TypedFieldId fid = container.add(vector); - ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true); - HoldingContainer hc = cg.addExpr(write); - - cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE); - logger.debug("Added eval."); - } + transferFieldIds.add(vectorRead.getFieldId().getFieldIds()[0]); + logger.debug("Added transfer for project expression."); + } else if (expr instanceof DrillFuncHolderExpr && + ((DrillFuncHolderExpr) expr).isComplexWriterFuncHolder()) { + // Need to process ComplexWriter function evaluation. + // Lazy initialization of the list of complex writers, if not done yet. + if (complexWriters == null) + complexWriters = Lists.newArrayList(); + + // The reference name will be passed to ComplexWriter, used as the name of the output vector from the writer. + ((DrillComplexWriterFuncHolder) ((DrillFuncHolderExpr) expr).getHolder()).setReference(namedExpression.getRef()); + cg.addExpr(expr); + } else{ + // need to do evaluation. + ValueVector vector = TypeHelper.getNewVector(outputField, oContext.getAllocator()); + allocationVectors.add(vector); + TypedFieldId fid = container.add(vector); + ValueVectorWriteExpression write = new ValueVectorWriteExpression(fid, expr, true); + HoldingContainer hc = cg.addExpr(write); + + cg.getEvalBlock()._if(hc.getValue().eq(JExpr.lit(0)))._then()._return(JExpr.FALSE); + logger.debug("Added eval for project expression."); } } + cg.rotateBlock(); cg.getEvalBlock()._return(JExpr.TRUE); @@ -327,4 +399,203 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ return exprs; } + private boolean isClassificationNeeded(List<NamedExpression> exprs) { + boolean needed = false; + for(int i = 0; i < exprs.size(); i++){ + final NamedExpression ex = exprs.get(i); + if (!(ex.getExpr() instanceof SchemaPath)) continue; + NameSegment expr = ((SchemaPath) ex.getExpr()).getRootSegment(); + NameSegment ref = ex.getRef().getRootSegment(); + boolean exprHasPrefix = expr.getPath().contains(StarColumnHelper.PREFIX_DELIMITER); + boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER); + boolean exprContainsStar = expr.getPath().contains(StarColumnHelper.STAR_COLUMN); + + if (exprHasPrefix || refHasPrefix || exprContainsStar) { + needed = true; + break; + } + } + return needed; + } + + private String getUniqueName(String name, ClassifierResult result) { + Integer currentSeq = (Integer) result.sequenceMap.get(name); + if (currentSeq == null) { // name is unique, so return the original name + Integer n = -1; + result.sequenceMap.put(name, n); + return name; + } + // create a new name + Integer newSeq = currentSeq + 1; + result.sequenceMap.put(name, newSeq); + + String newName = name + newSeq; + return newName; + } + + private void addToResultMaps(String origName, ClassifierResult result, boolean allowDupsWithRename) { + String name = origName; + if (allowDupsWithRename) { + name = getUniqueName(origName, result); + } + if (!result.outputMap.containsKey(name)) { + result.outputNames.add(name); + result.outputMap.put(name, name); + } else { + result.outputNames.add(EMPTY_STRING); + } + } + + private void classifyExpr(NamedExpression ex, RecordBatch incoming, ClassifierResult result) { + NameSegment expr = ((SchemaPath)ex.getExpr()).getRootSegment(); + NameSegment ref = ex.getRef().getRootSegment(); + boolean exprHasPrefix = expr.getPath().contains(StarColumnHelper.PREFIX_DELIMITER); + boolean refHasPrefix = ref.getPath().contains(StarColumnHelper.PREFIX_DELIMITER); + boolean exprIsStar = expr.getPath().equals(StarColumnHelper.STAR_COLUMN); + boolean refIsStar = ref.getPath().equals(StarColumnHelper.STAR_COLUMN); + boolean exprContainsStar = expr.getPath().contains(StarColumnHelper.STAR_COLUMN); + boolean refEndsWithStar = ref.getPath().endsWith(StarColumnHelper.STAR_COLUMN); + + String exprPrefix = EMPTY_STRING; + + if (exprHasPrefix) { + // get the prefix of the expr + String[] exprComponents = expr.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2); + assert(exprComponents.length == 2); + exprPrefix = exprComponents[0]; + result.prefix = exprPrefix; + } + + if (exprContainsStar) { + result.isStar = true; + Integer value = (Integer) result.prefixMap.get(exprPrefix); + if (value == null) { + Integer n = 1; + result.prefixMap.put(exprPrefix, n); + } else { + Integer n = value + 1; + result.prefixMap.put(exprPrefix, n); + } + } + + int incomingSchemaSize = incoming.getSchema().getFieldCount(); + + // for debugging.. + // if (incomingSchemaSize > 9) { + // assert false; + // } + + // input is '*' and output is 'prefix_*' + if (exprIsStar && refHasPrefix && refEndsWithStar) { + String[] components = ref.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2); + assert(components.length == 2); + String prefix = components[0]; + result.outputNames = Lists.newArrayList(); + for(VectorWrapper<?> wrapper : incoming) { + ValueVector vvIn = wrapper.getValueVector(); + String name = vvIn.getField().getPath().getRootSegment().getPath(); + + // add the prefix to the incoming column name + String newName = prefix + StarColumnHelper.PREFIX_DELIMITER + name; + addToResultMaps(newName, result, false); + } + } + // input and output are the same + else if (expr.getPath().equals(ref.getPath())) { + if (exprContainsStar && exprHasPrefix) { + assert exprPrefix != null; + + int k = 0; + result.outputNames = Lists.newArrayListWithCapacity(incomingSchemaSize); + for (int j=0; j < incomingSchemaSize; j++) { + result.outputNames.add(EMPTY_STRING); // initialize + } + + for(VectorWrapper<?> wrapper : incoming) { + ValueVector vvIn = wrapper.getValueVector(); + String incomingName = vvIn.getField().getPath().getRootSegment().getPath(); + // get the prefix of the name + String[] nameComponents = incomingName.split(StarColumnHelper.PREFIX_DELIMITER, 2); + // if incoming valuevector does not have a prefix, ignore it since this expression is not referencing it + if (nameComponents.length <= 1) { + k++; + continue; + } + String namePrefix = nameComponents[0]; + if (exprPrefix.equals(namePrefix)) { + String newName = incomingName; + if (!result.outputMap.containsKey(newName)) { + result.outputNames.set(k, newName); + result.outputMap.put(newName, newName); + } + } + k++; + } + } else { + result.outputNames = Lists.newArrayList(); + if (exprIsStar) { + for (VectorWrapper<?> wrapper : incoming) { + ValueVector vvIn = wrapper.getValueVector(); + String incomingName = vvIn.getField().getPath().getRootSegment().getPath(); + if (refIsStar) { + addToResultMaps(incomingName, result, true); // allow dups since this is likely top-level project + } else { + addToResultMaps(incomingName, result, false); + } + } + } else { + String newName = expr.getPath(); + if (!refHasPrefix && !exprHasPrefix) { + addToResultMaps(newName, result, true); // allow dups since this is likely top-level project + } else { + addToResultMaps(newName, result, false); + } + } + } + } + // only the output has prefix + else if (!exprHasPrefix && refHasPrefix) { + result.outputNames = Lists.newArrayList(); + String newName = ref.getPath(); + addToResultMaps(newName, result, false); + } + // input has prefix but output does not; this would happen for a top-level project + else if (exprHasPrefix && !refHasPrefix) { + int k = 0; + result.outputNames = Lists.newArrayListWithCapacity(incomingSchemaSize); + for (int j=0; j < incomingSchemaSize; j++) { + result.outputNames.add(EMPTY_STRING); // initialize + } + // remove the prefix from the incoming column names + for(VectorWrapper<?> wrapper : incoming) { + ValueVector vvIn = wrapper.getValueVector(); + String name = vvIn.getField().getPath().getRootSegment().getPath(); + String[] components = name.split(StarColumnHelper.PREFIX_DELIMITER, 2); + if (components.length <= 1) { + k++; + continue; + } + String namePrefix = components[0]; + String nameSuffix = components[1]; + if (exprPrefix.equals(namePrefix)) { + String newName = getUniqueName(nameSuffix, result); // for top level we need to make names unique + result.outputNames.set(k, newName); + } else { + result.outputNames.add(EMPTY_STRING); + } + k++; + } + } + // input and output have prefixes although they could be different... + else if (exprHasPrefix && refHasPrefix) { + String[] input = expr.getPath().split(StarColumnHelper.PREFIX_DELIMITER, 2); + assert(input.length == 2); + String inputSuffix = input[1]; + assert false : "Unexpected project expression or reference"; // not handled yet + } + else { + assert false : "Unexpected project expression or reference." ; + } + } + } |