diff options
author | Ben-Zvi <bben-zvi@mapr.com> | 2018-09-25 19:07:10 -0700 |
---|---|---|
committer | Vitalii Diravka <vitalii.diravka@gmail.com> | 2018-10-01 13:51:31 +0300 |
commit | 3b1ae159b94ef7c1d67ddde474c75d5558d3e50a (patch) | |
tree | 692b8d26555a0d8ba59dcdf968962806ba6cd4d2 /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java | |
parent | 98e5de3b5af862779244bac8329852b3c9a901df (diff) |
DRILL-6755: Avoid building Hash Table for inner/left join when probe side is empty
- Preparations and cleanup for DRILL-6755
clsoes #1480
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java | 60 |
1 files changed, 39 insertions, 21 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 89ab8d4a4..658f03a33 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -81,7 +81,6 @@ import org.apache.drill.exec.work.filter.BloomFilterDef; import org.apache.drill.exec.work.filter.RuntimeFilterDef; import org.apache.drill.exec.work.filter.RuntimeFilterReporter; - import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA; @@ -101,7 +100,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA * processed individually (that Build partition should be smaller than the original, hence likely fit whole into * memory to allow probing; if not -- see below). * Processing of each spilled pair is EXACTLY like processing the original Build/Probe incomings. (As a fact, - * the {@Link #innerNext() innerNext} method calls itself recursively !!). Thus the spilled build partition is + * the {@link #innerNext()} method calls itself recursively !!). Thus the spilled build partition is * read and divided into new partitions, which in turn may spill again (and again...). * The code tracks these spilling "cycles". Normally any such "again" (i.e. cycle of 2 or greater) is a waste, * indicating that the number of partitions chosen was too small. @@ -116,6 +115,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { // Join type, INNER, LEFT, RIGHT or OUTER private final JoinRelType joinType; + private boolean joinIsLeftOrFull; + private boolean joinIsRightOrFull; + private boolean skipHashTableBuild; // when outer side is empty, and the join is inner or left (see DRILL-6755) // Join conditions private final List<JoinCondition> conditions; @@ -131,8 +133,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { private final Set<String> buildJoinColumns; // Fields used for partitioning - - private long maxIncomingBatchSize; /** * The number of {@link HashPartition}s. This is configured via a system option and set in {@link #partitionNumTuning(int, HashJoinMemoryCalculator.BuildSidePartitioning)}. */ @@ -264,6 +264,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { buildSchema = right.getSchema(); // position of the new "column" for keeping the hash values (after the real columns) rightHVColPosition = right.getContainer().getNumberOfColumns(); + // In special cases, when the probe side is empty, and inner/left join - no need for Hash Table + skipHashTableBuild = leftUpstream == IterOutcome.NONE && ! joinIsRightOrFull; // We only need the hash tables if we have data on the build side. setupHashTable(); } @@ -447,12 +449,12 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { // Try to probe and project, or recursively handle a spilled partition if (!buildSideIsEmpty.booleanValue() || // If there are build-side rows - joinType != JoinRelType.INNER) { // or if this is a left/full outer join + joinIsLeftOrFull) { // or if this is a left/full outer join prefetchFirstProbeBatch(); if (leftUpstream.isError() || - ( leftUpstream == NONE && joinType != JoinRelType.FULL && joinType != JoinRelType.RIGHT )) { + ( leftUpstream == NONE && ! joinIsRightOrFull )) { // A termination condition was reached while prefetching the first probe side data holding batch. // We need to terminate. return leftUpstream; @@ -568,19 +570,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { } else { // Our build side is empty, we won't have any matches, clear the probe side - if (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) { - for (final VectorWrapper<?> wrapper : probeBatch) { - wrapper.getValueVector().clear(); - } - probeBatch.kill(true); - leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch); - while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) { - for (final VectorWrapper<?> wrapper : probeBatch) { - wrapper.getValueVector().clear(); - } - leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch); - } - } + killAndDrainLeftUpstream(); } // No more output records, clean up and return @@ -596,10 +586,31 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { } } + /** + * In case an upstream data is no longer needed, send a kill and flush any remaining batch + * + * @param batch probe or build batch + * @param upstream which upstream + * @param isLeft is it the left or right + */ + private void killAndDrainUpstream(RecordBatch batch, IterOutcome upstream, boolean isLeft) { + batch.kill(true); + while (upstream == IterOutcome.OK_NEW_SCHEMA || upstream == IterOutcome.OK) { + for (final VectorWrapper<?> wrapper : batch) { + wrapper.getValueVector().clear(); + } + upstream = next( isLeft ? HashJoinHelper.LEFT_INPUT : HashJoinHelper.RIGHT_INPUT, batch); + } + } + private void killAndDrainLeftUpstream() { killAndDrainUpstream(probeBatch, leftUpstream, true); } + private void killAndDrainRightUpstream() { killAndDrainUpstream(buildBatch, rightUpstream, false); } + private void setupHashTable() throws SchemaChangeException { final List<Comparator> comparators = Lists.newArrayListWithExpectedSize(conditions.size()); conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond))); + if ( skipHashTableBuild ) { return; } + // Setup the hash table configuration object List<NamedExpression> leftExpr = new ArrayList<>(conditions.size()); @@ -819,6 +830,11 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { return null; } + if ( skipHashTableBuild ) { // No hash table needed - then consume all the right upstream + killAndDrainRightUpstream(); + return null; + } + HashJoinMemoryCalculator.BuildSidePartitioning buildCalc; boolean firstCycle = cycleNum == 0; @@ -1013,7 +1029,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { final MajorType outputType; // If left or full outer join, then the output type must be nullable. However, map types are // not nullable so we must exclude them from the check below (see DRILL-2197). - if ((joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) && inputType.getMode() == DataMode.REQUIRED + if (joinIsLeftOrFull && inputType.getMode() == DataMode.REQUIRED && inputType.getMinorType() != TypeProtos.MinorType.MAP) { outputType = Types.overrideMode(inputType, DataMode.OPTIONAL); } else { @@ -1034,7 +1050,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { // If right or full outer join then the output type should be optional. However, map types are // not nullable so we must exclude them from the check below (see DRILL-2771, DRILL-2197). - if ((joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) && inputType.getMode() == DataMode.REQUIRED + if (joinIsRightOrFull && inputType.getMode() == DataMode.REQUIRED && inputType.getMinorType() != TypeProtos.MinorType.MAP) { outputType = Types.overrideMode(inputType, DataMode.OPTIONAL); } else { @@ -1074,6 +1090,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { this.buildBatch = right; this.probeBatch = left; joinType = popConfig.getJoinType(); + joinIsLeftOrFull = joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL; + joinIsRightOrFull = joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL; conditions = popConfig.getConditions(); this.popConfig = popConfig; rightExpr = new ArrayList<>(conditions.size()); |