aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main
diff options
context:
space:
mode:
authorJacques Nadeau <jacques@apache.org>2014-07-26 21:36:53 -0700
committerJacques Nadeau <jacques@apache.org>2014-07-26 21:41:10 -0700
commit913fad858bbb751cde47b15a2cffda7f4797bcad (patch)
tree8b24231095be4622a424a35be1f0a5f3018a538f /exec/java-exec/src/main
parent0cbf6ad30cc7f1dd2d751ba5d4d6d4e8dad5e09d (diff)
Refactor trait pull up to common SubsetTransformer. Update Prules to use new class and update FilterPrule to use all instead of best to work with Optiq 0.9.
Diffstat (limited to 'exec/java-exec/src/main')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java31
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java64
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java82
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java39
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java106
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java69
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java34
7 files changed, 251 insertions, 174 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java
index e72a7803f..c15c5e0f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/FilterPrule.java
@@ -19,13 +19,10 @@ package org.apache.drill.exec.planner.physical;
import org.apache.drill.exec.planner.logical.DrillFilterRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
-import org.eigenbase.rel.RelCollation;
-import org.eigenbase.rel.RelCollationTraitDef;
import org.eigenbase.rel.RelNode;
import org.eigenbase.relopt.RelOptRule;
import org.eigenbase.relopt.RelOptRuleCall;
import org.eigenbase.relopt.RelTraitSet;
-import org.eigenbase.relopt.volcano.RelSubset;
public class FilterPrule extends Prule {
public static final RelOptRule INSTANCE = new FilterPrule();
@@ -41,19 +38,25 @@ public class FilterPrule extends Prule {
RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL);
RelNode convertedInput = convert(input, traits);
- boolean transform = false;
-
- if (convertedInput instanceof RelSubset) {
- RelSubset subset = (RelSubset) convertedInput;
- RelNode bestRel = null;
- if ((bestRel = subset.getBest()) != null) {
- call.transformTo(new FilterPrel(filter.getCluster(), bestRel.getTraitSet(), convertedInput, filter.getCondition()));
- transform = true;
- }
- }
+
+ boolean transform = new Subset(call).go(filter, convertedInput);
+
if (!transform) {
call.transformTo(new FilterPrel(filter.getCluster(), convertedInput.getTraitSet(), convertedInput, filter.getCondition()));
}
}
-
+
+
+ private class Subset extends SubsetTransformer<DrillFilterRel, RuntimeException> {
+
+ public Subset(RelOptRuleCall call) {
+ super(call);
+ }
+
+ @Override
+ public RelNode convertChild(DrillFilterRel filter, RelNode rel) {
+ return new FilterPrel(filter.getCluster(), rel.getTraitSet(), rel, filter.getCondition());
+ }
+
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
index d8b23381e..4d42f6643 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashAggPrule.java
@@ -26,8 +26,8 @@ import org.eigenbase.rel.InvalidRelException;
import org.eigenbase.rel.RelNode;
import org.eigenbase.relopt.RelOptRule;
import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelTrait;
import org.eigenbase.relopt.RelTraitSet;
-import org.eigenbase.relopt.volcano.RelSubset;
import org.eigenbase.trace.EigenbaseTrace;
import com.google.common.collect.ImmutableList;
@@ -84,34 +84,8 @@ public class HashAggPrule extends AggPruleBase {
traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) ;
RelNode convertedInput = convert(input, traits);
+ new TwoPhaseSubset(call, distOnAllKeys).go(aggregate, convertedInput);
- if (convertedInput instanceof RelSubset) {
- RelSubset subset = (RelSubset) convertedInput;
- for (RelNode rel : subset.getRelList()) {
- if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT)) {
- DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
- traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
- RelNode newInput = convert(input, traits);
-
- HashAggPrel phase1Agg = new HashAggPrel(aggregate.getCluster(), traits, newInput,
- aggregate.getGroupSet(),
- aggregate.getAggCallList(),
- OperatorPhase.PHASE_1of2);
-
- HashToRandomExchangePrel exch =
- new HashToRandomExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
- phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)));
-
- HashAggPrel phase2Agg = new HashAggPrel(aggregate.getCluster(), traits, exch,
- aggregate.getGroupSet(),
- phase1Agg.getPhase2AggCalls(),
- OperatorPhase.PHASE_2of2);
-
-
- call.transformTo(phase2Agg);
- }
- }
- }
}
}
} catch (InvalidRelException e) {
@@ -119,6 +93,40 @@ public class HashAggPrule extends AggPruleBase {
}
}
+
+ private class TwoPhaseSubset extends SubsetTransformer<DrillAggregateRel, InvalidRelException> {
+ final RelTrait distOnAllKeys;
+
+ public TwoPhaseSubset(RelOptRuleCall call, RelTrait distOnAllKeys) {
+ super(call);
+ this.distOnAllKeys = distOnAllKeys;
+ }
+
+ @Override
+ public RelNode convertChild(DrillAggregateRel aggregate, RelNode input) throws InvalidRelException {
+
+ RelTraitSet traits = newTraitSet(Prel.DRILL_PHYSICAL, input.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE));
+ RelNode newInput = convert(input, traits);
+
+ HashAggPrel phase1Agg = new HashAggPrel(aggregate.getCluster(), traits, newInput,
+ aggregate.getGroupSet(),
+ aggregate.getAggCallList(),
+ OperatorPhase.PHASE_1of2);
+
+ HashToRandomExchangePrel exch =
+ new HashToRandomExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
+ phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)));
+
+ HashAggPrel phase2Agg = new HashAggPrel(aggregate.getCluster(), traits, exch,
+ aggregate.getGroupSet(),
+ phase1Agg.getPhase2AggCalls(),
+ OperatorPhase.PHASE_2of2);
+
+ return phase2Agg;
+ }
+
+ }
+
private void createTransformRequest(RelOptRuleCall call, DrillAggregateRel aggregate,
RelNode input, RelTraitSet traits) throws InvalidRelException {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
index 336e34ccc..d6bd71141 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/JoinPruleBase.java
@@ -92,39 +92,39 @@ public abstract class JoinPruleBase extends Prule {
PhysicalJoinType physicalJoinType,
RelNode left, RelNode right,
RelCollation collationLeft, RelCollation collationRight, boolean hashSingleKey)throws InvalidRelException {
-
- /* If join keys are l1 = r1 and l2 = r2 and ... l_k = r_k, then consider the following options of plan:
+
+ /* If join keys are l1 = r1 and l2 = r2 and ... l_k = r_k, then consider the following options of plan:
* 1) Plan1: distributed by (l1, l2, ..., l_k) for left side and by (r1, r2, ..., r_k) for right side.
* 2) Plan2: distributed by l1 for left side, by r1 for right side.
* 3) Plan3: distributed by l2 for left side, by r2 for right side.
* ...
* Plan_(k+1): distributed by l_k for left side, by r_k by right side.
- *
- * Whether enumerate plan 2, .., Plan_(k+1) depends on option : hashSingleKey.
+ *
+ * Whether enumerate plan 2, .., Plan_(k+1) depends on option : hashSingleKey.
*/
-
+
DrillDistributionTrait hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getLeftKeys())));
DrillDistributionTrait hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getRightKeys())));
-
+
createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight, hashLeftPartition, hashRightPartition);
-
+
assert (join.getLeftKeys().size() == join.getRightKeys().size());
-
+
if (!hashSingleKey)
return;
-
+
int numJoinKeys = join.getLeftKeys().size();
if (numJoinKeys > 1) {
for (int i = 0; i< numJoinKeys; i++) {
hashLeftPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getLeftKeys().subList(i, i+1))));
hashRightPartition = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(join.getRightKeys().subList(i, i+1))));
-
+
createDistBothPlan(call, join, physicalJoinType, left, right, collationLeft, collationRight, hashLeftPartition, hashRightPartition);
}
}
}
-
+
// Create join plan with both left and right children hash distributed. If the physical join type
// is MergeJoin, a collation must be provided for both left and right child and the plan will contain
// sort converter if necessary to provide the collation.
@@ -170,9 +170,9 @@ public abstract class JoinPruleBase extends Prule {
// is MergeJoin, a collation must be provided for both left and right child and the plan will contain sort converter
// if necessary to provide the collation.
protected void createBroadcastPlan(RelOptRuleCall call, DrillJoinRel join,
- PhysicalJoinType physicalJoinType,
- RelNode left, RelNode right,
- RelCollation collationLeft, RelCollation collationRight) throws InvalidRelException {
+ final PhysicalJoinType physicalJoinType,
+ final RelNode left, final RelNode right,
+ final RelCollation collationLeft, final RelCollation collationRight) throws InvalidRelException {
DrillDistributionTrait distBroadcastRight = new DrillDistributionTrait(DrillDistributionTrait.DistributionType.BROADCAST_DISTRIBUTED);
RelTraitSet traitsRight = null;
@@ -183,37 +183,35 @@ public abstract class JoinPruleBase extends Prule {
traitsRight = right.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distBroadcastRight);
}
- RelTraitSet traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
- RelNode convertedLeft = convert(left, traitsLeft);
- RelNode convertedRight = convert(right, traitsRight);
+ final RelTraitSet traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
+ final RelNode convertedLeft = convert(left, traitsLeft);
+ final RelNode convertedRight = convert(right, traitsRight);
- traitsLeft = left.getTraitSet().plus(Prel.DRILL_PHYSICAL);
+ new SubsetTransformer<DrillJoinRel, InvalidRelException>(call){
- DrillJoinRelBase newJoin = null;
-
- if (convertedLeft instanceof RelSubset) {
- RelSubset subset = (RelSubset) convertedLeft;
- for (RelNode rel : subset.getRelList()) {
- if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT)) {
- DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
- if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
- traitsLeft = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collationLeft).plus(toDist);
- } else {
- traitsLeft = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
- }
-
- RelNode newLeft = convert(left, traitsLeft);
- if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
- newJoin = new HashJoinPrel(join.getCluster(), traitsLeft, newLeft, convertedRight, join.getCondition(),
- join.getJoinType());
- } else if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
- newJoin = new MergeJoinPrel(join.getCluster(), traitsLeft, newLeft, convertedRight, join.getCondition(),
- join.getJoinType());
- }
- call.transformTo(newJoin) ;
+ public RelNode convertChild(final DrillJoinRel join, final RelNode rel) throws InvalidRelException {
+ DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+ RelTraitSet newTraitsLeft;
+ if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
+ newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, collationLeft, toDist);
+ } else {
+ newTraitsLeft = newTraitSet(Prel.DRILL_PHYSICAL, toDist);
+ }
+ Character.digit(1, 1);
+ RelNode newLeft = convert(left, newTraitsLeft);
+ if (physicalJoinType == PhysicalJoinType.HASH_JOIN) {
+ return new HashJoinPrel(join.getCluster(), traitsLeft, newLeft, convertedRight, join.getCondition(),
+ join.getJoinType());
+ } else if (physicalJoinType == PhysicalJoinType.MERGE_JOIN) {
+ return new MergeJoinPrel(join.getCluster(), traitsLeft, newLeft, convertedRight, join.getCondition(),
+ join.getJoinType());
+ } else{
+ return null;
}
+
}
- }
- }
+ }.go(join, convertedLeft);
+
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
index 02e6d44e2..833aaae4e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/ProjectPrule.java
@@ -23,19 +23,15 @@ import java.util.Map;
import net.hydromatic.linq4j.Ord;
-import org.apache.drill.exec.planner.common.DrillProjectRelBase;
import org.apache.drill.exec.planner.logical.DrillProjectRel;
-import org.apache.drill.exec.planner.logical.DrillRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionField;
import org.apache.drill.exec.planner.physical.DrillDistributionTrait.DistributionType;
-import org.eigenbase.rel.ProjectRel;
import org.eigenbase.rel.RelCollation;
import org.eigenbase.rel.RelCollationImpl;
import org.eigenbase.rel.RelCollationTraitDef;
import org.eigenbase.rel.RelFieldCollation;
import org.eigenbase.rel.RelNode;
-import org.eigenbase.relopt.Convention;
import org.eigenbase.relopt.RelOptRule;
import org.eigenbase.relopt.RelOptRuleCall;
import org.eigenbase.relopt.RelTraitSet;
@@ -64,26 +60,33 @@ public class ProjectPrule extends Prule {
RelNode convertedInput = convert(input, traits);
Map<Integer, Integer> inToOut = getProjectMap(project);
+ boolean traitPull = new ProjectTraitPull(call, inToOut).go(project, convertedInput);
- if (convertedInput instanceof RelSubset) {
- RelSubset subset = (RelSubset) convertedInput;
- for (RelNode rel : subset.getRelList()) {
- if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT)) {
- DrillDistributionTrait childDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
- RelCollation childCollation = rel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
+ if(!traitPull){
+ call.transformTo(new ProjectPrel(project.getCluster(), convertedInput.getTraitSet(), convertedInput, project.getProjects(), project.getRowType()));
+ }
+ }
+ private class ProjectTraitPull extends SubsetTransformer<DrillProjectRel, RuntimeException> {
+ final Map<Integer, Integer> inToOut;
- DrillDistributionTrait newDist = convertDist(childDist, inToOut);
- RelCollation newCollation = convertRelCollation(childCollation, inToOut);
+ public ProjectTraitPull(RelOptRuleCall call, Map<Integer, Integer> inToOut) {
+ super(call);
+ this.inToOut = inToOut;
+ }
- call.transformTo(new ProjectPrel(project.getCluster(), project.getTraitSet().plus(newDist).plus(newCollation).plus(Prel.DRILL_PHYSICAL),
- rel, project.getProjects(), project.getRowType()));
- }
- }
+ @Override
+ public RelNode convertChild(DrillProjectRel project, RelNode rel) throws RuntimeException {
+ DrillDistributionTrait childDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+ RelCollation childCollation = rel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
- } else{
- call.transformTo(new ProjectPrel(project.getCluster(), convertedInput.getTraitSet(), convertedInput, project.getProjects(), project.getRowType()));
+
+ DrillDistributionTrait newDist = convertDist(childDist, inToOut);
+ RelCollation newCollation = convertRelCollation(childCollation, inToOut);
+ RelTraitSet newProjectTraits = rel.getTraitSet().plus(newDist).plus(newCollation);
+ return new ProjectPrel(project.getCluster(), newProjectTraits, rel, project.getProjects(), project.getRowType());
}
+
}
private DrillDistributionTrait convertDist(DrillDistributionTrait srcDist, Map<Integer, Integer> inToOut) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
index 037516120..4191184f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java
@@ -23,6 +23,7 @@ import java.util.logging.Logger;
import net.hydromatic.optiq.util.BitSets;
import org.apache.drill.exec.planner.logical.DrillAggregateRel;
+import org.apache.drill.exec.planner.logical.DrillJoinRel;
import org.apache.drill.exec.planner.logical.RelOptHelper;
import org.apache.drill.exec.planner.physical.AggPrelBase.OperatorPhase;
import org.eigenbase.rel.InvalidRelException;
@@ -56,7 +57,7 @@ public class StreamAggPrule extends AggPruleBase {
public void onMatch(RelOptRuleCall call) {
final DrillAggregateRel aggregate = (DrillAggregateRel) call.rel(0);
final RelNode input = aggregate.getChild();
- RelCollation collation = getCollation(aggregate);
+ final RelCollation collation = getCollation(aggregate);
RelTraitSet traits = null;
if (aggregate.containsDistinctCall()) {
@@ -67,44 +68,40 @@ public class StreamAggPrule extends AggPruleBase {
try {
if (aggregate.getGroupSet().isEmpty()) {
DrillDistributionTrait singleDist = DrillDistributionTrait.SINGLETON;
- RelTraitSet singleDistTrait = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(singleDist);
+ final RelTraitSet singleDistTrait = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(singleDist);
if (create2PhasePlan(call, aggregate)) {
traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) ;
RelNode convertedInput = convert(input, traits);
+ new SubsetTransformer<DrillAggregateRel, InvalidRelException>(call){
- if (convertedInput instanceof RelSubset) {
- RelSubset subset = (RelSubset) convertedInput;
- for (RelNode rel : subset.getRelList()) {
- if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT)) {
- DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
- traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist);
- RelNode newInput = convert(input, traits);
-
- StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput,
- aggregate.getGroupSet(),
- aggregate.getAggCallList(),
- OperatorPhase.PHASE_1of2);
-
- UnionExchangePrel exch =
- new UnionExchangePrel(phase1Agg.getCluster(), singleDistTrait, phase1Agg);
-
- StreamAggPrel phase2Agg = new StreamAggPrel(aggregate.getCluster(), singleDistTrait, exch,
- aggregate.getGroupSet(),
- phase1Agg.getPhase2AggCalls(),
- OperatorPhase.PHASE_2of2);
-
- call.transformTo(phase2Agg);
- }
+ public RelNode convertChild(final DrillAggregateRel join, final RelNode rel) throws InvalidRelException {
+ DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+ RelTraitSet traits = newTraitSet(Prel.DRILL_PHYSICAL, toDist);
+ RelNode newInput = convert(input, traits);
+
+ StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput,
+ aggregate.getGroupSet(),
+ aggregate.getAggCallList(),
+ OperatorPhase.PHASE_1of2);
+
+ UnionExchangePrel exch =
+ new UnionExchangePrel(phase1Agg.getCluster(), singleDistTrait, phase1Agg);
+
+ return new StreamAggPrel(aggregate.getCluster(), singleDistTrait, exch,
+ aggregate.getGroupSet(),
+ phase1Agg.getPhase2AggCalls(),
+ OperatorPhase.PHASE_2of2);
}
- }
+ }.go(aggregate, convertedInput);
+
} else {
createTransformRequest(call, aggregate, input, singleDistTrait);
}
} else {
// hash distribute on all grouping keys
- DrillDistributionTrait distOnAllKeys =
+ final DrillDistributionTrait distOnAllKeys =
new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED,
ImmutableList.copyOf(getDistributionField(aggregate, true)));
@@ -126,39 +123,34 @@ public class StreamAggPrule extends AggPruleBase {
if (create2PhasePlan(call, aggregate)) {
traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) ;
-
RelNode convertedInput = convert(input, traits);
- if (convertedInput instanceof RelSubset) {
- RelSubset subset = (RelSubset) convertedInput;
- for (RelNode rel : subset.getRelList()) {
- if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT)) {
- DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
- traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(toDist);
- RelNode newInput = convert(input, traits);
-
- StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput,
- aggregate.getGroupSet(),
- aggregate.getAggCallList(),
- OperatorPhase.PHASE_1of2);
-
- int numEndPoints = PrelUtil.getSettings(phase1Agg.getCluster()).numEndPoints();
-
- HashToMergeExchangePrel exch =
- new HashToMergeExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
- phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)),
- collation,
- numEndPoints);
-
- StreamAggPrel phase2Agg = new StreamAggPrel(aggregate.getCluster(), traits, exch,
- aggregate.getGroupSet(),
- phase1Agg.getPhase2AggCalls(),
- OperatorPhase.PHASE_2of2);
-
- call.transformTo(phase2Agg);
- }
+ new SubsetTransformer<DrillAggregateRel, InvalidRelException>(call){
+
+ public RelNode convertChild(final DrillAggregateRel aggregate, final RelNode rel) throws InvalidRelException {
+ DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+ RelTraitSet traits = newTraitSet(Prel.DRILL_PHYSICAL, collation, toDist);
+ RelNode newInput = convert(input, traits);
+
+ StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput,
+ aggregate.getGroupSet(),
+ aggregate.getAggCallList(),
+ OperatorPhase.PHASE_1of2);
+
+ int numEndPoints = PrelUtil.getSettings(phase1Agg.getCluster()).numEndPoints();
+
+ HashToMergeExchangePrel exch =
+ new HashToMergeExchangePrel(phase1Agg.getCluster(), phase1Agg.getTraitSet().plus(Prel.DRILL_PHYSICAL).plus(distOnAllKeys),
+ phase1Agg, ImmutableList.copyOf(getDistributionField(aggregate, true)),
+ collation,
+ numEndPoints);
+
+ return new StreamAggPrel(aggregate.getCluster(), traits, exch,
+ aggregate.getGroupSet(),
+ phase1Agg.getPhase2AggCalls(),
+ OperatorPhase.PHASE_2of2);
}
- }
+ }.go(aggregate, convertedInput);
}
}
} catch (InvalidRelException e) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java
new file mode 100644
index 000000000..450b19768
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/SubsetTransformer.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.physical;
+
+import org.eigenbase.rel.RelNode;
+import org.eigenbase.relopt.RelOptRuleCall;
+import org.eigenbase.relopt.RelTrait;
+import org.eigenbase.relopt.RelTraitSet;
+import org.eigenbase.relopt.volcano.RelSubset;
+
+public abstract class SubsetTransformer<T extends RelNode, E extends Exception> {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SubsetTransformer.class);
+
+ public abstract RelNode convertChild(T current, RelNode child) throws E;
+
+ private final RelOptRuleCall call;
+
+ public SubsetTransformer(RelOptRuleCall call){
+ this.call = call;
+ }
+
+ public RelTraitSet newTraitSet(RelTrait... traits){
+ RelTraitSet set = call.getPlanner().emptyTraitSet();
+ for(RelTrait t : traits){
+ set = set.plus(t);
+ }
+ return set;
+
+ }
+
+ boolean go(T n, RelNode candidateSet) throws E {
+ if( !(candidateSet instanceof RelSubset) ) return false;
+
+ boolean transform = false;
+
+ for (RelNode rel : ((RelSubset)candidateSet).getRelList()) {
+ if (!isDefaultDist(rel)) {
+ RelNode out = convertChild(n, rel);
+ if(out != null){
+ call.transformTo(out);
+ transform = true;
+
+ }
+ }
+ }
+
+ return transform;
+ }
+
+ private boolean isDefaultDist(RelNode n){
+ return n.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT);
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
index 42a9984c9..15d94fb03 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/WriterPrule.java
@@ -45,25 +45,29 @@ public class WriterPrule extends Prule{
final RelTraitSet traits = input.getTraitSet().plus(Prel.DRILL_PHYSICAL);
final RelNode convertedInput = convert(input, traits);
- if (convertedInput instanceof RelSubset) {
- RelSubset subset = (RelSubset) convertedInput;
- for (RelNode rel : subset.getRelList()) {
- if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT)) {
- DrillDistributionTrait childDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
- RelCollation childCollation = rel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
-
- DrillWriterRelBase newWriter = new WriterPrel(writer.getCluster(),
- writer.getTraitSet().plus(childDist).plus(childCollation).plus(Prel.DRILL_PHYSICAL),
- rel, writer.getCreateTableEntry());
-
- call.transformTo(newWriter);
- }
- }
- } else {
+ if (!new WriteTraitPull(call).go(writer, convertedInput)) {
DrillWriterRelBase newWriter = new WriterPrel(writer.getCluster(), convertedInput.getTraitSet(),
convertedInput, writer.getCreateTableEntry());
call.transformTo(newWriter);
}
}
+
+ private class WriteTraitPull extends SubsetTransformer<DrillWriterRelBase, RuntimeException> {
+
+ public WriteTraitPull(RelOptRuleCall call) {
+ super(call);
+ }
+
+ @Override
+ public RelNode convertChild(DrillWriterRelBase writer, RelNode rel) throws RuntimeException {
+ DrillDistributionTrait childDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE);
+ RelCollation childCollation = rel.getTraitSet().getTrait(RelCollationTraitDef.INSTANCE);
+
+ return new WriterPrel(writer.getCluster(),
+ writer.getTraitSet().plus(childDist).plus(childCollation).plus(Prel.DRILL_PHYSICAL),
+ rel, writer.getCreateTableEntry());
+ }
+
+ }
}