diff options
author | Hanumath Rao Maduri <hanu.ncr@gmail.com> | 2018-10-09 17:33:43 -0700 |
---|---|---|
committer | Boaz Ben-Zvi <boaz@mapr.com> | 2018-11-01 16:13:51 -0700 |
commit | 71809ca6216d95540b2a41ce1ab2ebb742888671 (patch) | |
tree | b0e48affe3a1ff5764fb51e4c624e97e3dc0d294 /exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical | |
parent | 5fff1d8bff899e1af551c16f26a58b6b1d033ffb (diff) |
DRILL-6798: Planner changes to support semi-join.
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical')
5 files changed, 172 insertions, 9 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoin.java new file mode 100644 index 000000000..30067dab9 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoin.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rex.RexNode; +import org.apache.drill.exec.planner.common.DrillRelNode; +import java.util.List; + +/** + * Interface which needs to be implemented by all the join relation expressions. + */ +public interface DrillJoin extends DrillRelNode { + + /* Columns of left table that are part of join condition */ + List<Integer> getLeftKeys(); + + /* Columns of right table that are part of join condition */ + List<Integer> getRightKeys(); + + /* JoinType of the join operation*/ + JoinRelType getJoinType(); + + /* Join condition of the join relation */ + RexNode getCondition(); + + /* Left RelNode of the Join Relation */ + RelNode getLeft(); + + /* Right RelNode of the Join Relation */ + RelNode getRight(); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java index 42f7e72bc..0126e745c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java @@ -104,7 +104,7 @@ public class DrillJoinRel extends DrillJoinRelBase implements DrillRel { * @return */ private LogicalOperator implementInput(DrillImplementor implementor, int i, int offset, RelNode input) { - return implementInput(implementor, i, offset, input, this); + return implementInput(implementor, i, offset, input, this, this.getRowType().getFieldNames()); } /** @@ -118,12 +118,12 @@ public class DrillJoinRel extends DrillJoinRelBase implements DrillRel { * @return */ public static LogicalOperator implementInput(DrillImplementor implementor, int i, int offset, - RelNode input, DrillRel currentNode) { + RelNode input, DrillRel currentNode, + List<String> parentFields) { final LogicalOperator inputOp = implementor.visitChild(currentNode, i, input); assert uniqueFieldNames(input.getRowType()); - final List<String> fields = currentNode.getRowType().getFieldNames(); final List<String> inputFields = input.getRowType().getFieldNames(); - final List<String> outputFields = fields.subList(offset, offset + inputFields.size()); + final List<String> outputFields = parentFields.subList(offset, offset + inputFields.size()); if (!outputFields.equals(inputFields)) { // Ensure that input field names are the same as output field names. // If there are duplicate field names on left and right, fields will get diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java index 4356d4910..ca03de14f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java @@ -28,6 +28,7 @@ import org.apache.drill.common.logical.data.LateralJoin; import org.apache.drill.common.logical.data.LogicalOperator; import org.apache.drill.exec.planner.common.DrillLateralJoinRelBase; +import java.util.ArrayList; import java.util.List; @@ -48,12 +49,14 @@ public class DrillLateralJoinRel extends DrillLateralJoinRelBase implements Dril @Override public LogicalOperator implement(DrillImplementor implementor) { - final List<String> fields = getRowType().getFieldNames(); + List<String> fields = new ArrayList<>(); + fields.addAll(getInput(0).getRowType().getFieldNames()); + fields.addAll(getInput(1).getRowType().getFieldNames()); assert DrillJoinRel.isUnique(fields); final int leftCount = getInputSize(0); - final LogicalOperator leftOp = DrillJoinRel.implementInput(implementor, 0, 0, left, this); - final LogicalOperator rightOp = DrillJoinRel.implementInput(implementor, 1, leftCount, right, this); + final LogicalOperator leftOp = DrillJoinRel.implementInput(implementor, 0, 0, left, this, fields); + final LogicalOperator rightOp = DrillJoinRel.implementInput(implementor, 1, leftCount, right, this, fields); return new LateralJoin(leftOp, rightOp); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java index d5ff56bc1..a0b727d3f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java @@ -22,6 +22,7 @@ import org.apache.calcite.plan.RelOptCluster; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.rel.core.CorrelationId; +import org.apache.calcite.rel.core.JoinInfo; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rel.core.RelFactories; import org.apache.calcite.rel.type.RelDataType; @@ -39,7 +40,6 @@ import static org.apache.calcite.rel.core.RelFactories.DEFAULT_FILTER_FACTORY; import static org.apache.calcite.rel.core.RelFactories.DEFAULT_JOIN_FACTORY; import static org.apache.calcite.rel.core.RelFactories.DEFAULT_MATCH_FACTORY; import static org.apache.calcite.rel.core.RelFactories.DEFAULT_PROJECT_FACTORY; -import static org.apache.calcite.rel.core.RelFactories.DEFAULT_SEMI_JOIN_FACTORY; import static org.apache.calcite.rel.core.RelFactories.DEFAULT_SET_OP_FACTORY; import static org.apache.calcite.rel.core.RelFactories.DEFAULT_SORT_FACTORY; import static org.apache.calcite.rel.core.RelFactories.DEFAULT_TABLE_SCAN_FACTORY; @@ -60,6 +60,17 @@ public class DrillRelFactories { public static final RelFactories.JoinFactory DRILL_LOGICAL_JOIN_FACTORY = new DrillJoinFactoryImpl(); public static final RelFactories.AggregateFactory DRILL_LOGICAL_AGGREGATE_FACTORY = new DrillAggregateFactoryImpl(); + + public static final RelFactories.SemiJoinFactory DRILL_SEMI_JOIN_FACTORY = new SemiJoinFactoryImpl(); + + private static class SemiJoinFactoryImpl implements RelFactories.SemiJoinFactory { + public RelNode createSemiJoin(RelNode left, RelNode right, + RexNode condition) { + final JoinInfo joinInfo = JoinInfo.of(left, right, condition); + return DrillSemiJoinRel.create(left, right, + condition, joinInfo.leftKeys, joinInfo.rightKeys); + } + } /** * A {@link RelBuilderFactory} that creates a {@link DrillRelBuilder} that will * create logical relational expressions for everything. @@ -69,7 +80,7 @@ public class DrillRelFactories { Contexts.of(DEFAULT_PROJECT_FACTORY, DEFAULT_FILTER_FACTORY, DEFAULT_JOIN_FACTORY, - DEFAULT_SEMI_JOIN_FACTORY, + DRILL_SEMI_JOIN_FACTORY, DEFAULT_SORT_FACTORY, DEFAULT_AGGREGATE_FACTORY, DEFAULT_MATCH_FACTORY, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java new file mode 100644 index 000000000..09e4be9de --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.logical; + +import org.apache.calcite.plan.RelOptCluster; +import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.core.JoinInfo; +import org.apache.calcite.rel.core.JoinRelType; +import org.apache.calcite.rel.core.SemiJoin; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.util.ImmutableIntList; +import org.apache.calcite.util.Pair; +import org.apache.drill.common.expression.FieldReference; +import org.apache.drill.common.logical.data.Join; +import org.apache.drill.common.logical.data.JoinCondition; +import org.apache.drill.common.logical.data.LogicalOperator; +import org.apache.drill.common.logical.data.LogicalSemiJoin; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; + +import java.util.ArrayList; +import java.util.List; + +public class DrillSemiJoinRel extends SemiJoin implements DrillJoin, DrillRel { + + public DrillSemiJoinRel( + RelOptCluster cluster, + RelTraitSet traitSet, + RelNode left, + RelNode right, + RexNode condition, + ImmutableIntList leftKeys, + ImmutableIntList rightKeys) { + super(cluster, + traitSet, + left, + right, + condition, + leftKeys, + rightKeys); + } + + public static SemiJoin create(RelNode left, RelNode right, RexNode condition, + ImmutableIntList leftKeys, ImmutableIntList rightKeys) { + final RelOptCluster cluster = left.getCluster(); + return new DrillSemiJoinRel(cluster, cluster.traitSetOf(DrillRel.DRILL_LOGICAL), left, + right, condition, leftKeys, rightKeys); + } + + @Override + public SemiJoin copy(RelTraitSet traitSet, RexNode condition, + RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) { + Preconditions.checkArgument(joinType == JoinRelType.INNER); + final JoinInfo joinInfo = JoinInfo.of(left, right, condition); + Preconditions.checkArgument(joinInfo.isEqui()); + return new DrillSemiJoinRel(getCluster(), traitSet, left, right, condition, + joinInfo.leftKeys, joinInfo.rightKeys); + } + + @Override + public LogicalOperator implement(DrillImplementor implementor) { + List<String> fields = new ArrayList<>(); + fields.addAll(getInput(0).getRowType().getFieldNames()); + fields.addAll(getInput(1).getRowType().getFieldNames()); + Preconditions.checkArgument(DrillJoinRel.isUnique(fields)); + final int leftCount = left.getRowType().getFieldCount(); + final List<String> leftFields = fields.subList(0, leftCount); + final List<String> rightFields = fields.subList(leftCount, leftCount + right.getRowType().getFieldCount()); + + final LogicalOperator leftOp = DrillJoinRel.implementInput(implementor, 0, 0, left, this, fields); + final LogicalOperator rightOp = DrillJoinRel.implementInput(implementor, 1, leftCount, right, this, fields); + + Join.Builder builder = Join.builder(); + builder.type(joinType); + builder.left(leftOp); + builder.right(rightOp); + List<JoinCondition> conditions = Lists.newArrayList(); + for (Pair<Integer, Integer> pair : Pair.zip(leftKeys, rightKeys)) { + conditions.add(new JoinCondition(DrillJoinRel.EQUALITY_CONDITION, + new FieldReference(leftFields.get(pair.left)), new FieldReference(rightFields.get(pair.right)))); + } + + return new LogicalSemiJoin(leftOp, rightOp, conditions, joinType); + } +} |