aboutsummaryrefslogtreecommitdiff
path: root/exec
diff options
context:
space:
mode:
authorHanumath Rao Maduri <hanu.ncr@gmail.com>2018-10-09 17:33:43 -0700
committerBoaz Ben-Zvi <boaz@mapr.com>2018-11-01 16:13:51 -0700
commit71809ca6216d95540b2a41ce1ab2ebb742888671 (patch)
treeb0e48affe3a1ff5764fb51e4c624e97e3dc0d294 /exec
parent5fff1d8bff899e1af551c16f26a58b6b1d033ffb (diff)
DRILL-6798: Planner changes to support semi-join.
Diffstat (limited to 'exec')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java20
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java15
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoin.java48
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java15
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java101
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java57
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java15
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java83
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java49
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java1
-rw-r--r--exec/java-exec/src/main/resources/drill-module.conf1
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestSemiJoin.java118
19 files changed, 501 insertions, 61 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index ae55c9f21..17f8da523 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -358,15 +358,18 @@ public enum PlannerPhase {
* We have to create another copy of the ruleset with the context dependent elements;
* this cannot be reused across queries.
*/
- final ImmutableSet<RelOptRule> basicRules = ImmutableSet.<RelOptRule>builder()
+ ImmutableSet.Builder<RelOptRule> basicRules = ImmutableSet.<RelOptRule>builder()
.addAll(staticRuleSet)
.add(
DrillMergeProjectRule.getInstance(true, RelFactories.DEFAULT_PROJECT_FACTORY,
optimizerRulesContext.getFunctionRegistry())
- )
- .build();
+ );
+ if (optimizerRulesContext.getPlannerSettings().isHashJoinEnabled() &&
+ optimizerRulesContext.getPlannerSettings().isSemiJoinEnabled()) {
+ basicRules.add(RuleInstance.SEMI_JOIN_PROJECT_RULE);
+ }
- return RuleSets.ofList(basicRules);
+ return RuleSets.ofList(basicRules.build());
}
/**
@@ -474,7 +477,6 @@ public enum PlannerPhase {
static RuleSet getPhysicalRules(OptimizerRulesContext optimizerRulesContext) {
final List<RelOptRule> ruleList = new ArrayList<>();
final PlannerSettings ps = optimizerRulesContext.getPlannerSettings();
-
ruleList.add(ConvertCountToDirectScan.AGG_ON_PROJ_ON_SCAN);
ruleList.add(ConvertCountToDirectScan.AGG_ON_SCAN);
ruleList.add(SortConvertPrule.INSTANCE);
@@ -509,9 +511,14 @@ public enum PlannerPhase {
if (ps.isHashJoinEnabled()) {
ruleList.add(HashJoinPrule.DIST_INSTANCE);
-
+ if (ps.isSemiJoinEnabled()) {
+ ruleList.add(HashJoinPrule.SEMI_DIST_INSTANCE);
+ }
if(ps.isBroadcastJoinEnabled()){
ruleList.add(HashJoinPrule.BROADCAST_INSTANCE);
+ if (ps.isSemiJoinEnabled()) {
+ ruleList.add(HashJoinPrule.SEMI_BROADCAST_INSTANCE);
+ }
}
}
@@ -521,7 +528,6 @@ public enum PlannerPhase {
if(ps.isBroadcastJoinEnabled()){
ruleList.add(MergeJoinPrule.BROADCAST_INSTANCE);
}
-
}
// NLJ plans consist of broadcasting the right child, hence we need
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
index 8aec96c94..b14488c9c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/RuleInstance.java
@@ -18,8 +18,11 @@
package org.apache.drill.exec.planner;
import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.volcano.AbstractConverter;
+import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.Join;
+import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.logical.LogicalAggregate;
import org.apache.calcite.rel.logical.LogicalCalc;
import org.apache.calcite.rel.logical.LogicalJoin;
@@ -39,12 +42,13 @@ import org.apache.calcite.rel.rules.ProjectSetOpTransposeRule;
import org.apache.calcite.rel.rules.ProjectToWindowRule;
import org.apache.calcite.rel.rules.ProjectWindowTransposeRule;
import org.apache.calcite.rel.rules.ReduceExpressionsRule;
+import org.apache.calcite.rel.rules.SemiJoinRule;
import org.apache.calcite.rel.rules.SortRemoveRule;
import org.apache.calcite.rel.rules.SubQueryRemoveRule;
import org.apache.calcite.rel.rules.UnionToDistinctRule;
import org.apache.drill.exec.planner.logical.DrillConditions;
import org.apache.drill.exec.planner.logical.DrillRelFactories;
-
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
/**
* Contains rule instances which use custom RelBuilder.
*/
@@ -58,6 +62,15 @@ public interface RuleInstance {
new UnionToDistinctRule(LogicalUnion.class,
DrillRelFactories.LOGICAL_BUILDER);
+ SemiJoinRule SEMI_JOIN_PROJECT_RULE = new SemiJoinRule.ProjectToSemiJoinRule(Project.class, Join.class, Aggregate.class,
+ DrillRelFactories.LOGICAL_BUILDER, "DrillSemiJoinRule:project") {
+ public boolean matches(RelOptRuleCall call) {
+ Preconditions.checkArgument(call.rel(1) instanceof Join);
+ Join join = call.rel(1);
+ return !(join.getCondition().isAlwaysTrue() || join.getCondition().isAlwaysFalse());
+ }
+ };
+
JoinPushExpressionsRule JOIN_PUSH_EXPRESSIONS_RULE =
new JoinPushExpressionsRule(Join.class,
DrillRelFactories.LOGICAL_BUILDER);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java
index 434016ff8..cde49e4b9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/common/DrillJoinRelBase.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.planner.cost.DrillCostBase;
import org.apache.drill.exec.physical.impl.join.JoinUtils;
import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinCategory;
import org.apache.drill.exec.planner.cost.DrillCostBase.DrillCostFactory;
+import org.apache.drill.exec.planner.logical.DrillJoin;
import org.apache.drill.exec.planner.physical.PrelUtil;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinRelType;
@@ -45,7 +46,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
/**
* Base class for logical and physical Joins implemented in Drill.
*/
-public abstract class DrillJoinRelBase extends Join implements DrillRelNode {
+public abstract class DrillJoinRelBase extends Join implements DrillJoin {
protected List<Integer> leftKeys = Lists.newArrayList();
protected List<Integer> rightKeys = Lists.newArrayList();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoin.java
new file mode 100644
index 000000000..30067dab9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoin.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.drill.exec.planner.common.DrillRelNode;
+import java.util.List;
+
+/**
+ * Interface which needs to be implemented by all the join relation expressions.
+ */
+public interface DrillJoin extends DrillRelNode {
+
+ /* Columns of left table that are part of join condition */
+ List<Integer> getLeftKeys();
+
+ /* Columns of right table that are part of join condition */
+ List<Integer> getRightKeys();
+
+ /* JoinType of the join operation*/
+ JoinRelType getJoinType();
+
+ /* Join condition of the join relation */
+ RexNode getCondition();
+
+ /* Left RelNode of the Join Relation */
+ RelNode getLeft();
+
+ /* Right RelNode of the Join Relation */
+ RelNode getRight();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
index 42f7e72bc..0126e745c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java
@@ -104,7 +104,7 @@ public class DrillJoinRel extends DrillJoinRelBase implements DrillRel {
* @return
*/
private LogicalOperator implementInput(DrillImplementor implementor, int i, int offset, RelNode input) {
- return implementInput(implementor, i, offset, input, this);
+ return implementInput(implementor, i, offset, input, this, this.getRowType().getFieldNames());
}
/**
@@ -118,12 +118,12 @@ public class DrillJoinRel extends DrillJoinRelBase implements DrillRel {
* @return
*/
public static LogicalOperator implementInput(DrillImplementor implementor, int i, int offset,
- RelNode input, DrillRel currentNode) {
+ RelNode input, DrillRel currentNode,
+ List<String> parentFields) {
final LogicalOperator inputOp = implementor.visitChild(currentNode, i, input);
assert uniqueFieldNames(input.getRowType());
- final List<String> fields = currentNode.getRowType().getFieldNames();
final List<String> inputFields = input.getRowType().getFieldNames();
- final List<String> outputFields = fields.subList(offset, offset + inputFields.size());
+ final List<String> outputFields = parentFields.subList(offset, offset + inputFields.size());
if (!outputFields.equals(inputFields)) {
// Ensure that input field names are the same as output field names.
// If there are duplicate field names on left and right, fields will get
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java
index 4356d4910..ca03de14f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillLateralJoinRel.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.logical.data.LateralJoin;
import org.apache.drill.common.logical.data.LogicalOperator;
import org.apache.drill.exec.planner.common.DrillLateralJoinRelBase;
+import java.util.ArrayList;
import java.util.List;
@@ -48,12 +49,14 @@ public class DrillLateralJoinRel extends DrillLateralJoinRelBase implements Dril
@Override
public LogicalOperator implement(DrillImplementor implementor) {
- final List<String> fields = getRowType().getFieldNames();
+ List<String> fields = new ArrayList<>();
+ fields.addAll(getInput(0).getRowType().getFieldNames());
+ fields.addAll(getInput(1).getRowType().getFieldNames());
assert DrillJoinRel.isUnique(fields);
final int leftCount = getInputSize(0);
- final LogicalOperator leftOp = DrillJoinRel.implementInput(implementor, 0, 0, left, this);
- final LogicalOperator rightOp = DrillJoinRel.implementInput(implementor, 1, leftCount, right, this);
+ final LogicalOperator leftOp = DrillJoinRel.implementInput(implementor, 0, 0, left, this, fields);
+ final LogicalOperator rightOp = DrillJoinRel.implementInput(implementor, 1, leftCount, right, this, fields);
return new LateralJoin(leftOp, rightOp);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java
index d5ff56bc1..a0b727d3f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillRelFactories.java
@@ -22,6 +22,7 @@ import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.AggregateCall;
import org.apache.calcite.rel.core.CorrelationId;
+import org.apache.calcite.rel.core.JoinInfo;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.type.RelDataType;
@@ -39,7 +40,6 @@ import static org.apache.calcite.rel.core.RelFactories.DEFAULT_FILTER_FACTORY;
import static org.apache.calcite.rel.core.RelFactories.DEFAULT_JOIN_FACTORY;
import static org.apache.calcite.rel.core.RelFactories.DEFAULT_MATCH_FACTORY;
import static org.apache.calcite.rel.core.RelFactories.DEFAULT_PROJECT_FACTORY;
-import static org.apache.calcite.rel.core.RelFactories.DEFAULT_SEMI_JOIN_FACTORY;
import static org.apache.calcite.rel.core.RelFactories.DEFAULT_SET_OP_FACTORY;
import static org.apache.calcite.rel.core.RelFactories.DEFAULT_SORT_FACTORY;
import static org.apache.calcite.rel.core.RelFactories.DEFAULT_TABLE_SCAN_FACTORY;
@@ -60,6 +60,17 @@ public class DrillRelFactories {
public static final RelFactories.JoinFactory DRILL_LOGICAL_JOIN_FACTORY = new DrillJoinFactoryImpl();
public static final RelFactories.AggregateFactory DRILL_LOGICAL_AGGREGATE_FACTORY = new DrillAggregateFactoryImpl();
+
+ public static final RelFactories.SemiJoinFactory DRILL_SEMI_JOIN_FACTORY = new SemiJoinFactoryImpl();
+
+ private static class SemiJoinFactoryImpl implements RelFactories.SemiJoinFactory {
+ public RelNode createSemiJoin(RelNode left, RelNode right,
+ RexNode condition) {
+ final JoinInfo joinInfo = JoinInfo.of(left, right, condition);
+ return DrillSemiJoinRel.create(left, right,
+ condition, joinInfo.leftKeys, joinInfo.rightKeys);
+ }
+ }
/**
* A {@link RelBuilderFactory} that creates a {@link DrillRelBuilder} that will
* create logical relational expressions for everything.
@@ -69,7 +80,7 @@ public class DrillRelFactories {
Contexts.of(DEFAULT_PROJECT_FACTORY,
DEFAULT_FILTER_FACTORY,
DEFAULT_JOIN_FACTORY,
- DEFAULT_SEMI_JOIN_FACTORY,
+ DRILL_SEMI_JOIN_FACTORY,
DEFAULT_SORT_FACTORY,
DEFAULT_AGGREGATE_FACTORY,
DEFAULT_MATCH_FACTORY,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java
new file mode 100644
index 000000000..09e4be9de
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillSemiJoinRel.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.logical;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.JoinInfo;
+import org.apache.calcite.rel.core.JoinRelType;
+import org.apache.calcite.rel.core.SemiJoin;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Pair;
+import org.apache.drill.common.expression.FieldReference;
+import org.apache.drill.common.logical.data.Join;
+import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.common.logical.data.LogicalOperator;
+import org.apache.drill.common.logical.data.LogicalSemiJoin;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class DrillSemiJoinRel extends SemiJoin implements DrillJoin, DrillRel {
+
+ public DrillSemiJoinRel(
+ RelOptCluster cluster,
+ RelTraitSet traitSet,
+ RelNode left,
+ RelNode right,
+ RexNode condition,
+ ImmutableIntList leftKeys,
+ ImmutableIntList rightKeys) {
+ super(cluster,
+ traitSet,
+ left,
+ right,
+ condition,
+ leftKeys,
+ rightKeys);
+ }
+
+ public static SemiJoin create(RelNode left, RelNode right, RexNode condition,
+ ImmutableIntList leftKeys, ImmutableIntList rightKeys) {
+ final RelOptCluster cluster = left.getCluster();
+ return new DrillSemiJoinRel(cluster, cluster.traitSetOf(DrillRel.DRILL_LOGICAL), left,
+ right, condition, leftKeys, rightKeys);
+ }
+
+ @Override
+ public SemiJoin copy(RelTraitSet traitSet, RexNode condition,
+ RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) {
+ Preconditions.checkArgument(joinType == JoinRelType.INNER);
+ final JoinInfo joinInfo = JoinInfo.of(left, right, condition);
+ Preconditions.checkArgument(joinInfo.isEqui());
+ return new DrillSemiJoinRel(getCluster(), traitSet, left, right, condition,
+ joinInfo.leftKeys, joinInfo.rightKeys);
+ }
+
+ @Override
+ public LogicalOperator implement(DrillImplementor implementor) {
+ List<String> fields = new ArrayList<>();
+ fields.addAll(getInput(0).getRowType().getFieldNames());
+ fields.addAll(getInput(1).getRowType().getFieldNames());
+ Preconditions.checkArgument(DrillJoinRel.isUnique(fields));
+ final int leftCount = left.getRowType().getFieldCount();
+ final List<String> leftFields = fields.subList(0, leftCount);
+ final List<String> rightFields = fields.subList(leftCount, leftCount + right.getRowType().getFieldCount());
+
+ final LogicalOperator leftOp = DrillJoinRel.implementInput(implementor, 0, 0, left, this, fields);
+ final LogicalOperator rightOp = DrillJoinRel.implementInput(implementor, 1, leftCount, right, this, fields);
+
+ Join.Builder builder = Join.builder();
+ builder.type(joinType);
+ builder.left(leftOp);
+ builder.right(rightOp);
+ List<JoinCondition> conditions = Lists.newArrayList();
+ for (Pair<Integer, Integer> pair : Pair.zip(leftKeys, rightKeys)) {
+ conditions.add(new JoinCondition(DrillJoinRel.EQUALITY_CONDITION,
+ new FieldReference(leftFields.get(pair.left)), new FieldReference(rightFields.get(pair.right))));
+ }
+
+ return new LogicalSemiJoin(leftOp, rightOp, conditions, joinType);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
index 0e1fc4e85..6480f3d35 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java
@@ -20,8 +20,13 @@ package org.apache.drill.exec.planner.physical;
import java.io.IOException;
import java.util.List;
+import org.apache.calcite.plan.RelTrait;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.util.ImmutableBitSet;
import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.drill.exec.physical.base.PhysicalOperator;
@@ -38,7 +43,7 @@ import org.apache.calcite.plan.RelOptCost;
import org.apache.calcite.plan.RelOptPlanner;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rex.RexNode;
-
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.exec.work.filter.RuntimeFilterDef;
@@ -50,14 +55,25 @@ public class HashJoinPrel extends JoinPrel {
private int joinControl;
public HashJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
- JoinRelType joinType) throws InvalidRelException {
- this(cluster, traits, left, right, condition, joinType, false, null, false, JoinControl.DEFAULT);
+ JoinRelType joinType, boolean semiJoin) throws InvalidRelException {
+ this(cluster, traits, left, right, condition, joinType, false, null, false, JoinControl.DEFAULT, semiJoin);
+ }
+
+ public HashJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
+ JoinRelType joinType, boolean swapped, RuntimeFilterDef runtimeFilterDef,
+ boolean isRowKeyJoin, int joinControl) throws InvalidRelException {
+ this(cluster, traits, left, right, condition, joinType, swapped, runtimeFilterDef, isRowKeyJoin, joinControl, false);
}
public HashJoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
JoinRelType joinType, boolean swapped, RuntimeFilterDef runtimeFilterDef,
- boolean isRowKeyJoin, int joinControl) throws InvalidRelException {
- super(cluster, traits, left, right, condition, joinType);
+ boolean isRowKeyJoin, int joinControl, boolean semiJoin) throws InvalidRelException {
+ super(cluster, traits, left, right, condition, joinType, semiJoin);
+ Preconditions.checkArgument(isSemiJoin && !swapped || swapped && !isSemiJoin || (!swapped && !isSemiJoin));
+ if (isSemiJoin) {
+ Preconditions.checkArgument(!swapped, "swapping of inputs is not allowed for semi-joins");
+ Preconditions.checkArgument(validateTraits(traitSet, left, right));
+ }
this.swapped = swapped;
this.isRowKeyJoin = isRowKeyJoin;
joincategory = JoinUtils.getJoinCategory(left, right, condition, leftKeys, rightKeys, filterNulls);
@@ -65,11 +81,34 @@ public class HashJoinPrel extends JoinPrel {
this.joinControl = joinControl;
}
+ private static boolean validateTraits(RelTraitSet traitSet, RelNode left, RelNode right) {
+ ImmutableBitSet bitSet = ImmutableBitSet.range(left.getRowType().getFieldCount(),
+ left.getRowType().getFieldCount() + right.getRowType().getFieldCount());
+ for (RelTrait trait: traitSet) {
+ if (trait.getTraitDef().getTraitClass().equals(RelCollation.class)) {
+ RelCollation collationTrait = (RelCollation)trait;
+ for (RelFieldCollation field : collationTrait.getFieldCollations()) {
+ if (bitSet.indexOf(field.getFieldIndex()) > 0) {
+ return false;
+ }
+ }
+ } else if (trait.getTraitDef().getTraitClass().equals(DrillDistributionTrait.class)) {
+ DrillDistributionTrait distributionTrait = (DrillDistributionTrait) trait;
+ for (DrillDistributionTrait.DistributionField field : distributionTrait.getFields()) {
+ if (bitSet.indexOf(field.getFieldId()) > 0) {
+ return false;
+ }
+ }
+ }
+ }
+ return true;
+ }
+
@Override
public Join copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) {
try {
return new HashJoinPrel(this.getCluster(), traitSet, left, right, conditionExpr, joinType, this.swapped, this.runtimeFilterDef,
- this.isRowKeyJoin, this.joinControl);
+ this.isRowKeyJoin, this.joinControl, this.isSemiJoin);
}catch (InvalidRelException e) {
throw new AssertionError(e);
}
@@ -87,7 +126,7 @@ public class HashJoinPrel extends JoinPrel {
}
@Override
- public org.apache.drill.exec.physical.base.PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
+ public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
// Depending on whether the left/right is swapped for hash inner join, pass in different
// combinations of parameters.
if (! swapped) {
@@ -150,4 +189,8 @@ public class HashJoinPrel extends JoinPrel {
return this.isRowKeyJoin;
}
+ @Override
+ public RelWriter explainTerms(RelWriter pw) {
+ return super.explainTerms(pw).item("semi-join: ", isSemiJoin);
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
index d07cf51d3..0d7f5caa4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrule.java
@@ -17,7 +17,9 @@
*/
package org.apache.drill.exec.planner.physical;
+import org.apache.drill.exec.planner.logical.DrillJoin;
import org.apache.drill.exec.planner.logical.DrillJoinRel;
+import org.apache.drill.exec.planner.logical.DrillSemiJoinRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.calcite.rel.InvalidRelException;
import org.apache.calcite.rel.RelNode;
@@ -30,10 +32,14 @@ import org.slf4j.Logger;
public class HashJoinPrule extends JoinPruleBase {
public static final RelOptRule DIST_INSTANCE = new HashJoinPrule("Prel.HashJoinDistPrule", RelOptHelper.any(DrillJoinRel.class), true);
public static final RelOptRule BROADCAST_INSTANCE = new HashJoinPrule("Prel.HashJoinBroadcastPrule", RelOptHelper.any(DrillJoinRel.class), false);
+ public static final RelOptRule SEMI_DIST_INSTANCE = new HashJoinPrule("Prel.HashSemiJoinDistPrule", RelOptHelper.any(DrillSemiJoinRel.class), true);
+ public static final RelOptRule SEMI_BROADCAST_INSTANCE = new HashJoinPrule("Prel.HashSemiJoinBroadcastPrule", RelOptHelper.any(DrillSemiJoinRel.class), false);
+
protected static final Logger tracer = CalciteTrace.getPlannerTracer();
private final boolean isDist;
+ private boolean isSemi = false;
private HashJoinPrule(String name, RelOptRuleOperand operand, boolean isDist) {
super(operand, name);
this.isDist = isDist;
@@ -42,17 +48,18 @@ public class HashJoinPrule extends JoinPruleBase {
@Override
public boolean matches(RelOptRuleCall call) {
PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
+ isSemi = call.rel(0) instanceof DrillSemiJoinRel;
return settings.isMemoryEstimationEnabled() || settings.isHashJoinEnabled();
}
@Override
public void onMatch(RelOptRuleCall call) {
PlannerSettings settings = PrelUtil.getPlannerSettings(call.getPlanner());
- if (!settings.isHashJoinEnabled()) {
+ if (!settings.isHashJoinEnabled() || isSemi && !settings.isSemiJoinEnabled()) {
return;
}
- final DrillJoinRel join = call.rel(0);
+ final DrillJoin join = call.rel(0);
final RelNode left = join.getLeft();
final RelNode right = join.getRight();
@@ -66,11 +73,11 @@ public class HashJoinPrule extends JoinPruleBase {
if(isDist){
createDistBothPlan(call, join, PhysicalJoinType.HASH_JOIN,
- left, right, null /* left collation */, null /* right collation */, hashSingleKey);
+ left, right, null /* left collation */, null /* right collation */, hashSingleKey, isSemi);
}else{
if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
createBroadcastPlan(call, join, join.getCondition(), PhysicalJoinType.HASH_JOIN,
- left, right, null /* left collation */, null /* right collation */);
+ left, right, null /* left collation */, null /* right collation */, isSemi);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
index c40eeaa6a..2581fa667 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPrel.java
@@ -17,9 +17,14 @@
*/
package org.apache.drill.exec.planner.physical;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import org.apache.calcite.rex.RexChecker;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlValidatorUtil;
+import org.apache.calcite.util.Litmus;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.logical.data.JoinCondition;
@@ -37,7 +42,6 @@ import org.apache.calcite.rex.RexNode;
import org.apache.calcite.rex.RexUtil;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.util.Pair;
-
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
/**
@@ -48,11 +52,18 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
public abstract class JoinPrel extends DrillJoinRelBase implements Prel {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinPrel.class);
+ protected final boolean isSemiJoin;
protected JoinUtils.JoinCategory joincategory;
public JoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
- JoinRelType joinType) {
+ JoinRelType joinType) {
+ this(cluster, traits, left, right, condition, joinType, false);
+ }
+
+ public JoinPrel(RelOptCluster cluster, RelTraitSet traits, RelNode left, RelNode right, RexNode condition,
+ JoinRelType joinType, boolean isSemiJoin) {
super(cluster, traits, left, right, condition, joinType);
+ this.isSemiJoin = isSemiJoin;
}
@Override
@@ -73,7 +84,12 @@ public abstract class JoinPrel extends DrillJoinRelBase implements Prel {
assert uniqueFieldNames(input.getRowType());
final List<String> fields = getRowType().getFieldNames();
final List<String> inputFields = input.getRowType().getFieldNames();
- final List<String> outputFields = fields.subList(offset, offset + inputFields.size());
+ final List<String> outputFields;
+ if (fields.size() > offset) {
+ outputFields = fields.subList(offset, offset + inputFields.size());
+ } else {
+ outputFields = new ArrayList<>();
+ }
if (!outputFields.equals(inputFields)) {
// Ensure that input field names are the same as output field names.
// If there are duplicate field names on left and right, fields will get
@@ -86,6 +102,9 @@ public abstract class JoinPrel extends DrillJoinRelBase implements Prel {
}
private RelNode rename(RelNode input, List<RelDataTypeField> inputFields, List<String> outputFieldNames) {
+ if (outputFieldNames.size() == 0) {
+ return input;
+ }
List<RexNode> exprs = Lists.newArrayList();
for (RelDataTypeField field : inputFields) {
@@ -139,4 +158,62 @@ public abstract class JoinPrel extends DrillJoinRelBase implements Prel {
}
}
+ public boolean isSemiJoin() {
+ return isSemiJoin;
+ }
+
+ /* A Drill physical rel which is semi join will have output row type with fields from only
+ left side of the join. Calcite's join rel expects to have the output row type from
+ left and right side of the join. This function is overloaded to not throw exceptions for
+ a Drill semi join physical rel.
+ */
+ @Override public boolean isValid(Litmus litmus, Context context) {
+ if (!this.isSemiJoin && !super.isValid(litmus, context)) {
+ return false;
+ }
+ if (getRowType().getFieldCount()
+ != getSystemFieldList().size()
+ + left.getRowType().getFieldCount()
+ + (this.isSemiJoin ? 0 : right.getRowType().getFieldCount())) {
+ return litmus.fail("field count mismatch");
+ }
+ if (condition != null) {
+ if (condition.getType().getSqlTypeName() != SqlTypeName.BOOLEAN) {
+ return litmus.fail("condition must be boolean: {}",
+ condition.getType());
+ }
+ // The input to the condition is a row type consisting of system
+ // fields, left fields, and right fields. Very similar to the
+ // output row type, except that fields have not yet been made due
+ // due to outer joins.
+ RexChecker checker =
+ new RexChecker(
+ getCluster().getTypeFactory().builder()
+ .addAll(getSystemFieldList())
+ .addAll(getLeft().getRowType().getFieldList())
+ .addAll(getRight().getRowType().getFieldList())
+ .build(),
+ context, litmus);
+ condition.accept(checker);
+ if (checker.getFailureCount() > 0) {
+ return litmus.fail(checker.getFailureCount()
+ + " failures in condition " + condition);
+ }
+ }
+ return litmus.succeed();
+ }
+
+ @Override public RelDataType deriveRowType() {
+ if (isSemiJoin) {
+ return SqlValidatorUtil.deriveJoinRowType(
+ left.getRowType(),
+ null,
+ this.joinType,
+ getCluster().getTypeFactory(),
+ null,
+ new ArrayList<>());
+ } else {
+ return super.deriveRowType();
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
index 7588e2c13..366540160 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
@@ -22,7 +22,7 @@ import java.util.List;
import org.apache.drill.exec.physical.impl.join.JoinUtils;
import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinCategory;
import org.apache.drill.exec.planner.common.DrillJoinRelBase;
-import org.apache.drill.exec.planner.logical.DrillJoinRel;
+import org.apache.drill.exec.planner.logical.DrillJoin;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
import org.apache.calcite.rel.InvalidRelException;
import org.apache.calcite.rel.core.JoinRelType;
@@ -47,8 +47,8 @@ public abstract class JoinPruleBase extends Prule {
super(operand, description);
}
- protected boolean checkPreconditions(DrillJoinRel join, RelNode left, RelNode right,
- PlannerSettings settings) {
+ protected boolean checkPreconditions(DrillJoin join, RelNode left, RelNode right,
+ PlannerSettings settings) {
List<Integer> leftKeys = Lists.newArrayList();
List<Integer> rightKeys = Lists.newArrayList();
List<Boolean> filterNulls = Lists.newArrayList();
@@ -66,7 +66,7 @@ public abstract class JoinPruleBase extends Prule {
return distFields;
}
- protected boolean checkBroadcastConditions(RelOptPlanner planner, DrillJoinRel join, RelNode left, RelNode right) {
+ protected boolean checkBroadcastConditions(RelOptPlanner planner, DrillJoin join, RelNode left, RelNode right) {
double estimatedRightRowCount = RelMetadataQuery.instance().getRowCount(right);
if (estimatedRightRowCount < PrelUtil.getSettings(join.getCluster()).getBroadcastThreshold()
@@ -78,10 +78,11 @@ public abstract class JoinPruleBase extends Prule {
return false;
}
- protected void createDistBothPlan(RelOptRuleCall call, DrillJoinRel join,
+ protected void createDistBothPlan(RelOptRuleCall call, DrillJoin join,
PhysicalJoinType physicalJoinType,
RelNode left, RelNode right,
- RelCollation collationLeft, RelCollation collationRight, boolean hashSingleKey)throws InvalidRelException {
+ RelCollation collationLeft, RelCollation collationRight,
+ boolean hashSingleKey, boolean semiJoin)throws InvalidRelException {
/* If join keys are l1 = r1 and l2 = r2 and ... l_k = r_k, then consider the following options of plan:
* 1) Plan1: distributed by (l1, l2, ..., l_k) for left side and by (r1, r2, ..., r_k) for right side.
@@ -93,10 +94,12 @@ public abstract class JoinPruleBase extends Prule {
* Whether enumerate plan 2, .., Plan_(k+1) depends on option : hashSingleKey.
*/
- DrillDistributionTrait hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getLeftKeys())));
- DrillDistributionTrait hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getRightKeys())));
+ DrillDistributionTrait hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+ ImmutableList.copyOf(getDistributionField(join.getLeftKeys())));
+ DrillDistributionTrait hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
+ ImmutableList.copyOf(getDistributionField(join.getRightKeys())));
- createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight, hashLeftPartition, hashRightPartition);
+ createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight, hashLeftPartition, hashRightPartition, semiJoin);
assert (join.getLeftKeys().size() == join.getRightKeys().size());
@@ -110,7 +113,7 @@ public abstract class JoinPruleBase extends Prule {
hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getLeftKeys().subList(i, i+1))));
hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getRightKeys().subList(i, i+1))));
- createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight, hashLeftPartition, hashRightPartition);
+ createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight, hashLeftPartition, hashRightPartition, semiJoin);
}
}
}
@@ -118,11 +121,11 @@ public abstract class JoinPruleBase extends Prule {
// Create join plan with both left and right children hash distributed. If the physical join type
// is MergeJoin, a collation must be provided for both left and right child and the plan will contain
// sort converter if necessary to provide the collation.
- private void createDistBothPlan(RelOptRuleCall call, DrillJoinRel join,
+ private void createDistBothPlan(RelOptRuleCall call, DrillJoin join,
PhysicalJoinType physicalJoinType,
RelNode left, RelNode right,
RelCollation collationLeft, RelCollation collationRight,
- DrillDistributionTrait hashLeftPartition, DrillDistributionTrait hashRightPartition) throws InvalidRelException {
+ DrillDistributionTrait hashLeftPartition, DrillDistributionTrait hashRightPartition, boolean isSemiJoin) throws InvalidRelException {
RelTraitSet traitsLeft = null;
RelTraitSet traitsRight = null;
@@ -145,7 +148,7 @@ public abstract class JoinPruleBase extends Prule {
final RelTraitSet traitSet = PrelUtil.removeCollation(traitsLeft, call);
newJoin = new HashJoinPrel(join.getCluster(), traitSet,
convertedLeft, convertedRight, join.getCondition(),
- join.getJoinType());
+ join.getJoinType(), isSemiJoin);
} else if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
newJoin = new MergeJoinPrel(join.getCluster(), traitsLeft,
@@ -158,11 +161,11 @@ public abstract class JoinPruleBase extends Prule {
// Create join plan with left child ANY distributed and right child BROADCAST distributed. If the physical join type
// is MergeJoin, a collation must be provided for both left and right child and the plan will contain sort converter
// if necessary to provide the collation.
- protected void createBroadcastPlan(final RelOptRuleCall call, final DrillJoinRel join,
+ protected void createBroadcastPlan(final RelOptRuleCall call, final DrillJoin join,
final RexNode joinCondition,
final PhysicalJoinType physicalJoinType,
final RelNode left, final RelNode right,
- final RelCollation collationLeft, final RelCollation collationRight) throws InvalidRelException {
+ final RelCollation collationLeft, final RelCollation collationRight, boolean semiJoin) throws InvalidRelException {
DrillDistributionTrait distBroadcastRight = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.BROADCAST_DISTRIBUTED);
RelTraitSet traitsRight = null;
@@ -184,10 +187,10 @@ public abstract class JoinPruleBase extends Prule {
if(traitProp){
if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
- new SubsetTransformer<DrillJoinRel, InvalidRelException>(call) {
+ new SubsetTransformer<DrillJoin, InvalidRelException>(call) {
@Override
- public RelNode convertChild(final DrillJoinRel join, final RelNode rel) throws InvalidRelException {
+ public RelNode convertChild(final DrillJoin join, final RelNode rel) throws InvalidRelException {
DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
RelTraitSet newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, collationLeft, toDist);
@@ -200,24 +203,24 @@ public abstract class JoinPruleBase extends Prule {
} else if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
- new SubsetTransformer<DrillJoinRel, InvalidRelException>(call) {
+ new SubsetTransformer<DrillJoin, InvalidRelException>(call) {
@Override
- public RelNode convertChild(final DrillJoinRel join, final RelNode rel) throws InvalidRelException {
+ public RelNode convertChild(final DrillJoin join, final RelNode rel) throws InvalidRelException {
DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
RelTraitSet newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, toDist);
RelNode newLeft = convert(left, newTraitsLeft);
return new HashJoinPrel(join.getCluster(), newTraitsLeft, newLeft, convertedRight, joinCondition,
- join.getJoinType());
+ join.getJoinType(), semiJoin);
}
}.go(join, convertedLeft);
} else if (physicalJoinType == PhysicalJoinType.NESTEDLOOP_JOIN) {
- new SubsetTransformer<DrillJoinRel, InvalidRelException>(call) {
+ new SubsetTransformer<DrillJoin, InvalidRelException>(call) {
@Override
- public RelNode convertChild(final DrillJoinRel join, final RelNode rel) throws InvalidRelException {
+ public RelNode convertChild(final DrillJoin join, final RelNode rel) throws InvalidRelException {
DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
RelTraitSet newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, toDist);
RelNode newLeft = convert(left, newTraitsLeft);
@@ -235,7 +238,7 @@ public abstract class JoinPruleBase extends Prule {
} else if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
final RelTraitSet traitSet = PrelUtil.removeCollation(convertedLeft.getTraitSet(), call);
call.transformTo(new HashJoinPrel(join.getCluster(), traitSet, convertedLeft,
- convertedRight, joinCondition, join.getJoinType()));
+ convertedRight, joinCondition, join.getJoinType(), semiJoin));
} else if (physicalJoinType == PhysicalJoinType.NESTEDLOOP_JOIN) {
call.transformTo(new NestedLoopJoinPrel(join.getCluster(), convertedLeft.getTraitSet(), convertedLeft,
convertedRight, joinCondition, join.getJoinType()));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
index f06b66d2b..0bd25685d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrule.java
@@ -70,11 +70,11 @@ public class MergeJoinPrule extends JoinPruleBase {
RelCollation collationRight = getCollation(join.getRightKeys());
if(isDist){
- createDistBothPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, right, collationLeft, collationRight, hashSingleKey);
+ createDistBothPlan(call, join, PhysicalJoinType.MERGE_JOIN, left, right, collationLeft, collationRight, hashSingleKey, false);
}else{
if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
createBroadcastPlan(call, join, join.getCondition(), PhysicalJoinType.MERGE_JOIN,
- left, right, collationLeft, collationRight);
+ left, right, collationLeft, collationRight, false);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
index 848c8a16f..e7fc032af 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/NestedLoopJoinPrule.java
@@ -21,6 +21,7 @@ import java.util.List;
import org.apache.drill.exec.physical.impl.join.JoinUtils;
import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinCategory;
+import org.apache.drill.exec.planner.logical.DrillJoin;
import org.apache.drill.exec.planner.logical.DrillJoinRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.calcite.rel.InvalidRelException;
@@ -45,8 +46,8 @@ public class NestedLoopJoinPrule extends JoinPruleBase {
}
@Override
- protected boolean checkPreconditions(DrillJoinRel join, RelNode left, RelNode right,
- PlannerSettings settings) {
+ protected boolean checkPreconditions(DrillJoin join, RelNode left, RelNode right,
+ PlannerSettings settings) {
JoinRelType type = join.getJoinType();
if (!(type == JoinRelType.INNER || type == JoinRelType.LEFT)) {
@@ -93,7 +94,7 @@ public class NestedLoopJoinPrule extends JoinPruleBase {
if (checkBroadcastConditions(call.getPlanner(), join, left, right)) {
createBroadcastPlan(call, join, join.getCondition(), PhysicalJoinType.NESTEDLOOP_JOIN,
- left, right, null /* left collation */, null /* right collation */);
+ left, right, null /* left collation */, null /* right collation */, false);
}
} catch (InvalidRelException e) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
index 63f884cfd..7577cf918 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java
@@ -64,6 +64,8 @@ public class PlannerSettings implements Context{
new OptionDescription("Generates the topN plan for queries with the ORDER BY and LIMIT clauses."));
public static final OptionValidator HASHJOIN = new BooleanValidator("planner.enable_hashjoin",
new OptionDescription("Enable the memory hungry hash join. Drill assumes that a query will have adequate memory to complete and tries to use the fastest operations possible to complete the planned inner, left, right, or full outer joins using a hash table. Does not write to disk. Disabling hash join allows Drill to manage arbitrarily large data in a small memory footprint."));
+ public static final OptionValidator SEMIJOIN = new BooleanValidator("planner.enable_semijoin",
+ new OptionDescription("Enable the semi join optimization. Planner removes the distinct processing below the hash join and sets the semi join flag in hash join."));
public static final OptionValidator MERGEJOIN = new BooleanValidator("planner.enable_mergejoin",
new OptionDescription("Sort-based operation. A merge join is used for inner join, left and right outer joins. Inputs to the merge join must be sorted. It reads the sorted input streams from both sides and finds matching rows. Writes to disk."));
public static final OptionValidator NESTEDLOOPJOIN = new BooleanValidator("planner.enable_nestedloopjoin",
@@ -273,6 +275,10 @@ public class PlannerSettings implements Context{
return options.getOption(HASHJOIN.getOptionName()).bool_val;
}
+ public boolean isSemiJoinEnabled() {
+ return options.getOption(SEMIJOIN.getOptionName()).bool_val;
+ }
+
public boolean isMergeJoinEnabled() {
return options.getOption(MERGEJOIN.getOptionName()).bool_val;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java
index b7bc4bb77..0fe0f92b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/SwapHashJoinVisitor.java
@@ -68,7 +68,7 @@ public class SwapHashJoinVisitor extends BasePrelVisitor<Prel, Double, RuntimeEx
// Mark left/right is swapped, when INNER hash join's left row count < ( 1+ margin factor) right row count.
RelMetadataQuery mq = newJoin.getCluster().getMetadataQuery();
if (newJoin.getLeft().estimateRowCount(mq) < (1 + value) * newJoin.getRight().estimateRowCount(mq) &&
- newJoin.getJoinType() == JoinRelType.INNER) {
+ newJoin.getJoinType() == JoinRelType.INNER && !newJoin.isSemiJoin()) {
((HashJoinPrel) newJoin).setSwapped(true);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 23f35b5e1..a33d83264 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -81,6 +81,7 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
new OptionDefinition(PlannerSettings.STREAMAGG),
new OptionDefinition(PlannerSettings.TOPN, new OptionMetaData(OptionValue.AccessibleScopes.ALL, false, true)),
new OptionDefinition(PlannerSettings.HASHJOIN),
+ new OptionDefinition(PlannerSettings.SEMIJOIN),
new OptionDefinition(PlannerSettings.MERGEJOIN),
new OptionDefinition(PlannerSettings.NESTEDLOOPJOIN),
new OptionDefinition(PlannerSettings.MULTIPHASE),
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 76be050ba..f083c6603 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -514,6 +514,7 @@ drill.exec.options: {
planner.enable_hash_single_key: true,
planner.enable_hashagg: true,
planner.enable_hashjoin: true,
+ planner.enable_semijoin: false,
planner.enable_hashjoin_swap: true,
planner.enable_hep_opt: true,
planner.enable_hep_partition_pruning: true,
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestSemiJoin.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestSemiJoin.java
new file mode 100644
index 000000000..a660fffee
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestSemiJoin.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.categories.OperatorTest;
+import org.apache.drill.categories.SlowTest;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
+import org.apache.drill.test.BaseTestQuery;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import org.junit.experimental.categories.Category;
+
+@Category({SlowTest.class, OperatorTest.class})
+public class TestSemiJoin extends BaseTestQuery {
+ @Test
+ public void testInClauseToSemiJoin() throws Exception {
+ String sql = "select employee_id, full_name from cp.`employee.json` where employee_id in (select employee_id from cp.`employee.json` )";
+
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), true);
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String queryPlan = client.queryBuilder().sql(sql).explainText();
+ assertTrue(queryPlan.contains("semi-join: =[true]"));
+ }
+ }
+
+ @Test
+ public void testInClauseWithSemiJoinDisabled() throws Exception {
+ String sql = "select employee_id, full_name from cp.`employee.json` where employee_id in (select employee_id from cp.`employee.json` )";
+
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), false);
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String queryPlan = client.queryBuilder().sql(sql).explainText();
+ assertTrue(!queryPlan.contains("semi-join: =[true]"));
+ }
+ }
+
+ @Test
+ public void testSmallInClauseToSemiJoin() throws Exception {
+ String sql = "select employee_id, full_name from cp.`employee.json` " +
+ "where employee_id in (351, 352, 353, 451, 452, 453)";
+
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), true);
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String queryPlan = client.queryBuilder().sql(sql).explainText();
+ assertTrue(!queryPlan.contains("semi-join: =[true]"));
+ }
+ }
+
+ @Test
+ public void testLargeInClauseToSemiJoin() throws Exception {
+ String sql = "select employee_id, full_name from cp.`employee.json` " +
+ "where employee_id in (351, 352, 353, 451, 452, 453, 551, 552, 553, 651, 652, 653, 751, 752, 753, 851, 852, 853, 951, 952, 953)";
+
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), true);
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String queryPlan = client.queryBuilder().sql(sql).explainText();
+ assertTrue(queryPlan.contains("semi-join: =[true]"));
+ }
+ }
+
+ @Test
+ public void testStarWithInClauseToSemiJoin() throws Exception {
+ String sql = "select * from cp.`employee.json` where employee_id in (select employee_id from cp.`employee.json` )";
+
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), true);
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String queryPlan = client.queryBuilder().sql(sql).explainText();
+ assertTrue(queryPlan.contains("semi-join: =[true]"));
+ }
+ }
+
+ @Test
+ public void testMultiColumnInClauseWithSemiJoin() throws Exception {
+ String sql = "select * from cp.`employee.json` where (employee_id, full_name) in (select employee_id, full_name from cp.`employee.json` )";
+
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .setOptionDefault(PlannerSettings.SEMIJOIN.getOptionName(), true);
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ String queryPlan = client.queryBuilder().sql(sql).explainText();
+ assertTrue(queryPlan.contains("semi-join: =[true]"));
+ }
+ }
+}