diff options
author | Timothy Farkas <timothyfarkas@apache.org> | 2018-07-16 15:33:23 -0700 |
---|---|---|
committer | Vitalii Diravka <vitalii.diravka@gmail.com> | 2018-08-13 13:33:02 +0300 |
commit | 6ad0f9f1bab8bdda18f3eaaf29445bc94355156e (patch) | |
tree | e17e21895006a209f25d0994e06a7e0c15bd5b73 /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java | |
parent | 93a1c5aba6f17b93a52ab4367f625103a904f1ad (diff) |
DRILL-6453: Fix deadlock caused by reading from left and right inputs in HashJoin simultaneously.
closes #1408
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java | 358 |
1 files changed, 203 insertions, 155 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index b1ea96f05..0bd6fe624 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -27,6 +27,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.FieldReference; import org.apache.drill.common.expression.PathSegment; @@ -68,6 +69,9 @@ import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; import org.apache.calcite.rel.core.JoinRelType; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT; +import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA; + /** * This class implements the runtime execution for the Hash-Join operator * supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins @@ -114,6 +118,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { // Fields used for partitioning + private long maxIncomingBatchSize; /** * The number of {@link HashPartition}s. This is configured via a system option and set in {@link #partitionNumTuning(int, HashJoinMemoryCalculator.BuildSidePartitioning)}. */ @@ -125,7 +130,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { * The master class used to generate {@link HashTable}s. */ private ChainedHashTable baseHashTable; - private boolean buildSideIsEmpty = true; + private MutableBoolean buildSideIsEmpty = new MutableBoolean(false); + private MutableBoolean probeSideIsEmpty = new MutableBoolean(false); private boolean canSpill = true; private boolean wasKilled; // a kill was received, may need to clean spilled partns @@ -138,7 +144,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { private int outputRecords; // Schema of the build side - private BatchSchema rightSchema; + private BatchSchema buildSchema; // Schema of the probe side private BatchSchema probeSchema; @@ -150,9 +156,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { private RecordBatch probeBatch; /** - * Flag indicating whether or not the first data holding batch needs to be fetched. + * Flag indicating whether or not the first data holding build batch needs to be fetched. + */ + private MutableBoolean prefetchedBuild = new MutableBoolean(false); + /** + * Flag indicating whether or not the first data holding probe batch needs to be fetched. */ - private boolean prefetched; + private MutableBoolean prefetchedProbe = new MutableBoolean(false); // For handling spilling private SpillSet spillSet; @@ -220,123 +230,120 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { protected void buildSchema() throws SchemaChangeException { // We must first get the schemas from upstream operators before we can build // our schema. - boolean validSchema = sniffNewSchemas(); + boolean validSchema = prefetchFirstBatchFromBothSides(); if (validSchema) { // We are able to construct a valid schema from the upstream data. // Setting the state here makes sure AbstractRecordBatch returns OK_NEW_SCHEMA state = BatchState.BUILD_SCHEMA; - } else { - verifyOutcomeToSetBatchState(leftUpstream, rightUpstream); + + if (leftUpstream == OK_NEW_SCHEMA) { + probeSchema = left.getSchema(); + } + + if (rightUpstream == OK_NEW_SCHEMA) { + buildSchema = right.getSchema(); + // position of the new "column" for keeping the hash values (after the real columns) + rightHVColPosition = right.getContainer().getNumberOfColumns(); + // We only need the hash tables if we have data on the build side. + setupHashTable(); + } + + try { + hashJoinProbe = setupHashJoinProbe(); + } catch (IOException | ClassTransformationException e) { + throw new SchemaChangeException(e); + } } // If we have a valid schema, this will build a valid container. If we were unable to obtain a valid schema, - // we still need to build a dummy schema. These code handles both cases for us. + // we still need to build a dummy schema. This code handles both cases for us. setupOutputContainerSchema(); container.buildSchema(BatchSchema.SelectionVectorMode.NONE); - - // Initialize the hash join helper context - if (rightUpstream == IterOutcome.OK_NEW_SCHEMA) { - // We only need the hash tables if we have data on the build side. - setupHashTable(); - } - - try { - hashJoinProbe = setupHashJoinProbe(); - } catch (IOException | ClassTransformationException e) { - throw new SchemaChangeException(e); - } } - @Override - protected boolean prefetchFirstBatchFromBothSides() { - if (leftUpstream != IterOutcome.NONE) { - // We can only get data if there is data available - leftUpstream = sniffNonEmptyBatch(leftUpstream, LEFT_INDEX, left); - } - - if (rightUpstream != IterOutcome.NONE) { - // We can only get data if there is data available - rightUpstream = sniffNonEmptyBatch(rightUpstream, RIGHT_INDEX, right); - } - - buildSideIsEmpty = rightUpstream == IterOutcome.NONE; - - if (verifyOutcomeToSetBatchState(leftUpstream, rightUpstream)) { - // For build side, use aggregate i.e. average row width across batches - batchMemoryManager.update(LEFT_INDEX, 0); - batchMemoryManager.update(RIGHT_INDEX, 0, true); - - logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX)); - logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX)); + /** + * Prefetches the first build side data holding batch. + */ + private void prefetchFirstBuildBatch() { + rightUpstream = prefetchFirstBatch(rightUpstream, + prefetchedBuild, + buildSideIsEmpty, + RIGHT_INDEX, + right, + () -> { + batchMemoryManager.update(RIGHT_INDEX, 0, true); + logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX)); + }); + } - // Got our first batche(s) - state = BatchState.FIRST; - return true; - } else { - return false; - } + /** + * Prefetches the first build side data holding batch. + */ + private void prefetchFirstProbeBatch() { + leftUpstream = prefetchFirstBatch(leftUpstream, + prefetchedProbe, + probeSideIsEmpty, + LEFT_INDEX, + left, + () -> { + batchMemoryManager.update(LEFT_INDEX, 0); + logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX)); + }); } /** - * Sniffs all data necessary to construct a schema. - * @return True if all the data necessary to construct a schema has been retrieved. False otherwise. + * Used to fetch the first data holding batch from either the build or probe side. + * @param outcome The current upstream outcome for either the build or probe side. + * @param prefetched A flag indicating if we have already done a prefetch of the first data holding batch for the probe or build side. + * @param isEmpty A flag indicating if the probe or build side is empty. + * @param index The upstream index of the probe or build batch. + * @param batch The probe or build batch itself. + * @param memoryManagerUpdate A lambda function to execute the memory manager update for the probe or build batch. + * @return The current {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}. */ - private boolean sniffNewSchemas() { - do { - // Ask for data until we get a valid result. - leftUpstream = next(LEFT_INDEX, left); - } while (leftUpstream == IterOutcome.NOT_YET); + private IterOutcome prefetchFirstBatch(IterOutcome outcome, + final MutableBoolean prefetched, + final MutableBoolean isEmpty, + final int index, + final RecordBatch batch, + final Runnable memoryManagerUpdate) { + if (prefetched.booleanValue()) { + // We have already prefetch the first data holding batch + return outcome; + } - boolean isValidLeft = false; + // If we didn't retrieve our first data holding batch, we need to do it now. + prefetched.setValue(true); - switch (leftUpstream) { - case OK_NEW_SCHEMA: - probeSchema = probeBatch.getSchema(); - case NONE: - isValidLeft = true; - break; - case OK: - case EMIT: - throw new IllegalStateException("Unsupported outcome while building schema " + leftUpstream); - default: - // Termination condition + if (outcome != IterOutcome.NONE) { + // We can only get data if there is data available + outcome = sniffNonEmptyBatch(outcome, index, batch); } - do { - // Ask for data until we get a valid result. - rightUpstream = next(RIGHT_INDEX, right); - } while (rightUpstream == IterOutcome.NOT_YET); - - boolean isValidRight = false; + isEmpty.setValue(outcome == IterOutcome.NONE); // If we recieved NONE there is no data. - switch (rightUpstream) { - case OK_NEW_SCHEMA: - // We need to have the schema of the build side even when the build side is empty - rightSchema = buildBatch.getSchema(); - // position of the new "column" for keeping the hash values (after the real columns) - rightHVColPosition = buildBatch.getContainer().getNumberOfColumns(); - case NONE: - isValidRight = true; - break; - case OK: - case EMIT: - throw new IllegalStateException("Unsupported outcome while building schema " + leftUpstream); - default: - // Termination condition + if (outcome == IterOutcome.OUT_OF_MEMORY) { + // We reached a termination state + state = BatchState.OUT_OF_MEMORY; + } else if (outcome == IterOutcome.STOP) { + // We reached a termination state + state = BatchState.STOP; + } else { + // Got our first batch(es) + memoryManagerUpdate.run(); + state = BatchState.FIRST; } - // Left and right sides must return a valid response and both sides cannot be NONE. - return (isValidLeft && isValidRight) && - (leftUpstream != IterOutcome.NONE && rightUpstream != IterOutcome.NONE); + return outcome; } /** - * Currently in order to accurately predict memory usage for spilling, the first non-empty build side and probe side batches are needed. This method - * fetches the first non-empty batch from the left or right side. + * Currently in order to accurately predict memory usage for spilling, the first non-empty build or probe side batch is needed. This method + * fetches the first non-empty batch from the probe or build side. * @param curr The current outcome. - * @param inputIndex Index specifying whether to work with the left or right input. - * @param recordBatch The left or right record batch. + * @param inputIndex Index specifying whether to work with the prorbe or build input. + * @param recordBatch The probe or build record batch. * @return The {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left or right record batch. */ private IterOutcome sniffNonEmptyBatch(IterOutcome curr, int inputIndex, RecordBatch recordBatch) { @@ -354,8 +361,10 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { case NOT_YET: // We need to try again break; + case EMIT: + throw new UnsupportedOperationException("We do not support " + EMIT); default: - // Other cases termination conditions + // Other cases are termination conditions return curr; } } @@ -381,96 +390,119 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { @Override public IterOutcome innerNext() { - if (!prefetched) { - // If we didn't retrieve our first data hold batch, we need to do it now. - prefetched = true; - prefetchFirstBatchFromBothSides(); - - // Handle emitting the correct outcome for termination conditions - // Use the state set by prefetchFirstBatchFromBothSides to emit the correct termination outcome. - switch (state) { - case DONE: - return IterOutcome.NONE; - case STOP: - return IterOutcome.STOP; - case OUT_OF_MEMORY: - return IterOutcome.OUT_OF_MEMORY; - default: - // No termination condition so continue processing. - } - } - - if ( wasKilled ) { + if (wasKilled) { + // We have recieved a kill signal. We need to stop processing. this.cleanup(); super.close(); return IterOutcome.NONE; } + prefetchFirstBuildBatch(); + + if (rightUpstream.isError()) { + // A termination condition was reached while prefetching the first build side data holding batch. + // We need to terminate. + return rightUpstream; + } + try { /* If we are here for the first time, execute the build phase of the * hash join and setup the run time generated class for the probe side */ if (state == BatchState.FIRST) { // Build the hash table, using the build side record batches. - executeBuildPhase(); + final IterOutcome buildExecuteTermination = executeBuildPhase(); + + if (buildExecuteTermination != null) { + // A termination condition was reached while executing the build phase. + // We need to terminate. + return buildExecuteTermination; + } + // Update the hash table related stats for the operator updateStats(); - // Initialize various settings for the probe side - hashJoinProbe.setupHashJoinProbe(probeBatch, this, joinType, leftUpstream, partitions, cycleNum, container, spilledInners, buildSideIsEmpty, numPartitions, rightHVColPosition); } // Try to probe and project, or recursively handle a spilled partition - if ( ! buildSideIsEmpty || // If there are build-side rows - joinType != JoinRelType.INNER) { // or if this is a left/full outer join - - // Allocate the memory for the vectors in the output container - batchMemoryManager.allocateVectors(container); - hashJoinProbe.setTargetOutputCount(batchMemoryManager.getOutputRowCount()); + if (!buildSideIsEmpty.booleanValue() || // If there are build-side rows + joinType != JoinRelType.INNER) { // or if this is a left/full outer join - outputRecords = hashJoinProbe.probeAndProject(); + prefetchFirstProbeBatch(); - for (final VectorWrapper<?> v : container) { - v.getValueVector().getMutator().setValueCount(outputRecords); + if (leftUpstream.isError()) { + // A termination condition was reached while prefetching the first probe side data holding batch. + // We need to terminate. + return leftUpstream; } - container.setRecordCount(outputRecords); - batchMemoryManager.updateOutgoingStats(outputRecords); - if (logger.isDebugEnabled()) { - logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this)); - } + if (!buildSideIsEmpty.booleanValue() || !probeSideIsEmpty.booleanValue()) { + // Only allocate outgoing vectors and execute probing logic if there is data - /* We are here because of one the following - * 1. Completed processing of all the records and we are done - * 2. We've filled up the outgoing batch to the maximum and we need to return upstream - * Either case build the output container's schema and return - */ - if (outputRecords > 0 || state == BatchState.FIRST) { if (state == BatchState.FIRST) { - state = BatchState.NOT_FIRST; + // Initialize various settings for the probe side + hashJoinProbe.setupHashJoinProbe(probeBatch, + this, + joinType, + leftUpstream, + partitions, + cycleNum, + container, + spilledInners, + buildSideIsEmpty.booleanValue(), + numPartitions, + rightHVColPosition); + } + + // Allocate the memory for the vectors in the output container + batchMemoryManager.allocateVectors(container); + + hashJoinProbe.setTargetOutputCount(batchMemoryManager.getOutputRowCount()); + + outputRecords = hashJoinProbe.probeAndProject(); + + for (final VectorWrapper<?> v : container) { + v.getValueVector().getMutator().setValueCount(outputRecords); + } + container.setRecordCount(outputRecords); + + batchMemoryManager.updateOutgoingStats(outputRecords); + if (logger.isDebugEnabled()) { + logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this)); } - return IterOutcome.OK; + /* We are here because of one the following + * 1. Completed processing of all the records and we are done + * 2. We've filled up the outgoing batch to the maximum and we need to return upstream + * Either case build the output container's schema and return + */ + if (outputRecords > 0 || state == BatchState.FIRST) { + if (state == BatchState.FIRST) { + state = BatchState.NOT_FIRST; + } + + return IterOutcome.OK; + } } // Free all partitions' in-memory data structures // (In case need to start processing spilled partitions) - for ( HashPartition partn : partitions ) { + for (HashPartition partn : partitions) { partn.cleanup(false); // clean, but do not delete the spill files !! } // // (recursively) Handle the spilled partitions, if any // - if ( !buildSideIsEmpty && !spilledPartitionsList.isEmpty()) { + if (!buildSideIsEmpty.booleanValue() && !spilledPartitionsList.isEmpty()) { // Get the next (previously) spilled partition to handle as incoming HJSpilledPartition currSp = spilledPartitionsList.remove(0); // Create a BUILD-side "incoming" out of the inner spill file of that partition - buildBatch = new SpilledRecordbatch(currSp.innerSpillFile, currSp.innerSpilledBatches, context, rightSchema, oContext, spillSet); + buildBatch = new SpilledRecordbatch(currSp.innerSpillFile, currSp.innerSpilledBatches, context, buildSchema, oContext, spillSet); // The above ctor call also got the first batch; need to update the outcome rightUpstream = ((SpilledRecordbatch) buildBatch).getInitialOutcome(); - if ( currSp.outerSpilledBatches > 0 ) { + if (currSp.outerSpilledBatches > 0) { // Create a PROBE-side "incoming" out of the outer spill file of that partition probeBatch = new SpilledRecordbatch(currSp.outerSpillFile, currSp.outerSpilledBatches, context, probeSchema, oContext, spillSet); // The above ctor call also got the first batch; need to update the outcome @@ -644,13 +676,14 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { buildBatch, probeBatch, buildJoinColumns, + leftUpstream == IterOutcome.NONE, // probeEmpty allocator.getLimit(), + maxIncomingBatchSize, numPartitions, RECORDS_PER_BATCH, RECORDS_PER_BATCH, maxBatchSize, maxBatchSize, - batchMemoryManager.getOutputRowCount(), batchMemoryManager.getOutputBatchSize(), HashTable.DEFAULT_LOAD_FACTOR); @@ -689,12 +722,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { * Execute the BUILD phase; first read incoming and split rows into partitions; * may decide to spill some of the partitions * + * @return Returns an {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} if a termination condition is reached. Otherwise returns null. * @throws SchemaChangeException */ - public void executeBuildPhase() throws SchemaChangeException { - if (rightUpstream == IterOutcome.NONE) { + public IterOutcome executeBuildPhase() throws SchemaChangeException { + if (buildSideIsEmpty.booleanValue()) { // empty right - return; + return null; } HashJoinMemoryCalculator.BuildSidePartitioning buildCalc; @@ -716,13 +750,14 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { buildBatch, probeBatch, buildJoinColumns, + leftUpstream == IterOutcome.NONE, // probeEmpty allocator.getLimit(), + maxIncomingBatchSize, numPartitions, RECORDS_PER_BATCH, RECORDS_PER_BATCH, maxBatchSize, maxBatchSize, - batchMemoryManager.getOutputRowCount(), batchMemoryManager.getOutputBatchSize(), HashTable.DEFAULT_LOAD_FACTOR); @@ -754,8 +789,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { continue; case OK_NEW_SCHEMA: - if (!rightSchema.equals(buildBatch.getSchema())) { - throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in build side.", rightSchema, buildBatch.getSchema()); + if (!buildSchema.equals(buildBatch.getSchema())) { + throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in build side.", buildSchema, buildBatch.getSchema()); } for (HashPartition partn : partitions) { partn.updateBatches(); } // Fall through @@ -801,8 +836,16 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { } } + prefetchFirstProbeBatch(); + + if (leftUpstream.isError()) { + // A termination condition was reached while prefetching the first build side data holding batch. + // We need to terminate. + return leftUpstream; + } + HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = buildCalc.next(); - postBuildCalc.initialize(); + postBuildCalc.initialize(probeSideIsEmpty.booleanValue()); // probeEmpty // // Traverse all the in-memory partitions' incoming batches, and build their hash tables @@ -849,14 +892,18 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { spilledInners[partn.getPartitionNum()] = sp; // for the outer to find the SP later partn.closeWriter(); + + partn.updateProbeRecordsPerBatch(postBuildCalc.getProbeRecordsPerBatch()); } } + + return null; } private void setupOutputContainerSchema() { - if (rightSchema != null) { - for (final MaterializedField field : rightSchema) { + if (buildSchema != null) { + for (final MaterializedField field : buildSchema) { final MajorType inputType = field.getType(); final MajorType outputType; // If left or full outer join, then the output type must be nullable. However, map types are @@ -938,6 +985,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { this.allocator = oContext.getAllocator(); + maxIncomingBatchSize = context.getOptions().getLong(ExecConstants.OUTPUT_BATCH_SIZE); numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR); if ( numPartitions == 1 ) { // disableSpilling("Spilling is disabled due to configuration setting of num_partitions to 1"); @@ -976,7 +1024,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { * spillSet. */ private void cleanup() { - if ( buildSideIsEmpty ) { return; } // not set up; nothing to clean + if ( buildSideIsEmpty.booleanValue() ) { return; } // not set up; nothing to clean if ( spillSet.getWriteBytes() > 0 ) { stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0)); @@ -1027,7 +1075,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { * written is updated at close time in {@link #cleanup()}. */ private void updateStats() { - if ( buildSideIsEmpty ) { return; } // no stats when the right side is empty + if ( buildSideIsEmpty.booleanValue() ) { return; } // no stats when the right side is empty if ( cycleNum > 0 ) { return; } // These stats are only for before processing spilled files final HashTableStats htStats = new HashTableStats(); |