diff options
author | Jacques Nadeau <jacques@apache.org> | 2014-07-05 08:57:59 -0700 |
---|---|---|
committer | Jacques Nadeau <jacques@apache.org> | 2014-07-26 21:40:33 -0700 |
commit | e52d2b66bdd2750dc2bb3c98fa937e8984b93c12 (patch) | |
tree | 141871e3ba8513137cb04f204e1164925382ea46 /exec/java-exec/src/main | |
parent | caa8b78c5c31c44e59b5ff1bdf6f1900d14b1a1a (diff) |
move to optiq 0.9
Diffstat (limited to 'exec/java-exec/src/main')
12 files changed, 63 insertions, 51 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java index 3401bc767..0b8b88a14 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java @@ -23,7 +23,7 @@ import java.util.List; import java.util.Map; import net.hydromatic.optiq.SchemaPlus; -import net.hydromatic.optiq.tools.Frameworks; +import net.hydromatic.optiq.jdbc.SimpleOptiqSchema; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.ExecutionSetupException; @@ -132,7 +132,7 @@ public class FragmentContext implements Closeable { "This is a non-root fragment.")); return null; } else { - SchemaPlus root = Frameworks.createRootSchema(false); + SchemaPlus root = SimpleOptiqSchema.createRootSchema(false); context.getStorage().getSchemaFactory().registerSchemas(connection.getSession(), root); return root; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java index e6c6fa7a9..78e98fb85 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.ops; import java.util.Collection; import net.hydromatic.optiq.SchemaPlus; -import net.hydromatic.optiq.tools.Frameworks; +import net.hydromatic.optiq.jdbc.SimpleOptiqSchema; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.cache.DistributedCache; @@ -84,7 +84,7 @@ public class QueryContext{ } public SchemaPlus getRootSchema(){ - SchemaPlus rootSchema = Frameworks.createRootSchema(false); + SchemaPlus rootSchema = SimpleOptiqSchema.createRootSchema(false); drillbitContext.getSchemaFactory().registerSchemas(session, rootSchema); return rootSchema; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java index e687f5af8..151318e2a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillJoinRel.java @@ -30,6 +30,7 @@ import org.apache.drill.common.logical.data.Project; import org.apache.drill.exec.planner.common.DrillJoinRelBase; import org.apache.drill.exec.planner.torel.ConversionContext; import org.eigenbase.rel.InvalidRelException; +import org.eigenbase.rel.JoinRelBase; import org.eigenbase.rel.JoinRelType; import org.eigenbase.rel.RelNode; import org.eigenbase.relopt.RelOptCluster; @@ -78,8 +79,9 @@ public class DrillJoinRel extends DrillJoinRelBase implements DrillRel { this.rightKeys = rightKeys; } + @Override - public DrillJoinRel copy(RelTraitSet traitSet, RexNode condition, RelNode left, RelNode right, JoinRelType joinType) { + public DrillJoinRel copy(RelTraitSet traitSet, RexNode condition, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) { try { return new DrillJoinRel(getCluster(), traitSet, left, right, condition, joinType); } catch (InvalidRelException e) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java index 633084f03..aa88310af 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillOptiq.java @@ -102,13 +102,13 @@ public class DrillOptiq { if (FunctionCallFactory.isBooleanOperator(funcName)) { LogicalExpression func = FunctionCallFactory.createBooleanOperator(funcName, args); return func; - } else { + } else { args = Lists.reverse(args); LogicalExpression lastArg = args.get(0); for(int i = 1; i < args.size(); i++){ lastArg = FunctionCallFactory.createExpression(funcName, Lists.newArrayList(args.get(i), lastArg)); } - + return lastArg; } case FUNCTION: @@ -118,6 +118,9 @@ public class DrillOptiq { case POSTFIX: logger.debug("Postfix"); switch(call.getKind()){ + case IS_NOT_NULL: + case IS_NOT_TRUE: + case IS_NOT_FALSE: case IS_NULL: case IS_TRUE: case IS_FALSE: diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java index 3af1bf4cf..136eb4c2d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/HashJoinPrel.java @@ -53,7 +53,7 @@ public class HashJoinPrel extends JoinPrel { @Override - public JoinRelBase copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, RelNode right, JoinRelType joinType) { + public JoinRelBase copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) { try { return new HashJoinPrel(this.getCluster(), traitSet, left, right, conditionExpr, joinType); }catch (InvalidRelException e) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java index ee7d76c60..f6b7ef6da 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/MergeJoinPrel.java @@ -61,7 +61,7 @@ public class MergeJoinPrel extends JoinPrel { @Override - public JoinRelBase copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, RelNode right, JoinRelType joinType) { + public JoinRelBase copy(RelTraitSet traitSet, RexNode conditionExpr, RelNode left, RelNode right, JoinRelType joinType, boolean semiJoinDone) { try { return new MergeJoinPrel(this.getCluster(), traitSet, left, right, conditionExpr, joinType); }catch (InvalidRelException e) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java index 931301870..998ecc065 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PlannerSettings.java @@ -17,8 +17,6 @@ */ package org.apache.drill.exec.planner.physical; -import net.hydromatic.optiq.tools.FrameworkContext; - import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.server.options.OptionValidator; @@ -26,8 +24,9 @@ import org.apache.drill.exec.server.options.TypeValidators.BooleanValidator; import org.apache.drill.exec.server.options.TypeValidators.LongValidator; import org.apache.drill.exec.server.options.TypeValidators.PositiveLongValidator; import org.apache.drill.exec.server.options.TypeValidators.RangeDoubleValidator; +import org.eigenbase.relopt.Context; -public class PlannerSettings implements FrameworkContext{ +public class PlannerSettings implements Context{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PlannerSettings.class); private int numEndPoints = 0; @@ -101,7 +100,7 @@ public class PlannerSettings implements FrameworkContext{ public boolean isBroadcastJoinEnabled() { return options.getOption(BROADCAST.getOptionName()).bool_val; } - + public boolean isHashSingleKey() { return options.getOption(HASH_SINGLE_KEY.getOptionName()).bool_val; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java index c66ff5dcc..3d4476091 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PrelUtil.java @@ -95,17 +95,17 @@ public class PrelUtil { public static Iterator<Prel> iter(RelNode... nodes){ return (Iterator<Prel>) (Object) Arrays.asList(nodes).iterator(); } - + public static Iterator<Prel> iter(List<RelNode> nodes) { return (Iterator<Prel>) (Object) nodes.iterator(); } public static PlannerSettings getSettings(RelOptCluster cluster){ - return cluster.getPlanner().getFrameworkContext().unwrap(PlannerSettings.class); + return cluster.getPlanner().getContext().unwrap(PlannerSettings.class); } public static PlannerSettings getPlannerSettings(RelOptPlanner planner) { - return planner.getFrameworkContext().unwrap(PlannerSettings.class); + return planner.getContext().unwrap(PlannerSettings.class); } public static Prel removeSvIfRequired(Prel prel, SelectionVectorMode... allowed){ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java index 47a56b8ed..037516120 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/StreamAggPrule.java @@ -51,14 +51,14 @@ public class StreamAggPrule extends AggPruleBase { public boolean matches(RelOptRuleCall call) { return PrelUtil.getPlannerSettings(call.getPlanner()).isStreamAggEnabled(); } - + @Override public void onMatch(RelOptRuleCall call) { final DrillAggregateRel aggregate = (DrillAggregateRel) call.rel(0); final RelNode input = aggregate.getChild(); RelCollation collation = getCollation(aggregate); RelTraitSet traits = null; - + if (aggregate.containsDistinctCall()) { // currently, don't use StreamingAggregate if any of the logical aggrs contains DISTINCT return; @@ -68,78 +68,78 @@ public class StreamAggPrule extends AggPruleBase { if (aggregate.getGroupSet().isEmpty()) { DrillDistributionTrait singleDist = DrillDistributionTrait.SINGLETON; RelTraitSet singleDistTrait = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(singleDist); - + if (create2PhasePlan(call, aggregate)) { traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) ; - RelNode convertedInput = convert(input, traits); + RelNode convertedInput = convert(input, traits); if (convertedInput instanceof RelSubset) { RelSubset subset = (RelSubset) convertedInput; for (RelNode rel : subset.getRelList()) { if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT)) { - DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE); + DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE); traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(toDist); RelNode newInput = convert(input, traits); StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput, aggregate.getGroupSet(), - aggregate.getAggCallList(), + aggregate.getAggCallList(), OperatorPhase.PHASE_1of2); - UnionExchangePrel exch = + UnionExchangePrel exch = new UnionExchangePrel(phase1Agg.getCluster(), singleDistTrait, phase1Agg); - + StreamAggPrel phase2Agg = new StreamAggPrel(aggregate.getCluster(), singleDistTrait, exch, aggregate.getGroupSet(), phase1Agg.getPhase2AggCalls(), OperatorPhase.PHASE_2of2); - call.transformTo(phase2Agg); + call.transformTo(phase2Agg); } } } - } else { + } else { createTransformRequest(call, aggregate, input, singleDistTrait); } } else { // hash distribute on all grouping keys - DrillDistributionTrait distOnAllKeys = - new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, + DrillDistributionTrait distOnAllKeys = + new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(aggregate, true))); - + traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(distOnAllKeys); createTransformRequest(call, aggregate, input, traits); // hash distribute on one grouping key - DrillDistributionTrait distOnOneKey = - new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, + DrillDistributionTrait distOnOneKey = + new DrillDistributionTrait(DrillDistributionTrait.DistributionType.HASH_DISTRIBUTED, ImmutableList.copyOf(getDistributionField(aggregate, false))); - + traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(distOnOneKey); // Temporarily commenting out the single distkey plan since a few tpch queries (e.g 01.sql) get stuck // in VolcanoPlanner.canonize() method. Note that the corresponding single distkey plan for HashAggr works - // ok. One possibility is that here we have dist on single key but collation on all keys, so that - // might be causing some problem. - /// TODO: re-enable this plan after resolving the issue. + // ok. One possibility is that here we have dist on single key but collation on all keys, so that + // might be causing some problem. + /// TODO: re-enable this plan after resolving the issue. // createTransformRequest(call, aggregate, input, traits); - + if (create2PhasePlan(call, aggregate)) { traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL) ; - RelNode convertedInput = convert(input, traits); + RelNode convertedInput = convert(input, traits); if (convertedInput instanceof RelSubset) { RelSubset subset = (RelSubset) convertedInput; for (RelNode rel : subset.getRelList()) { if (!rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE).equals(DrillDistributionTrait.DEFAULT)) { - DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE); + DrillDistributionTrait toDist = rel.getTraitSet().getTrait(DrillDistributionTraitDef.INSTANCE); traits = call.getPlanner().emptyTraitSet().plus(Prel.DRILL_PHYSICAL).plus(collation).plus(toDist); RelNode newInput = convert(input, traits); StreamAggPrel phase1Agg = new StreamAggPrel(aggregate.getCluster(), traits, newInput, aggregate.getGroupSet(), - aggregate.getAggCallList(), + aggregate.getAggCallList(), OperatorPhase.PHASE_1of2); int numEndPoints = PrelUtil.getSettings(phase1Agg.getCluster()).numEndPoints(); @@ -152,34 +152,34 @@ public class StreamAggPrule extends AggPruleBase { StreamAggPrel phase2Agg = new StreamAggPrel(aggregate.getCluster(), traits, exch, aggregate.getGroupSet(), - phase1Agg.getPhase2AggCalls(), + phase1Agg.getPhase2AggCalls(), OperatorPhase.PHASE_2of2); - call.transformTo(phase2Agg); + call.transformTo(phase2Agg); } } - } + } } - } + } } catch (InvalidRelException e) { tracer.warning(e.toString()); } } - private void createTransformRequest(RelOptRuleCall call, DrillAggregateRel aggregate, + private void createTransformRequest(RelOptRuleCall call, DrillAggregateRel aggregate, RelNode input, RelTraitSet traits) throws InvalidRelException { final RelNode convertedInput = convert(input, traits); - + StreamAggPrel newAgg = new StreamAggPrel(aggregate.getCluster(), traits, convertedInput, aggregate.getGroupSet(), aggregate.getAggCallList(), OperatorPhase.PHASE_1of1); - + call.transformTo(newAgg); } - - + + private RelCollation getCollation(DrillAggregateRel rel){ - + List<RelFieldCollation> fields = Lists.newArrayList(); for (int group : BitSets.toIter(rel.getGroupSet())) { fields.add(new RelFieldCollation(group)); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java index a48070f6a..321d79d4e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java @@ -22,11 +22,11 @@ import java.util.ArrayList; import java.util.List; import net.hydromatic.optiq.config.Lex; +import net.hydromatic.optiq.tools.FrameworkConfig; import net.hydromatic.optiq.tools.Frameworks; import net.hydromatic.optiq.tools.Planner; import net.hydromatic.optiq.tools.RelConversionException; import net.hydromatic.optiq.tools.RuleSet; -import net.hydromatic.optiq.tools.StdFrameworkConfig; import net.hydromatic.optiq.tools.ValidationException; import org.apache.drill.exec.ops.QueryContext; @@ -68,7 +68,7 @@ public class DrillSqlWorker { this.context = context; RelOptCostFactory costFactory = (context.getPlannerSettings().useDefaultCosting()) ? null : new DrillCostBase.DrillCostFactory() ; - StdFrameworkConfig config = StdFrameworkConfig.newBuilder() // + FrameworkConfig config = Frameworks.newConfigBuilder() // .lex(Lex.MYSQL) // .parserFactory(DrillParserWithCompoundIdConverter.FACTORY) // .defaultSchema(context.getNewDefaultSchema()) // diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java index f98d66140..f34b1e3a1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/AbstractSchema.java @@ -125,4 +125,11 @@ public abstract class AbstractSchema implements Schema{ public Expression getExpression(SchemaPlus parentSchema, String name) { return EXPRESSION; } + + @Override + public boolean contentsHaveChangedSince(long lastCheck, long now) { + return true; + } + + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java index dbfe93f98..2d8dee8e9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/SubSchemaWrapper.java @@ -89,4 +89,5 @@ public class SubSchemaWrapper extends AbstractSchema { public String getTypeName() { return innerSchema.getTypeName(); } + } |