diff options
author | Ben-Zvi <bben-zvi@mapr.com> | 2018-04-27 21:59:25 -0700 |
---|---|---|
committer | Boaz Ben-Zvi <boaz@mapr.com> | 2018-05-17 14:57:50 -0700 |
commit | 89e0fe6b34259a2f51a7c45070935a2a2400eca4 (patch) | |
tree | 76285cbf52ce2de8b177d05397c0ea5700dca9fc /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java | |
parent | 349ac5ad431a6814f0a065bf16dee635cc6d3274 (diff) |
DRILL-6027:
- Added fallback option for HashJoin.
- No copy of incoming for single partition, and avoid HT resize.
- Fix memory leak when cancelling while spill file is read
- get correct schema when probe side is empty
- Re-create the HashJoinProbe
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java | 468 |
1 files changed, 138 insertions, 330 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 0c46e36e8..ee7a8a38c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -37,8 +37,10 @@ import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.Types; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.memory.BaseAllocator; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.ops.FragmentContext; @@ -66,7 +68,24 @@ import org.apache.drill.exec.vector.complex.AbstractContainerVector; import org.apache.calcite.rel.core.JoinRelType; /** + * This class implements the runtime execution for the Hash-Join operator + * supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins * + * This implementation splits the incoming Build side rows into multiple Partitions, thus allowing spilling of + * some of these partitions to disk if memory gets tight. Each partition is implemented as a {@link HashPartition}. + * After the build phase is over, in the most general case, some of the partitions were spilled, and the others + * are in memory. Each of the partitions in memory would get a {@link HashTable} built. + * Next the Probe side is read, and each row is key matched with a Build partition. If that partition is in + * memory, then the key is used to probe and perform the join, and the results are added to the outgoing batch. + * But if that build side partition was spilled, then the matching Probe size partition is spilled as well. + * After all the Probe side was processed, we are left with pairs of spilled partitions. Then each pair is + * processed individually (that Build partition should be smaller than the original, hence likely fit whole into + * memory to allow probing; if not -- see below). + * Processing of each spilled pair is EXACTLY like processing the original Build/Probe incomings. (As a fact, + * the {@Link #innerNext() innerNext} method calls itself recursively !!). Thus the spilled build partition is + * read and divided into new partitions, which in turn may spill again (and again...). + * The code tracks these spilling "cycles". Normally any such "again" (i.e. cycle of 2 or greater) is a waste, + * indicating that the number of partitions chosen was too small. */ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { protected static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashJoinBatch.class); @@ -86,6 +105,10 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { // Join conditions private final List<JoinCondition> conditions; + + // Runtime generated class implementing HashJoinProbe interface + private HashJoinProbe hashJoinProbe = null; + private final List<NamedExpression> rightExpr; /** @@ -120,6 +143,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { // Schema of the build side private BatchSchema rightSchema; + // Schema of the probe side + private BatchSchema probeSchema; + private int rightHVColPosition; private BufferAllocator allocator; @@ -131,15 +157,15 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { private SpillSet spillSet; HashJoinPOP popConfig; - private int cycleNum = 0; // primary, secondary, tertiary, etc. + private int cycleNum = 0; // 1-primary, 2-secondary, 3-tertiary, etc. private int originalPartition = -1; // the partition a secondary reads from - IntVector read_HV_vector; // HV vector that was read from the spilled batch + IntVector read_right_HV_vector; // HV vector that was read from the spilled batch private int maxBatchesInMemory; /** * This holds information about the spilled partitions for the build and probe side. */ - private static class HJSpilledPartition { + public static class HJSpilledPartition { public int innerSpilledBatches; public String innerSpillFile; public int outerSpilledBatches; @@ -189,6 +215,12 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { setupHashTable(); } setupOutputContainerSchema(); + try { + hashJoinProbe = setupHashJoinProbe(); + } catch (IOException | ClassTransformationException e) { + throw new SchemaChangeException(e); + } + // Build the container schema and set the counts for (final VectorWrapper<?> w : container) { w.getValueVector().allocateNew(); @@ -212,7 +244,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { return false; } - if (checkForEarlyFinish()) { + if (checkForEarlyFinish(leftUpstream, rightUpstream)) { state = BatchState.DONE; return false; } @@ -234,11 +266,16 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { switch (outcome) { case OK_NEW_SCHEMA: - // We need to have the schema of the build side even when the build side is empty - rightSchema = buildBatch.getSchema(); - // position of the new "column" for keeping the hash values (after the real columns) - rightHVColPosition = buildBatch.getContainer().getNumberOfColumns(); - // new schema can also contain records + if ( inputIndex == 0 ) { + // Indicate that a schema was seen (in case probe side is empty) + probeSchema = probeBatch.getSchema(); + } else { + // We need to have the schema of the build side even when the build side is empty + rightSchema = buildBatch.getSchema(); + // position of the new "column" for keeping the hash values (after the real columns) + rightHVColPosition = buildBatch.getContainer().getNumberOfColumns(); + // new schema can also contain records + } case OK: if (recordBatch.getRecordCount() == 0) { continue; @@ -265,12 +302,19 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { return new HashJoinMemoryCalculatorImpl(safetyFactor, fragmentationFactor, hashTableDoublingFactor, hashTableCalculatorType); } else { - return new MechanicalHashJoinMemoryCalculator(maxBatchesInMemory); + return new HashJoinMechanicalMemoryCalculator(maxBatchesInMemory); } } @Override public IterOutcome innerNext() { + // In case incoming was killed before, just cleanup and return + if ( wasKilled ) { + this.cleanup(); + super.close(); + return IterOutcome.NONE; + } + try { /* If we are here for the first time, execute the build phase of the * hash join and setup the run time generated class for the probe side @@ -280,19 +324,18 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { executeBuildPhase(); // Update the hash table related stats for the operator updateStats(); - // - setupProbe(); + // Initialize various settings for the probe side + hashJoinProbe.setupHashJoinProbe(probeBatch, this, joinType, leftUpstream, partitions, cycleNum, container, spilledInners, buildSideIsEmpty, numPartitions, rightHVColPosition); } - // Store the number of records projected - + // Try to probe and project, or recursively handle a spilled partition if ( ! buildSideIsEmpty || // If there are build-side rows joinType != JoinRelType.INNER) { // or if this is a left/full outer join // Allocate the memory for the vectors in the output container allocateVectors(); - outputRecords = probeAndProject(); + outputRecords = hashJoinProbe.probeAndProject(); /* We are here because of one the following * 1. Completed processing of all the records and we are done @@ -314,13 +357,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { // Free all partitions' in-memory data structures // (In case need to start processing spilled partitions) for ( HashPartition partn : partitions ) { - partn.close(); + partn.cleanup(false); // clean, but do not delete the spill files !! } // // (recursively) Handle the spilled partitions, if any // - if ( !buildSideIsEmpty && !wasKilled && !spilledPartitionsList.isEmpty()) { + if ( !buildSideIsEmpty && !spilledPartitionsList.isEmpty()) { // Get the next (previously) spilled partition to handle as incoming HJSpilledPartition currSp = spilledPartitionsList.remove(0); @@ -337,7 +380,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { } else { probeBatch = left; // if no outer batch then reuse left - needed for updateIncoming() leftUpstream = IterOutcome.NONE; - changeToFinalProbeState(); + hashJoinProbe.changeToFinalProbeState(); } // update the cycle num if needed @@ -356,12 +399,15 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { this.cleanup(); throw UserException .unsupportedError() - .message("Hash-Join can not partition inner data any further (too many join-key duplicates? - try merge-join)\n" + .message("Hash-Join can not partition the inner data any further (probably due to too many join-key duplicates)\n" + "On cycle num %d mem available %d num partitions %d", cycleNum, allocator.getLimit(), numPartitions) .build(logger); } } - logger.debug("Start reading spilled partition {} (prev {}) from cycle {} (with {}-{} batches). More {} spilled partitions left.", currSp.origPartn, currSp.prevOrigPartn, currSp.cycleNum, currSp.outerSpilledBatches, currSp.innerSpilledBatches, spilledPartitionsList.size()); + logger.debug("Start reading spilled partition {} (prev {}) from cycle {} (with {}-{} batches)." + + " More {} spilled partitions left.", + currSp.origPartn, currSp.prevOrigPartn, currSp.cycleNum, currSp.outerSpilledBatches, + currSp.innerSpilledBatches, spilledPartitionsList.size()); state = BatchState.FIRST; // TODO need to determine if this is still necessary since prefetchFirstBatchFromBothSides sets this @@ -400,12 +446,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { private void setupHashTable() throws SchemaChangeException { final List<Comparator> comparators = Lists.newArrayListWithExpectedSize(conditions.size()); - // When DRILL supports Java 8, use the following instead of the for() loop - // conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond))); - for (int i=0; i<conditions.size(); i++) { - JoinCondition cond = conditions.get(i); - comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond)); - } + conditions.forEach(cond->comparators.add(JoinUtils.checkAndReturnSupportedJoinComparator(cond))); // Setup the hash table configuration object List<NamedExpression> leftExpr = new ArrayList<>(conditions.size()); @@ -441,16 +482,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { // // Find out the estimated max batch size, etc // and compute the max numPartitions possible + // See partitionNumTuning() // - // numPartitions = 8; // just for initial work; change later - // partitionMask = 7; - // bitsInMask = 3; - - // SET FROM CONFIGURATION OPTIONS : - // ================================ - // Set the number of partitions from the configuration (raise to a power of two, if needed) - // Based on the number of partitions: Set the mask and bit count partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5 @@ -471,7 +505,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { // Recreate the partitions every time build is initialized for (int part = 0; part < numPartitions; part++ ) { partitions[part] = new HashPartition(context, allocator, baseHashTable, buildBatch, probeBatch, - RECORDS_PER_BATCH, spillSet, part, cycleNum); + RECORDS_PER_BATCH, spillSet, part, cycleNum, numPartitions); } spilledInners = new HJSpilledPartition[numPartitions]; @@ -526,15 +560,38 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { TARGET_RECORDS_PER_BATCH, HashTable.DEFAULT_LOAD_FACTOR); - numPartitions = 1; // We are only using one partition - canSpill = false; // We cannot spill - allocator.setLimit(AbstractBase.MAX_ALLOCATION); // Violate framework and force unbounded memory + disableSpilling(null); } return buildCalc; } /** + * Disable spilling - use only a single partition and set the memory limit to the max ( 10GB ) + * @param reason If not null - log this as warning, else check fallback setting to either warn or fail. + */ + private void disableSpilling(String reason) { + // Fail, or just issue a warning if a reason was given, or a fallback option is enabled + if ( reason == null ) { + final boolean fallbackEnabled = context.getOptions().getOption(ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY).bool_val; + if (fallbackEnabled) { + logger.warn("Spilling is disabled - not enough memory available for internal partitioning. Falling back" + + " to use unbounded memory"); + } else { + throw UserException.resourceError().message(String.format("Not enough memory for internal partitioning and fallback mechanism for " + + "HashJoin to use unbounded memory is disabled. Either enable fallback config %s using Alter " + + "session/system command or increase memory limit for Drillbit", ExecConstants.HASHJOIN_FALLBACK_ENABLED_KEY)).build(logger); + } + } else { + logger.warn(reason); + } + + numPartitions = 1; // We are only using one partition + canSpill = false; // We cannot spill + allocator.setLimit(AbstractBase.MAX_ALLOCATION); // Violate framework and force unbounded memory + } + + /** * Execute the BUILD phase; first read incoming and split rows into partitions; * may decide to spill some of the partitions * @@ -560,7 +617,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { calc.initialize(doMemoryCalculation); buildCalc = calc.next(); - // We've sniffed first non empty build and probe batches so we have enough information to createa calculator + // We've sniffed first non empty build and probe batches so we have enough information to create a calculator buildCalc.initialize(firstCycle, true, // TODO Fix after growing hash values bug fixed buildBatch, probeBatch, @@ -608,25 +665,30 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { for (HashPartition partn : partitions) { partn.updateBatches(); } // Fall through case OK: + // Special treatment (when no spill, and single partition) -- use the incoming vectors as they are (no row copy) + if ( numPartitions == 1 ) { + partitions[0].appendBatch(buildBatch); + break; + } final int currentRecordCount = buildBatch.getRecordCount(); if ( cycleNum > 0 ) { - read_HV_vector = (IntVector) buildBatch.getContainer().getLast(); + read_right_HV_vector = (IntVector) buildBatch.getContainer().getLast(); } // For every record in the build batch, hash the key columns and keep the result for (int ind = 0; ind < currentRecordCount; ind++) { int hashCode = ( cycleNum == 0 ) ? partitions[0].getBuildHashCode(ind) - : read_HV_vector.getAccessor().get(ind); // get the hash value from the HV column + : read_right_HV_vector.getAccessor().get(ind); // get the hash value from the HV column int currPart = hashCode & partitionMask ; hashCode >>>= bitsInMask; // Append the new inner row to the appropriate partition; spill (that partition) if needed partitions[currPart].appendInnerRow(buildBatch.getContainer(), ind, hashCode, buildCalc); // may spill if needed } - if ( read_HV_vector != null ) { - read_HV_vector.clear(); - read_HV_vector = null; + if ( read_right_HV_vector != null ) { + read_right_HV_vector.clear(); + read_right_HV_vector = null; } break; } @@ -637,8 +699,10 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { // Move the remaining current batches into their temp lists, or spill // them if the partition is spilled. Add the spilled partitions into // the spilled partitions list - for (HashPartition partn : partitions) { - partn.completeAnInnerBatch(false, partn.isSpilled() ); + if ( numPartitions > 1 ) { // a single partition needs no completion + for (HashPartition partn : partitions) { + partn.completeAnInnerBatch(false, partn.isSpilled()); + } } HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = buildCalc.next(); @@ -715,7 +779,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { } } - if (leftUpstream == IterOutcome.OK || leftUpstream == IterOutcome.OK_NEW_SCHEMA) { + if (probeSchema != null) { // a probe schema was seen (even though the probe may had no rows) for (final VectorWrapper<?> vv : probeBatch) { final MajorType inputType = vv.getField().getType(); final MajorType outputType; @@ -761,6 +825,15 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { return spilledInners[part] != null; } + /** + * The constructor + * + * @param popConfig + * @param context + * @param left -- probe/outer side incoming input + * @param right -- build/iner side incoming input + * @throws OutOfMemoryException + */ public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch left, /*Probe side record batch*/ RecordBatch right /*Build side record batch*/ @@ -783,16 +856,15 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { rightExpr.add(new NamedExpression(conditions.get(i).getRight(), new FieldReference(refName))); } + this.allocator = oContext.getAllocator(); + numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR); if ( numPartitions == 1 ) { // - canSpill = false; - logger.warn("Spilling is disabled due to configuration setting of num_partitions to 1"); + disableSpilling("Spilling is disabled due to configuration setting of num_partitions to 1"); } numPartitions = BaseAllocator.nextPowerOfTwo(numPartitions); // in case not a power of 2 - this.allocator = oContext.getAllocator(); - final long memLimit = context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_MEMORY_VALIDATOR); if (memLimit != 0) { @@ -802,6 +874,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { RECORDS_PER_BATCH = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_ROWS_IN_BATCH_VALIDATOR); maxBatchesInMemory = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_MAX_BATCHES_IN_MEMORY_VALIDATOR); + logger.info("Memory limit {} bytes", FileUtils.byteCountToDisplaySize(allocator.getLimit())); spillSet = new SpillSet(context, popConfig); // Create empty partitions (in the ctor - covers the case where right side is empty) @@ -818,10 +891,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0)); } - // clean (and deallocate) each partition + // clean (and deallocate) each partition, and delete its spill file for (HashPartition partn : partitions) { - partn.clearHashTableAndHelper(); - partn.closeWriterAndDeleteFile(); + partn.close(); } // delete any spill file left in unread spilled partitions @@ -896,288 +968,24 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { @Override public void close() { - for ( HashPartition partn : partitions ) { - partn.close(); + if ( cycleNum > 0 ) { // spilling happened + // In case closing due to cancellation, BaseRootExec.close() does not close the open + // SpilledRecordBatch "scanners" as it only knows about the original left/right ops. + killIncoming(false); } - cleanup(); + this.cleanup(); super.close(); } - // ============================================================== - // - // Methods used for the probe - // - // ============================================================ - private BatchSchema probeSchema; - - enum ProbeState { - PROBE_PROJECT, PROJECT_RIGHT, DONE - } - - private int currRightPartition = 0; // for returning RIGHT/FULL - - // Number of records to process on the probe side - private int recordsToProcess = 0; - - // Number of records processed on the probe side - private int recordsProcessed = 0; - - // Indicate if we should drain the next record from the probe side - private boolean getNextRecord = true; - - // Contains both batch idx and record idx of the matching record in the build side - private int currentCompositeIdx = -1; - - // Current state the hash join algorithm is in - private ProbeState probeState = ProbeState.PROBE_PROJECT; - - // For outer or right joins, this is a list of unmatched records that needs to be projected - private List<Integer> unmatchedBuildIndexes = null; + public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException { + final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getOptions()); + cg.plainJavaCapable(true); + // cg.saveCodeForDebugging(true); - // While probing duplicates, retain current build-side partition in case need to continue - // probing later on the same chain of duplicates - private HashPartition currPartition; + // No real code generation !! - /** - * Various initialization needed to perform the probe - * Must be called AFTER the build completes - */ - private void setupProbe() { - currRightPartition = 0; // In case it's a Right/Full outer join - recordsProcessed = 0; - recordsToProcess = 0; - - probeSchema = probeBatch.getSchema(); - probeState = ProbeState.PROBE_PROJECT; - - // A special case - if the left was an empty file - if ( leftUpstream == IterOutcome.NONE ){ - changeToFinalProbeState(); - } else { - this.recordsToProcess = probeBatch.getRecordCount(); - } - - // for those outer partitions that need spilling (cause their matching inners spilled) - // initialize those partitions' current batches and hash-value vectors - for ( HashPartition partn : partitions ) { - partn.allocateNewCurrentBatchAndHV(); - } - - if ( cycleNum > 0 ) { - if ( read_HV_vector != null ) { read_HV_vector.clear();} - if ( leftUpstream != IterOutcome.NONE ) { // Skip when outer spill was empty - read_HV_vector = (IntVector) probeBatch.getContainer().getLast(); - } - } + final HashJoinProbe hj = context.getImplementationClass(cg); + return hj; } - private void executeProjectRightPhase(int currBuildPart) { - while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < recordsToProcess) { - outputRecords = - container.appendRow(partitions[currBuildPart].getContainers(), unmatchedBuildIndexes.get(recordsProcessed), - null /* no probeBatch */, 0 /* no probe index */ ); - recordsProcessed++; - } - } - - private void executeProbePhase() throws SchemaChangeException { - - while (outputRecords < TARGET_RECORDS_PER_BATCH && probeState != ProbeState.DONE && probeState != ProbeState.PROJECT_RIGHT) { - - // Check if we have processed all records in this batch we need to invoke next - if (recordsProcessed == recordsToProcess) { - - // Done processing all records in the previous batch, clean up! - for (VectorWrapper<?> wrapper : probeBatch) { - wrapper.getValueVector().clear(); - } - - IterOutcome leftUpstream = next(HashJoinHelper.LEFT_INPUT, probeBatch); - - switch (leftUpstream) { - case NONE: - case NOT_YET: - case STOP: - recordsProcessed = 0; - recordsToProcess = 0; - changeToFinalProbeState(); - // in case some outer partitions were spilled, need to spill their last batches - for ( HashPartition partn : partitions ) { - if ( ! partn.isSpilled() ) { continue; } // skip non-spilled - partn.completeAnOuterBatch(false); - // update the partition's spill record with the outer side - HJSpilledPartition sp = spilledInners[partn.getPartitionNum()]; - sp.outerSpillFile = partn.getSpillFile(); - sp.outerSpilledBatches = partn.getPartitionBatchesCount(); - - partn.closeWriter(); - } - - continue; - - case OK_NEW_SCHEMA: - if (probeBatch.getSchema().equals(probeSchema)) { - for ( HashPartition partn : partitions ) { partn.updateBatches(); } - - } else { - throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in probe side.", - probeSchema, - probeBatch.getSchema()); - } - case OK: - recordsToProcess = probeBatch.getRecordCount(); - recordsProcessed = 0; - // If we received an empty batch do nothing - if (recordsToProcess == 0) { - continue; - } - if ( cycleNum > 0 ) { - read_HV_vector = (IntVector) probeBatch.getContainer().getLast(); // Needed ? - } - } - } - int probeIndex = -1; - // Check if we need to drain the next row in the probe side - if (getNextRecord) { - - if ( !buildSideIsEmpty ) { - int hashCode = ( cycleNum == 0 ) ? - partitions[0].getProbeHashCode(recordsProcessed) - : read_HV_vector.getAccessor().get(recordsProcessed); - int currBuildPart = hashCode & partitionMask ; - hashCode >>>= bitsInMask; - - // Set and keep the current partition (may be used again on subsequent probe calls as - // inner rows of duplicate key are processed) - currPartition = partitions[currBuildPart]; // inner if not spilled, else outer - - // If the matching inner partition was spilled - if ( isSpilledInner(currBuildPart) ) { - // add this row to its outer partition (may cause a spill, when the batch is full) - - currPartition.appendOuterRow(hashCode, recordsProcessed); - - recordsProcessed++; // done with this outer record - continue; // on to the next outer record - } - - probeIndex = currPartition.probeForKey(recordsProcessed, hashCode); - - } - - if (probeIndex != -1) { - - /* The current probe record has a key that matches. Get the index - * of the first row in the build side that matches the current key - * (and record this match in the bitmap, in case of a FULL/RIGHT join) - */ - currentCompositeIdx = currPartition.getStartIndex(probeIndex); - - outputRecords = - container.appendRow(currPartition.getContainers(), currentCompositeIdx, - probeBatch.getContainer(), recordsProcessed); - - /* Projected single row from the build side with matching key but there - * may be more rows with the same key. Check if that's the case - */ - currentCompositeIdx = currPartition.getNextIndex(currentCompositeIdx); - if (currentCompositeIdx == -1) { - /* We only had one row in the build side that matched the current key - * from the probe side. Drain the next row in the probe side. - */ - recordsProcessed++; - } else { - /* There is more than one row with the same key on the build side - * don't drain more records from the probe side till we have projected - * all the rows with this key - */ - getNextRecord = false; - } - } else { // No matching key - - // If we have a left outer join, project the outer side - if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) { - - outputRecords = - container.appendOuterRow(probeBatch.getContainer(), recordsProcessed, rightHVColPosition); - } - recordsProcessed++; - } - } - else { // match the next inner row with the same key - - currPartition.setRecordMatched(currentCompositeIdx); - - outputRecords = - container.appendRow(currPartition.getContainers(), currentCompositeIdx, - probeBatch.getContainer(), recordsProcessed); - - currentCompositeIdx = currPartition.getNextIndex(currentCompositeIdx); - - if (currentCompositeIdx == -1) { - // We don't have any more rows matching the current key on the build side, move on to the next probe row - getNextRecord = true; - recordsProcessed++; - } - } - } - } - - /** - * Perform the probe and project the results - * - * @return number of output records - * @throws SchemaChangeException - */ - private int probeAndProject() throws SchemaChangeException { - - outputRecords = 0; - - // When handling spilled partitions, the state becomes DONE at the end of each partition - if ( probeState == ProbeState.DONE ) { - return outputRecords; // that is zero - } - - if (probeState == ProbeState.PROBE_PROJECT) { - executeProbePhase(); - } - - if (probeState == ProbeState.PROJECT_RIGHT) { - // Inner probe is done; now we are here because we still have a RIGHT OUTER (or a FULL) join - - do { - - if (unmatchedBuildIndexes == null) { // first time for this partition ? - if ( buildSideIsEmpty ) { return outputRecords; } // in case of an empty right - // Get this partition's list of build indexes that didn't match any record on the probe side - unmatchedBuildIndexes = partitions[currRightPartition].getNextUnmatchedIndex(); - recordsProcessed = 0; - recordsToProcess = unmatchedBuildIndexes.size(); - } - - // Project the list of unmatched records on the build side - executeProjectRightPhase(currRightPartition); - - if ( recordsProcessed < recordsToProcess ) { // more records in this partition? - return outputRecords; // outgoing is full; report and come back later - } else { - currRightPartition++; // on to the next right partition - unmatchedBuildIndexes = null; - } - - } while ( currRightPartition < numPartitions ); - - probeState = ProbeState.DONE; // last right partition was handled; we are done now - } - - return outputRecords; - } - - private void changeToFinalProbeState() { - // We are done with the (left) probe phase. - // If it's a RIGHT or a FULL join then need to get the unmatched indexes from the build side - probeState = - (joinType == JoinRelType.RIGHT || joinType == JoinRelType.FULL) ? ProbeState.PROJECT_RIGHT : - ProbeState.DONE; // else we're done - } } |