diff options
author | Roman Kulyk <rom.kulyk@gmail.com> | 2016-10-28 13:26:53 +0000 |
---|---|---|
committer | Sudheesh Katkam <skatkam@maprtech.com> | 2016-11-02 10:56:01 -0700 |
commit | 5f34c960e80a0938a4ce2666654a60df17fe402b (patch) | |
tree | aebf39aba49f2eda330c624be5513f907a42bef9 /exec/java-exec/src/main/java/org/apache/drill/exec/physical | |
parent | 83513daf0903e0d94fcaad7b1ae4e8ad6272b494 (diff) |
DRILL-4927 (part 2): Add support for Null Equality Joins (mixed comparators)
This changes are a subset of the original pull request from DRILL-4539 (PR-462).
- Added changes to support mixed comparators;
- Added tests for it.
closes #635
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical')
8 files changed, 92 insertions, 61 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index 606a542b5..d2b42d010 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -18,7 +18,9 @@ package org.apache.drill.exec.physical.impl.aggregate; import java.io.IOException; +import java.util.List; +import com.google.common.collect.Lists; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.ErrorCollector; @@ -40,6 +42,7 @@ import org.apache.drill.exec.expr.ValueVectorWriteExpression; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.HashAggregate; import org.apache.drill.exec.physical.impl.aggregate.HashAggregator.AggOutcome; +import org.apache.drill.exec.physical.impl.common.Comparator; import org.apache.drill.exec.physical.impl.common.HashTable; import org.apache.drill.exec.physical.impl.common.HashTableConfig; import org.apache.drill.exec.record.AbstractRecordBatch; @@ -64,6 +67,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { private LogicalExpression[] aggrExprs; private TypedFieldId[] groupByOutFieldIds; private TypedFieldId[] aggrOutFieldIds; // field ids for the outgoing batch + private final List<Comparator> comparators; private final GeneratorMapping UPDATE_AGGR_INSIDE = GeneratorMapping.create("setupInterior" /* setup method */, "updateAggrValuesInternal" /* eval method */, @@ -82,6 +86,13 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { public HashAggBatch(HashAggregate popConfig, RecordBatch incoming, FragmentContext context) throws ExecutionSetupException { super(popConfig, context); this.incoming = incoming; + + final int numGrpByExprs = popConfig.getGroupByExprs().size(); + comparators = Lists.newArrayListWithExpectedSize(numGrpByExprs); + for (int i=0; i<numGrpByExprs; i++) { + // nulls are equal in group by case + comparators.add(Comparator.IS_NOT_DISTINCT_FROM); + } } @Override @@ -241,7 +252,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { HashTableConfig htConfig = // TODO - fix the validator on this option new HashTableConfig((int)context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE), - HashTable.DEFAULT_LOAD_FACTOR, popConfig.getGroupByExprs(), null /* no probe exprs */); + HashTable.DEFAULT_LOAD_FACTOR, popConfig.getGroupByExprs(), null /* no probe exprs */, comparators); agg.setup(popConfig, htConfig, context, this.stats, oContext.getAllocator(), incoming, this, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index 5e081638c..c31264a81 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -268,8 +268,7 @@ public abstract class HashAggTemplate implements HashAggregator { } ChainedHashTable ht = - new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing, - true /* nulls are equal */); + new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing); this.htable = ht.createAndSetupHashTable(groupByOutFieldIds); numGroupByOutFields = groupByOutFieldIds.length; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java index 260808c40..972e8c78d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.common; import java.io.IOException; import java.util.Arrays; +import java.util.List; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; @@ -116,11 +117,9 @@ public class ChainedHashTable { private final RecordBatch incomingBuild; private final RecordBatch incomingProbe; private final RecordBatch outgoing; - private final boolean areNullsEqual; public ChainedHashTable(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, - RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, - boolean areNullsEqual) { + RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing) { this.htConfig = htConfig; this.context = context; @@ -128,7 +127,6 @@ public class ChainedHashTable { this.incomingBuild = incomingBuild; this.incomingProbe = incomingProbe; this.outgoing = outgoing; - this.areNullsEqual = areNullsEqual; } public HashTable createAndSetupHashTable(TypedFieldId[] outKeyFieldIds) throws ClassTransformationException, @@ -197,9 +195,10 @@ public class ChainedHashTable { // generate code for isKeyMatch(), setValue(), getHash() and outputRecordKeys() - setupIsKeyMatchInternal(cgInner, KeyMatchIncomingBuildMapping, KeyMatchHtableMapping, keyExprsBuild, htKeyFieldIds); + setupIsKeyMatchInternal(cgInner, KeyMatchIncomingBuildMapping, KeyMatchHtableMapping, keyExprsBuild, + htConfig.getComparators(), htKeyFieldIds); setupIsKeyMatchInternal(cgInner, KeyMatchIncomingProbeMapping, KeyMatchHtableProbeMapping, keyExprsProbe, - htKeyFieldIds); + htConfig.getComparators(), htKeyFieldIds); setupSetValue(cgInner, keyExprsBuild, htKeyFieldIds); if (outgoing != null) { @@ -221,7 +220,7 @@ public class ChainedHashTable { private void setupIsKeyMatchInternal(ClassGenerator<HashTable> cg, MappingSet incomingMapping, MappingSet htableMapping, - LogicalExpression[] keyExprs, TypedFieldId[] htKeyFieldIds) + LogicalExpression[] keyExprs, List<Comparator> comparators, TypedFieldId[] htKeyFieldIds) throws SchemaChangeException { cg.setMappingSet(incomingMapping); @@ -230,19 +229,20 @@ public class ChainedHashTable { return; } - int i = 0; - for (LogicalExpression expr : keyExprs) { + for (int i=0; i<keyExprs.length; i++) { + final LogicalExpression expr = keyExprs[i]; cg.setMappingSet(incomingMapping); HoldingContainer left = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE); cg.setMappingSet(htableMapping); - ValueVectorReadExpression vvrExpr = new ValueVectorReadExpression(htKeyFieldIds[i++]); + ValueVectorReadExpression vvrExpr = new ValueVectorReadExpression(htKeyFieldIds[i]); HoldingContainer right = cg.addExpr(vvrExpr, ClassGenerator.BlkCreateMode.FALSE); JConditional jc; // codegen for nullable columns if nulls are not equal - if (!areNullsEqual && left.isOptional() && right.isOptional()) { + if (comparators.get(i) == Comparator.EQUALS + && left.isOptional() && right.isOptional()) { jc = cg.getEvalBlock()._if(left.getIsSet().eq(JExpr.lit(0)). cand(right.getIsSet().eq(JExpr.lit(0)))); jc._then()._return(JExpr.FALSE); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/Comparator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/Comparator.java new file mode 100644 index 000000000..fa9b45c2f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/Comparator.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.common; + +/** + * Comparator type. Used in Join and Aggregation operators. + */ +public enum Comparator { + NONE, // No comparator + EQUALS, // Equality comparator + IS_NOT_DISTINCT_FROM // 'IS NOT DISTINCT FROM' comparator +}
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java index a6b2587c3..1e3d7e91a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java @@ -28,21 +28,22 @@ import java.util.List; @JsonTypeName("hashtable-config") public class HashTableConfig { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashTableConfig.class); - private final int initialCapacity; private final float loadFactor; private final List<NamedExpression> keyExprsBuild; private final List<NamedExpression> keyExprsProbe; + private final List<Comparator> comparators; @JsonCreator public HashTableConfig(@JsonProperty("initialCapacity") int initialCapacity, @JsonProperty("loadFactor") float loadFactor, @JsonProperty("keyExprsBuild") List<NamedExpression> keyExprsBuild, - @JsonProperty("keyExprsProbe") List<NamedExpression> keyExprsProbe) { + @JsonProperty("keyExprsProbe") List<NamedExpression> keyExprsProbe, + @JsonProperty("comparators") List<Comparator> comparators) { this.initialCapacity = initialCapacity; this.loadFactor = loadFactor; this.keyExprsBuild = keyExprsBuild; this.keyExprsProbe = keyExprsProbe; + this.comparators = comparators; } public int getInitialCapacity() { @@ -61,4 +62,8 @@ public class HashTableConfig { return keyExprsProbe; } + public List<Comparator> getComparators() { + return comparators; + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 431ced3a7..18cfc78ab 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import com.google.common.collect.Lists; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.logical.data.JoinCondition; import org.apache.drill.common.logical.data.NamedExpression; @@ -44,7 +45,7 @@ import org.apache.drill.exec.physical.impl.common.HashTable; import org.apache.drill.exec.physical.impl.common.HashTableConfig; import org.apache.drill.exec.physical.impl.common.HashTableStats; import org.apache.drill.exec.physical.impl.common.IndexPointer; -import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinComparator; +import org.apache.drill.exec.physical.impl.common.Comparator; import org.apache.drill.exec.physical.impl.sort.RecordBatchData; import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; @@ -79,6 +80,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { // Join conditions private final List<JoinCondition> conditions; + private final List<Comparator> comparators; + // Runtime generated class implementing HashJoinProbe interface private HashJoinProbe hashJoinProbe = null; @@ -285,19 +288,12 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { final List<NamedExpression> rightExpr = new ArrayList<>(conditionsSize); List<NamedExpression> leftExpr = new ArrayList<>(conditionsSize); - JoinComparator comparator = JoinComparator.NONE; // Create named expressions from the conditions for (int i = 0; i < conditionsSize; i++) { rightExpr.add(new NamedExpression(conditions.get(i).getRight(), new FieldReference("build_side_" + i))); leftExpr.add(new NamedExpression(conditions.get(i).getLeft(), new FieldReference("probe_side_" + i))); - - // Hash join only supports certain types of comparisons - comparator = JoinUtils.checkAndSetComparison(conditions.get(i), comparator); } - assert comparator != JoinComparator.NONE; - final boolean areNullsEqual = (comparator == JoinComparator.IS_NOT_DISTINCT_FROM) ? true : false; - // Set the left named expression to be null if the probe batch is empty. if (leftUpstream != IterOutcome.OK_NEW_SCHEMA && leftUpstream != IterOutcome.OK) { leftExpr = null; @@ -309,12 +305,11 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { final HashTableConfig htConfig = new HashTableConfig((int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE), - HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr); + HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators); // Create the chained hash table final ChainedHashTable ht = - new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null, - areNullsEqual); + new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null); hashTable = ht.createAndSetupHashTable(null); } @@ -500,6 +495,12 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { this.right = right; joinType = popConfig.getJoinType(); conditions = popConfig.getConditions(); + + comparators = Lists.newArrayListWithExpectedSize(conditions.size()); + for (int i=0; i<conditions.size(); i++) { + JoinCondition cond = conditions.get(i); + comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)); + } } private void updateStats(HashTable htable) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java index be363d2d3..caa18becd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java @@ -21,10 +21,12 @@ package org.apache.drill.exec.physical.impl.join; import org.apache.calcite.rel.core.Join; import org.apache.calcite.rel.core.JoinRelType; import org.apache.calcite.rex.RexNode; +import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.logical.data.JoinCondition; import org.apache.calcite.rel.RelNode; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.plan.volcano.RelSubset; +import org.apache.drill.exec.physical.impl.common.Comparator; import org.apache.drill.exec.planner.logical.DrillAggregateRel; import org.apache.drill.exec.planner.logical.DrillFilterRel; import org.apache.drill.exec.planner.logical.DrillProjectRel; @@ -46,40 +48,28 @@ import java.util.List; import com.google.common.collect.Lists; public class JoinUtils { - public static enum JoinComparator { - NONE, // No comparator - EQUALS, // Equality comparator - IS_NOT_DISTINCT_FROM // 'IS NOT DISTINCT FROM' comparator - } - public static enum JoinCategory { + public enum JoinCategory { EQUALITY, // equality join INEQUALITY, // inequality join: <>, <, > CARTESIAN // no join condition } + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinUtils.class); - // Check the comparator for the join condition. Note that a similar check is also + // Check the comparator is supported in join condition. Note that a similar check is also // done in JoinPrel; however we have to repeat it here because a physical plan // may be submitted directly to Drill. - public static JoinComparator checkAndSetComparison(JoinCondition condition, - JoinComparator comparator) { - if (condition.getRelationship().equalsIgnoreCase("EQUALS") || - condition.getRelationship().equals("==") /* older json plans still have '==' */) { - if (comparator == JoinComparator.NONE || - comparator == JoinComparator.EQUALS) { - return JoinComparator.EQUALS; - } else { - throw new IllegalArgumentException("This type of join does not support mixed comparators."); - } - } else if (condition.getRelationship().equalsIgnoreCase("IS_NOT_DISTINCT_FROM")) { - if (comparator == JoinComparator.NONE || - comparator == JoinComparator.IS_NOT_DISTINCT_FROM) { - return JoinComparator.IS_NOT_DISTINCT_FROM; - } else { - throw new IllegalArgumentException("This type of join does not support mixed comparators."); - } + public static Comparator checkAndReturnSupportedJoinComparator(JoinCondition condition) { + switch(condition.getRelationship().toUpperCase()) { + case "EQUALS": + case "==": /* older json plans still have '==' */ + return Comparator.EQUALS; + case "IS_NOT_DISTINCT_FROM": + return Comparator.IS_NOT_DISTINCT_FROM; } - throw new IllegalArgumentException("Invalid comparator supplied to this join."); + throw UserException.unsupportedError() + .message("Invalid comparator supplied to this join: ", condition.getRelationship()) + .build(logger); } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index 234fd3a43..90f3f5f8c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -22,6 +22,7 @@ import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM; import java.io.IOException; import java.util.List; +import com.google.common.collect.Lists; import org.apache.calcite.rel.core.JoinRelType; import org.apache.drill.common.expression.ErrorCollector; import org.apache.drill.common.expression.ErrorCollectorImpl; @@ -43,7 +44,7 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.fn.FunctionGenerationHelper; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.config.MergeJoinPOP; -import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinComparator; +import org.apache.drill.exec.physical.impl.common.Comparator; import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; @@ -98,10 +99,9 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { private final RecordIterator rightIterator; private final JoinStatus status; private final List<JoinCondition> conditions; + private final List<Comparator> comparators; private final JoinRelType joinType; private JoinWorker worker; - private boolean areNullsEqual = false; // whether nulls compare equal - private static final String LEFT_INPUT = "LEFT INPUT"; private static final String RIGHT_INPUT = "RIGHT INPUT"; @@ -120,12 +120,10 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { this.status = new JoinStatus(leftIterator, rightIterator, this); this.conditions = popConfig.getConditions(); - JoinComparator comparator = JoinComparator.NONE; + this.comparators = Lists.newArrayListWithExpectedSize(conditions.size()); for (JoinCondition condition : conditions) { - comparator = JoinUtils.checkAndSetComparison(condition, comparator); + this.comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(condition)); } - assert comparator != JoinComparator.NONE; - areNullsEqual = (comparator == JoinComparator.IS_NOT_DISTINCT_FROM); } public JoinRelType getJoinType() { @@ -461,7 +459,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { // If not 0, it means not equal. // Null compares to Null should returns null (unknown). In such case, we return 1 to indicate they are not equal. if (compareLeftExprHolder.isOptional() && compareRightExprHolder.isOptional() - && ! areNullsEqual) { + && comparators.get(i) == Comparator.EQUALS) { JConditional jc = cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0)). cand(compareRightExprHolder.getIsSet().eq(JExpr.lit(0)))); jc._then()._return(JExpr.lit(1)); |