aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
diff options
context:
space:
mode:
authorBen-Zvi <bben-zvi@mapr.com>2018-04-27 21:59:25 -0700
committerBoaz Ben-Zvi <boaz@mapr.com>2018-05-17 14:57:50 -0700
commit89e0fe6b34259a2f51a7c45070935a2a2400eca4 (patch)
tree76285cbf52ce2de8b177d05397c0ea5700dca9fc /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
parent349ac5ad431a6814f0a065bf16dee635cc6d3274 (diff)
DRILL-6027:
- Added fallback option for HashJoin. - No copy of incoming for single partition, and avoid HT resize. - Fix memory leak when cancelling while spill file is read - get correct schema when probe side is empty - Re-create the HashJoinProbe
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java468
1 files changed, 138 insertions, 330 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 0c46e36e8..ee7a8a38c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -37,8 +37,10 @@ import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.memory.BaseAllocator;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.ops.FragmentContext;
@@ -66,7 +68,24 @@ import org.apache.drill.exec.vector.complex.AbstractContainerVector;
import org.apache.calcite.rel.core.JoinRelType;
/**
+ * This class implements the runtime execution for the Hash-Join operator
+ * supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
*
+ * This implementation splits the incoming Build side rows into multiple Partitions, thus allowing spilling of
+ * some of these partitions to disk if memory gets tight. Each partition is implemented as a {@link HashPartition}.
+ * After the build phase is over, in the most general case, some of the partitions were spilled, and the others
+ * are in memory. Each of the partitions in memory would get a {@link HashTable} built.
+ * Next the Probe side is read, and each row is key matched with a Build partition. If that partition is in
+ * memory, then the key is used to probe and perform the join, and the results are added to the outgoing batch.
+ * But if that build side partition was spilled, then the matching Probe size partition is spilled as well.
+ * After all the Probe side was processed, we are left with pairs of spilled partitions. Then each pair is
+ * processed individually (that Build partition should be smaller than the original, hence likely fit whole into
+ * memory to allow probing; if not -- see below).
+ * Processing of each spilled pair is EXACTLY like processing the original Build/Probe incomings. (As a fact,
+ * the {@Link #innerNext() innerNext} method calls itself recursively !!). Thus the spilled build partition is
+ * read and divided into new partitions, which in turn may spill again (and again...).
+ * The code tracks these spilling "cycles". Normally any such "again" (i.e. cycle of 2 or greater) is a waste,
+ * indicating that the number of partitions chosen was too small.
*/
public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashJoinBatch.class);
@@ -86,6 +105,10 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
// Join conditions
private final List<JoinCondition> conditions;
+
+ // Runtime generated class implementing HashJoinProbe interface
+ private HashJoinProbe hashJoinProbe = null;
+
private final List<NamedExpression> rightExpr;
/**
@@ -120,6 +143,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
// Schema of the build side
private BatchSchema rightSchema;
+ // Schema of the probe side
+ private BatchSchema probeSchema;
+
private int rightHVColPosition;
private BufferAllocator allocator;
@@ -131,15 +157,15 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
private SpillSet spillSet;
HashJoinPOP popConfig;
- private int cycleNum = 0; // primary, secondary, tertiary, etc.
+ private int cycleNum = 0; // 1-primary, 2-secondary, 3-tertiary, etc.
private int originalPartition = -1; // the partition a secondary reads from
- IntVector read_HV_vector; // HV vector that was read from the spilled batch
+ IntVector read_right_HV_vector; // HV vector that was read from the spilled batch
private int maxBatchesInMemory;
/**
* This holds information about the spilled partitions for the build and probe side.
*/
- private static class HJSpilledPartition {
+ public static class HJSpilledPartition {
public int innerSpilledBatches;
public String innerSpillFile;
public int outerSpilledBatches;
@@ -189,6 +215,12 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
setupHashTable();
}
setupOutputContainerSchema();
+ try {
+ hashJoinProbe = setupHashJoinProbe();
+ } catch (IOException | ClassTransformationException e) {
+ throw new SchemaChangeException(e);
+ }
+
// Build the container schema and set the counts
for (final VectorWrapper<?> w : container) {
w.getValueVector().allocateNew();
@@ -212,7 +244,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
return false;
}
- if (checkForEarlyFinish()) {
+ if (checkForEarlyFinish(leftUpstream, rightUpstream)) {
state = BatchState.DONE;
return false;
}
@@ -234,11 +266,16 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
switch (outcome) {
case OK_NEW_SCHEMA:
- // We need to have the schema of the build side even when the build side is empty
- rightSchema = buildBatch.getSchema();
- // position of the new "column" for keeping the hash values (after the real columns)
- rightHVColPosition = buildBatch.getContainer().getNumberOfColumns();
- // new schema can also contain records
+ if ( inputIndex == 0 ) {
+ // Indicate that a schema was seen (in case probe side is empty)
+ probeSchema = probeBatch.getSchema();
+ } else {
+ // We need to have the schema of the build side even when the build side is empty
+ rightSchema = buildBatch.getSchema();
+ // position of the new "column" for keeping the hash values (after the real columns)
+ rightHVColPosition = buildBatch.getContainer().getNumberOfColumns();
+ // new schema can also contain records
+ }
case OK:
if (recordBatch.getRecordCount() == 0) {
continue;
@@ -265,12 +302,19 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
return new HashJoinMemoryCalculatorImpl(safetyFactor, fragmentationFactor, hashTableDoublingFactor, hashTableCalculatorType);
} else {
- return new MechanicalHashJoinMemoryCalculator(maxBatchesInMemory);
+ return new HashJoinMechanicalMemoryCalculator(maxBatchesInMemory);
}
}
@Override
public IterOutcome innerNext() {
+ // In case incoming was killed before, just cleanup and return
+ if ( wasKilled ) {
+ this.cleanup();
+ super.close();
+ return IterOutcome.NONE;
+ }
+
try {
/* If we are here for the first time, execute the build phase of the
* hash join and setup the run time generated class for the probe side
@@ -280,19 +324,18 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
executeBuildPhase();
// Update the hash table related stats for the operator
updateStats();
- //
- setupProbe();
+ // Initialize various settings for the probe side
+ hashJoinProbe.setupHashJoinProbe(probeBatch, this, joinType, leftUpstream, partitions, cycleNum, container, spilledInners, buildSideIsEmpty, numPartitions, rightHVColPosition);
}
- // Store the number of records projected
-
+ // Try to probe and project, or recursively handle a spilled partition
if ( ! buildSideIsEmpty || // If there are build-side rows
joinType != JoinRelType.INNER) { // or if this is a left/full outer join
// Allocate the memory for the vectors in the output container
allocateVectors();
- outputRecords = probeAndProject();
+ outputRecords = hashJoinProbe.probeAndProject();
/* We are here because of one the following
* 1. Completed processing of all the records and we are done
@@ -314,13 +357,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
// Free all partitions' in-memory data structures
// (In case need to start processing spilled partitions)
for ( HashPartition partn : partitions ) {
- partn.close();
+ partn.cleanup(false); // clean, but do not delete the spill files !!
}
//
// (recursively) Handle the spilled partitions, if any
//
- if ( !buildSideIsEmpty && !wasKilled && !spilledPartitionsList.isEmpty()) {
+ if ( !buildSideIsEmpty && !spilledPartitionsList.isEmpty()) {
// Get the next (previously) spilled partition to handle as incoming
HJSpilledPartition currSp = spilledPartitionsList.remove(0);
@@ -337,7 +380,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
} else {
probeBatch = left; // if no outer batch then reuse left - needed for updateIncoming()
leftUpstream = IterOutcome.NONE;
- changeToFinalProbeState();
+ hashJoinProbe.changeToFinalProbeState();
}
// update the cycle num if needed
@@ -356,12 +399,15 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
this.cleanup();
throw UserException
.unsupportedError()
- .message("Hash-Join can not partition inner data any further (too many join-key duplicates? - try merge-join)\n"
+ .message("Hash-Join can not partition the inner data any further (probably due to too many join-key duplicates)\n"
+ "On cycle num %d mem available %d num partitions %d", cycleNum, allocator.getLimit(), numPartitions)
.build(logger);
}
}
- logger.debug("Start reading spilled partition {} (prev {}) from cycle {} (with {}-{} batches). More {} spilled partitions left.", currSp.origPartn, currSp.prevOrigPartn, currSp.cycleNum, currSp.outerSpilledBatches, currSp.innerSpilledBatches, spilledPartitionsList.size());
+ logger.debug("Start reading spilled partition {} (prev {}) from cycle {} (with {}-{} batches)." +
+ " More {} spilled partitions left.",
+ currSp.origPartn, currSp.prevOrigPartn, currSp.cycleNum, currSp.outerSpilledBatches,
+ currSp.innerSpilledBatches, spilledPartitionsList.size());
state = BatchState.FIRST; // TODO need to determine if this is still necessary since prefetchFirstBatchFromBothSides sets this
@@ -400,12 +446,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
private void setupHashTable() throws SchemaChangeException {
final List<Comparator> comparators = Lists.newArrayListWithExpectedSize(conditions.size());
- // When DRILL supports Java 8, use the following instead of the for() loop
- // conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
- for (int i=0; i<conditions.size(); i++) {
- JoinCondition cond = conditions.get(i);
- comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond));
- }
+ conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)));
// Setup the hash table configuration object
List<NamedExpression> leftExpr = new ArrayList<>(conditions.size());
@@ -441,16 +482,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
//
// Find out the estimated max batch size, etc
// and compute the max numPartitions possible
+ // See partitionNumTuning()
//
- // numPartitions = 8; // just for initial work; change later
- // partitionMask = 7;
- // bitsInMask = 3;
-
- // SET FROM CONFIGURATION OPTIONS :
- // ================================
- // Set the number of partitions from the configuration (raise to a power of two, if needed)
- // Based on the number of partitions: Set the mask and bit count
partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
@@ -471,7 +505,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
// Recreate the partitions every time build is initialized
for (int part = 0; part < numPartitions; part++ ) {
partitions[part] = new HashPartition(context, allocator, baseHashTable, buildBatch, probeBatch,
- RECORDS_PER_BATCH, spillSet, part, cycleNum);
+ RECORDS_PER_BATCH, spillSet, part, cycleNum, numPartitions);
}
spilledInners = new HJSpilledPartition[numPartitions];
@@ -526,15 +560,38 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
TARGET_RECORDS_PER_BATCH,
HashTable.DEFAULT_LOAD_FACTOR);
- numPartitions = 1; // We are only using one partition
- canSpill = false; // We cannot spill
- allocator.setLimit(AbstractBase.MAX_ALLOCATION); // Violate framework and force unbounded memory
+ disableSpilling(null);
}
return buildCalc;
}
/**
+ * Disable spilling - use only a single partition and set the memory limit to the max ( 10GB )
+ * @param reason If not null - log this as warning, else check fallback setting to either warn or fail.
+ */
+ private void disableSpilling(String reason) {
+ // Fail, or just issue a warning if a reason was given, or a fallback option is enabled
+ if ( reason == null ) {
+ final boolean fallbackEnabled = context.getOptions().getOption(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY).bool_val;
+ if (fallbackEnabled) {
+ logger.warn("Spilling is disabled - not enough memory available for internal partitioning. Falling back" +
+ " to use unbounded memory");
+ } else {
+ throw UserException.resourceError().message(String.format("Not enough memory for internal partitioning and fallback mechanism for " +
+ "HashJoin to use unbounded memory is disabled. Either enable fallback config %s using Alter " +
+ "session/system command or increase memory limit for Drillbit", ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY)).build(logger);
+ }
+ } else {
+ logger.warn(reason);
+ }
+
+ numPartitions = 1; // We are only using one partition
+ canSpill = false; // We cannot spill
+ allocator.setLimit(AbstractBase.MAX_ALLOCATION); // Violate framework and force unbounded memory
+ }
+
+ /**
* Execute the BUILD phase; first read incoming and split rows into partitions;
* may decide to spill some of the partitions
*
@@ -560,7 +617,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
calc.initialize(doMemoryCalculation);
buildCalc = calc.next();
- // We've sniffed first non empty build and probe batches so we have enough information to createa calculator
+ // We've sniffed first non empty build and probe batches so we have enough information to create a calculator
buildCalc.initialize(firstCycle, true, // TODO Fix after growing hash values bug fixed
buildBatch,
probeBatch,
@@ -608,25 +665,30 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
for (HashPartition partn : partitions) { partn.updateBatches(); }
// Fall through
case OK:
+ // Special treatment (when no spill, and single partition) -- use the incoming vectors as they are (no row copy)
+ if ( numPartitions == 1 ) {
+ partitions[0].appendBatch(buildBatch);
+ break;
+ }
final int currentRecordCount = buildBatch.getRecordCount();
if ( cycleNum > 0 ) {
- read_HV_vector = (IntVector) buildBatch.getContainer().getLast();
+ read_right_HV_vector = (IntVector) buildBatch.getContainer().getLast();
}
// For every record in the build batch, hash the key columns and keep the result
for (int ind = 0; ind < currentRecordCount; ind++) {
int hashCode = ( cycleNum == 0 ) ? partitions[0].getBuildHashCode(ind)
- : read_HV_vector.getAccessor().get(ind); // get the hash value from the HV column
+ : read_right_HV_vector.getAccessor().get(ind); // get the hash value from the HV column
int currPart = hashCode & partitionMask ;
hashCode >>>= bitsInMask;
// Append the new inner row to the appropriate partition; spill (that partition) if needed
partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, hashCode, buildCalc); // may spill if needed
}
- if ( read_HV_vector != null ) {
- read_HV_vector.clear();
- read_HV_vector = null;
+ if ( read_right_HV_vector != null ) {
+ read_right_HV_vector.clear();
+ read_right_HV_vector = null;
}
break;
}
@@ -637,8 +699,10 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
// Move the remaining current batches into their temp lists, or spill
// them if the partition is spilled. Add the spilled partitions into
// the spilled partitions list
- for (HashPartition partn : partitions) {
- partn.completeAnInnerBatch(false, partn.isSpilled() );
+ if ( numPartitions > 1 ) { // a single partition needs no completion
+ for (HashPartition partn : partitions) {
+ partn.completeAnInnerBatch(false, partn.isSpilled());
+ }
}
HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = buildCalc.next();
@@ -715,7 +779,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
}
}
- if (leftUpstream == IterOutcome.OK || leftUpstream == IterOutcome.OK_NEW_SCHEMA) {
+ if (probeSchema != null) { // a probe schema was seen (even though the probe may had no rows)
for (final VectorWrapper<?> vv : probeBatch) {
final MajorType inputType = vv.getField().getType();
final MajorType outputType;
@@ -761,6 +825,15 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
return spilledInners[part] != null;
}
+ /**
+ * The constructor
+ *
+ * @param popConfig
+ * @param context
+ * @param left -- probe/outer side incoming input
+ * @param right -- build/iner side incoming input
+ * @throws OutOfMemoryException
+ */
public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context,
RecordBatch left, /*Probe side record batch*/
RecordBatch right /*Build side record batch*/
@@ -783,16 +856,15 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
rightExpr.add(new NamedExpression(conditions.get(i).getRight(), new FieldReference(refName)));
}
+ this.allocator = oContext.getAllocator();
+
numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR);
if ( numPartitions == 1 ) { //
- canSpill = false;
- logger.warn("Spilling is disabled due to configuration setting of num_partitions to 1");
+ disableSpilling("Spilling is disabled due to configuration setting of num_partitions to 1");
}
numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2
- this.allocator = oContext.getAllocator();
-
final long memLimit = context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR);
if (memLimit != 0) {
@@ -802,6 +874,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
RECORDS_PER_BATCH = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR);
maxBatchesInMemory = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR);
+ logger.info("Memory limit {} bytes", FileUtils.byteCountToDisplaySize(allocator.getLimit()));
spillSet = new SpillSet(context, popConfig);
// Create empty partitions (in the ctor - covers the case where right side is empty)
@@ -818,10 +891,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
(int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
}
- // clean (and deallocate) each partition
+ // clean (and deallocate) each partition, and delete its spill file
for (HashPartition partn : partitions) {
- partn.clearHashTableAndHelper();
- partn.closeWriterAndDeleteFile();
+ partn.close();
}
// delete any spill file left in unread spilled partitions
@@ -896,288 +968,24 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
@Override
public void close() {
- for ( HashPartition partn : partitions ) {
- partn.close();
+ if ( cycleNum > 0 ) { // spilling happened
+ // In case closing due to cancellation, BaseRootExec.close() does not close the open
+ // SpilledRecordBatch "scanners" as it only knows about the original left/right ops.
+ killIncoming(false);
}
- cleanup();
+ this.cleanup();
super.close();
}
- // ==============================================================
- //
- // Methods used for the probe
- //
- // ============================================================
- private BatchSchema probeSchema;
-
- enum ProbeState {
- PROBE_PROJECT, PROJECT_RIGHT, DONE
- }
-
- private int currRightPartition = 0; // for returning RIGHT/FULL
-
- // Number of records to process on the probe side
- private int recordsToProcess = 0;
-
- // Number of records processed on the probe side
- private int recordsProcessed = 0;
-
- // Indicate if we should drain the next record from the probe side
- private boolean getNextRecord = true;
-
- // Contains both batch idx and record idx of the matching record in the build side
- private int currentCompositeIdx = -1;
-
- // Current state the hash join algorithm is in
- private ProbeState probeState = ProbeState.PROBE_PROJECT;
-
- // For outer or right joins, this is a list of unmatched records that needs to be projected
- private List<Integer> unmatchedBuildIndexes = null;
+ public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException {
+ final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getOptions());
+ cg.plainJavaCapable(true);
+ // cg.saveCodeForDebugging(true);
- // While probing duplicates, retain current build-side partition in case need to continue
- // probing later on the same chain of duplicates
- private HashPartition currPartition;
+ // No real code generation !!
- /**
- * Various initialization needed to perform the probe
- * Must be called AFTER the build completes
- */
- private void setupProbe() {
- currRightPartition = 0; // In case it's a Right/Full outer join
- recordsProcessed = 0;
- recordsToProcess = 0;
-
- probeSchema = probeBatch.getSchema();
- probeState = ProbeState.PROBE_PROJECT;
-
- // A special case - if the left was an empty file
- if ( leftUpstream == IterOutcome.NONE ){
- changeToFinalProbeState();
- } else {
- this.recordsToProcess = probeBatch.getRecordCount();
- }
-
- // for those outer partitions that need spilling (cause their matching inners spilled)
- // initialize those partitions' current batches and hash-value vectors
- for ( HashPartition partn : partitions ) {
- partn.allocateNewCurrentBatchAndHV();
- }
-
- if ( cycleNum > 0 ) {
- if ( read_HV_vector != null ) { read_HV_vector.clear();}
- if ( leftUpstream != IterOutcome.NONE ) { // Skip when outer spill was empty
- read_HV_vector = (IntVector) probeBatch.getContainer().getLast();
- }
- }
+ final HashJoinProbe hj = context.getImplementationClass(cg);
+ return hj;
}
- private void executeProjectRightPhase(int currBuildPart) {
- while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < recordsToProcess) {
- outputRecords =
- container.appendRow(partitions[currBuildPart].getContainers(), unmatchedBuildIndexes.get(recordsProcessed),
- null /* no probeBatch */, 0 /* no probe index */ );
- recordsProcessed++;
- }
- }
-
- private void executeProbePhase() throws SchemaChangeException {
-
- while (outputRecords < TARGET_RECORDS_PER_BATCH && probeState != ProbeState.DONE && probeState != ProbeState.PROJECT_RIGHT) {
-
- // Check if we have processed all records in this batch we need to invoke next
- if (recordsProcessed == recordsToProcess) {
-
- // Done processing all records in the previous batch, clean up!
- for (VectorWrapper<?> wrapper : probeBatch) {
- wrapper.getValueVector().clear();
- }
-
- IterOutcome leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch);
-
- switch (leftUpstream) {
- case NONE:
- case NOT_YET:
- case STOP:
- recordsProcessed = 0;
- recordsToProcess = 0;
- changeToFinalProbeState();
- // in case some outer partitions were spilled, need to spill their last batches
- for ( HashPartition partn : partitions ) {
- if ( ! partn.isSpilled() ) { continue; } // skip non-spilled
- partn.completeAnOuterBatch(false);
- // update the partition's spill record with the outer side
- HJSpilledPartition sp = spilledInners[partn.getPartitionNum()];
- sp.outerSpillFile = partn.getSpillFile();
- sp.outerSpilledBatches = partn.getPartitionBatchesCount();
-
- partn.closeWriter();
- }
-
- continue;
-
- case OK_NEW_SCHEMA:
- if (probeBatch.getSchema().equals(probeSchema)) {
- for ( HashPartition partn : partitions ) { partn.updateBatches(); }
-
- } else {
- throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in probe side.",
- probeSchema,
- probeBatch.getSchema());
- }
- case OK:
- recordsToProcess = probeBatch.getRecordCount();
- recordsProcessed = 0;
- // If we received an empty batch do nothing
- if (recordsToProcess == 0) {
- continue;
- }
- if ( cycleNum > 0 ) {
- read_HV_vector = (IntVector) probeBatch.getContainer().getLast(); // Needed ?
- }
- }
- }
- int probeIndex = -1;
- // Check if we need to drain the next row in the probe side
- if (getNextRecord) {
-
- if ( !buildSideIsEmpty ) {
- int hashCode = ( cycleNum == 0 ) ?
- partitions[0].getProbeHashCode(recordsProcessed)
- : read_HV_vector.getAccessor().get(recordsProcessed);
- int currBuildPart = hashCode & partitionMask ;
- hashCode >>>= bitsInMask;
-
- // Set and keep the current partition (may be used again on subsequent probe calls as
- // inner rows of duplicate key are processed)
- currPartition = partitions[currBuildPart]; // inner if not spilled, else outer
-
- // If the matching inner partition was spilled
- if ( isSpilledInner(currBuildPart) ) {
- // add this row to its outer partition (may cause a spill, when the batch is full)
-
- currPartition.appendOuterRow(hashCode, recordsProcessed);
-
- recordsProcessed++; // done with this outer record
- continue; // on to the next outer record
- }
-
- probeIndex = currPartition.probeForKey(recordsProcessed, hashCode);
-
- }
-
- if (probeIndex != -1) {
-
- /* The current probe record has a key that matches. Get the index
- * of the first row in the build side that matches the current key
- * (and record this match in the bitmap, in case of a FULL/RIGHT join)
- */
- currentCompositeIdx = currPartition.getStartIndex(probeIndex);
-
- outputRecords =
- container.appendRow(currPartition.getContainers(), currentCompositeIdx,
- probeBatch.getContainer(), recordsProcessed);
-
- /* Projected single row from the build side with matching key but there
- * may be more rows with the same key. Check if that's the case
- */
- currentCompositeIdx = currPartition.getNextIndex(currentCompositeIdx);
- if (currentCompositeIdx == -1) {
- /* We only had one row in the build side that matched the current key
- * from the probe side. Drain the next row in the probe side.
- */
- recordsProcessed++;
- } else {
- /* There is more than one row with the same key on the build side
- * don't drain more records from the probe side till we have projected
- * all the rows with this key
- */
- getNextRecord = false;
- }
- } else { // No matching key
-
- // If we have a left outer join, project the outer side
- if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
-
- outputRecords =
- container.appendOuterRow(probeBatch.getContainer(), recordsProcessed, rightHVColPosition);
- }
- recordsProcessed++;
- }
- }
- else { // match the next inner row with the same key
-
- currPartition.setRecordMatched(currentCompositeIdx);
-
- outputRecords =
- container.appendRow(currPartition.getContainers(), currentCompositeIdx,
- probeBatch.getContainer(), recordsProcessed);
-
- currentCompositeIdx = currPartition.getNextIndex(currentCompositeIdx);
-
- if (currentCompositeIdx == -1) {
- // We don't have any more rows matching the current key on the build side, move on to the next probe row
- getNextRecord = true;
- recordsProcessed++;
- }
- }
- }
- }
-
- /**
- * Perform the probe and project the results
- *
- * @return number of output records
- * @throws SchemaChangeException
- */
- private int probeAndProject() throws SchemaChangeException {
-
- outputRecords = 0;
-
- // When handling spilled partitions, the state becomes DONE at the end of each partition
- if ( probeState == ProbeState.DONE ) {
- return outputRecords; // that is zero
- }
-
- if (probeState == ProbeState.PROBE_PROJECT) {
- executeProbePhase();
- }
-
- if (probeState == ProbeState.PROJECT_RIGHT) {
- // Inner probe is done; now we are here because we still have a RIGHT OUTER (or a FULL) join
-
- do {
-
- if (unmatchedBuildIndexes == null) { // first time for this partition ?
- if ( buildSideIsEmpty ) { return outputRecords; } // in case of an empty right
- // Get this partition's list of build indexes that didn't match any record on the probe side
- unmatchedBuildIndexes = partitions[currRightPartition].getNextUnmatchedIndex();
- recordsProcessed = 0;
- recordsToProcess = unmatchedBuildIndexes.size();
- }
-
- // Project the list of unmatched records on the build side
- executeProjectRightPhase(currRightPartition);
-
- if ( recordsProcessed < recordsToProcess ) { // more records in this partition?
- return outputRecords; // outgoing is full; report and come back later
- } else {
- currRightPartition++; // on to the next right partition
- unmatchedBuildIndexes = null;
- }
-
- } while ( currRightPartition < numPartitions );
-
- probeState = ProbeState.DONE; // last right partition was handled; we are done now
- }
-
- return outputRecords;
- }
-
- private void changeToFinalProbeState() {
- // We are done with the (left) probe phase.
- // If it's a RIGHT or a FULL join then need to get the unmatched indexes from the build side
- probeState =
- (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) ? ProbeState.PROJECT_RIGHT :
- ProbeState.DONE; // else we're done
- }
}