aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
diff options
context:
space:
mode:
authorTimothy Farkas <timothyfarkas@apache.org>2018-07-16 15:33:23 -0700
committerVitalii Diravka <vitalii.diravka@gmail.com>2018-08-13 13:33:02 +0300
commit6ad0f9f1bab8bdda18f3eaaf29445bc94355156e (patch)
treee17e21895006a209f25d0994e06a7e0c15bd5b73 /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
parent93a1c5aba6f17b93a52ab4367f625103a904f1ad (diff)
DRILL-6453: Fix deadlock caused by reading from left and right inputs in HashJoin simultaneously.
closes #1408
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java358
1 files changed, 203 insertions, 155 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index b1ea96f05..0bd6fe624 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.PathSegment;
@@ -68,6 +69,9 @@ import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
import org.apache.calcite.rel.core.JoinRelType;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
/**
* This class implements the runtime execution for the Hash-Join operator
* supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
@@ -114,6 +118,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
// Fields used for partitioning
+ private long maxIncomingBatchSize;
/**
* The number of {@link HashPartition}s. This is configured via a system option and set in {@link #partitionNumTuning(int, HashJoinMemoryCalculator.BuildSidePartitioning)}.
*/
@@ -125,7 +130,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
* The master class used to generate {@link HashTable}s.
*/
private ChainedHashTable baseHashTable;
- private boolean buildSideIsEmpty = true;
+ private MutableBoolean buildSideIsEmpty = new MutableBoolean(false);
+ private MutableBoolean probeSideIsEmpty = new MutableBoolean(false);
private boolean canSpill = true;
private boolean wasKilled; // a kill was received, may need to clean spilled partns
@@ -138,7 +144,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
private int outputRecords;
// Schema of the build side
- private BatchSchema rightSchema;
+ private BatchSchema buildSchema;
// Schema of the probe side
private BatchSchema probeSchema;
@@ -150,9 +156,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
private RecordBatch probeBatch;
/**
- * Flag indicating whether or not the first data holding batch needs to be fetched.
+ * Flag indicating whether or not the first data holding build batch needs to be fetched.
+ */
+ private MutableBoolean prefetchedBuild = new MutableBoolean(false);
+ /**
+ * Flag indicating whether or not the first data holding probe batch needs to be fetched.
*/
- private boolean prefetched;
+ private MutableBoolean prefetchedProbe = new MutableBoolean(false);
// For handling spilling
private SpillSet spillSet;
@@ -220,123 +230,120 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
protected void buildSchema() throws SchemaChangeException {
// We must first get the schemas from upstream operators before we can build
// our schema.
- boolean validSchema = sniffNewSchemas();
+ boolean validSchema = prefetchFirstBatchFromBothSides();
if (validSchema) {
// We are able to construct a valid schema from the upstream data.
// Setting the state here makes sure AbstractRecordBatch returns OK_NEW_SCHEMA
state = BatchState.BUILD_SCHEMA;
- } else {
- verifyOutcomeToSetBatchState(leftUpstream, rightUpstream);
+
+ if (leftUpstream == OK_NEW_SCHEMA) {
+ probeSchema = left.getSchema();
+ }
+
+ if (rightUpstream == OK_NEW_SCHEMA) {
+ buildSchema = right.getSchema();
+ // position of the new "column" for keeping the hash values (after the real columns)
+ rightHVColPosition = right.getContainer().getNumberOfColumns();
+ // We only need the hash tables if we have data on the build side.
+ setupHashTable();
+ }
+
+ try {
+ hashJoinProbe = setupHashJoinProbe();
+ } catch (IOException | ClassTransformationException e) {
+ throw new SchemaChangeException(e);
+ }
}
// If we have a valid schema, this will build a valid container. If we were unable to obtain a valid schema,
- // we still need to build a dummy schema. These code handles both cases for us.
+ // we still need to build a dummy schema. This code handles both cases for us.
setupOutputContainerSchema();
container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-
- // Initialize the hash join helper context
- if (rightUpstream == IterOutcome.OK_NEW_SCHEMA) {
- // We only need the hash tables if we have data on the build side.
- setupHashTable();
- }
-
- try {
- hashJoinProbe = setupHashJoinProbe();
- } catch (IOException | ClassTransformationException e) {
- throw new SchemaChangeException(e);
- }
}
- @Override
- protected boolean prefetchFirstBatchFromBothSides() {
- if (leftUpstream != IterOutcome.NONE) {
- // We can only get data if there is data available
- leftUpstream = sniffNonEmptyBatch(leftUpstream, LEFT_INDEX, left);
- }
-
- if (rightUpstream != IterOutcome.NONE) {
- // We can only get data if there is data available
- rightUpstream = sniffNonEmptyBatch(rightUpstream, RIGHT_INDEX, right);
- }
-
- buildSideIsEmpty = rightUpstream == IterOutcome.NONE;
-
- if (verifyOutcomeToSetBatchState(leftUpstream, rightUpstream)) {
- // For build side, use aggregate i.e. average row width across batches
- batchMemoryManager.update(LEFT_INDEX, 0);
- batchMemoryManager.update(RIGHT_INDEX, 0, true);
-
- logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
- logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
+ /**
+ * Prefetches the first build side data holding batch.
+ */
+ private void prefetchFirstBuildBatch() {
+ rightUpstream = prefetchFirstBatch(rightUpstream,
+ prefetchedBuild,
+ buildSideIsEmpty,
+ RIGHT_INDEX,
+ right,
+ () -> {
+ batchMemoryManager.update(RIGHT_INDEX, 0, true);
+ logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
+ });
+ }
- // Got our first batche(s)
- state = BatchState.FIRST;
- return true;
- } else {
- return false;
- }
+ /**
+ * Prefetches the first build side data holding batch.
+ */
+ private void prefetchFirstProbeBatch() {
+ leftUpstream = prefetchFirstBatch(leftUpstream,
+ prefetchedProbe,
+ probeSideIsEmpty,
+ LEFT_INDEX,
+ left,
+ () -> {
+ batchMemoryManager.update(LEFT_INDEX, 0);
+ logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
+ });
}
/**
- * Sniffs all data necessary to construct a schema.
- * @return True if all the data necessary to construct a schema has been retrieved. False otherwise.
+ * Used to fetch the first data holding batch from either the build or probe side.
+ * @param outcome The current upstream outcome for either the build or probe side.
+ * @param prefetched A flag indicating if we have already done a prefetch of the first data holding batch for the probe or build side.
+ * @param isEmpty A flag indicating if the probe or build side is empty.
+ * @param index The upstream index of the probe or build batch.
+ * @param batch The probe or build batch itself.
+ * @param memoryManagerUpdate A lambda function to execute the memory manager update for the probe or build batch.
+ * @return The current {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}.
*/
- private boolean sniffNewSchemas() {
- do {
- // Ask for data until we get a valid result.
- leftUpstream = next(LEFT_INDEX, left);
- } while (leftUpstream == IterOutcome.NOT_YET);
+ private IterOutcome prefetchFirstBatch(IterOutcome outcome,
+ final MutableBoolean prefetched,
+ final MutableBoolean isEmpty,
+ final int index,
+ final RecordBatch batch,
+ final Runnable memoryManagerUpdate) {
+ if (prefetched.booleanValue()) {
+ // We have already prefetch the first data holding batch
+ return outcome;
+ }
- boolean isValidLeft = false;
+ // If we didn't retrieve our first data holding batch, we need to do it now.
+ prefetched.setValue(true);
- switch (leftUpstream) {
- case OK_NEW_SCHEMA:
- probeSchema = probeBatch.getSchema();
- case NONE:
- isValidLeft = true;
- break;
- case OK:
- case EMIT:
- throw new IllegalStateException("Unsupported outcome while building schema " + leftUpstream);
- default:
- // Termination condition
+ if (outcome != IterOutcome.NONE) {
+ // We can only get data if there is data available
+ outcome = sniffNonEmptyBatch(outcome, index, batch);
}
- do {
- // Ask for data until we get a valid result.
- rightUpstream = next(RIGHT_INDEX, right);
- } while (rightUpstream == IterOutcome.NOT_YET);
-
- boolean isValidRight = false;
+ isEmpty.setValue(outcome == IterOutcome.NONE); // If we recieved NONE there is no data.
- switch (rightUpstream) {
- case OK_NEW_SCHEMA:
- // We need to have the schema of the build side even when the build side is empty
- rightSchema = buildBatch.getSchema();
- // position of the new "column" for keeping the hash values (after the real columns)
- rightHVColPosition = buildBatch.getContainer().getNumberOfColumns();
- case NONE:
- isValidRight = true;
- break;
- case OK:
- case EMIT:
- throw new IllegalStateException("Unsupported outcome while building schema " + leftUpstream);
- default:
- // Termination condition
+ if (outcome == IterOutcome.OUT_OF_MEMORY) {
+ // We reached a termination state
+ state = BatchState.OUT_OF_MEMORY;
+ } else if (outcome == IterOutcome.STOP) {
+ // We reached a termination state
+ state = BatchState.STOP;
+ } else {
+ // Got our first batch(es)
+ memoryManagerUpdate.run();
+ state = BatchState.FIRST;
}
- // Left and right sides must return a valid response and both sides cannot be NONE.
- return (isValidLeft && isValidRight) &&
- (leftUpstream != IterOutcome.NONE && rightUpstream != IterOutcome.NONE);
+ return outcome;
}
/**
- * Currently in order to accurately predict memory usage for spilling, the first non-empty build side and probe side batches are needed. This method
- * fetches the first non-empty batch from the left or right side.
+ * Currently in order to accurately predict memory usage for spilling, the first non-empty build or probe side batch is needed. This method
+ * fetches the first non-empty batch from the probe or build side.
* @param curr The current outcome.
- * @param inputIndex Index specifying whether to work with the left or right input.
- * @param recordBatch The left or right record batch.
+ * @param inputIndex Index specifying whether to work with the prorbe or build input.
+ * @param recordBatch The probe or build record batch.
* @return The {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left or right record batch.
*/
private IterOutcome sniffNonEmptyBatch(IterOutcome curr, int inputIndex, RecordBatch recordBatch) {
@@ -354,8 +361,10 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
case NOT_YET:
// We need to try again
break;
+ case EMIT:
+ throw new UnsupportedOperationException("We do not support " + EMIT);
default:
- // Other cases termination conditions
+ // Other cases are termination conditions
return curr;
}
}
@@ -381,96 +390,119 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
@Override
public IterOutcome innerNext() {
- if (!prefetched) {
- // If we didn't retrieve our first data hold batch, we need to do it now.
- prefetched = true;
- prefetchFirstBatchFromBothSides();
-
- // Handle emitting the correct outcome for termination conditions
- // Use the state set by prefetchFirstBatchFromBothSides to emit the correct termination outcome.
- switch (state) {
- case DONE:
- return IterOutcome.NONE;
- case STOP:
- return IterOutcome.STOP;
- case OUT_OF_MEMORY:
- return IterOutcome.OUT_OF_MEMORY;
- default:
- // No termination condition so continue processing.
- }
- }
-
- if ( wasKilled ) {
+ if (wasKilled) {
+ // We have recieved a kill signal. We need to stop processing.
this.cleanup();
super.close();
return IterOutcome.NONE;
}
+ prefetchFirstBuildBatch();
+
+ if (rightUpstream.isError()) {
+ // A termination condition was reached while prefetching the first build side data holding batch.
+ // We need to terminate.
+ return rightUpstream;
+ }
+
try {
/* If we are here for the first time, execute the build phase of the
* hash join and setup the run time generated class for the probe side
*/
if (state == BatchState.FIRST) {
// Build the hash table, using the build side record batches.
- executeBuildPhase();
+ final IterOutcome buildExecuteTermination = executeBuildPhase();
+
+ if (buildExecuteTermination != null) {
+ // A termination condition was reached while executing the build phase.
+ // We need to terminate.
+ return buildExecuteTermination;
+ }
+
// Update the hash table related stats for the operator
updateStats();
- // Initialize various settings for the probe side
- hashJoinProbe.setupHashJoinProbe(probeBatch, this, joinType, leftUpstream, partitions, cycleNum, container, spilledInners, buildSideIsEmpty, numPartitions, rightHVColPosition);
}
// Try to probe and project, or recursively handle a spilled partition
- if ( ! buildSideIsEmpty || // If there are build-side rows
- joinType != JoinRelType.INNER) { // or if this is a left/full outer join
-
- // Allocate the memory for the vectors in the output container
- batchMemoryManager.allocateVectors(container);
- hashJoinProbe.setTargetOutputCount(batchMemoryManager.getOutputRowCount());
+ if (!buildSideIsEmpty.booleanValue() || // If there are build-side rows
+ joinType != JoinRelType.INNER) { // or if this is a left/full outer join
- outputRecords = hashJoinProbe.probeAndProject();
+ prefetchFirstProbeBatch();
- for (final VectorWrapper<?> v : container) {
- v.getValueVector().getMutator().setValueCount(outputRecords);
+ if (leftUpstream.isError()) {
+ // A termination condition was reached while prefetching the first probe side data holding batch.
+ // We need to terminate.
+ return leftUpstream;
}
- container.setRecordCount(outputRecords);
- batchMemoryManager.updateOutgoingStats(outputRecords);
- if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
- }
+ if (!buildSideIsEmpty.booleanValue() || !probeSideIsEmpty.booleanValue()) {
+ // Only allocate outgoing vectors and execute probing logic if there is data
- /* We are here because of one the following
- * 1. Completed processing of all the records and we are done
- * 2. We've filled up the outgoing batch to the maximum and we need to return upstream
- * Either case build the output container's schema and return
- */
- if (outputRecords > 0 || state == BatchState.FIRST) {
if (state == BatchState.FIRST) {
- state = BatchState.NOT_FIRST;
+ // Initialize various settings for the probe side
+ hashJoinProbe.setupHashJoinProbe(probeBatch,
+ this,
+ joinType,
+ leftUpstream,
+ partitions,
+ cycleNum,
+ container,
+ spilledInners,
+ buildSideIsEmpty.booleanValue(),
+ numPartitions,
+ rightHVColPosition);
+ }
+
+ // Allocate the memory for the vectors in the output container
+ batchMemoryManager.allocateVectors(container);
+
+ hashJoinProbe.setTargetOutputCount(batchMemoryManager.getOutputRowCount());
+
+ outputRecords = hashJoinProbe.probeAndProject();
+
+ for (final VectorWrapper<?> v : container) {
+ v.getValueVector().getMutator().setValueCount(outputRecords);
+ }
+ container.setRecordCount(outputRecords);
+
+ batchMemoryManager.updateOutgoingStats(outputRecords);
+ if (logger.isDebugEnabled()) {
+ logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
}
- return IterOutcome.OK;
+ /* We are here because of one the following
+ * 1. Completed processing of all the records and we are done
+ * 2. We've filled up the outgoing batch to the maximum and we need to return upstream
+ * Either case build the output container's schema and return
+ */
+ if (outputRecords > 0 || state == BatchState.FIRST) {
+ if (state == BatchState.FIRST) {
+ state = BatchState.NOT_FIRST;
+ }
+
+ return IterOutcome.OK;
+ }
}
// Free all partitions' in-memory data structures
// (In case need to start processing spilled partitions)
- for ( HashPartition partn : partitions ) {
+ for (HashPartition partn : partitions) {
partn.cleanup(false); // clean, but do not delete the spill files !!
}
//
// (recursively) Handle the spilled partitions, if any
//
- if ( !buildSideIsEmpty && !spilledPartitionsList.isEmpty()) {
+ if (!buildSideIsEmpty.booleanValue() && !spilledPartitionsList.isEmpty()) {
// Get the next (previously) spilled partition to handle as incoming
HJSpilledPartition currSp = spilledPartitionsList.remove(0);
// Create a BUILD-side "incoming" out of the inner spill file of that partition
- buildBatch = new SpilledRecordbatch(currSp.innerSpillFile, currSp.innerSpilledBatches, context, rightSchema, oContext, spillSet);
+ buildBatch = new SpilledRecordbatch(currSp.innerSpillFile, currSp.innerSpilledBatches, context, buildSchema, oContext, spillSet);
// The above ctor call also got the first batch; need to update the outcome
rightUpstream = ((SpilledRecordbatch) buildBatch).getInitialOutcome();
- if ( currSp.outerSpilledBatches > 0 ) {
+ if (currSp.outerSpilledBatches > 0) {
// Create a PROBE-side "incoming" out of the outer spill file of that partition
probeBatch = new SpilledRecordbatch(currSp.outerSpillFile, currSp.outerSpilledBatches, context, probeSchema, oContext, spillSet);
// The above ctor call also got the first batch; need to update the outcome
@@ -644,13 +676,14 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
buildBatch,
probeBatch,
buildJoinColumns,
+ leftUpstream == IterOutcome.NONE, // probeEmpty
allocator.getLimit(),
+ maxIncomingBatchSize,
numPartitions,
RECORDS_PER_BATCH,
RECORDS_PER_BATCH,
maxBatchSize,
maxBatchSize,
- batchMemoryManager.getOutputRowCount(),
batchMemoryManager.getOutputBatchSize(),
HashTable.DEFAULT_LOAD_FACTOR);
@@ -689,12 +722,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
* Execute the BUILD phase; first read incoming and split rows into partitions;
* may decide to spill some of the partitions
*
+ * @return Returns an {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} if a termination condition is reached. Otherwise returns null.
* @throws SchemaChangeException
*/
- public void executeBuildPhase() throws SchemaChangeException {
- if (rightUpstream == IterOutcome.NONE) {
+ public IterOutcome executeBuildPhase() throws SchemaChangeException {
+ if (buildSideIsEmpty.booleanValue()) {
// empty right
- return;
+ return null;
}
HashJoinMemoryCalculator.BuildSidePartitioning buildCalc;
@@ -716,13 +750,14 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
buildBatch,
probeBatch,
buildJoinColumns,
+ leftUpstream == IterOutcome.NONE, // probeEmpty
allocator.getLimit(),
+ maxIncomingBatchSize,
numPartitions,
RECORDS_PER_BATCH,
RECORDS_PER_BATCH,
maxBatchSize,
maxBatchSize,
- batchMemoryManager.getOutputRowCount(),
batchMemoryManager.getOutputBatchSize(),
HashTable.DEFAULT_LOAD_FACTOR);
@@ -754,8 +789,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
continue;
case OK_NEW_SCHEMA:
- if (!rightSchema.equals(buildBatch.getSchema())) {
- throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in build side.", rightSchema, buildBatch.getSchema());
+ if (!buildSchema.equals(buildBatch.getSchema())) {
+ throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in build side.", buildSchema, buildBatch.getSchema());
}
for (HashPartition partn : partitions) { partn.updateBatches(); }
// Fall through
@@ -801,8 +836,16 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
}
}
+ prefetchFirstProbeBatch();
+
+ if (leftUpstream.isError()) {
+ // A termination condition was reached while prefetching the first build side data holding batch.
+ // We need to terminate.
+ return leftUpstream;
+ }
+
HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = buildCalc.next();
- postBuildCalc.initialize();
+ postBuildCalc.initialize(probeSideIsEmpty.booleanValue()); // probeEmpty
//
// Traverse all the in-memory partitions' incoming batches, and build their hash tables
@@ -849,14 +892,18 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
spilledInners[partn.getPartitionNum()] = sp; // for the outer to find the SP later
partn.closeWriter();
+
+ partn.updateProbeRecordsPerBatch(postBuildCalc.getProbeRecordsPerBatch());
}
}
+
+ return null;
}
private void setupOutputContainerSchema() {
- if (rightSchema != null) {
- for (final MaterializedField field : rightSchema) {
+ if (buildSchema != null) {
+ for (final MaterializedField field : buildSchema) {
final MajorType inputType = field.getType();
final MajorType outputType;
// If left or full outer join, then the output type must be nullable. However, map types are
@@ -938,6 +985,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
this.allocator = oContext.getAllocator();
+ maxIncomingBatchSize = context.getOptions().getLong(ExecConstants.OUTPUT_BATCH_SIZE);
numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR);
if ( numPartitions == 1 ) { //
disableSpilling("Spilling is disabled due to configuration setting of num_partitions to 1");
@@ -976,7 +1024,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
* spillSet.
*/
private void cleanup() {
- if ( buildSideIsEmpty ) { return; } // not set up; nothing to clean
+ if ( buildSideIsEmpty.booleanValue() ) { return; } // not set up; nothing to clean
if ( spillSet.getWriteBytes() > 0 ) {
stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
(int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
@@ -1027,7 +1075,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
* written is updated at close time in {@link #cleanup()}.
*/
private void updateStats() {
- if ( buildSideIsEmpty ) { return; } // no stats when the right side is empty
+ if ( buildSideIsEmpty.booleanValue() ) { return; } // no stats when the right side is empty
if ( cycleNum > 0 ) { return; } // These stats are only for before processing spilled files
final HashTableStats htStats = new HashTableStats();