diff options
author | HanumathRao <hanu.ncr@gmail.com> | 2018-06-21 18:42:24 -0700 |
---|---|---|
committer | Volodymyr Vysotskyi <vvovyk@gmail.com> | 2018-07-01 19:06:29 +0300 |
commit | 8ec2dc64175648103a5ec51f8ad98387496692a9 (patch) | |
tree | 7ec2cf94373d1a8165f954efb28cbb017a3b172f /exec/java-exec/src/main | |
parent | 7c22e35ef2a9ecc41cc15c5deefac9b306ea87a1 (diff) |
DRILL-6545: Projection Push down into Lateral Join operator.
closes #1347
Diffstat (limited to 'exec/java-exec/src/main')
11 files changed, 351 insertions, 20 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java index a12fed126..55ede9628 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; import com.google.common.base.Preconditions; import org.apache.calcite.rel.core.JoinRelType; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.base.AbstractJoinPop; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.PhysicalVisitor; @@ -34,6 +35,9 @@ import java.util.List; public class LateralJoinPOP extends AbstractJoinPop { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LateralJoinPOP.class); + @JsonProperty("excludedColumns") + private List<SchemaPath> excludedColumns; + @JsonProperty("unnestForLateralJoin") private UnnestPOP unnestForLateralJoin; @@ -41,19 +45,21 @@ public class LateralJoinPOP extends AbstractJoinPop { public LateralJoinPOP( @JsonProperty("left") PhysicalOperator left, @JsonProperty("right") PhysicalOperator right, - @JsonProperty("joinType") JoinRelType joinType) { + @JsonProperty("joinType") JoinRelType joinType, + @JsonProperty("excludedColumns") List<SchemaPath> excludedColumns) { super(left, right, joinType, null, null); Preconditions.checkArgument(joinType != JoinRelType.FULL, "Full outer join is currently not supported with Lateral Join"); Preconditions.checkArgument(joinType != JoinRelType.RIGHT, "Right join is currently not supported with Lateral Join"); + this.excludedColumns = excludedColumns; } @Override public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) { Preconditions.checkArgument(children.size() == 2, "Lateral join should have two physical operators"); - LateralJoinPOP newPOP = new LateralJoinPOP(children.get(0), children.get(1), joinType); + LateralJoinPOP newPOP = new LateralJoinPOP(children.get(0), children.get(1), joinType, this.excludedColumns); newPOP.unnestForLateralJoin = this.unnestForLateralJoin; return newPOP; } @@ -63,6 +69,11 @@ public class LateralJoinPOP extends AbstractJoinPop { return this.unnestForLateralJoin; } + @JsonProperty("excludedColumns") + public List<SchemaPath> getExcludedColumns() { + return this.excludedColumns; + } + public void setUnnestForLateralJoin(UnnestPOP unnest) { this.unnestForLateralJoin = unnest; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java index c8bb2a4f5..519d5036e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java @@ -37,6 +37,8 @@ import org.apache.drill.exec.planner.logical.DrillJoinRel; import org.apache.drill.exec.planner.logical.DrillJoinRule; import org.apache.drill.exec.planner.logical.DrillLimitRule; import org.apache.drill.exec.planner.logical.DrillMergeProjectRule; +import org.apache.drill.exec.planner.logical.DrillProjectLateralJoinTransposeRule; +import org.apache.drill.exec.planner.logical.DrillProjectPushIntoLateralJoinRule; import org.apache.drill.exec.planner.logical.DrillProjectRule; import org.apache.drill.exec.planner.logical.DrillPushFilterPastProjectRule; import org.apache.drill.exec.planner.logical.DrillPushLimitToScanRule; @@ -287,7 +289,8 @@ public enum PlannerPhase { // Due to infinite loop in planning (DRILL-3257/CALCITE-1271), temporarily use this rule in Hep planner // RuleInstance.FILTER_SET_OP_TRANSPOSE_RULE, DrillFilterAggregateTransposeRule.INSTANCE, - + DrillProjectLateralJoinTransposeRule.INSTANCE, + DrillProjectPushIntoLateralJoinRule.INSTANCE, RuleInstance.FILTER_MERGE_RULE, RuleInstance.FILTER_CORRELATE_RULE, RuleInstance.AGGREGATE_REMOVE_RULE, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java index a7bbbca92..28e5246b0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.planner.common; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelOptCost; import org.apache.calcite.plan.RelOptPlanner; @@ -25,17 +27,27 @@ import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.Correlate; import org.apache.calcite.rel.core.CorrelationId; import org.apache.calcite.rel.metadata.RelMetadataQuery; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.sql.SemiJoinType; +import org.apache.calcite.sql.validate.SqlValidatorUtil; import org.apache.calcite.util.ImmutableBitSet; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.planner.cost.DrillCostBase; import org.apache.drill.exec.planner.physical.PrelUtil; +import java.util.ArrayList; +import java.util.List; + public abstract class DrillLateralJoinRelBase extends Correlate implements DrillRelNode { - public DrillLateralJoinRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, - CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) { + + final private static double CORRELATE_MEM_COPY_COST = DrillCostBase.MEMORY_TO_CPU_RATIO * DrillCostBase.BASE_CPU_COST; + final public boolean excludeCorrelateColumn; + public DrillLateralJoinRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, boolean excludeCorrelateCol, + CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) { super(cluster, traits, left, right, correlationId, requiredColumns, semiJoinType); + this.excludeCorrelateColumn = excludeCorrelateCol; } @Override public RelOptCost computeSelfCost(RelOptPlanner planner, @@ -49,7 +61,53 @@ public abstract class DrillLateralJoinRelBase extends Correlate implements Drill double rowSize = (this.getLeft().getRowType().getFieldList().size()) * fieldWidth; double cpuCost = rowCount * rowSize * DrillCostBase.BASE_CPU_COST; - double memCost = 0; + double memCost = !excludeCorrelateColumn ? CORRELATE_MEM_COPY_COST : 0.0; return costFactory.makeCost(rowCount, cpuCost, 0, 0, memCost); } + + @Override + protected RelDataType deriveRowType() { + switch (joinType) { + case LEFT: + case INNER: + return constructRowType(SqlValidatorUtil.deriveJoinRowType(left.getRowType(), + right.getRowType(), joinType.toJoinType(), + getCluster().getTypeFactory(), null, + ImmutableList.<RelDataTypeField>of())); + case ANTI: + case SEMI: + return constructRowType(left.getRowType()); + default: + throw new IllegalStateException("Unknown join type " + joinType); + } + } + + public int getInputSize(int offset, RelNode input) { + if (this.excludeCorrelateColumn && + offset == 0) { + return input.getRowType().getFieldList().size() - 1; + } + return input.getRowType().getFieldList().size(); + } + + public RelDataType constructRowType(RelDataType inputRowType) { + Preconditions.checkArgument(this.requiredColumns.cardinality() == 1); + + List<RelDataType> fields = new ArrayList<>(); + List<String> fieldNames = new ArrayList<>(); + if (excludeCorrelateColumn) { + int corrVariable = this.requiredColumns.nextSetBit(0); + + for (RelDataTypeField field : inputRowType.getFieldList()) { + if (field.getIndex() == corrVariable) { + continue; + } + fieldNames.add(field.getName()); + fields.add(field.getType()); + } + + return getCluster().getTypeFactory().createStructType(fields, fieldNames); + } + return inputRowType; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java index 36d7db296..9dd5032b1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java @@ -18,9 +18,12 @@ package org.apache.drill.exec.planner.common; import java.util.AbstractList; +import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; @@ -29,6 +32,7 @@ import org.apache.calcite.rel.rules.ProjectRemoveRule; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; @@ -282,4 +286,70 @@ public abstract class DrillRelOptUtil { } return false; } + + /** + * InputRefVisitor is a utility class used to collect all the RexInputRef nodes in a + * RexNode. + * + */ + public static class InputRefVisitor extends RexVisitorImpl<Void> { + private final List<RexInputRef> inputRefList; + + public InputRefVisitor() { + super(true); + inputRefList = new ArrayList<>(); + } + + public Void visitInputRef(RexInputRef ref) { + inputRefList.add(ref); + return null; + } + + public Void visitCall(RexCall call) { + for (RexNode operand : call.operands) { + operand.accept(this); + } + return null; + } + + public List<RexInputRef> getInputRefs() { + return inputRefList; + } + } + + + /** + * RexFieldsTransformer is a utility class used to convert column refs in a RexNode + * based on inputRefMap (input to output ref map). + * + * This transformer can be used to find and replace the existing inputRef in a RexNode with a new inputRef. + */ + public static class RexFieldsTransformer { + private final RexBuilder rexBuilder; + private final Map<Integer, Integer> inputRefMap; + + public RexFieldsTransformer( + RexBuilder rexBuilder, + Map<Integer, Integer> inputRefMap) { + this.rexBuilder = rexBuilder; + this.inputRefMap = inputRefMap; + } + + public RexNode go(RexNode rex) { + if (rex instanceof RexCall) { + ImmutableList.Builder<RexNode> builder = ImmutableList.builder(); + final RexCall call = (RexCall) rex; + for (RexNode operand : call.operands) { + builder.add(go(operand)); + } + return call.clone(call.getType(), builder.build()); + } else if (rex instanceof RexInputRef) { + RexInputRef var = (RexInputRef) rex; + int index = var.getIndex(); + return rexBuilder.makeInputRef(var.getType(), inputRefMap.get(index)); + } else { + return rex; + } + } + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRule.java index 52e603f3f..9f91818be 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRule.java @@ -46,7 +46,7 @@ public class DrillCorrelateRule extends RelOptRule { final RelTraitSet traits = correlate.getTraitSet().plus(DrillRel.DRILL_LOGICAL); DrillLateralJoinRel lateralJoinRel = new DrillLateralJoinRel(correlate.getCluster(), - traits, convertedLeft, convertedRight, correlate.getCorrelationId(), + traits, convertedLeft, convertedRight, false, correlate.getCorrelationId(), correlate.getRequiredColumns(), correlate.getJoinType()); call.transformTo(lateralJoinRel); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java index 035dae9bb..aa6ccb051 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java @@ -33,16 +33,16 @@ import java.util.List; public class DrillLateralJoinRel extends DrillLateralJoinRelBase implements DrillRel { - protected DrillLateralJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, + protected DrillLateralJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, boolean includeCorrelateVar, CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) { - super(cluster, traits, left, right, correlationId, requiredColumns, semiJoinType); + super(cluster, traits, left, right, includeCorrelateVar, correlationId, requiredColumns, semiJoinType); } @Override public Correlate copy(RelTraitSet traitSet, RelNode left, RelNode right, CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType joinType) { - return new DrillLateralJoinRel(this.getCluster(), this.getTraitSet(), left, right, correlationId, requiredColumns, + return new DrillLateralJoinRel(this.getCluster(), this.getTraitSet(), left, right, this.excludeCorrelateColumn, correlationId, requiredColumns, this.getJoinType()); } @@ -50,7 +50,7 @@ public class DrillLateralJoinRel extends DrillLateralJoinRelBase implements Dril public LogicalOperator implement(DrillImplementor implementor) { final List<String> fields = getRowType().getFieldNames(); assert DrillJoinRel.isUnique(fields); - final int leftCount = left.getRowType().getFieldCount(); + final int leftCount = getInputSize(0,left); final LogicalOperator leftOp = DrillJoinRel.implementInput(implementor, 0, 0, left, this); final LogicalOperator rightOp = DrillJoinRel.implementInput(implementor, 1, leftCount, right, this); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectLateralJoinTransposeRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectLateralJoinTransposeRule.java new file mode 100644 index 000000000..5cb984a4c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectLateralJoinTransposeRule.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + + +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.rel.core.Correlate; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rel.rules.ProjectCorrelateTransposeRule; +import org.apache.calcite.rel.rules.PushProjector; +import org.apache.calcite.tools.RelBuilderFactory; + +public class DrillProjectLateralJoinTransposeRule extends ProjectCorrelateTransposeRule { + + public static final DrillProjectLateralJoinTransposeRule INSTANCE = new DrillProjectLateralJoinTransposeRule(PushProjector.ExprCondition.TRUE, RelFactories.LOGICAL_BUILDER); + + public DrillProjectLateralJoinTransposeRule(PushProjector.ExprCondition preserveExprCondition, RelBuilderFactory relFactory) { + super(preserveExprCondition, relFactory); + } + + @Override + public boolean matches(RelOptRuleCall call) { + Correlate correlate = call.rel(1); + + + // No need to call ProjectCorrelateTransposeRule if the current lateralJoin contains excludeCorrelationColumn set to true. + // This is needed as the project push into Lateral join rule changes the output row type which will fail assertions in ProjectCorrelateTransposeRule. + if (correlate instanceof DrillLateralJoinRel && + ((DrillLateralJoinRel)correlate).excludeCorrelateColumn) { + return false; + } + + return true; + } +}
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectPushIntoLateralJoinRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectPushIntoLateralJoinRule.java new file mode 100644 index 000000000..6a57c89fb --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectPushIntoLateralJoinRule.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import org.apache.calcite.plan.RelOptRule; +import org.apache.calcite.plan.RelOptRuleCall; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.RelFactories; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.tools.RelBuilderFactory; +import org.apache.drill.exec.planner.StarColumnHelper; +import org.apache.drill.exec.planner.common.DrillRelOptUtil; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +public class DrillProjectPushIntoLateralJoinRule extends RelOptRule { + + public static final DrillProjectPushIntoLateralJoinRule INSTANCE = + new DrillProjectPushIntoLateralJoinRule(RelFactories.LOGICAL_BUILDER); + + + public DrillProjectPushIntoLateralJoinRule(RelBuilderFactory relFactory) { + super(operand(DrillProjectRel.class, + operand(DrillLateralJoinRel.class, any())), + relFactory, null); + } + + public void onMatch(RelOptRuleCall call) { + DrillProjectRel origProj = call.rel(0); + final DrillLateralJoinRel corr = call.rel(1); + + if (StarColumnHelper.containsStarColumn(origProj.getRowType()) || + StarColumnHelper.containsStarColumn(corr.getRowType()) || + corr.excludeCorrelateColumn) { + return; + } + DrillRelOptUtil.InputRefVisitor collectRefs = new DrillRelOptUtil.InputRefVisitor(); + for (RexNode exp: origProj.getChildExps()) { + exp.accept(collectRefs); + } + + int correlationIndex = corr.getRequiredColumns().nextSetBit(0); + for (RexInputRef inputRef : collectRefs.getInputRefs()) { + if (inputRef.getIndex() == correlationIndex) { + return; + } + } + + final RelNode left = corr.getLeft(); + final RelNode right = corr.getRight(); + final RelNode convertedLeft = convert(left, left.getTraitSet().plus(DrillRel.DRILL_LOGICAL).simplify()); + final RelNode convertedRight = convert(right, right.getTraitSet().plus(DrillRel.DRILL_LOGICAL).simplify()); + + final RelTraitSet traits = corr.getTraitSet().plus(DrillRel.DRILL_LOGICAL); + RelNode relNode = new DrillLateralJoinRel(corr.getCluster(), + traits, convertedLeft, convertedRight, true, corr.getCorrelationId(), + corr.getRequiredColumns(), corr.getJoinType()); + + if (!DrillRelOptUtil.isTrivialProject(origProj, true)) { + Map<Integer, Integer> mapWithoutCorr = buildMapWithoutCorrColumn(corr, correlationIndex); + List<RexNode> outputExprs = transformExprs(origProj.getCluster().getRexBuilder(), origProj.getChildExps(), mapWithoutCorr); + + relNode = new DrillProjectRel(origProj.getCluster(), + left.getTraitSet().plus(DrillRel.DRILL_LOGICAL), + relNode, outputExprs, origProj.getRowType()); + } + call.transformTo(relNode); + } + + private List<RexNode> transformExprs(RexBuilder builder, List<RexNode> exprs, Map<Integer, Integer> corrMap) { + List<RexNode> outputExprs = new ArrayList<>(); + DrillRelOptUtil.RexFieldsTransformer transformer = new DrillRelOptUtil.RexFieldsTransformer(builder, corrMap); + for (RexNode expr : exprs) { + outputExprs.add(transformer.go(expr)); + } + return outputExprs; + } + + private Map<Integer, Integer> buildMapWithoutCorrColumn(RelNode corr, int correlationIndex) { + int index = 0; + Map<Integer, Integer> result = new HashMap(); + for (int i=0;i<corr.getRowType().getFieldList().size();i++) { + if (i == correlationIndex) { + continue; + } else { + result.put(i, index++); + } + } + return result; + } +}
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrel.java index 565871bb2..b55076bd0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrel.java @@ -22,6 +22,7 @@ import com.google.common.collect.Lists; import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.plan.RelTraitSet; import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.RelWriter; import org.apache.calcite.rel.core.Correlate; import org.apache.calcite.rel.core.CorrelationId; import org.apache.calcite.rel.type.RelDataType; @@ -30,6 +31,8 @@ import org.apache.calcite.rex.RexNode; import org.apache.calcite.rex.RexUtil; import org.apache.calcite.sql.SemiJoinType; import org.apache.calcite.util.ImmutableBitSet; +import org.apache.commons.collections.ListUtils; +import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.LateralJoinPOP; import org.apache.drill.exec.planner.common.DrillLateralJoinRelBase; @@ -38,21 +41,23 @@ import org.apache.drill.exec.planner.physical.visitor.PrelVisitor; import org.apache.drill.exec.record.BatchSchema; import java.io.IOException; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; public class LateralJoinPrel extends DrillLateralJoinRelBase implements Prel { - protected LateralJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, + protected LateralJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, boolean excludeCorrelateCol, CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) { - super(cluster, traits, left, right, correlationId, requiredColumns, semiJoinType); + super(cluster, traits, left, right, excludeCorrelateCol, correlationId, requiredColumns, semiJoinType); } + @Override public Correlate copy(RelTraitSet traitSet, RelNode left, RelNode right, CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType joinType) { - return new LateralJoinPrel(this.getCluster(), this.getTraitSet(), left, right, correlationId, requiredColumns, + return new LateralJoinPrel(this.getCluster(), this.getTraitSet(), left, right, this.excludeCorrelateColumn, correlationId, requiredColumns, this.getJoinType()); } @@ -63,11 +68,22 @@ public class LateralJoinPrel extends DrillLateralJoinRelBase implements Prel { PhysicalOperator rightPop = ((Prel)right).getPhysicalOperator(creator); SemiJoinType jtype = this.getJoinType(); - - LateralJoinPOP ljoin = new LateralJoinPOP(leftPop, rightPop, jtype.toJoinType()); + List<SchemaPath> excludedColumns = new ArrayList<>(); + if (getColumn() != null) { + excludedColumns.add(getColumn()); + } + LateralJoinPOP ljoin = new LateralJoinPOP(leftPop, rightPop, jtype.toJoinType(), excludedColumns); return creator.addMetadata(this, ljoin); } + private SchemaPath getColumn() { + if (this.excludeCorrelateColumn) { + int index = this.getRequiredColumns().asList().get(0); + return SchemaPath.getSimplePath(this.getInput(0).getRowType().getFieldNames().get(index)); + } + return null; + } + /** * Check to make sure that the fields of the inputs are the same as the output field names. * If not, insert a project renaming them. @@ -76,8 +92,8 @@ public class LateralJoinPrel extends DrillLateralJoinRelBase implements Prel { Preconditions.checkArgument(DrillJoinRelBase.uniqueFieldNames(input.getRowType())); final List<String> fields = getRowType().getFieldNames(); final List<String> inputFields = input.getRowType().getFieldNames(); - final List<String> outputFields = fields.subList(offset, offset + inputFields.size()); - if (!outputFields.equals(inputFields)) { + final List<String> outputFields = fields.subList(offset, offset + getInputSize(offset, input)); + if (ListUtils.subtract(outputFields, inputFields).size() != 0) { // Ensure that input field names are the same as output field names. // If there are duplicate field names on left and right, fields will get // lost. @@ -105,6 +121,16 @@ public class LateralJoinPrel extends DrillLateralJoinRelBase implements Prel { } @Override + public RelWriter explainTerms(RelWriter pw) { + if (this.excludeCorrelateColumn) { + return super.explainTerms(pw).item("column excluded from output: ", this.getColumn()); + } else { + return super.explainTerms(pw); + } + } + + + @Override public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> visitor, X value) throws E { return visitor.visitLateral(this, value); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrule.java index e531dca4a..10e247b01 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrule.java @@ -48,7 +48,7 @@ public class LateralJoinPrule extends Prule { final LateralJoinPrel lateralJoinPrel = new LateralJoinPrel(lateralJoinRel.getCluster(), corrTraits, - convertedLeft, convertedRight, lateralJoinRel.getCorrelationId(), + convertedLeft, convertedRight, lateralJoinRel.excludeCorrelateColumn, lateralJoinRel.getCorrelationId(), lateralJoinRel.getRequiredColumns(),lateralJoinRel.getJoinType()); call.transformTo(lateralJoinPrel); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java index d450c5616..850f0bdf0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java @@ -76,7 +76,7 @@ public class JoinPrelRenameVisitor extends BasePrelVisitor<Prel, Void, RuntimeEx List<RelNode> children = getChildren(prel); - final int leftCount = children.get(0).getRowType().getFieldCount(); + final int leftCount = prel.getInputSize(0,children.get(0)); List<RelNode> reNamedChildren = Lists.newArrayList(); |