aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/physical
diff options
context:
space:
mode:
authorRoman Kulyk <rom.kulyk@gmail.com>2016-10-28 13:26:53 +0000
committerSudheesh Katkam <skatkam@maprtech.com>2016-11-02 10:56:01 -0700
commit5f34c960e80a0938a4ce2666654a60df17fe402b (patch)
treeaebf39aba49f2eda330c624be5513f907a42bef9 /exec/java-exec/src/main/java/org/apache/drill/exec/physical
parent83513daf0903e0d94fcaad7b1ae4e8ad6272b494 (diff)
DRILL-4927 (part 2): Add support for Null Equality Joins (mixed comparators)
This changes are a subset of the original pull request from DRILL-4539 (PR-462). - Added changes to support mixed comparators; - Added tests for it. closes #635
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java22
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/Comparator.java27
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java23
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java40
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java14
8 files changed, 92 insertions, 61 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 606a542b5..d2b42d010 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -18,7 +18,9 @@
package org.apache.drill.exec.physical.impl.aggregate;
import java.io.IOException;
+import java.util.List;
+import com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollector;
@@ -40,6 +42,7 @@ import org.apache.drill.exec.expr.ValueVectorWriteExpression;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.HashAggregate;
import org.apache.drill.exec.physical.impl.aggregate.HashAggregator.AggOutcome;
+import org.apache.drill.exec.physical.impl.common.Comparator;
import org.apache.drill.exec.physical.impl.common.HashTable;
import org.apache.drill.exec.physical.impl.common.HashTableConfig;
import org.apache.drill.exec.record.AbstractRecordBatch;
@@ -64,6 +67,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
private LogicalExpression[] aggrExprs;
private TypedFieldId[] groupByOutFieldIds;
private TypedFieldId[] aggrOutFieldIds; // field ids for the outgoing batch
+ private final List<Comparator> comparators;
private final GeneratorMapping UPDATE_AGGR_INSIDE =
GeneratorMapping.create("setupInterior" /* setup method */, "updateAggrValuesInternal" /* eval method */,
@@ -82,6 +86,13 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
public HashAggBatch(HashAggregate popConfig, RecordBatch incoming, FragmentContext context) throws ExecutionSetupException {
super(popConfig, context);
this.incoming = incoming;
+
+ final int numGrpByExprs = popConfig.getGroupByExprs().size();
+ comparators = Lists.newArrayListWithExpectedSize(numGrpByExprs);
+ for (int i=0; i<numGrpByExprs; i++) {
+ // nulls are equal in group by case
+ comparators.add(Comparator.IS_NOT_DISTINCT_FROM);
+ }
}
@Override
@@ -241,7 +252,7 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
HashTableConfig htConfig =
// TODO - fix the validator on this option
new HashTableConfig((int)context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
- HashTable.DEFAULT_LOAD_FACTOR, popConfig.getGroupByExprs(), null /* no probe exprs */);
+ HashTable.DEFAULT_LOAD_FACTOR, popConfig.getGroupByExprs(), null /* no probe exprs */, comparators);
agg.setup(popConfig, htConfig, context, this.stats,
oContext.getAllocator(), incoming, this,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 5e081638c..c31264a81 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -268,8 +268,7 @@ public abstract class HashAggTemplate implements HashAggregator {
}
ChainedHashTable ht =
- new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing,
- true /* nulls are equal */);
+ new ChainedHashTable(htConfig, context, allocator, incoming, null /* no incoming probe */, outgoing);
this.htable = ht.createAndSetupHashTable(groupByOutFieldIds);
numGroupByOutFields = groupByOutFieldIds.length;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
index 260808c40..972e8c78d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/ChainedHashTable.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.common;
import java.io.IOException;
import java.util.Arrays;
+import java.util.List;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
@@ -116,11 +117,9 @@ public class ChainedHashTable {
private final RecordBatch incomingBuild;
private final RecordBatch incomingProbe;
private final RecordBatch outgoing;
- private final boolean areNullsEqual;
public ChainedHashTable(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator,
- RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing,
- boolean areNullsEqual) {
+ RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing) {
this.htConfig = htConfig;
this.context = context;
@@ -128,7 +127,6 @@ public class ChainedHashTable {
this.incomingBuild = incomingBuild;
this.incomingProbe = incomingProbe;
this.outgoing = outgoing;
- this.areNullsEqual = areNullsEqual;
}
public HashTable createAndSetupHashTable(TypedFieldId[] outKeyFieldIds) throws ClassTransformationException,
@@ -197,9 +195,10 @@ public class ChainedHashTable {
// generate code for isKeyMatch(), setValue(), getHash() and outputRecordKeys()
- setupIsKeyMatchInternal(cgInner, KeyMatchIncomingBuildMapping, KeyMatchHtableMapping, keyExprsBuild, htKeyFieldIds);
+ setupIsKeyMatchInternal(cgInner, KeyMatchIncomingBuildMapping, KeyMatchHtableMapping, keyExprsBuild,
+ htConfig.getComparators(), htKeyFieldIds);
setupIsKeyMatchInternal(cgInner, KeyMatchIncomingProbeMapping, KeyMatchHtableProbeMapping, keyExprsProbe,
- htKeyFieldIds);
+ htConfig.getComparators(), htKeyFieldIds);
setupSetValue(cgInner, keyExprsBuild, htKeyFieldIds);
if (outgoing != null) {
@@ -221,7 +220,7 @@ public class ChainedHashTable {
private void setupIsKeyMatchInternal(ClassGenerator<HashTable> cg, MappingSet incomingMapping, MappingSet htableMapping,
- LogicalExpression[] keyExprs, TypedFieldId[] htKeyFieldIds)
+ LogicalExpression[] keyExprs, List<Comparator> comparators, TypedFieldId[] htKeyFieldIds)
throws SchemaChangeException {
cg.setMappingSet(incomingMapping);
@@ -230,19 +229,20 @@ public class ChainedHashTable {
return;
}
- int i = 0;
- for (LogicalExpression expr : keyExprs) {
+ for (int i=0; i<keyExprs.length; i++) {
+ final LogicalExpression expr = keyExprs[i];
cg.setMappingSet(incomingMapping);
HoldingContainer left = cg.addExpr(expr, ClassGenerator.BlkCreateMode.FALSE);
cg.setMappingSet(htableMapping);
- ValueVectorReadExpression vvrExpr = new ValueVectorReadExpression(htKeyFieldIds[i++]);
+ ValueVectorReadExpression vvrExpr = new ValueVectorReadExpression(htKeyFieldIds[i]);
HoldingContainer right = cg.addExpr(vvrExpr, ClassGenerator.BlkCreateMode.FALSE);
JConditional jc;
// codegen for nullable columns if nulls are not equal
- if (!areNullsEqual && left.isOptional() && right.isOptional()) {
+ if (comparators.get(i) == Comparator.EQUALS
+ && left.isOptional() && right.isOptional()) {
jc = cg.getEvalBlock()._if(left.getIsSet().eq(JExpr.lit(0)).
cand(right.getIsSet().eq(JExpr.lit(0))));
jc._then()._return(JExpr.FALSE);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/Comparator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/Comparator.java
new file mode 100644
index 000000000..fa9b45c2f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/Comparator.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+/**
+ * Comparator type. Used in Join and Aggregation operators.
+ */
+public enum Comparator {
+ NONE, // No comparator
+ EQUALS, // Equality comparator
+ IS_NOT_DISTINCT_FROM // 'IS NOT DISTINCT FROM' comparator
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java
index a6b2587c3..1e3d7e91a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableConfig.java
@@ -28,21 +28,22 @@ import java.util.List;
@JsonTypeName("hashtable-config")
public class HashTableConfig {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashTableConfig.class);
-
private final int initialCapacity;
private final float loadFactor;
private final List<NamedExpression> keyExprsBuild;
private final List<NamedExpression> keyExprsProbe;
+ private final List<Comparator> comparators;
@JsonCreator
public HashTableConfig(@JsonProperty("initialCapacity") int initialCapacity, @JsonProperty("loadFactor") float loadFactor,
@JsonProperty("keyExprsBuild") List<NamedExpression> keyExprsBuild,
- @JsonProperty("keyExprsProbe") List<NamedExpression> keyExprsProbe) {
+ @JsonProperty("keyExprsProbe") List<NamedExpression> keyExprsProbe,
+ @JsonProperty("comparators") List<Comparator> comparators) {
this.initialCapacity = initialCapacity;
this.loadFactor = loadFactor;
this.keyExprsBuild = keyExprsBuild;
this.keyExprsProbe = keyExprsProbe;
+ this.comparators = comparators;
}
public int getInitialCapacity() {
@@ -61,4 +62,8 @@ public class HashTableConfig {
return keyExprsProbe;
}
+ public List<Comparator> getComparators() {
+ return comparators;
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 431ced3a7..18cfc78ab 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import com.google.common.collect.Lists;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.drill.common.logical.data.NamedExpression;
@@ -44,7 +45,7 @@ import org.apache.drill.exec.physical.impl.common.HashTable;
import org.apache.drill.exec.physical.impl.common.HashTableConfig;
import org.apache.drill.exec.physical.impl.common.HashTableStats;
import org.apache.drill.exec.physical.impl.common.IndexPointer;
-import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinComparator;
+import org.apache.drill.exec.physical.impl.common.Comparator;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
@@ -79,6 +80,8 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
// Join conditions
private final List<JoinCondition> conditions;
+ private final List<Comparator> comparators;
+
// Runtime generated class implementing HashJoinProbe interface
private HashJoinProbe hashJoinProbe = null;
@@ -285,19 +288,12 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
final List<NamedExpression> rightExpr = new ArrayList<>(conditionsSize);
List<NamedExpression> leftExpr = new ArrayList<>(conditionsSize);
- JoinComparator comparator = JoinComparator.NONE;
// Create named expressions from the conditions
for (int i = 0; i < conditionsSize; i++) {
rightExpr.add(new NamedExpression(conditions.get(i).getRight(), new FieldReference("build_side_" + i)));
leftExpr.add(new NamedExpression(conditions.get(i).getLeft(), new FieldReference("probe_side_" + i)));
-
- // Hash join only supports certain types of comparisons
- comparator = JoinUtils.checkAndSetComparison(conditions.get(i), comparator);
}
- assert comparator != JoinComparator.NONE;
- final boolean areNullsEqual = (comparator == JoinComparator.IS_NOT_DISTINCT_FROM) ? true : false;
-
// Set the left named expression to be null if the probe batch is empty.
if (leftUpstream != IterOutcome.OK_NEW_SCHEMA && leftUpstream != IterOutcome.OK) {
leftExpr = null;
@@ -309,12 +305,11 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
final HashTableConfig htConfig =
new HashTableConfig((int) context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE),
- HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr);
+ HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr, comparators);
// Create the chained hash table
final ChainedHashTable ht =
- new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null,
- areNullsEqual);
+ new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null);
hashTable = ht.createAndSetupHashTable(null);
}
@@ -500,6 +495,12 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {
this.right = right;
joinType = popConfig.getJoinType();
conditions = popConfig.getConditions();
+
+ comparators = Lists.newArrayListWithExpectedSize(conditions.size());
+ for (int i=0; i<conditions.size(); i++) {
+ JoinCondition cond = conditions.get(i);
+ comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond));
+ }
}
private void updateStats(HashTable htable) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
index be363d2d3..caa18becd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/JoinUtils.java
@@ -21,10 +21,12 @@ package org.apache.drill.exec.physical.impl.join;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rex.RexNode;
+import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.plan.volcano.RelSubset;
+import org.apache.drill.exec.physical.impl.common.Comparator;
import org.apache.drill.exec.planner.logical.DrillAggregateRel;
import org.apache.drill.exec.planner.logical.DrillFilterRel;
import org.apache.drill.exec.planner.logical.DrillProjectRel;
@@ -46,40 +48,28 @@ import java.util.List;
import com.google.common.collect.Lists;
public class JoinUtils {
- public static enum JoinComparator {
- NONE, // No comparator
- EQUALS, // Equality comparator
- IS_NOT_DISTINCT_FROM // 'IS NOT DISTINCT FROM' comparator
- }
- public static enum JoinCategory {
+ public enum JoinCategory {
EQUALITY, // equality join
INEQUALITY, // inequality join: <>, <, >
CARTESIAN // no join condition
}
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JoinUtils.class);
- // Check the comparator for the join condition. Note that a similar check is also
+ // Check the comparator is supported in join condition. Note that a similar check is also
// done in JoinPrel; however we have to repeat it here because a physical plan
// may be submitted directly to Drill.
- public static JoinComparator checkAndSetComparison(JoinCondition condition,
- JoinComparator comparator) {
- if (condition.getRelationship().equalsIgnoreCase("EQUALS") ||
- condition.getRelationship().equals("==") /* older json plans still have '==' */) {
- if (comparator == JoinComparator.NONE ||
- comparator == JoinComparator.EQUALS) {
- return JoinComparator.EQUALS;
- } else {
- throw new IllegalArgumentException("This type of join does not support mixed comparators.");
- }
- } else if (condition.getRelationship().equalsIgnoreCase("IS_NOT_DISTINCT_FROM")) {
- if (comparator == JoinComparator.NONE ||
- comparator == JoinComparator.IS_NOT_DISTINCT_FROM) {
- return JoinComparator.IS_NOT_DISTINCT_FROM;
- } else {
- throw new IllegalArgumentException("This type of join does not support mixed comparators.");
- }
+ public static Comparator checkAndReturnSupportedJoinComparator(JoinCondition condition) {
+ switch(condition.getRelationship().toUpperCase()) {
+ case "EQUALS":
+ case "==": /* older json plans still have '==' */
+ return Comparator.EQUALS;
+ case "IS_NOT_DISTINCT_FROM":
+ return Comparator.IS_NOT_DISTINCT_FROM;
}
- throw new IllegalArgumentException("Invalid comparator supplied to this join.");
+ throw UserException.unsupportedError()
+ .message("Invalid comparator supplied to this join: ", condition.getRelationship())
+ .build(logger);
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index 234fd3a43..90f3f5f8c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -22,6 +22,7 @@ import static org.apache.drill.exec.compile.sig.GeneratorMapping.GM;
import java.io.IOException;
import java.util.List;
+import com.google.common.collect.Lists;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.drill.common.expression.ErrorCollector;
import org.apache.drill.common.expression.ErrorCollectorImpl;
@@ -43,7 +44,7 @@ import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.fn.FunctionGenerationHelper;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.MergeJoinPOP;
-import org.apache.drill.exec.physical.impl.join.JoinUtils.JoinComparator;
+import org.apache.drill.exec.physical.impl.common.Comparator;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
@@ -98,10 +99,9 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
private final RecordIterator rightIterator;
private final JoinStatus status;
private final List<JoinCondition> conditions;
+ private final List<Comparator> comparators;
private final JoinRelType joinType;
private JoinWorker worker;
- private boolean areNullsEqual = false; // whether nulls compare equal
-
private static final String LEFT_INPUT = "LEFT INPUT";
private static final String RIGHT_INPUT = "RIGHT INPUT";
@@ -120,12 +120,10 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
this.status = new JoinStatus(leftIterator, rightIterator, this);
this.conditions = popConfig.getConditions();
- JoinComparator comparator = JoinComparator.NONE;
+ this.comparators = Lists.newArrayListWithExpectedSize(conditions.size());
for (JoinCondition condition : conditions) {
- comparator = JoinUtils.checkAndSetComparison(condition, comparator);
+ this.comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(condition));
}
- assert comparator != JoinComparator.NONE;
- areNullsEqual = (comparator == JoinComparator.IS_NOT_DISTINCT_FROM);
}
public JoinRelType getJoinType() {
@@ -461,7 +459,7 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> {
// If not 0, it means not equal.
// Null compares to Null should returns null (unknown). In such case, we return 1 to indicate they are not equal.
if (compareLeftExprHolder.isOptional() && compareRightExprHolder.isOptional()
- && ! areNullsEqual) {
+ && comparators.get(i) == Comparator.EQUALS) {
JConditional jc = cg.getEvalBlock()._if(compareLeftExprHolder.getIsSet().eq(JExpr.lit(0)).
cand(compareRightExprHolder.getIsSet().eq(JExpr.lit(0))));
jc._then()._return(JExpr.lit(1));