From 3b1ae159b94ef7c1d67ddde474c75d5558d3e50a Mon Sep 17 00:00:00 2001 From: Ben-Zvi Date: Tue, 25 Sep 2018 19:07:10 -0700 Subject: DRILL-6755: Avoid building Hash Table for inner/left join when probe side is empty - Preparations and cleanup for DRILL-6755 clsoes #1480 --- .../exec/physical/impl/join/HashJoinBatch.java | 60 ++++++++++++++-------- .../exec/record/AbstractBinaryRecordBatch.java | 2 +- .../drill/exec/work/batch/BaseRawBatchBuffer.java | 2 +- .../physical/impl/join/TestHashJoinOutcome.java | 43 ++++++++++++++++ 4 files changed, 84 insertions(+), 23 deletions(-) (limited to 'exec') diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 89ab8d4a4..658f03a33 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -81,7 +81,6 @@ import org.apache.drill.exec.work.filter.BloomFilterDef; import org.apache.drill.exec.work.filter.RuntimeFilterDef; import org.apache.drill.exec.work.filter.RuntimeFilterReporter; - import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE; import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA; @@ -101,7 +100,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA * processed individually (that Build partition should be smaller than the original, hence likely fit whole into * memory to allow probing; if not -- see below). * Processing of each spilled pair is EXACTLY like processing the original Build/Probe incomings. (As a fact, - * the {@Link #innerNext() innerNext} method calls itself recursively !!). Thus the spilled build partition is + * the {@link #innerNext()} method calls itself recursively !!). Thus the spilled build partition is * read and divided into new partitions, which in turn may spill again (and again...). * The code tracks these spilling "cycles". Normally any such "again" (i.e. cycle of 2 or greater) is a waste, * indicating that the number of partitions chosen was too small. @@ -116,6 +115,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch { // Join type, INNER, LEFT, RIGHT or OUTER private final JoinRelType joinType; + private boolean joinIsLeftOrFull; + private boolean joinIsRightOrFull; + private boolean skipHashTableBuild; // when outer side is empty, and the join is inner or left (see DRILL-6755) // Join conditions private final List conditions; @@ -131,8 +133,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch { private final Set buildJoinColumns; // Fields used for partitioning - - private long maxIncomingBatchSize; /** * The number of {@link HashPartition}s. This is configured via a system option and set in {@link #partitionNumTuning(int, HashJoinMemoryCalculator.BuildSidePartitioning)}. */ @@ -264,6 +264,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch { buildSchema = right.getSchema(); // position of the new "column" for keeping the hash values (after the real columns) rightHVColPosition = right.getContainer().getNumberOfColumns(); + // In special cases, when the probe side is empty, and inner/left join - no need for Hash Table + skipHashTableBuild = leftUpstream == IterOutcome.NONE && ! joinIsRightOrFull; // We only need the hash tables if we have data on the build side. setupHashTable(); } @@ -447,12 +449,12 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch { // Try to probe and project, or recursively handle a spilled partition if (!buildSideIsEmpty.booleanValue() || // If there are build-side rows - joinType != JoinRelType.INNER) { // or if this is a left/full outer join + joinIsLeftOrFull) { // or if this is a left/full outer join prefetchFirstProbeBatch(); if (leftUpstream.isError() || - ( leftUpstream == NONE && joinType != JoinRelType.FULL && joinType != JoinRelType.RIGHT )) { + ( leftUpstream == NONE && ! joinIsRightOrFull )) { // A termination condition was reached while prefetching the first probe side data holding batch. // We need to terminate. return leftUpstream; @@ -568,19 +570,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch { } else { // Our build side is empty, we won't have any matches, clear the probe side - if (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) { - for (final VectorWrapper wrapper : probeBatch) { - wrapper.getValueVector().clear(); - } - probeBatch.kill(true); - leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch); - while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) { - for (final VectorWrapper wrapper : probeBatch) { - wrapper.getValueVector().clear(); - } - leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch); - } - } + killAndDrainLeftUpstream(); } // No more output records, clean up and return @@ -596,10 +586,31 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch { } } + /** + * In case an upstream data is no longer needed, send a kill and flush any remaining batch + * + * @param batch probe or build batch + * @param upstream which upstream + * @param isLeft is it the left or right + */ + private void killAndDrainUpstream(RecordBatch batch, IterOutcome upstream, boolean isLeft) { + batch.kill(true); + while (upstream == IterOutcome.OK_NEW_SCHEMA || upstream == IterOutcome.OK) { + for (final VectorWrapper wrapper : batch) { + wrapper.getValueVector().clear(); + } + upstream = next( isLeft ? HashJoinHelper.LEFT_INPUT : HashJoinHelper.RIGHT_INPUT, batch); + } + } + private void killAndDrainLeftUpstream() { killAndDrainUpstream(probeBatch, leftUpstream, true); } + private void killAndDrainRightUpstream() { killAndDrainUpstream(buildBatch, rightUpstream, false); } + private void setupHashTable() throws SchemaChangeException { final List comparators = Lists.newArrayListWithExpectedSize(conditions.size()); conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond))); + if ( skipHashTableBuild ) { return; } + // Setup the hash table configuration object List leftExpr = new ArrayList<>(conditions.size()); @@ -819,6 +830,11 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch { return null; } + if ( skipHashTableBuild ) { // No hash table needed - then consume all the right upstream + killAndDrainRightUpstream(); + return null; + } + HashJoinMemoryCalculator.BuildSidePartitioning buildCalc; boolean firstCycle = cycleNum == 0; @@ -1013,7 +1029,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch { final MajorType outputType; // If left or full outer join, then the output type must be nullable. However, map types are // not nullable so we must exclude them from the check below (see DRILL-2197). - if ((joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) && inputType.getMode() == DataMode.REQUIRED + if (joinIsLeftOrFull && inputType.getMode() == DataMode.REQUIRED && inputType.getMinorType() != TypeProtos.MinorType.MAP) { outputType = Types.overrideMode(inputType, DataMode.OPTIONAL); } else { @@ -1034,7 +1050,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch { // If right or full outer join then the output type should be optional. However, map types are // not nullable so we must exclude them from the check below (see DRILL-2771, DRILL-2197). - if ((joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) && inputType.getMode() == DataMode.REQUIRED + if (joinIsRightOrFull && inputType.getMode() == DataMode.REQUIRED && inputType.getMinorType() != TypeProtos.MinorType.MAP) { outputType = Types.overrideMode(inputType, DataMode.OPTIONAL); } else { @@ -1074,6 +1090,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch { this.buildBatch = right; this.probeBatch = left; joinType = popConfig.getJoinType(); + joinIsLeftOrFull = joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL; + joinIsRightOrFull = joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL; conditions = popConfig.getConditions(); this.popConfig = popConfig; rightExpr = new ArrayList<>(conditions.size()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java index e7fa4e6b5..486fb1e1a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java @@ -113,7 +113,7 @@ public abstract class AbstractBinaryRecordBatch exte return verifyOutcomeToSetBatchState(leftUpstream, rightUpstream); } - /* + /** * Checks for the operator specific early terminal condition. * @return true if the further processing can stop. * false if the further processing is needed. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java index 6d77d639f..5487d9508 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java @@ -105,7 +105,7 @@ public abstract class BaseRawBatchBuffer implements RawBatchBuffer { @Override public void close() { if (!isTerminated() && context.getExecutorState().shouldContinue()) { - final String msg = String.format("Cleanup before finished. %d out of %d strams have finished", completedStreams(), fragmentCount); + final String msg = String.format("Cleanup before finished. %d out of %d streams have finished", completedStreams(), fragmentCount); throw new IllegalStateException(msg); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java index 349a29511..5beb7cbdd 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java @@ -209,4 +209,47 @@ public class TestHashJoinOutcome extends PhysicalOpUnitTestBase { public void testHashJoinNoneOutcomeUninitLeftSide() { testHashJoinOutcomes(UninitializedSide.Left, RecordBatch.IterOutcome.NONE, RecordBatch.IterOutcome.NONE); } + + /** + * Testing for DRILL-6755: No Hash Table is built when the first probe batch is NONE + */ + @Test + public void testHashJoinWhenProbeIsNONE() { + + inputOutcomesLeft.add(RecordBatch.IterOutcome.NONE); + + inputOutcomesRight.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA); + inputOutcomesRight.add(RecordBatch.IterOutcome.OK); + inputOutcomesRight.add(RecordBatch.IterOutcome.NONE); + + // for the probe side input - use multiple batches (to check that they are all cleared/drained) + final List buildSideinputContainer = new ArrayList<>(5); + buildSideinputContainer.add(emptyInputRowSetRight.container()); + buildSideinputContainer.add(nonEmptyInputRowSetRight.container()); + RowSet.SingleRowSet secondInputRowSetRight = operatorFixture.rowSetBuilder(inputSchemaRight).addRow(456).build(); + RowSet.SingleRowSet thirdInputRowSetRight = operatorFixture.rowSetBuilder(inputSchemaRight).addRow(789).build(); + buildSideinputContainer.add(secondInputRowSetRight.container()); + buildSideinputContainer.add(thirdInputRowSetRight.container()); + + final MockRecordBatch mockInputBatchRight = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, buildSideinputContainer, inputOutcomesRight, batchSchemaRight); + final MockRecordBatch mockInputBatchLeft = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, inputContainerLeft, inputOutcomesLeft, batchSchemaLeft); + + List conditions = Lists.newArrayList(); + + conditions.add(new JoinCondition(SqlKind.EQUALS.toString(), FieldReference.getWithQuotedRef("leftcol"), FieldReference.getWithQuotedRef("rightcol"))); + + HashJoinPOP hjConf = new HashJoinPOP(null, null, conditions, JoinRelType.INNER); + + HashJoinBatch hjBatch = new HashJoinBatch(hjConf, operatorFixture.getFragmentContext(), mockInputBatchLeft, mockInputBatchRight); + + RecordBatch.IterOutcome gotOutcome = hjBatch.next(); + assertTrue(gotOutcome == RecordBatch.IterOutcome.OK_NEW_SCHEMA); + + gotOutcome = hjBatch.next(); + assertTrue(gotOutcome == RecordBatch.IterOutcome.NONE); + + secondInputRowSetRight.clear(); + thirdInputRowSetRight.clear(); + buildSideinputContainer.clear(); + } } -- cgit v1.2.3