aboutsummaryrefslogtreecommitdiff
path: root/exec
diff options
context:
space:
mode:
authorBen-Zvi <bben-zvi@mapr.com>2018-09-25 19:07:10 -0700
committerVitalii Diravka <vitalii.diravka@gmail.com>2018-10-01 13:51:31 +0300
commit3b1ae159b94ef7c1d67ddde474c75d5558d3e50a (patch)
tree692b8d26555a0d8ba59dcdf968962806ba6cd4d2 /exec
parent98e5de3b5af862779244bac8329852b3c9a901df (diff)
DRILL-6755: Avoid building Hash Table for inner/left join when probe side is empty
- Preparations and cleanup for DRILL-6755 clsoes #1480
Diffstat (limited to 'exec')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java60
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java2
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java43
4 files changed, 84 insertions, 23 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 89ab8d4a4..658f03a33 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -81,7 +81,6 @@ import org.apache.drill.exec.work.filter.BloomFilterDef;
import org.apache.drill.exec.work.filter.RuntimeFilterDef;
import org.apache.drill.exec.work.filter.RuntimeFilterReporter;
-
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.NONE;
import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
@@ -101,7 +100,7 @@ import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA
* processed individually (that Build partition should be smaller than the original, hence likely fit whole into
* memory to allow probing; if not -- see below).
* Processing of each spilled pair is EXACTLY like processing the original Build/Probe incomings. (As a fact,
- * the {@Link #innerNext() innerNext} method calls itself recursively !!). Thus the spilled build partition is
+ * the {@link #innerNext()} method calls itself recursively !!). Thus the spilled build partition is
* read and divided into new partitions, which in turn may spill again (and again...).
* The code tracks these spilling "cycles". Normally any such "again" (i.e. cycle of 2 or greater) is a waste,
* indicating that the number of partitions chosen was too small.
@@ -116,6 +115,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
// Join type, INNER, LEFT, RIGHT or OUTER
private final JoinRelType joinType;
+ private boolean joinIsLeftOrFull;
+ private boolean joinIsRightOrFull;
+ private boolean skipHashTableBuild; // when outer side is empty, and the join is inner or left (see DRILL-6755)
// Join conditions
private final List<JoinCondition> conditions;
@@ -131,8 +133,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
private final Set<String> buildJoinColumns;
// Fields used for partitioning
-
- private long maxIncomingBatchSize;
/**
* The number of {@link HashPartition}s. This is configured via a system option and set in {@link #partitionNumTuning(int, HashJoinMemoryCalculator.BuildSidePartitioning)}.
*/
@@ -264,6 +264,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
buildSchema = right.getSchema();
// position of the new "column" for keeping the hash values (after the real columns)
rightHVColPosition = right.getContainer().getNumberOfColumns();
+ // In special cases, when the probe side is empty, and inner/left join - no need for Hash Table
+ skipHashTableBuild = leftUpstream == IterOutcome.NONE && ! joinIsRightOrFull;
// We only need the hash tables if we have data on the build side.
setupHashTable();
}
@@ -447,12 +449,12 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
// Try to probe and project, or recursively handle a spilled partition
if (!buildSideIsEmpty.booleanValue() || // If there are build-side rows
- joinType != JoinRelType.INNER) { // or if this is a left/full outer join
+ joinIsLeftOrFull) { // or if this is a left/full outer join
prefetchFirstProbeBatch();
if (leftUpstream.isError() ||
- ( leftUpstream == NONE && joinType != JoinRelType.FULL && joinType != JoinRelType.RIGHT )) {
+ ( leftUpstream == NONE && ! joinIsRightOrFull )) {
// A termination condition was reached while prefetching the first probe side data holding batch.
// We need to terminate.
return leftUpstream;
@@ -568,19 +570,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
} else {
// Our build side is empty, we won't have any matches, clear the probe side
- if (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
- for (final VectorWrapper<?> wrapper : probeBatch) {
- wrapper.getValueVector().clear();
- }
- probeBatch.kill(true);
- leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch);
- while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
- for (final VectorWrapper<?> wrapper : probeBatch) {
- wrapper.getValueVector().clear();
- }
- leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch);
- }
- }
+ killAndDrainLeftUpstream();
}
// No more output records, clean up and return
@@ -596,10 +586,31 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
}
}
+ /**
+ * In case an upstream data is no longer needed, send a kill and flush any remaining batch
+ *
+ * @param batch probe or build batch
+ * @param upstream which upstream
+ * @param isLeft is it the left or right
+ */
+ private void killAndDrainUpstream(RecordBatch batch, IterOutcome upstream, boolean isLeft) {
+ batch.kill(true);
+ while (upstream == IterOutcome.OK_NEW_SCHEMA || upstream == IterOutcome.OK) {
+ for (final VectorWrapper<?> wrapper : batch) {
+ wrapper.getValueVector().clear();
+ }
+ upstream = next( isLeft ? HashJoinHelper.LEFT_INPUT : HashJoinHelper.RIGHT_INPUT, batch);
+ }
+ }
+ private void killAndDrainLeftUpstream() { killAndDrainUpstream(probeBatch, leftUpstream, true); }
+ private void killAndDrainRightUpstream() { killAndDrainUpstream(buildBatch, rightUpstream, false); }
+
private void setupHashTable() throws SchemaChangeException {
final List<Comparator> comparators = Lists.newArrayListWithExpectedSize(conditions.size());
conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
+ if ( skipHashTableBuild ) { return; }
+
// Setup the hash table configuration object
List<NamedExpression> leftExpr = new ArrayList<>(conditions.size());
@@ -819,6 +830,11 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
return null;
}
+ if ( skipHashTableBuild ) { // No hash table needed - then consume all the right upstream
+ killAndDrainRightUpstream();
+ return null;
+ }
+
HashJoinMemoryCalculator.BuildSidePartitioning buildCalc;
boolean firstCycle = cycleNum == 0;
@@ -1013,7 +1029,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
final MajorType outputType;
// If left or full outer join, then the output type must be nullable. However, map types are
// not nullable so we must exclude them from the check below (see DRILL-2197).
- if ((joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) && inputType.getMode() == DataMode.REQUIRED
+ if (joinIsLeftOrFull && inputType.getMode() == DataMode.REQUIRED
&& inputType.getMinorType() != TypeProtos.MinorType.MAP) {
outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
} else {
@@ -1034,7 +1050,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
// If right or full outer join then the output type should be optional. However, map types are
// not nullable so we must exclude them from the check below (see DRILL-2771, DRILL-2197).
- if ((joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) && inputType.getMode() == DataMode.REQUIRED
+ if (joinIsRightOrFull && inputType.getMode() == DataMode.REQUIRED
&& inputType.getMinorType() != TypeProtos.MinorType.MAP) {
outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
} else {
@@ -1074,6 +1090,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
this.buildBatch = right;
this.probeBatch = left;
joinType = popConfig.getJoinType();
+ joinIsLeftOrFull = joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL;
+ joinIsRightOrFull = joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL;
conditions = popConfig.getConditions();
this.popConfig = popConfig;
rightExpr = new ArrayList<>(conditions.size());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
index e7fa4e6b5..486fb1e1a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractBinaryRecordBatch.java
@@ -113,7 +113,7 @@ public abstract class AbstractBinaryRecordBatch<T extends PhysicalOperator> exte
return verifyOutcomeToSetBatchState(leftUpstream, rightUpstream);
}
- /*
+ /**
* Checks for the operator specific early terminal condition.
* @return true if the further processing can stop.
* false if the further processing is needed.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
index 6d77d639f..5487d9508 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
@@ -105,7 +105,7 @@ public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
@Override
public void close() {
if (!isTerminated() && context.getExecutorState().shouldContinue()) {
- final String msg = String.format("Cleanup before finished. %d out of %d strams have finished", completedStreams(), fragmentCount);
+ final String msg = String.format("Cleanup before finished. %d out of %d streams have finished", completedStreams(), fragmentCount);
throw new IllegalStateException(msg);
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
index 349a29511..5beb7cbdd 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinOutcome.java
@@ -209,4 +209,47 @@ public class TestHashJoinOutcome extends PhysicalOpUnitTestBase {
public void testHashJoinNoneOutcomeUninitLeftSide() {
testHashJoinOutcomes(UninitializedSide.Left, RecordBatch.IterOutcome.NONE, RecordBatch.IterOutcome.NONE);
}
+
+ /**
+ * Testing for DRILL-6755: No Hash Table is built when the first probe batch is NONE
+ */
+ @Test
+ public void testHashJoinWhenProbeIsNONE() {
+
+ inputOutcomesLeft.add(RecordBatch.IterOutcome.NONE);
+
+ inputOutcomesRight.add(RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+ inputOutcomesRight.add(RecordBatch.IterOutcome.OK);
+ inputOutcomesRight.add(RecordBatch.IterOutcome.NONE);
+
+ // for the probe side input - use multiple batches (to check that they are all cleared/drained)
+ final List<VectorContainer> buildSideinputContainer = new ArrayList<>(5);
+ buildSideinputContainer.add(emptyInputRowSetRight.container());
+ buildSideinputContainer.add(nonEmptyInputRowSetRight.container());
+ RowSet.SingleRowSet secondInputRowSetRight = operatorFixture.rowSetBuilder(inputSchemaRight).addRow(456).build();
+ RowSet.SingleRowSet thirdInputRowSetRight = operatorFixture.rowSetBuilder(inputSchemaRight).addRow(789).build();
+ buildSideinputContainer.add(secondInputRowSetRight.container());
+ buildSideinputContainer.add(thirdInputRowSetRight.container());
+
+ final MockRecordBatch mockInputBatchRight = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, buildSideinputContainer, inputOutcomesRight, batchSchemaRight);
+ final MockRecordBatch mockInputBatchLeft = new MockRecordBatch(operatorFixture.getFragmentContext(), opContext, inputContainerLeft, inputOutcomesLeft, batchSchemaLeft);
+
+ List<JoinCondition> conditions = Lists.newArrayList();
+
+ conditions.add(new JoinCondition(SqlKind.EQUALS.toString(), FieldReference.getWithQuotedRef("leftcol"), FieldReference.getWithQuotedRef("rightcol")));
+
+ HashJoinPOP hjConf = new HashJoinPOP(null, null, conditions, JoinRelType.INNER);
+
+ HashJoinBatch hjBatch = new HashJoinBatch(hjConf, operatorFixture.getFragmentContext(), mockInputBatchLeft, mockInputBatchRight);
+
+ RecordBatch.IterOutcome gotOutcome = hjBatch.next();
+ assertTrue(gotOutcome == RecordBatch.IterOutcome.OK_NEW_SCHEMA);
+
+ gotOutcome = hjBatch.next();
+ assertTrue(gotOutcome == RecordBatch.IterOutcome.NONE);
+
+ secondInputRowSetRight.clear();
+ thirdInputRowSetRight.clear();
+ buildSideinputContainer.clear();
+ }
}