aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
diff options
context:
space:
mode:
authorBen-Zvi <bben-zvi@mapr.com>2018-09-25 19:07:10 -0700
committerVitalii Diravka <vitalii.diravka@gmail.com>2018-10-01 13:51:31 +0300
commit3b1ae159b94ef7c1d67ddde474c75d5558d3e50a (patch)
tree692b8d26555a0d8ba59dcdf968962806ba6cd4d2 /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
parent98e5de3b5af862779244bac8329852b3c9a901df (diff)
DRILL-6755: Avoid building Hash Table for inner/left join when probe side is empty
- Preparations and cleanup for DRILL-6755 clsoes #1480
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java60
1 files changed, 39 insertions, 21 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 89ab8d4a4..658f03a33 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -81,7 +81,6 @@ import org.apache.drill.exec.work.filter.BloomFilterDef;
import org.apache.drill.exec.work.filter.RuntimeFilterDef;
import org.apache.drill.exec.work.filter.RuntimeFilterReporter;
-
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
@@ -101,7 +100,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA
* processed individually (that Build partition should be smaller than the original, hence likely fit whole into
* memory to allow probing; if not -- see below).
* Processing of each spilled pair is EXACTLY like processing the original Build/Probe incomings. (As a fact,
- * the {@Link #innerNext() innerNext} method calls itself recursively !!). Thus the spilled build partition is
+ * the {@link #innerNext()} method calls itself recursively !!). Thus the spilled build partition is
* read and divided into new partitions, which in turn may spill again (and again...).
* The code tracks these spilling "cycles". Normally any such "again" (i.e. cycle of 2 or greater) is a waste,
* indicating that the number of partitions chosen was too small.
@@ -116,6 +115,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
// Join type, INNER, LEFT, RIGHT or OUTER
private final JoinRelType joinType;
+ private boolean joinIsLeftOrFull;
+ private boolean joinIsRightOrFull;
+ private boolean skipHashTableBuild; // when outer side is empty, and the join is inner or left (see DRILL-6755)
// Join conditions
private final List<JoinCondition> conditions;
@@ -131,8 +133,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
private final Set<String> buildJoinColumns;
// Fields used for partitioning
-
- private long maxIncomingBatchSize;
/**
* The number of {@link HashPartition}s. This is configured via a system option and set in {@link #partitionNumTuning(int, HashJoinMemoryCalculator.BuildSidePartitioning)}.
*/
@@ -264,6 +264,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
buildSchema = right.getSchema();
// position of the new "column" for keeping the hash values (after the real columns)
rightHVColPosition = right.getContainer().getNumberOfColumns();
+ // In special cases, when the probe side is empty, and inner/left join - no need for Hash Table
+ skipHashTableBuild = leftUpstream == IterOutcome.NONE && ! joinIsRightOrFull;
// We only need the hash tables if we have data on the build side.
setupHashTable();
}
@@ -447,12 +449,12 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
// Try to probe and project, or recursively handle a spilled partition
if (!buildSideIsEmpty.booleanValue() || // If there are build-side rows
- joinType != JoinRelType.INNER) { // or if this is a left/full outer join
+ joinIsLeftOrFull) { // or if this is a left/full outer join
prefetchFirstProbeBatch();
if (leftUpstream.isError() ||
- ( leftUpstream == NONE && joinType != JoinRelType.FULL && joinType != JoinRelType.RIGHT )) {
+ ( leftUpstream == NONE && ! joinIsRightOrFull )) {
// A termination condition was reached while prefetching the first probe side data holding batch.
// We need to terminate.
return leftUpstream;
@@ -568,19 +570,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
} else {
// Our build side is empty, we won't have any matches, clear the probe side
- if (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
- for (final VectorWrapper<?> wrapper : probeBatch) {
- wrapper.getValueVector().clear();
- }
- probeBatch.kill(true);
- leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch);
- while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
- for (final VectorWrapper<?> wrapper : probeBatch) {
- wrapper.getValueVector().clear();
- }
- leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch);
- }
- }
+ killAndDrainLeftUpstream();
}
// No more output records, clean up and return
@@ -596,10 +586,31 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
}
}
+ /**
+ * In case an upstream data is no longer needed, send a kill and flush any remaining batch
+ *
+ * @param batch probe or build batch
+ * @param upstream which upstream
+ * @param isLeft is it the left or right
+ */
+ private void killAndDrainUpstream(RecordBatch batch, IterOutcome upstream, boolean isLeft) {
+ batch.kill(true);
+ while (upstream == IterOutcome.OK_NEW_SCHEMA || upstream == IterOutcome.OK) {
+ for (final VectorWrapper<?> wrapper : batch) {
+ wrapper.getValueVector().clear();
+ }
+ upstream = next( isLeft ? HashJoinHelper.LEFT_INPUT : HashJoinHelper.RIGHT_INPUT, batch);
+ }
+ }
+ private void killAndDrainLeftUpstream() { killAndDrainUpstream(probeBatch, leftUpstream, true); }
+ private void killAndDrainRightUpstream() { killAndDrainUpstream(buildBatch, rightUpstream, false); }
+
private void setupHashTable() throws SchemaChangeException {
final List<Comparator> comparators = Lists.newArrayListWithExpectedSize(conditions.size());
conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
+ if ( skipHashTableBuild ) { return; }
+
// Setup the hash table configuration object
List<NamedExpression> leftExpr = new ArrayList<>(conditions.size());
@@ -819,6 +830,11 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
return null;
}
+ if ( skipHashTableBuild ) { // No hash table needed - then consume all the right upstream
+ killAndDrainRightUpstream();
+ return null;
+ }
+
HashJoinMemoryCalculator.BuildSidePartitioning buildCalc;
boolean firstCycle = cycleNum == 0;
@@ -1013,7 +1029,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
final MajorType outputType;
// If left or full outer join, then the output type must be nullable. However, map types are
// not nullable so we must exclude them from the check below (see DRILL-2197).
- if ((joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) && inputType.getMode() == DataMode.REQUIRED
+ if (joinIsLeftOrFull && inputType.getMode() == DataMode.REQUIRED
&& inputType.getMinorType() != TypeProtos.MinorType.MAP) {
outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
} else {
@@ -1034,7 +1050,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
// If right or full outer join then the output type should be optional. However, map types are
// not nullable so we must exclude them from the check below (see DRILL-2771, DRILL-2197).
- if ((joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) && inputType.getMode() == DataMode.REQUIRED
+ if (joinIsRightOrFull && inputType.getMode() == DataMode.REQUIRED
&& inputType.getMinorType() != TypeProtos.MinorType.MAP) {
outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
} else {
@@ -1074,6 +1090,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
this.buildBatch = right;
this.probeBatch = left;
joinType = popConfig.getJoinType();
+ joinIsLeftOrFull = joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL;
+ joinIsRightOrFull = joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL;
conditions = popConfig.getConditions();
this.popConfig = popConfig;
rightExpr = new ArrayList<>(conditions.size());