aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main
diff options
context:
space:
mode:
authorHanumathRao <hanu.ncr@gmail.com>2018-06-21 18:42:24 -0700
committerVolodymyr Vysotskyi <vvovyk@gmail.com>2018-07-01 19:06:29 +0300
commit8ec2dc64175648103a5ec51f8ad98387496692a9 (patch)
tree7ec2cf94373d1a8165f954efb28cbb017a3b172f /exec/java-exec/src/main
parent7c22e35ef2a9ecc41cc15c5deefac9b306ea87a1 (diff)
DRILL-6545: Projection Push down into Lateral Join operator.
closes #1347
Diffstat (limited to 'exec/java-exec/src/main')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java15
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java64
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java70
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRule.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectLateralJoinTransposeRule.java50
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectPushIntoLateralJoinRule.java113
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrel.java40
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrule.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java2
11 files changed, 351 insertions, 20 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
index a12fed126..55ede9628 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/LateralJoinPOP.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.base.AbstractJoinPop;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
@@ -34,6 +35,9 @@ import java.util.List;
public class LateralJoinPOP extends AbstractJoinPop {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LateralJoinPOP.class);
+ @JsonProperty("excludedColumns")
+ private List<SchemaPath> excludedColumns;
+
@JsonProperty("unnestForLateralJoin")
private UnnestPOP unnestForLateralJoin;
@@ -41,19 +45,21 @@ public class LateralJoinPOP extends AbstractJoinPop {
public LateralJoinPOP(
@JsonProperty("left") PhysicalOperator left,
@JsonProperty("right") PhysicalOperator right,
- @JsonProperty("joinType") JoinRelType joinType) {
+ @JsonProperty("joinType") JoinRelType joinType,
+ @JsonProperty("excludedColumns") List<SchemaPath> excludedColumns) {
super(left, right, joinType, null, null);
Preconditions.checkArgument(joinType != JoinRelType.FULL,
"Full outer join is currently not supported with Lateral Join");
Preconditions.checkArgument(joinType != JoinRelType.RIGHT,
"Right join is currently not supported with Lateral Join");
+ this.excludedColumns = excludedColumns;
}
@Override
public PhysicalOperator getNewWithChildren(List<PhysicalOperator> children) {
Preconditions.checkArgument(children.size() == 2,
"Lateral join should have two physical operators");
- LateralJoinPOP newPOP = new LateralJoinPOP(children.get(0), children.get(1), joinType);
+ LateralJoinPOP newPOP = new LateralJoinPOP(children.get(0), children.get(1), joinType, this.excludedColumns);
newPOP.unnestForLateralJoin = this.unnestForLateralJoin;
return newPOP;
}
@@ -63,6 +69,11 @@ public class LateralJoinPOP extends AbstractJoinPop {
return this.unnestForLateralJoin;
}
+ @JsonProperty("excludedColumns")
+ public List<SchemaPath> getExcludedColumns() {
+ return this.excludedColumns;
+ }
+
public void setUnnestForLateralJoin(UnnestPOP unnest) {
this.unnestForLateralJoin = unnest;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index c8bb2a4f5..519d5036e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -37,6 +37,8 @@ import org.apache.drill.exec.planner.logical.DrillJoinRel;
import org.apache.drill.exec.planner.logical.DrillJoinRule;
import org.apache.drill.exec.planner.logical.DrillLimitRule;
import org.apache.drill.exec.planner.logical.DrillMergeProjectRule;
+import org.apache.drill.exec.planner.logical.DrillProjectLateralJoinTransposeRule;
+import org.apache.drill.exec.planner.logical.DrillProjectPushIntoLateralJoinRule;
import org.apache.drill.exec.planner.logical.DrillProjectRule;
import org.apache.drill.exec.planner.logical.DrillPushFilterPastProjectRule;
import org.apache.drill.exec.planner.logical.DrillPushLimitToScanRule;
@@ -287,7 +289,8 @@ public enum PlannerPhase {
// Due to infinite loop in planning (DRILL-3257/CALCITE-1271), temporarily use this rule in Hep planner
// RuleInstance.FILTER_SET_OP_TRANSPOSE_RULE,
DrillFilterAggregateTransposeRule.INSTANCE,
-
+ DrillProjectLateralJoinTransposeRule.INSTANCE,
+ DrillProjectPushIntoLateralJoinRule.INSTANCE,
RuleInstance.FILTER_MERGE_RULE,
RuleInstance.FILTER_CORRELATE_RULE,
RuleInstance.AGGREGATE_REMOVE_RULE,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java
index a7bbbca92..28e5246b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillLateralJoinRelBase.java
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.planner.common;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
@@ -25,17 +27,27 @@ import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Correlate;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.sql.SemiJoinType;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.planner.cost.DrillCostBase;
import org.apache.drill.exec.planner.physical.PrelUtil;
+import java.util.ArrayList;
+import java.util.List;
+
public abstract class DrillLateralJoinRelBase extends Correlate implements DrillRelNode {
- public DrillLateralJoinRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
- CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) {
+
+ final private static double CORRELATE_MEM_COPY_COST = DrillCostBase.MEMORY_TO_CPU_RATIO * DrillCostBase.BASE_CPU_COST;
+ final public boolean excludeCorrelateColumn;
+ public DrillLateralJoinRelBase(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, boolean excludeCorrelateCol,
+ CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) {
super(cluster, traits, left, right, correlationId, requiredColumns, semiJoinType);
+ this.excludeCorrelateColumn = excludeCorrelateCol;
}
@Override public RelOptCost computeSelfCost(RelOptPlanner planner,
@@ -49,7 +61,53 @@ public abstract class DrillLateralJoinRelBase extends Correlate implements Drill
double rowSize = (this.getLeft().getRowType().getFieldList().size()) * fieldWidth;
double cpuCost = rowCount * rowSize * DrillCostBase.BASE_CPU_COST;
- double memCost = 0;
+ double memCost = !excludeCorrelateColumn ? CORRELATE_MEM_COPY_COST : 0.0;
return costFactory.makeCost(rowCount, cpuCost, 0, 0, memCost);
}
+
+ @Override
+ protected RelDataType deriveRowType() {
+ switch (joinType) {
+ case LEFT:
+ case INNER:
+ return constructRowType(SqlValidatorUtil.deriveJoinRowType(left.getRowType(),
+ right.getRowType(), joinType.toJoinType(),
+ getCluster().getTypeFactory(), null,
+ ImmutableList.<RelDataTypeField>of()));
+ case ANTI:
+ case SEMI:
+ return constructRowType(left.getRowType());
+ default:
+ throw new IllegalStateException("Unknown join type " + joinType);
+ }
+ }
+
+ public int getInputSize(int offset, RelNode input) {
+ if (this.excludeCorrelateColumn &&
+ offset == 0) {
+ return input.getRowType().getFieldList().size() - 1;
+ }
+ return input.getRowType().getFieldList().size();
+ }
+
+ public RelDataType constructRowType(RelDataType inputRowType) {
+ Preconditions.checkArgument(this.requiredColumns.cardinality() == 1);
+
+ List<RelDataType> fields = new ArrayList<>();
+ List<String> fieldNames = new ArrayList<>();
+ if (excludeCorrelateColumn) {
+ int corrVariable = this.requiredColumns.nextSetBit(0);
+
+ for (RelDataTypeField field : inputRowType.getFieldList()) {
+ if (field.getIndex() == corrVariable) {
+ continue;
+ }
+ fieldNames.add(field.getName());
+ fields.add(field.getType());
+ }
+
+ return getCluster().getTypeFactory().createStructType(fields, fieldNames);
+ }
+ return inputRowType;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
index 36d7db296..9dd5032b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillRelOptUtil.java
@@ -18,9 +18,12 @@
package org.apache.drill.exec.planner.common;
import java.util.AbstractList;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelNode;
@@ -29,6 +32,7 @@ import org.apache.calcite.rel.rules.ProjectRemoveRule;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.rex.RexInputRef;
import org.apache.calcite.rex.RexLiteral;
@@ -282,4 +286,70 @@ public abstract class DrillRelOptUtil {
}
return false;
}
+
+ /**
+ * InputRefVisitor is a utility class used to collect all the RexInputRef nodes in a
+ * RexNode.
+ *
+ */
+ public static class InputRefVisitor extends RexVisitorImpl<Void> {
+ private final List<RexInputRef> inputRefList;
+
+ public InputRefVisitor() {
+ super(true);
+ inputRefList = new ArrayList<>();
+ }
+
+ public Void visitInputRef(RexInputRef ref) {
+ inputRefList.add(ref);
+ return null;
+ }
+
+ public Void visitCall(RexCall call) {
+ for (RexNode operand : call.operands) {
+ operand.accept(this);
+ }
+ return null;
+ }
+
+ public List<RexInputRef> getInputRefs() {
+ return inputRefList;
+ }
+ }
+
+
+ /**
+ * RexFieldsTransformer is a utility class used to convert column refs in a RexNode
+ * based on inputRefMap (input to output ref map).
+ *
+ * This transformer can be used to find and replace the existing inputRef in a RexNode with a new inputRef.
+ */
+ public static class RexFieldsTransformer {
+ private final RexBuilder rexBuilder;
+ private final Map<Integer, Integer> inputRefMap;
+
+ public RexFieldsTransformer(
+ RexBuilder rexBuilder,
+ Map<Integer, Integer> inputRefMap) {
+ this.rexBuilder = rexBuilder;
+ this.inputRefMap = inputRefMap;
+ }
+
+ public RexNode go(RexNode rex) {
+ if (rex instanceof RexCall) {
+ ImmutableList.Builder<RexNode> builder = ImmutableList.builder();
+ final RexCall call = (RexCall) rex;
+ for (RexNode operand : call.operands) {
+ builder.add(go(operand));
+ }
+ return call.clone(call.getType(), builder.build());
+ } else if (rex instanceof RexInputRef) {
+ RexInputRef var = (RexInputRef) rex;
+ int index = var.getIndex();
+ return rexBuilder.makeInputRef(var.getType(), inputRefMap.get(index));
+ } else {
+ return rex;
+ }
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRule.java
index 52e603f3f..9f91818be 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillCorrelateRule.java
@@ -46,7 +46,7 @@ public class DrillCorrelateRule extends RelOptRule {
final RelTraitSet traits = correlate.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
DrillLateralJoinRel lateralJoinRel = new DrillLateralJoinRel(correlate.getCluster(),
- traits, convertedLeft, convertedRight, correlate.getCorrelationId(),
+ traits, convertedLeft, convertedRight, false, correlate.getCorrelationId(),
correlate.getRequiredColumns(), correlate.getJoinType());
call.transformTo(lateralJoinRel);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java
index 035dae9bb..aa6ccb051 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java
@@ -33,16 +33,16 @@ import java.util.List;
public class DrillLateralJoinRel extends DrillLateralJoinRelBase implements DrillRel {
- protected DrillLateralJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
+ protected DrillLateralJoinRel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, boolean includeCorrelateVar,
CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) {
- super(cluster, traits, left, right, correlationId, requiredColumns, semiJoinType);
+ super(cluster, traits, left, right, includeCorrelateVar, correlationId, requiredColumns, semiJoinType);
}
@Override
public Correlate copy(RelTraitSet traitSet,
RelNode left, RelNode right, CorrelationId correlationId,
ImmutableBitSet requiredColumns, SemiJoinType joinType) {
- return new DrillLateralJoinRel(this.getCluster(), this.getTraitSet(), left, right, correlationId, requiredColumns,
+ return new DrillLateralJoinRel(this.getCluster(), this.getTraitSet(), left, right, this.excludeCorrelateColumn, correlationId, requiredColumns,
this.getJoinType());
}
@@ -50,7 +50,7 @@ public class DrillLateralJoinRel extends DrillLateralJoinRelBase implements Dril
public LogicalOperator implement(DrillImplementor implementor) {
final List<String> fields = getRowType().getFieldNames();
assert DrillJoinRel.isUnique(fields);
- final int leftCount = left.getRowType().getFieldCount();
+ final int leftCount = getInputSize(0,left);
final LogicalOperator leftOp = DrillJoinRel.implementInput(implementor, 0, 0, left, this);
final LogicalOperator rightOp = DrillJoinRel.implementInput(implementor, 1, leftCount, right, this);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectLateralJoinTransposeRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectLateralJoinTransposeRule.java
new file mode 100644
index 000000000..5cb984a4c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectLateralJoinTransposeRule.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.rel.core.Correlate;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.rules.ProjectCorrelateTransposeRule;
+import org.apache.calcite.rel.rules.PushProjector;
+import org.apache.calcite.tools.RelBuilderFactory;
+
+public class DrillProjectLateralJoinTransposeRule extends ProjectCorrelateTransposeRule {
+
+ public static final DrillProjectLateralJoinTransposeRule INSTANCE = new DrillProjectLateralJoinTransposeRule(PushProjector.ExprCondition.TRUE, RelFactories.LOGICAL_BUILDER);
+
+ public DrillProjectLateralJoinTransposeRule(PushProjector.ExprCondition preserveExprCondition, RelBuilderFactory relFactory) {
+ super(preserveExprCondition, relFactory);
+ }
+
+ @Override
+ public boolean matches(RelOptRuleCall call) {
+ Correlate correlate = call.rel(1);
+
+
+ // No need to call ProjectCorrelateTransposeRule if the current lateralJoin contains excludeCorrelationColumn set to true.
+ // This is needed as the project push into Lateral join rule changes the output row type which will fail assertions in ProjectCorrelateTransposeRule.
+ if (correlate instanceof DrillLateralJoinRel &&
+ ((DrillLateralJoinRel)correlate).excludeCorrelateColumn) {
+ return false;
+ }
+
+ return true;
+ }
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectPushIntoLateralJoinRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectPushIntoLateralJoinRule.java
new file mode 100644
index 000000000..6a57c89fb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillProjectPushIntoLateralJoinRule.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.drill.exec.planner.StarColumnHelper;
+import org.apache.drill.exec.planner.common.DrillRelOptUtil;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class DrillProjectPushIntoLateralJoinRule extends RelOptRule {
+
+ public static final DrillProjectPushIntoLateralJoinRule INSTANCE =
+ new DrillProjectPushIntoLateralJoinRule(RelFactories.LOGICAL_BUILDER);
+
+
+ public DrillProjectPushIntoLateralJoinRule(RelBuilderFactory relFactory) {
+ super(operand(DrillProjectRel.class,
+ operand(DrillLateralJoinRel.class, any())),
+ relFactory, null);
+ }
+
+ public void onMatch(RelOptRuleCall call) {
+ DrillProjectRel origProj = call.rel(0);
+ final DrillLateralJoinRel corr = call.rel(1);
+
+ if (StarColumnHelper.containsStarColumn(origProj.getRowType()) ||
+ StarColumnHelper.containsStarColumn(corr.getRowType()) ||
+ corr.excludeCorrelateColumn) {
+ return;
+ }
+ DrillRelOptUtil.InputRefVisitor collectRefs = new DrillRelOptUtil.InputRefVisitor();
+ for (RexNode exp: origProj.getChildExps()) {
+ exp.accept(collectRefs);
+ }
+
+ int correlationIndex = corr.getRequiredColumns().nextSetBit(0);
+ for (RexInputRef inputRef : collectRefs.getInputRefs()) {
+ if (inputRef.getIndex() == correlationIndex) {
+ return;
+ }
+ }
+
+ final RelNode left = corr.getLeft();
+ final RelNode right = corr.getRight();
+ final RelNode convertedLeft = convert(left, left.getTraitSet().plus(DrillRel.DRILL_LOGICAL).simplify());
+ final RelNode convertedRight = convert(right, right.getTraitSet().plus(DrillRel.DRILL_LOGICAL).simplify());
+
+ final RelTraitSet traits = corr.getTraitSet().plus(DrillRel.DRILL_LOGICAL);
+ RelNode relNode = new DrillLateralJoinRel(corr.getCluster(),
+ traits, convertedLeft, convertedRight, true, corr.getCorrelationId(),
+ corr.getRequiredColumns(), corr.getJoinType());
+
+ if (!DrillRelOptUtil.isTrivialProject(origProj, true)) {
+ Map<Integer, Integer> mapWithoutCorr = buildMapWithoutCorrColumn(corr, correlationIndex);
+ List<RexNode> outputExprs = transformExprs(origProj.getCluster().getRexBuilder(), origProj.getChildExps(), mapWithoutCorr);
+
+ relNode = new DrillProjectRel(origProj.getCluster(),
+ left.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+ relNode, outputExprs, origProj.getRowType());
+ }
+ call.transformTo(relNode);
+ }
+
+ private List<RexNode> transformExprs(RexBuilder builder, List<RexNode> exprs, Map<Integer, Integer> corrMap) {
+ List<RexNode> outputExprs = new ArrayList<>();
+ DrillRelOptUtil.RexFieldsTransformer transformer = new DrillRelOptUtil.RexFieldsTransformer(builder, corrMap);
+ for (RexNode expr : exprs) {
+ outputExprs.add(transformer.go(expr));
+ }
+ return outputExprs;
+ }
+
+ private Map<Integer, Integer> buildMapWithoutCorrColumn(RelNode corr, int correlationIndex) {
+ int index = 0;
+ Map<Integer, Integer> result = new HashMap();
+ for (int i=0;i<corr.getRowType().getFieldList().size();i++) {
+ if (i == correlationIndex) {
+ continue;
+ } else {
+ result.put(i, index++);
+ }
+ }
+ return result;
+ }
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrel.java
index 565871bb2..b55076bd0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrel.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.Correlate;
import org.apache.calcite.rel.core.CorrelationId;
import org.apache.calcite.rel.type.RelDataType;
@@ -30,6 +31,8 @@ import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.sql.SemiJoinType;
import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.commons.collections.ListUtils;
+import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.LateralJoinPOP;
import org.apache.drill.exec.planner.common.DrillLateralJoinRelBase;
@@ -38,21 +41,23 @@ import org.apache.drill.exec.planner.physical.visitor.PrelVisitor;
import org.apache.drill.exec.record.BatchSchema;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class LateralJoinPrel extends DrillLateralJoinRelBase implements Prel {
- protected LateralJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right,
+ protected LateralJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, boolean excludeCorrelateCol,
CorrelationId correlationId, ImmutableBitSet requiredColumns, SemiJoinType semiJoinType) {
- super(cluster, traits, left, right, correlationId, requiredColumns, semiJoinType);
+ super(cluster, traits, left, right, excludeCorrelateCol, correlationId, requiredColumns, semiJoinType);
}
+
@Override
public Correlate copy(RelTraitSet traitSet,
RelNode left, RelNode right, CorrelationId correlationId,
ImmutableBitSet requiredColumns, SemiJoinType joinType) {
- return new LateralJoinPrel(this.getCluster(), this.getTraitSet(), left, right, correlationId, requiredColumns,
+ return new LateralJoinPrel(this.getCluster(), this.getTraitSet(), left, right, this.excludeCorrelateColumn, correlationId, requiredColumns,
this.getJoinType());
}
@@ -63,11 +68,22 @@ public class LateralJoinPrel extends DrillLateralJoinRelBase implements Prel {
PhysicalOperator rightPop = ((Prel)right).getPhysicalOperator(creator);
SemiJoinType jtype = this.getJoinType();
-
- LateralJoinPOP ljoin = new LateralJoinPOP(leftPop, rightPop, jtype.toJoinType());
+ List<SchemaPath> excludedColumns = new ArrayList<>();
+ if (getColumn() != null) {
+ excludedColumns.add(getColumn());
+ }
+ LateralJoinPOP ljoin = new LateralJoinPOP(leftPop, rightPop, jtype.toJoinType(), excludedColumns);
return creator.addMetadata(this, ljoin);
}
+ private SchemaPath getColumn() {
+ if (this.excludeCorrelateColumn) {
+ int index = this.getRequiredColumns().asList().get(0);
+ return SchemaPath.getSimplePath(this.getInput(0).getRowType().getFieldNames().get(index));
+ }
+ return null;
+ }
+
/**
* Check to make sure that the fields of the inputs are the same as the output field names.
* If not, insert a project renaming them.
@@ -76,8 +92,8 @@ public class LateralJoinPrel extends DrillLateralJoinRelBase implements Prel {
Preconditions.checkArgument(DrillJoinRelBase.uniqueFieldNames(input.getRowType()));
final List<String> fields = getRowType().getFieldNames();
final List<String> inputFields = input.getRowType().getFieldNames();
- final List<String> outputFields = fields.subList(offset, offset + inputFields.size());
- if (!outputFields.equals(inputFields)) {
+ final List<String> outputFields = fields.subList(offset, offset + getInputSize(offset, input));
+ if (ListUtils.subtract(outputFields, inputFields).size() != 0) {
// Ensure that input field names are the same as output field names.
// If there are duplicate field names on left and right, fields will get
// lost.
@@ -105,6 +121,16 @@ public class LateralJoinPrel extends DrillLateralJoinRelBase implements Prel {
}
@Override
+ public RelWriter explainTerms(RelWriter pw) {
+ if (this.excludeCorrelateColumn) {
+ return super.explainTerms(pw).item("column excluded from output: ", this.getColumn());
+ } else {
+ return super.explainTerms(pw);
+ }
+ }
+
+
+ @Override
public <T, X, E extends Throwable> T accept(PrelVisitor<T, X, E> visitor, X value) throws E {
return visitor.visitLateral(this, value);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrule.java
index e531dca4a..10e247b01 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/LateralJoinPrule.java
@@ -48,7 +48,7 @@ public class LateralJoinPrule extends Prule {
final LateralJoinPrel lateralJoinPrel = new LateralJoinPrel(lateralJoinRel.getCluster(),
corrTraits,
- convertedLeft, convertedRight, lateralJoinRel.getCorrelationId(),
+ convertedLeft, convertedRight, lateralJoinRel.excludeCorrelateColumn, lateralJoinRel.getCorrelationId(),
lateralJoinRel.getRequiredColumns(),lateralJoinRel.getJoinType());
call.transformTo(lateralJoinPrel);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java
index d450c5616..850f0bdf0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/JoinPrelRenameVisitor.java
@@ -76,7 +76,7 @@ public class JoinPrelRenameVisitor extends BasePrelVisitor<Prel, Void, RuntimeEx
List<RelNode> children = getChildren(prel);
- final int leftCount = children.get(0).getRowType().getFieldCount();
+ final int leftCount = prel.getInputSize(0,children.get(0));
List<RelNode> reNamedChildren = Lists.newArrayList();