aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl
diff options
context:
space:
mode:
authorTimothy Farkas <timothyfarkas@apache.org>2018-07-16 15:33:23 -0700
committerVitalii Diravka <vitalii.diravka@gmail.com>2018-08-13 13:33:02 +0300
commit6ad0f9f1bab8bdda18f3eaaf29445bc94355156e (patch)
treee17e21895006a209f25d0994e06a7e0c15bd5b73 /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl
parent93a1c5aba6f17b93a52ab4367f625103a904f1ad (diff)
DRILL-6453: Fix deadlock caused by reading from left and right inputs in HashJoin simultaneously.
closes #1408
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/BatchSizePredictor.java79
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/BatchSizePredictorImpl.java165
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java358
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java22
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java21
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java396
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorConservativeImpl.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorLeanImpl.java2
9 files changed, 711 insertions, 348 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
index eaccd3355..fbdc4f3b8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.physical.impl.common;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.RetryAfterSpillException;
import org.apache.drill.common.exceptions.UserException;
@@ -122,6 +123,7 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
private List<HashJoinMemoryCalculator.BatchStat> inMemoryBatchStats = Lists.newArrayList();
private long partitionInMemorySize;
private long numInMemoryRecords;
+ private boolean updatedRecordsPerBatch = false;
public HashPartition(FragmentContext context, BufferAllocator allocator, ChainedHashTable baseHashTable,
RecordBatch buildBatch, RecordBatch probeBatch,
@@ -156,6 +158,18 @@ public class HashPartition implements HashJoinMemoryCalculator.PartitionStat {
}
/**
+ * Configure a different temporary batch size when spilling probe batches.
+ * @param newRecordsPerBatch The new temporary batch size to use.
+ */
+ public void updateProbeRecordsPerBatch(int newRecordsPerBatch) {
+ Preconditions.checkArgument(newRecordsPerBatch > 0);
+ Preconditions.checkState(!updatedRecordsPerBatch); // Only allow updating once
+ Preconditions.checkState(processingOuter); // We can only update the records per batch when probing.
+
+ recordsPerBatch = newRecordsPerBatch;
+ }
+
+ /**
* Allocate a new vector container for either right or left record batch
* Add an additional special vector for the hash values
* Note: this call may OOM !!
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/BatchSizePredictor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/BatchSizePredictor.java
new file mode 100644
index 000000000..912e4feaf
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/BatchSizePredictor.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import org.apache.drill.exec.record.RecordBatch;
+
+/**
+ * This class predicts the sizes of batches given an input batch.
+ *
+ * <h4>Invariants</h4>
+ * <ul>
+ * <li>The {@link BatchSizePredictor} assumes that a {@link RecordBatch} is in a state where it can return a valid record count.</li>
+ * </ul>
+ */
+public interface BatchSizePredictor {
+ /**
+ * Gets the batchSize computed in the call to {@link #updateStats()}. Returns 0 if {@link #hadDataLastTime()} is false.
+ * @return Gets the batchSize computed in the call to {@link #updateStats()}. Returns 0 if {@link #hadDataLastTime()} is false.
+ * @throws IllegalStateException if {@link #updateStats()} was never called.
+ */
+ long getBatchSize();
+
+ /**
+ * Gets the number of records computed in the call to {@link #updateStats()}. Returns 0 if {@link #hadDataLastTime()} is false.
+ * @return Gets the number of records computed in the call to {@link #updateStats()}. Returns 0 if {@link #hadDataLastTime()} is false.
+ * @throws IllegalStateException if {@link #updateStats()} was never called.
+ */
+ int getNumRecords();
+
+ /**
+ * True if the input batch had records in the last call to {@link #updateStats()}. False otherwise.
+ * @return True if the input batch had records in the last call to {@link #updateStats()}. False otherwise.
+ */
+ boolean hadDataLastTime();
+
+ /**
+ * This method can be called multiple times to collect stats about the latest data in the provided record batch. These
+ * stats are used to predict batch sizes. If the batch currently has no data, this method is a noop. This method must be
+ * called at least once before {@link #predictBatchSize(int, boolean)}.
+ */
+ void updateStats();
+
+ /**
+ * Predicts the size of a batch using the current collected stats.
+ * @param desiredNumRecords The number of records contained in the batch whose size we want to predict.
+ * @param reserveHash Whether or not to include a column containing hash values.
+ * @return The size of the predicted batch.
+ * @throws IllegalStateException if {@link #hadDataLastTime()} is false or {@link #updateStats()} was not called.
+ */
+ long predictBatchSize(int desiredNumRecords, boolean reserveHash);
+
+ /**
+ * A factory for creating {@link BatchSizePredictor}s.
+ */
+ interface Factory {
+ /**
+ * Creates a predictor with a batch whose data needs to be used to predict other batch sizes.
+ * @param batch The batch whose size needs to be predicted.
+ * @param fragmentationFactor A constant used to predict value vector doubling.
+ * @param safetyFactor A constant used to leave padding for unpredictable incoming batches.
+ */
+ BatchSizePredictor create(RecordBatch batch, double fragmentationFactor, double safetyFactor);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/BatchSizePredictorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/BatchSizePredictorImpl.java
new file mode 100644
index 000000000..bbebd2bd7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/BatchSizePredictorImpl.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.join;
+
+import com.google.common.base.Preconditions;
+import org.apache.drill.exec.record.RecordBatch;
+import org.apache.drill.exec.record.RecordBatchSizer;
+import org.apache.drill.exec.vector.IntVector;
+
+import java.util.Map;
+
+public class BatchSizePredictorImpl implements BatchSizePredictor {
+ private RecordBatch batch;
+ private double fragmentationFactor;
+ private double safetyFactor;
+
+ private long batchSize;
+ private int numRecords;
+ private boolean updatedStats;
+ private boolean hasData;
+
+ public BatchSizePredictorImpl(final RecordBatch batch,
+ final double fragmentationFactor,
+ final double safetyFactor) {
+ this.batch = Preconditions.checkNotNull(batch);
+ this.fragmentationFactor = fragmentationFactor;
+ this.safetyFactor = safetyFactor;
+ }
+
+ @Override
+ public long getBatchSize() {
+ Preconditions.checkState(updatedStats);
+ return hasData? batchSize: 0;
+ }
+
+ @Override
+ public int getNumRecords() {
+ Preconditions.checkState(updatedStats);
+ return hasData? numRecords: 0;
+ }
+
+ @Override
+ public boolean hadDataLastTime() {
+ return hasData;
+ }
+
+ @Override
+ public void updateStats() {
+ final RecordBatchSizer batchSizer = new RecordBatchSizer(batch);
+ numRecords = batchSizer.rowCount();
+ updatedStats = true;
+ hasData = numRecords > 0;
+
+ if (hasData) {
+ batchSize = getBatchSizeEstimate(batch);
+ }
+ }
+
+ @Override
+ public long predictBatchSize(int desiredNumRecords, boolean reserveHash) {
+ Preconditions.checkState(hasData);
+ // Safety factor can be multiplied at the end since these batches are coming from exchange operators, so no excess value vector doubling
+ return computeMaxBatchSize(batchSize,
+ numRecords,
+ desiredNumRecords,
+ fragmentationFactor,
+ safetyFactor,
+ reserveHash);
+ }
+
+ public static long computeValueVectorSize(long numRecords, long byteSize) {
+ long naiveSize = numRecords * byteSize;
+ return roundUpToPowerOf2(naiveSize);
+ }
+
+ public static long computeValueVectorSize(long numRecords, long byteSize, double safetyFactor) {
+ long naiveSize = RecordBatchSizer.multiplyByFactor(numRecords * byteSize, safetyFactor);
+ return roundUpToPowerOf2(naiveSize);
+ }
+
+ public static long roundUpToPowerOf2(long num) {
+ Preconditions.checkArgument(num >= 1);
+ return num == 1 ? 1 : Long.highestOneBit(num - 1) << 1;
+ }
+
+ public static long computeMaxBatchSizeNoHash(final long incomingBatchSize,
+ final int incomingNumRecords,
+ final int desiredNumRecords,
+ final double fragmentationFactor,
+ final double safetyFactor) {
+ long maxBatchSize = computePartitionBatchSize(incomingBatchSize, incomingNumRecords, desiredNumRecords);
+ // Multiple by fragmentation factor
+ return RecordBatchSizer.multiplyByFactors(maxBatchSize, fragmentationFactor, safetyFactor);
+ }
+
+ public static long computeMaxBatchSize(final long incomingBatchSize,
+ final int incomingNumRecords,
+ final int desiredNumRecords,
+ final double fragmentationFactor,
+ final double safetyFactor,
+ final boolean reserveHash) {
+ long size = computeMaxBatchSizeNoHash(incomingBatchSize,
+ incomingNumRecords,
+ desiredNumRecords,
+ fragmentationFactor,
+ safetyFactor);
+
+ if (!reserveHash) {
+ return size;
+ }
+
+ long hashSize = desiredNumRecords * ((long) IntVector.VALUE_WIDTH);
+ hashSize = RecordBatchSizer.multiplyByFactors(hashSize, fragmentationFactor);
+
+ return size + hashSize;
+ }
+
+ public static long computePartitionBatchSize(final long incomingBatchSize,
+ final int incomingNumRecords,
+ final int desiredNumRecords) {
+ return (long) Math.ceil((((double) incomingBatchSize) /
+ ((double) incomingNumRecords)) *
+ ((double) desiredNumRecords));
+ }
+
+ public static long getBatchSizeEstimate(final RecordBatch recordBatch) {
+ final RecordBatchSizer sizer = new RecordBatchSizer(recordBatch);
+ long size = 0L;
+
+ for (Map.Entry<String, RecordBatchSizer.ColumnSize> column : sizer.columns().entrySet()) {
+ size += computeValueVectorSize(recordBatch.getRecordCount(), column.getValue().getStdNetOrNetSizePerEntry());
+ }
+
+ return size;
+ }
+
+ public static class Factory implements BatchSizePredictor.Factory {
+ public static final Factory INSTANCE = new Factory();
+
+ private Factory() {
+ }
+
+ @Override
+ public BatchSizePredictor create(final RecordBatch batch,
+ final double fragmentationFactor,
+ final double safetyFactor) {
+ return new BatchSizePredictorImpl(batch, fragmentationFactor, safetyFactor);
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index b1ea96f05..0bd6fe624 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.PathSegment;
@@ -68,6 +69,9 @@ import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
import org.apache.calcite.rel.core.JoinRelType;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.EMIT;
+import static org.apache.drill.exec.record.RecordBatch.IterOutcome.OK_NEW_SCHEMA;
+
/**
* This class implements the runtime execution for the Hash-Join operator
* supporting INNER, LEFT OUTER, RIGHT OUTER, and FULL OUTER joins
@@ -114,6 +118,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
// Fields used for partitioning
+ private long maxIncomingBatchSize;
/**
* The number of {@link HashPartition}s. This is configured via a system option and set in {@link #partitionNumTuning(int, HashJoinMemoryCalculator.BuildSidePartitioning)}.
*/
@@ -125,7 +130,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
* The master class used to generate {@link HashTable}s.
*/
private ChainedHashTable baseHashTable;
- private boolean buildSideIsEmpty = true;
+ private MutableBoolean buildSideIsEmpty = new MutableBoolean(false);
+ private MutableBoolean probeSideIsEmpty = new MutableBoolean(false);
private boolean canSpill = true;
private boolean wasKilled; // a kill was received, may need to clean spilled partns
@@ -138,7 +144,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
private int outputRecords;
// Schema of the build side
- private BatchSchema rightSchema;
+ private BatchSchema buildSchema;
// Schema of the probe side
private BatchSchema probeSchema;
@@ -150,9 +156,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
private RecordBatch probeBatch;
/**
- * Flag indicating whether or not the first data holding batch needs to be fetched.
+ * Flag indicating whether or not the first data holding build batch needs to be fetched.
+ */
+ private MutableBoolean prefetchedBuild = new MutableBoolean(false);
+ /**
+ * Flag indicating whether or not the first data holding probe batch needs to be fetched.
*/
- private boolean prefetched;
+ private MutableBoolean prefetchedProbe = new MutableBoolean(false);
// For handling spilling
private SpillSet spillSet;
@@ -220,123 +230,120 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
protected void buildSchema() throws SchemaChangeException {
// We must first get the schemas from upstream operators before we can build
// our schema.
- boolean validSchema = sniffNewSchemas();
+ boolean validSchema = prefetchFirstBatchFromBothSides();
if (validSchema) {
// We are able to construct a valid schema from the upstream data.
// Setting the state here makes sure AbstractRecordBatch returns OK_NEW_SCHEMA
state = BatchState.BUILD_SCHEMA;
- } else {
- verifyOutcomeToSetBatchState(leftUpstream, rightUpstream);
+
+ if (leftUpstream == OK_NEW_SCHEMA) {
+ probeSchema = left.getSchema();
+ }
+
+ if (rightUpstream == OK_NEW_SCHEMA) {
+ buildSchema = right.getSchema();
+ // position of the new "column" for keeping the hash values (after the real columns)
+ rightHVColPosition = right.getContainer().getNumberOfColumns();
+ // We only need the hash tables if we have data on the build side.
+ setupHashTable();
+ }
+
+ try {
+ hashJoinProbe = setupHashJoinProbe();
+ } catch (IOException | ClassTransformationException e) {
+ throw new SchemaChangeException(e);
+ }
}
// If we have a valid schema, this will build a valid container. If we were unable to obtain a valid schema,
- // we still need to build a dummy schema. These code handles both cases for us.
+ // we still need to build a dummy schema. This code handles both cases for us.
setupOutputContainerSchema();
container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
-
- // Initialize the hash join helper context
- if (rightUpstream == IterOutcome.OK_NEW_SCHEMA) {
- // We only need the hash tables if we have data on the build side.
- setupHashTable();
- }
-
- try {
- hashJoinProbe = setupHashJoinProbe();
- } catch (IOException | ClassTransformationException e) {
- throw new SchemaChangeException(e);
- }
}
- @Override
- protected boolean prefetchFirstBatchFromBothSides() {
- if (leftUpstream != IterOutcome.NONE) {
- // We can only get data if there is data available
- leftUpstream = sniffNonEmptyBatch(leftUpstream, LEFT_INDEX, left);
- }
-
- if (rightUpstream != IterOutcome.NONE) {
- // We can only get data if there is data available
- rightUpstream = sniffNonEmptyBatch(rightUpstream, RIGHT_INDEX, right);
- }
-
- buildSideIsEmpty = rightUpstream == IterOutcome.NONE;
-
- if (verifyOutcomeToSetBatchState(leftUpstream, rightUpstream)) {
- // For build side, use aggregate i.e. average row width across batches
- batchMemoryManager.update(LEFT_INDEX, 0);
- batchMemoryManager.update(RIGHT_INDEX, 0, true);
-
- logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
- logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
+ /**
+ * Prefetches the first build side data holding batch.
+ */
+ private void prefetchFirstBuildBatch() {
+ rightUpstream = prefetchFirstBatch(rightUpstream,
+ prefetchedBuild,
+ buildSideIsEmpty,
+ RIGHT_INDEX,
+ right,
+ () -> {
+ batchMemoryManager.update(RIGHT_INDEX, 0, true);
+ logger.debug("BATCH_STATS, incoming right: {}", batchMemoryManager.getRecordBatchSizer(RIGHT_INDEX));
+ });
+ }
- // Got our first batche(s)
- state = BatchState.FIRST;
- return true;
- } else {
- return false;
- }
+ /**
+ * Prefetches the first build side data holding batch.
+ */
+ private void prefetchFirstProbeBatch() {
+ leftUpstream = prefetchFirstBatch(leftUpstream,
+ prefetchedProbe,
+ probeSideIsEmpty,
+ LEFT_INDEX,
+ left,
+ () -> {
+ batchMemoryManager.update(LEFT_INDEX, 0);
+ logger.debug("BATCH_STATS, incoming left: {}", batchMemoryManager.getRecordBatchSizer(LEFT_INDEX));
+ });
}
/**
- * Sniffs all data necessary to construct a schema.
- * @return True if all the data necessary to construct a schema has been retrieved. False otherwise.
+ * Used to fetch the first data holding batch from either the build or probe side.
+ * @param outcome The current upstream outcome for either the build or probe side.
+ * @param prefetched A flag indicating if we have already done a prefetch of the first data holding batch for the probe or build side.
+ * @param isEmpty A flag indicating if the probe or build side is empty.
+ * @param index The upstream index of the probe or build batch.
+ * @param batch The probe or build batch itself.
+ * @param memoryManagerUpdate A lambda function to execute the memory manager update for the probe or build batch.
+ * @return The current {@link org.apache.drill.exec.record.RecordBatch.IterOutcome}.
*/
- private boolean sniffNewSchemas() {
- do {
- // Ask for data until we get a valid result.
- leftUpstream = next(LEFT_INDEX, left);
- } while (leftUpstream == IterOutcome.NOT_YET);
+ private IterOutcome prefetchFirstBatch(IterOutcome outcome,
+ final MutableBoolean prefetched,
+ final MutableBoolean isEmpty,
+ final int index,
+ final RecordBatch batch,
+ final Runnable memoryManagerUpdate) {
+ if (prefetched.booleanValue()) {
+ // We have already prefetch the first data holding batch
+ return outcome;
+ }
- boolean isValidLeft = false;
+ // If we didn't retrieve our first data holding batch, we need to do it now.
+ prefetched.setValue(true);
- switch (leftUpstream) {
- case OK_NEW_SCHEMA:
- probeSchema = probeBatch.getSchema();
- case NONE:
- isValidLeft = true;
- break;
- case OK:
- case EMIT:
- throw new IllegalStateException("Unsupported outcome while building schema " + leftUpstream);
- default:
- // Termination condition
+ if (outcome != IterOutcome.NONE) {
+ // We can only get data if there is data available
+ outcome = sniffNonEmptyBatch(outcome, index, batch);
}
- do {
- // Ask for data until we get a valid result.
- rightUpstream = next(RIGHT_INDEX, right);
- } while (rightUpstream == IterOutcome.NOT_YET);
-
- boolean isValidRight = false;
+ isEmpty.setValue(outcome == IterOutcome.NONE); // If we recieved NONE there is no data.
- switch (rightUpstream) {
- case OK_NEW_SCHEMA:
- // We need to have the schema of the build side even when the build side is empty
- rightSchema = buildBatch.getSchema();
- // position of the new "column" for keeping the hash values (after the real columns)
- rightHVColPosition = buildBatch.getContainer().getNumberOfColumns();
- case NONE:
- isValidRight = true;
- break;
- case OK:
- case EMIT:
- throw new IllegalStateException("Unsupported outcome while building schema " + leftUpstream);
- default:
- // Termination condition
+ if (outcome == IterOutcome.OUT_OF_MEMORY) {
+ // We reached a termination state
+ state = BatchState.OUT_OF_MEMORY;
+ } else if (outcome == IterOutcome.STOP) {
+ // We reached a termination state
+ state = BatchState.STOP;
+ } else {
+ // Got our first batch(es)
+ memoryManagerUpdate.run();
+ state = BatchState.FIRST;
}
- // Left and right sides must return a valid response and both sides cannot be NONE.
- return (isValidLeft && isValidRight) &&
- (leftUpstream != IterOutcome.NONE && rightUpstream != IterOutcome.NONE);
+ return outcome;
}
/**
- * Currently in order to accurately predict memory usage for spilling, the first non-empty build side and probe side batches are needed. This method
- * fetches the first non-empty batch from the left or right side.
+ * Currently in order to accurately predict memory usage for spilling, the first non-empty build or probe side batch is needed. This method
+ * fetches the first non-empty batch from the probe or build side.
* @param curr The current outcome.
- * @param inputIndex Index specifying whether to work with the left or right input.
- * @param recordBatch The left or right record batch.
+ * @param inputIndex Index specifying whether to work with the prorbe or build input.
+ * @param recordBatch The probe or build record batch.
* @return The {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} for the left or right record batch.
*/
private IterOutcome sniffNonEmptyBatch(IterOutcome curr, int inputIndex, RecordBatch recordBatch) {
@@ -354,8 +361,10 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
case NOT_YET:
// We need to try again
break;
+ case EMIT:
+ throw new UnsupportedOperationException("We do not support " + EMIT);
default:
- // Other cases termination conditions
+ // Other cases are termination conditions
return curr;
}
}
@@ -381,96 +390,119 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
@Override
public IterOutcome innerNext() {
- if (!prefetched) {
- // If we didn't retrieve our first data hold batch, we need to do it now.
- prefetched = true;
- prefetchFirstBatchFromBothSides();
-
- // Handle emitting the correct outcome for termination conditions
- // Use the state set by prefetchFirstBatchFromBothSides to emit the correct termination outcome.
- switch (state) {
- case DONE:
- return IterOutcome.NONE;
- case STOP:
- return IterOutcome.STOP;
- case OUT_OF_MEMORY:
- return IterOutcome.OUT_OF_MEMORY;
- default:
- // No termination condition so continue processing.
- }
- }
-
- if ( wasKilled ) {
+ if (wasKilled) {
+ // We have recieved a kill signal. We need to stop processing.
this.cleanup();
super.close();
return IterOutcome.NONE;
}
+ prefetchFirstBuildBatch();
+
+ if (rightUpstream.isError()) {
+ // A termination condition was reached while prefetching the first build side data holding batch.
+ // We need to terminate.
+ return rightUpstream;
+ }
+
try {
/* If we are here for the first time, execute the build phase of the
* hash join and setup the run time generated class for the probe side
*/
if (state == BatchState.FIRST) {
// Build the hash table, using the build side record batches.
- executeBuildPhase();
+ final IterOutcome buildExecuteTermination = executeBuildPhase();
+
+ if (buildExecuteTermination != null) {
+ // A termination condition was reached while executing the build phase.
+ // We need to terminate.
+ return buildExecuteTermination;
+ }
+
// Update the hash table related stats for the operator
updateStats();
- // Initialize various settings for the probe side
- hashJoinProbe.setupHashJoinProbe(probeBatch, this, joinType, leftUpstream, partitions, cycleNum, container, spilledInners, buildSideIsEmpty, numPartitions, rightHVColPosition);
}
// Try to probe and project, or recursively handle a spilled partition
- if ( ! buildSideIsEmpty || // If there are build-side rows
- joinType != JoinRelType.INNER) { // or if this is a left/full outer join
-
- // Allocate the memory for the vectors in the output container
- batchMemoryManager.allocateVectors(container);
- hashJoinProbe.setTargetOutputCount(batchMemoryManager.getOutputRowCount());
+ if (!buildSideIsEmpty.booleanValue() || // If there are build-side rows
+ joinType != JoinRelType.INNER) { // or if this is a left/full outer join
- outputRecords = hashJoinProbe.probeAndProject();
+ prefetchFirstProbeBatch();
- for (final VectorWrapper<?> v : container) {
- v.getValueVector().getMutator().setValueCount(outputRecords);
+ if (leftUpstream.isError()) {
+ // A termination condition was reached while prefetching the first probe side data holding batch.
+ // We need to terminate.
+ return leftUpstream;
}
- container.setRecordCount(outputRecords);
- batchMemoryManager.updateOutgoingStats(outputRecords);
- if (logger.isDebugEnabled()) {
- logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
- }
+ if (!buildSideIsEmpty.booleanValue() || !probeSideIsEmpty.booleanValue()) {
+ // Only allocate outgoing vectors and execute probing logic if there is data
- /* We are here because of one the following
- * 1. Completed processing of all the records and we are done
- * 2. We've filled up the outgoing batch to the maximum and we need to return upstream
- * Either case build the output container's schema and return
- */
- if (outputRecords > 0 || state == BatchState.FIRST) {
if (state == BatchState.FIRST) {
- state = BatchState.NOT_FIRST;
+ // Initialize various settings for the probe side
+ hashJoinProbe.setupHashJoinProbe(probeBatch,
+ this,
+ joinType,
+ leftUpstream,
+ partitions,
+ cycleNum,
+ container,
+ spilledInners,
+ buildSideIsEmpty.booleanValue(),
+ numPartitions,
+ rightHVColPosition);
+ }
+
+ // Allocate the memory for the vectors in the output container
+ batchMemoryManager.allocateVectors(container);
+
+ hashJoinProbe.setTargetOutputCount(batchMemoryManager.getOutputRowCount());
+
+ outputRecords = hashJoinProbe.probeAndProject();
+
+ for (final VectorWrapper<?> v : container) {
+ v.getValueVector().getMutator().setValueCount(outputRecords);
+ }
+ container.setRecordCount(outputRecords);
+
+ batchMemoryManager.updateOutgoingStats(outputRecords);
+ if (logger.isDebugEnabled()) {
+ logger.debug("BATCH_STATS, outgoing: {}", new RecordBatchSizer(this));
}
- return IterOutcome.OK;
+ /* We are here because of one the following
+ * 1. Completed processing of all the records and we are done
+ * 2. We've filled up the outgoing batch to the maximum and we need to return upstream
+ * Either case build the output container's schema and return
+ */
+ if (outputRecords > 0 || state == BatchState.FIRST) {
+ if (state == BatchState.FIRST) {
+ state = BatchState.NOT_FIRST;
+ }
+
+ return IterOutcome.OK;
+ }
}
// Free all partitions' in-memory data structures
// (In case need to start processing spilled partitions)
- for ( HashPartition partn : partitions ) {
+ for (HashPartition partn : partitions) {
partn.cleanup(false); // clean, but do not delete the spill files !!
}
//
// (recursively) Handle the spilled partitions, if any
//
- if ( !buildSideIsEmpty && !spilledPartitionsList.isEmpty()) {
+ if (!buildSideIsEmpty.booleanValue() && !spilledPartitionsList.isEmpty()) {
// Get the next (previously) spilled partition to handle as incoming
HJSpilledPartition currSp = spilledPartitionsList.remove(0);
// Create a BUILD-side "incoming" out of the inner spill file of that partition
- buildBatch = new SpilledRecordbatch(currSp.innerSpillFile, currSp.innerSpilledBatches, context, rightSchema, oContext, spillSet);
+ buildBatch = new SpilledRecordbatch(currSp.innerSpillFile, currSp.innerSpilledBatches, context, buildSchema, oContext, spillSet);
// The above ctor call also got the first batch; need to update the outcome
rightUpstream = ((SpilledRecordbatch) buildBatch).getInitialOutcome();
- if ( currSp.outerSpilledBatches > 0 ) {
+ if (currSp.outerSpilledBatches > 0) {
// Create a PROBE-side "incoming" out of the outer spill file of that partition
probeBatch = new SpilledRecordbatch(currSp.outerSpillFile, currSp.outerSpilledBatches, context, probeSchema, oContext, spillSet);
// The above ctor call also got the first batch; need to update the outcome
@@ -644,13 +676,14 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
buildBatch,
probeBatch,
buildJoinColumns,
+ leftUpstream == IterOutcome.NONE, // probeEmpty
allocator.getLimit(),
+ maxIncomingBatchSize,
numPartitions,
RECORDS_PER_BATCH,
RECORDS_PER_BATCH,
maxBatchSize,
maxBatchSize,
- batchMemoryManager.getOutputRowCount(),
batchMemoryManager.getOutputBatchSize(),
HashTable.DEFAULT_LOAD_FACTOR);
@@ -689,12 +722,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
* Execute the BUILD phase; first read incoming and split rows into partitions;
* may decide to spill some of the partitions
*
+ * @return Returns an {@link org.apache.drill.exec.record.RecordBatch.IterOutcome} if a termination condition is reached. Otherwise returns null.
* @throws SchemaChangeException
*/
- public void executeBuildPhase() throws SchemaChangeException {
- if (rightUpstream == IterOutcome.NONE) {
+ public IterOutcome executeBuildPhase() throws SchemaChangeException {
+ if (buildSideIsEmpty.booleanValue()) {
// empty right
- return;
+ return null;
}
HashJoinMemoryCalculator.BuildSidePartitioning buildCalc;
@@ -716,13 +750,14 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
buildBatch,
probeBatch,
buildJoinColumns,
+ leftUpstream == IterOutcome.NONE, // probeEmpty
allocator.getLimit(),
+ maxIncomingBatchSize,
numPartitions,
RECORDS_PER_BATCH,
RECORDS_PER_BATCH,
maxBatchSize,
maxBatchSize,
- batchMemoryManager.getOutputRowCount(),
batchMemoryManager.getOutputBatchSize(),
HashTable.DEFAULT_LOAD_FACTOR);
@@ -754,8 +789,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
continue;
case OK_NEW_SCHEMA:
- if (!rightSchema.equals(buildBatch.getSchema())) {
- throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in build side.", rightSchema, buildBatch.getSchema());
+ if (!buildSchema.equals(buildBatch.getSchema())) {
+ throw SchemaChangeException.schemaChanged("Hash join does not support schema changes in build side.", buildSchema, buildBatch.getSchema());
}
for (HashPartition partn : partitions) { partn.updateBatches(); }
// Fall through
@@ -801,8 +836,16 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
}
}
+ prefetchFirstProbeBatch();
+
+ if (leftUpstream.isError()) {
+ // A termination condition was reached while prefetching the first build side data holding batch.
+ // We need to terminate.
+ return leftUpstream;
+ }
+
HashJoinMemoryCalculator.PostBuildCalculations postBuildCalc = buildCalc.next();
- postBuildCalc.initialize();
+ postBuildCalc.initialize(probeSideIsEmpty.booleanValue()); // probeEmpty
//
// Traverse all the in-memory partitions' incoming batches, and build their hash tables
@@ -849,14 +892,18 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
spilledInners[partn.getPartitionNum()] = sp; // for the outer to find the SP later
partn.closeWriter();
+
+ partn.updateProbeRecordsPerBatch(postBuildCalc.getProbeRecordsPerBatch());
}
}
+
+ return null;
}
private void setupOutputContainerSchema() {
- if (rightSchema != null) {
- for (final MaterializedField field : rightSchema) {
+ if (buildSchema != null) {
+ for (final MaterializedField field : buildSchema) {
final MajorType inputType = field.getType();
final MajorType outputType;
// If left or full outer join, then the output type must be nullable. However, map types are
@@ -938,6 +985,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
this.allocator = oContext.getAllocator();
+ maxIncomingBatchSize = context.getOptions().getLong(ExecConstants.OUTPUT_BATCH_SIZE);
numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHJOIN_NUM_PARTITIONS_VALIDATOR);
if ( numPartitions == 1 ) { //
disableSpilling("Spilling is disabled due to configuration setting of num_partitions to 1");
@@ -976,7 +1024,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
* spillSet.
*/
private void cleanup() {
- if ( buildSideIsEmpty ) { return; } // not set up; nothing to clean
+ if ( buildSideIsEmpty.booleanValue() ) { return; } // not set up; nothing to clean
if ( spillSet.getWriteBytes() > 0 ) {
stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
(int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
@@ -1027,7 +1075,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
* written is updated at close time in {@link #cleanup()}.
*/
private void updateStats() {
- if ( buildSideIsEmpty ) { return; } // no stats when the right side is empty
+ if ( buildSideIsEmpty.booleanValue() ) { return; } // no stats when the right side is empty
if ( cycleNum > 0 ) { return; } // These stats are only for before processing spilled files
final HashTableStats htStats = new HashTableStats();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java
index fb087a0fd..af6be8bfe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMechanicalMemoryCalculator.java
@@ -59,6 +59,7 @@ public class HashJoinMechanicalMemoryCalculator implements HashJoinMemoryCalcula
private int initialPartitions;
private PartitionStatSet partitionStatSet;
+ private int recordsPerPartitionBatchProbe;
public MechanicalBuildSidePartitioning(int maxNumInMemBatches) {
this.maxNumInMemBatches = maxNumInMemBatches;
@@ -70,16 +71,18 @@ public class HashJoinMechanicalMemoryCalculator implements HashJoinMemoryCalcula
RecordBatch buildSideBatch,
RecordBatch probeSideBatch,
Set<String> joinColumns,
+ boolean probeEmpty,
long memoryAvailable,
+ long maxIncomingBatchSize,
int initialPartitions,
int recordsPerPartitionBatchBuild,
int recordsPerPartitionBatchProbe,
int maxBatchNumRecordsBuild,
int maxBatchNumRecordsProbe,
- int outputBatchNumRecords,
int outputBatchSize,
double loadFactor) {
this.initialPartitions = initialPartitions;
+ this.recordsPerPartitionBatchProbe = recordsPerPartitionBatchProbe;
}
@Override
@@ -115,7 +118,7 @@ public class HashJoinMechanicalMemoryCalculator implements HashJoinMemoryCalcula
@Nullable
@Override
public PostBuildCalculations next() {
- return new MechanicalPostBuildCalculations(maxNumInMemBatches, partitionStatSet);
+ return new MechanicalPostBuildCalculations(maxNumInMemBatches, partitionStatSet, recordsPerPartitionBatchProbe);
}
@Override
@@ -127,16 +130,23 @@ public class HashJoinMechanicalMemoryCalculator implements HashJoinMemoryCalcula
public static class MechanicalPostBuildCalculations implements PostBuildCalculations {
private final int maxNumInMemBatches;
private final PartitionStatSet partitionStatSet;
+ private final int recordsPerPartitionBatchProbe;
- public MechanicalPostBuildCalculations(int maxNumInMemBatches,
- PartitionStatSet partitionStatSet) {
+ public MechanicalPostBuildCalculations(final int maxNumInMemBatches,
+ final PartitionStatSet partitionStatSet,
+ final int recordsPerPartitionBatchProbe) {
this.maxNumInMemBatches = maxNumInMemBatches;
this.partitionStatSet = Preconditions.checkNotNull(partitionStatSet);
+ this.recordsPerPartitionBatchProbe = recordsPerPartitionBatchProbe;
}
@Override
- public void initialize() {
- // Do nothing
+ public void initialize(boolean probeEmty) {
+ }
+
+ @Override
+ public int getProbeRecordsPerBatch() {
+ return recordsPerPartitionBatchProbe;
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
index 868fbfd10..0ccd912d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculator.java
@@ -34,7 +34,7 @@ import java.util.Set;
* different memory calculations at each phase. The phases of execution have been broken down
* into an explicit state machine diagram below. What ocurrs in each state is described in
* the documentation of the {@link HashJoinState} class below. <b>Note:</b> the transition from Probing
- * and Partitioning back to Build Side Partitioning. This happens we had to spill probe side
+ * and Partitioning back to Build Side Partitioning. This happens when we had to spill probe side
* partitions and we needed to recursively process spilled partitions. This recursion is
* described in more detail in the example below.
* </p>
@@ -86,6 +86,14 @@ public interface HashJoinMemoryCalculator extends HashJoinStateCalculator<HashJo
/**
* The interface representing the {@link HashJoinStateCalculator} corresponding to the
* {@link HashJoinState#BUILD_SIDE_PARTITIONING} state.
+ *
+ * <h4>Invariants</h4>
+ * <ul>
+ * <li>
+ * This calculator will only be used when there is build side data. If there is no build side data, the caller
+ * should not invoke this calculator.
+ * </li>
+ * </ul>
*/
interface BuildSidePartitioning extends HashJoinStateCalculator<PostBuildCalculations> {
void initialize(boolean autoTune,
@@ -93,13 +101,14 @@ public interface HashJoinMemoryCalculator extends HashJoinStateCalculator<HashJo
RecordBatch buildSideBatch,
RecordBatch probeSideBatch,
Set<String> joinColumns,
+ boolean probeEmpty,
long memoryAvailable,
+ long maxIncomingBatchSize,
int initialPartitions,
int recordsPerPartitionBatchBuild,
int recordsPerPartitionBatchProbe,
int maxBatchNumRecordsBuild,
int maxBatchNumRecordsProbe,
- int outputBatchNumRecords,
int outputBatchSize,
double loadFactor);
@@ -121,7 +130,13 @@ public interface HashJoinMemoryCalculator extends HashJoinStateCalculator<HashJo
* {@link HashJoinState#POST_BUILD_CALCULATIONS} state.
*/
interface PostBuildCalculations extends HashJoinStateCalculator<HashJoinMemoryCalculator> {
- void initialize();
+ /**
+ * Initializes the calculator with additional information needed.
+ * @param probeEmty True if the probe is empty. False otherwise.
+ */
+ void initialize(boolean probeEmty);
+
+ int getProbeRecordsPerBatch();
boolean shouldSpill();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
index 37f33295e..a351cbcaf 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinMemoryCalculatorImpl.java
@@ -73,7 +73,9 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
throw new IllegalArgumentException("Invalid calc type: " + hashTableCalculatorType);
}
- return new BuildSidePartitioningImpl(hashTableSizeCalculator,
+ return new BuildSidePartitioningImpl(
+ BatchSizePredictorImpl.Factory.INSTANCE,
+ hashTableSizeCalculator,
HashJoinHelperSizeCalculatorImpl.INSTANCE,
fragmentationFactor, safetyFactor);
} else {
@@ -86,65 +88,28 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
return INITIALIZING;
}
- public static long computeMaxBatchSizeNoHash(final long incomingBatchSize,
- final int incomingNumRecords,
- final int desiredNumRecords,
- final double fragmentationFactor,
- final double safetyFactor) {
- long maxBatchSize = HashJoinMemoryCalculatorImpl
- .computePartitionBatchSize(incomingBatchSize, incomingNumRecords, desiredNumRecords);
- // Multiple by fragmentation factor
- return RecordBatchSizer.multiplyByFactors(maxBatchSize, fragmentationFactor, safetyFactor);
- }
-
- public static long computeMaxBatchSize(final long incomingBatchSize,
- final int incomingNumRecords,
- final int desiredNumRecords,
- final double fragmentationFactor,
- final double safetyFactor,
- final boolean reserveHash) {
- long size = computeMaxBatchSizeNoHash(incomingBatchSize,
- incomingNumRecords,
- desiredNumRecords,
- fragmentationFactor,
- safetyFactor);
-
- if (!reserveHash) {
- return size;
- }
-
- long hashSize = desiredNumRecords * ((long) IntVector.VALUE_WIDTH);
- hashSize = RecordBatchSizer.multiplyByFactors(hashSize, fragmentationFactor);
-
- return size + hashSize;
- }
-
- public static long computePartitionBatchSize(final long incomingBatchSize,
- final int incomingNumRecords,
- final int desiredNumRecords) {
- return (long) Math.ceil((((double) incomingBatchSize) /
- ((double) incomingNumRecords)) *
- ((double) desiredNumRecords));
- }
-
public static class NoopBuildSidePartitioningImpl implements BuildSidePartitioning {
private int initialPartitions;
+ private int recordsPerPartitionBatchProbe;
@Override
public void initialize(boolean autoTune,
boolean reserveHash,
RecordBatch buildSideBatch,
- RecordBatch probeSideBatch, Set<String> joinColumns,
+ RecordBatch probeSideBatch,
+ Set<String> joinColumns,
+ boolean probeEmpty,
long memoryAvailable,
+ long maxIncomingBatchSize,
int initialPartitions,
int recordsPerPartitionBatchBuild,
int recordsPerPartitionBatchProbe,
int maxBatchNumRecordsBuild,
int maxBatchNumRecordsProbe,
- int outputBatchNumRecords,
int outputBatchSize,
double loadFactor) {
this.initialPartitions = initialPartitions;
+ this.recordsPerPartitionBatchProbe = recordsPerPartitionBatchProbe;
}
@Override
@@ -180,7 +145,7 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
@Nullable
@Override
public PostBuildCalculations next() {
- return new NoopPostBuildCalculationsImpl();
+ return new NoopPostBuildCalculationsImpl(recordsPerPartitionBatchProbe);
}
@Override
@@ -204,7 +169,7 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
* <h1>Life Cycle</h1>
* <p>
* <ul>
- * <li><b>Step 0:</b> Call {@link #initialize(boolean, boolean, RecordBatch, RecordBatch, Set, long, int, int, int, int, int, int, int, double)}.
+ * <li><b>Step 0:</b> Call {@link #initialize(boolean, boolean, RecordBatch, RecordBatch, Set, boolean, long, long, int, int, int, int, int, int, double)}.
* This will initialize the StateCalculate with the additional information it needs.</li>
* <li><b>Step 1:</b> Call {@link #getNumPartitions()} to see the number of partitions that fit in memory.</li>
* <li><b>Step 2:</b> Call {@link #shouldSpill()} To determine if spilling needs to occurr.</li>
@@ -215,6 +180,7 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
public static class BuildSidePartitioningImpl implements BuildSidePartitioning {
public static final Logger log = LoggerFactory.getLogger(BuildSidePartitioning.class);
+ private final BatchSizePredictor.Factory batchSizePredictorFactory;
private final HashTableSizeCalculator hashTableSizeCalculator;
private final HashJoinHelperSizeCalculator hashJoinHelperSizeCalculator;
private final double fragmentationFactor;
@@ -223,10 +189,8 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
private int maxBatchNumRecordsBuild;
private int maxBatchNumRecordsProbe;
private long memoryAvailable;
- private long buildBatchSize;
- private long probeBatchSize;
- private int buildNumRecords;
- private int probeNumRecords;
+ private boolean probeEmpty;
+ private long maxIncomingBatchSize;
private long maxBuildBatchSize;
private long maxProbeBatchSize;
private long maxOutputBatchSize;
@@ -246,13 +210,17 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
private long reservedMemory;
private long maxReservedMemory;
+ private BatchSizePredictor buildSizePredictor;
+ private BatchSizePredictor probeSizePredictor;
private boolean firstInitialized;
private boolean initialized;
- public BuildSidePartitioningImpl(final HashTableSizeCalculator hashTableSizeCalculator,
+ public BuildSidePartitioningImpl(final BatchSizePredictor.Factory batchSizePredictorFactory,
+ final HashTableSizeCalculator hashTableSizeCalculator,
final HashJoinHelperSizeCalculator hashJoinHelperSizeCalculator,
final double fragmentationFactor,
final double safetyFactor) {
+ this.batchSizePredictorFactory = Preconditions.checkNotNull(batchSizePredictorFactory);
this.hashTableSizeCalculator = Preconditions.checkNotNull(hashTableSizeCalculator);
this.hashJoinHelperSizeCalculator = Preconditions.checkNotNull(hashJoinHelperSizeCalculator);
this.fragmentationFactor = fragmentationFactor;
@@ -262,35 +230,33 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
@Override
public void initialize(boolean autoTune,
boolean reserveHash,
- RecordBatch buildSideBatch,
- RecordBatch probeSideBatch,
+ RecordBatch buildBatch,
+ RecordBatch probeBatch,
Set<String> joinColumns,
+ boolean probeEmpty,
long memoryAvailable,
+ long maxIncomingBatchSize,
int initialPartitions,
int recordsPerPartitionBatchBuild,
int recordsPerPartitionBatchProbe,
int maxBatchNumRecordsBuild,
int maxBatchNumRecordsProbe,
- int outputBatchNumRecords,
int outputBatchSize,
double loadFactor) {
- Preconditions.checkNotNull(buildSideBatch);
- Preconditions.checkNotNull(probeSideBatch);
+ Preconditions.checkNotNull(probeBatch);
+ Preconditions.checkNotNull(buildBatch);
Preconditions.checkNotNull(joinColumns);
- final RecordBatchSizer buildSizer = new RecordBatchSizer(buildSideBatch);
- final RecordBatchSizer probeSizer = new RecordBatchSizer(probeSideBatch);
+ final BatchSizePredictor buildSizePredictor =
+ batchSizePredictorFactory.create(buildBatch, fragmentationFactor, safetyFactor);
+ final BatchSizePredictor probeSizePredictor =
+ batchSizePredictorFactory.create(probeBatch, fragmentationFactor, safetyFactor);
- long buildBatchSize = getBatchSizeEstimate(buildSideBatch);
- long probeBatchSize = getBatchSizeEstimate(probeSideBatch);
+ buildSizePredictor.updateStats();
+ probeSizePredictor.updateStats();
- int buildNumRecords = buildSizer.rowCount();
- int probeNumRecords = probeSizer.rowCount();
+ final RecordBatchSizer buildSizer = new RecordBatchSizer(buildBatch);
- final CaseInsensitiveMap<Long> buildValueSizes = getNotExcludedColumnSizes(
- joinColumns, buildSizer);
- final CaseInsensitiveMap<Long> probeValueSizes = getNotExcludedColumnSizes(
- joinColumns, probeSizer);
final CaseInsensitiveMap<Long> keySizes = CaseInsensitiveMap.newHashMap();
for (String joinColumn: joinColumns) {
@@ -302,11 +268,11 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
reserveHash,
keySizes,
memoryAvailable,
+ maxIncomingBatchSize,
initialPartitions,
- buildBatchSize,
- probeBatchSize,
- buildNumRecords,
- probeNumRecords,
+ probeEmpty,
+ buildSizePredictor,
+ probeSizePredictor,
recordsPerPartitionBatchBuild,
recordsPerPartitionBatchProbe,
maxBatchNumRecordsBuild,
@@ -316,47 +282,15 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
}
@VisibleForTesting
- protected static CaseInsensitiveMap<Long> getNotExcludedColumnSizes(
- final Set<String> excludedColumns,
- final RecordBatchSizer batchSizer) {
- final CaseInsensitiveMap<Long> columnSizes = CaseInsensitiveMap.newHashMap();
- final CaseInsensitiveMap<Boolean> excludedSet = CaseInsensitiveMap.newHashMap();
-
- for (final String excludedColumn: excludedColumns) {
- excludedSet.put(excludedColumn, true);
- }
-
- for (final Map.Entry<String, RecordBatchSizer.ColumnSize> entry: batchSizer.columns().entrySet()) {
- final String columnName = entry.getKey();
- final RecordBatchSizer.ColumnSize columnSize = entry.getValue();
-
- columnSizes.put(columnName, (long) columnSize.getStdNetOrNetSizePerEntry());
- }
-
- return columnSizes;
- }
-
- public static long getBatchSizeEstimate(final RecordBatch recordBatch) {
- final RecordBatchSizer sizer = new RecordBatchSizer(recordBatch);
- long size = 0L;
-
- for (Map.Entry<String, RecordBatchSizer.ColumnSize> column: sizer.columns().entrySet()) {
- size += PostBuildCalculationsImpl.computeValueVectorSize(recordBatch.getRecordCount(), column.getValue().getStdNetOrNetSizePerEntry());
- }
-
- return size;
- }
-
- @VisibleForTesting
protected void initialize(boolean autoTune,
boolean reserveHash,
CaseInsensitiveMap<Long> keySizes,
long memoryAvailable,
+ long maxIncomingBatchSize,
int initialPartitions,
- long buildBatchSize,
- long probeBatchSize,
- int buildNumRecords,
- int probeNumRecords,
+ boolean probeEmpty,
+ BatchSizePredictor buildSizePredictor,
+ BatchSizePredictor probeSizePredictor,
int recordsPerPartitionBatchBuild,
int recordsPerPartitionBatchProbe,
int maxBatchNumRecordsBuild,
@@ -365,6 +299,9 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
double loadFactor) {
Preconditions.checkState(!firstInitialized);
Preconditions.checkArgument(initialPartitions >= 1);
+ // If we had probe data before there should still be probe data now.
+ // If we didn't have probe data before we could get some new data now.
+ Preconditions.checkState(!(probeEmpty && probeSizePredictor.hadDataLastTime()));
firstInitialized = true;
this.loadFactor = loadFactor;
@@ -372,10 +309,10 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
this.reserveHash = reserveHash;
this.keySizes = Preconditions.checkNotNull(keySizes);
this.memoryAvailable = memoryAvailable;
- this.buildBatchSize = buildBatchSize;
- this.probeBatchSize = probeBatchSize;
- this.buildNumRecords = buildNumRecords;
- this.probeNumRecords = probeNumRecords;
+ this.probeEmpty = probeEmpty;
+ this.maxIncomingBatchSize = maxIncomingBatchSize;
+ this.buildSizePredictor = buildSizePredictor;
+ this.probeSizePredictor = probeSizePredictor;
this.initialPartitions = initialPartitions;
this.recordsPerPartitionBatchBuild = recordsPerPartitionBatchBuild;
this.recordsPerPartitionBatchProbe = recordsPerPartitionBatchProbe;
@@ -420,31 +357,32 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
private void calculateMemoryUsage()
{
// Adjust based on number of records
- maxBuildBatchSize = computeMaxBatchSizeNoHash(buildBatchSize, buildNumRecords,
- maxBatchNumRecordsBuild, fragmentationFactor, safetyFactor);
- maxProbeBatchSize = computeMaxBatchSizeNoHash(probeBatchSize, probeNumRecords,
- maxBatchNumRecordsProbe, fragmentationFactor, safetyFactor);
-
- // Safety factor can be multiplied at the end since these batches are coming from exchange operators, so no excess value vector doubling
- partitionBuildBatchSize = computeMaxBatchSize(buildBatchSize,
- buildNumRecords,
- recordsPerPartitionBatchBuild,
- fragmentationFactor,
- safetyFactor,
- reserveHash);
+ maxBuildBatchSize = buildSizePredictor.predictBatchSize(maxBatchNumRecordsBuild, false);
- // Safety factor can be multiplied at the end since these batches are coming from exchange operators, so no excess value vector doubling
- partitionProbeBatchSize = computeMaxBatchSize(
- probeBatchSize,
- probeNumRecords,
- recordsPerPartitionBatchProbe,
- fragmentationFactor,
- safetyFactor,
- reserveHash);
+ if (probeSizePredictor.hadDataLastTime()) {
+ // We have probe data and we can compute the max incoming size.
+ maxProbeBatchSize = probeSizePredictor.predictBatchSize(maxBatchNumRecordsProbe, false);
+ } else {
+ // We don't have probe data
+ if (probeEmpty) {
+ // We know the probe has no data, so we don't need to reserve any space for the incoming probe
+ maxProbeBatchSize = 0;
+ } else {
+ // The probe side may have data, so assume it is the max incoming batch size. This assumption
+ // can fail in some cases since the batch sizing project is incomplete.
+ maxProbeBatchSize = maxIncomingBatchSize;
+ }
+ }
+
+ partitionBuildBatchSize = buildSizePredictor.predictBatchSize(recordsPerPartitionBatchBuild, reserveHash);
+
+ if (probeSizePredictor.hadDataLastTime()) {
+ partitionProbeBatchSize = probeSizePredictor.predictBatchSize(recordsPerPartitionBatchProbe, reserveHash);
+ }
maxOutputBatchSize = (long) ((double)outputBatchSize * fragmentationFactor * safetyFactor);
- long probeReservedMemory;
+ long probeReservedMemory = 0;
for (partitions = initialPartitions;; partitions /= 2) {
// The total amount of memory to reserve for incomplete batches across all partitions
@@ -455,13 +393,19 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
// they will have a well defined size.
reservedMemory = incompletePartitionsBatchSizes + maxBuildBatchSize + maxProbeBatchSize;
- probeReservedMemory = PostBuildCalculationsImpl.calculateReservedMemory(
- partitions,
- maxProbeBatchSize,
- maxOutputBatchSize,
- partitionProbeBatchSize);
+ if (probeSizePredictor.hadDataLastTime()) {
+ // If we have probe data, use it in our memory reservation calculations.
+ probeReservedMemory = PostBuildCalculationsImpl.calculateReservedMemory(
+ partitions,
+ maxProbeBatchSize,
+ maxOutputBatchSize,
+ partitionProbeBatchSize);
- maxReservedMemory = Math.max(reservedMemory, probeReservedMemory);
+ maxReservedMemory = Math.max(reservedMemory, probeReservedMemory);
+ } else {
+ // If we do not have probe data, do our best effort at estimating the number of partitions without it.
+ maxReservedMemory = reservedMemory;
+ }
if (!autoTune || maxReservedMemory <= memoryAvailable) {
// Stop the tuning loop if we are not doing auto tuning, or if we are living within our memory limit
@@ -488,19 +432,19 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
"partitionProbeBatchSize = %d\n" +
"recordsPerPartitionBatchProbe = %d\n",
reservedMemory, memoryAvailable, partitions, initialPartitions,
- buildBatchSize,
- buildNumRecords,
+ buildSizePredictor.getBatchSize(),
+ buildSizePredictor.getNumRecords(),
partitionBuildBatchSize,
recordsPerPartitionBatchBuild,
- probeBatchSize,
- probeNumRecords,
+ probeSizePredictor.getBatchSize(),
+ probeSizePredictor.getNumRecords(),
partitionProbeBatchSize,
recordsPerPartitionBatchProbe);
String phase = "Probe phase: ";
if (reservedMemory > memoryAvailable) {
- if (probeReservedMemory > memoryAvailable) {
+ if (probeSizePredictor.hadDataLastTime() && probeReservedMemory > memoryAvailable) {
phase = "Build and Probe phases: ";
} else {
phase = "Build phase: ";
@@ -531,10 +475,12 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
public PostBuildCalculations next() {
Preconditions.checkState(initialized);
- return new PostBuildCalculationsImpl(memoryAvailable,
- partitionProbeBatchSize,
- maxProbeBatchSize,
+ return new PostBuildCalculationsImpl(
+ probeSizePredictor,
+ memoryAvailable,
maxOutputBatchSize,
+ maxBatchNumRecordsProbe,
+ recordsPerPartitionBatchProbe,
partitionStatsSet,
keySizes,
hashTableSizeCalculator,
@@ -572,9 +518,19 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
}
public static class NoopPostBuildCalculationsImpl implements PostBuildCalculations {
+ private final int recordsPerPartitionBatchProbe;
+
+ public NoopPostBuildCalculationsImpl(final int recordsPerPartitionBatchProbe) {
+ this.recordsPerPartitionBatchProbe = recordsPerPartitionBatchProbe;
+ }
+
@Override
- public void initialize() {
+ public void initialize(boolean hasProbeData) {
+ }
+ @Override
+ public int getProbeRecordsPerBatch() {
+ return recordsPerPartitionBatchProbe;
}
@Override
@@ -610,7 +566,7 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
* <h1>Lifecycle</h1>
* <p>
* <ul>
- * <li><b>Step 1:</b> Call {@link #initialize()}. This
+ * <li><b>Step 1:</b> Call {@link #initialize(boolean)}. This
* gives the {@link HashJoinStateCalculator} additional information it needs to compute memory requirements.</li>
* <li><b>Step 2:</b> Call {@link #shouldSpill()}. This tells
* you which build side partitions need to be spilled in order to make room for probing.</li>
@@ -620,10 +576,15 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
* </p>
*/
public static class PostBuildCalculationsImpl implements PostBuildCalculations {
+ private static final Logger log = LoggerFactory.getLogger(PostBuildCalculationsImpl.class);
+
+ public static final int MIN_RECORDS_PER_PARTITION_BATCH_PROBE = 10;
+
+ private final BatchSizePredictor probeSizePredictor;
private final long memoryAvailable;
- private final long partitionProbeBatchSize;
- private final long maxProbeBatchSize;
private final long maxOutputBatchSize;
+ private final int maxBatchNumRecordsProbe;
+ private final int recordsPerPartitionBatchProbe;
private final PartitionStatSet buildPartitionStatSet;
private final Map<String, Long> keySizes;
private final HashTableSizeCalculator hashTableSizeCalculator;
@@ -632,26 +593,30 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
private final double safetyFactor;
private final double loadFactor;
private final boolean reserveHash;
- // private final long maxOutputBatchSize;
private boolean initialized;
private long consumedMemory;
+ private boolean probeEmpty;
+ private long maxProbeBatchSize;
+ private long partitionProbeBatchSize;
+ private int computedProbeRecordsPerBatch;
- public PostBuildCalculationsImpl(final long memoryAvailable,
- final long partitionProbeBatchSize,
- final long maxProbeBatchSize,
- final long maxOutputBatchSize,
- final PartitionStatSet buildPartitionStatSet,
- final Map<String, Long> keySizes,
- final HashTableSizeCalculator hashTableSizeCalculator,
- final HashJoinHelperSizeCalculator hashJoinHelperSizeCalculator,
- final double fragmentationFactor,
- final double safetyFactor,
- final double loadFactor,
- final boolean reserveHash) {
+ @VisibleForTesting
+ public PostBuildCalculationsImpl(final BatchSizePredictor probeSizePredictor,
+ final long memoryAvailable,
+ final long maxOutputBatchSize,
+ final int maxBatchNumRecordsProbe,
+ final int recordsPerPartitionBatchProbe,
+ final PartitionStatSet buildPartitionStatSet,
+ final Map<String, Long> keySizes,
+ final HashTableSizeCalculator hashTableSizeCalculator,
+ final HashJoinHelperSizeCalculator hashJoinHelperSizeCalculator,
+ final double fragmentationFactor,
+ final double safetyFactor,
+ final double loadFactor,
+ final boolean reserveHash) {
+ this.probeSizePredictor = Preconditions.checkNotNull(probeSizePredictor);
this.memoryAvailable = memoryAvailable;
- this.partitionProbeBatchSize = partitionProbeBatchSize;
- this.maxProbeBatchSize = maxProbeBatchSize;
this.maxOutputBatchSize = maxOutputBatchSize;
this.buildPartitionStatSet = Preconditions.checkNotNull(buildPartitionStatSet);
this.keySizes = Preconditions.checkNotNull(keySizes);
@@ -661,38 +626,100 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
this.safetyFactor = safetyFactor;
this.loadFactor = loadFactor;
this.reserveHash = reserveHash;
+ this.maxBatchNumRecordsProbe = maxBatchNumRecordsProbe;
+ this.recordsPerPartitionBatchProbe = recordsPerPartitionBatchProbe;
+ this.computedProbeRecordsPerBatch = recordsPerPartitionBatchProbe;
}
- // TODO take an incoming Probe RecordBatch
@Override
- public void initialize() {
+ public void initialize(boolean probeEmpty) {
Preconditions.checkState(!initialized);
+ // If we had probe data before there should still be probe data now.
+ // If we didn't have probe data before we could get some new data now.
+ Preconditions.checkState(probeSizePredictor.hadDataLastTime() && !probeEmpty || !probeSizePredictor.hadDataLastTime());
initialized = true;
+ this.probeEmpty = probeEmpty;
+
+ if (probeEmpty) {
+ // We know there is no probe side data, so we don't need to calculate anything.
+ return;
+ }
+
+ // We need to compute sizes of probe side data.
+ if (!probeSizePredictor.hadDataLastTime()) {
+ probeSizePredictor.updateStats();
+ }
+
+ maxProbeBatchSize = probeSizePredictor.predictBatchSize(maxBatchNumRecordsProbe, false);
+ partitionProbeBatchSize = probeSizePredictor.predictBatchSize(recordsPerPartitionBatchProbe, reserveHash);
+
+ long worstCaseProbeMemory = calculateReservedMemory(
+ buildPartitionStatSet.getSize(),
+ maxProbeBatchSize,
+ maxOutputBatchSize,
+ partitionProbeBatchSize);
+
+ if (worstCaseProbeMemory > memoryAvailable) {
+ // We don't have enough memory for the probe data if all the partitions are spilled, we need to adjust the records
+ // per probe partition batch in order to make this work.
+
+ computedProbeRecordsPerBatch = computeProbeRecordsPerBatch(memoryAvailable,
+ buildPartitionStatSet.getSize(),
+ recordsPerPartitionBatchProbe,
+ MIN_RECORDS_PER_PARTITION_BATCH_PROBE,
+ maxProbeBatchSize,
+ maxOutputBatchSize,
+ partitionProbeBatchSize);
+
+ partitionProbeBatchSize = probeSizePredictor.predictBatchSize(computedProbeRecordsPerBatch, reserveHash);
+ }
}
- public long getConsumedMemory() {
+ @Override
+ public int getProbeRecordsPerBatch() {
Preconditions.checkState(initialized);
- return consumedMemory;
+ return computedProbeRecordsPerBatch;
}
- // TODO move this somewhere else that makes sense
- public static long computeValueVectorSize(long numRecords, long byteSize)
- {
- long naiveSize = numRecords * byteSize;
- return roundUpToPowerOf2(naiveSize);
+ @VisibleForTesting
+ public long getMaxProbeBatchSize() {
+ return maxProbeBatchSize;
}
- public static long computeValueVectorSize(long numRecords, long byteSize, double safetyFactor)
- {
- long naiveSize = RecordBatchSizer.multiplyByFactor(numRecords * byteSize, safetyFactor);
- return roundUpToPowerOf2(naiveSize);
+ @VisibleForTesting
+ public long getPartitionProbeBatchSize() {
+ return partitionProbeBatchSize;
}
- // TODO move to drill common
- public static long roundUpToPowerOf2(long num)
- {
- Preconditions.checkArgument(num >= 1);
- return num == 1 ? 1 : Long.highestOneBit(num - 1) << 1;
+ public long getConsumedMemory() {
+ Preconditions.checkState(initialized);
+ return consumedMemory;
+ }
+
+ public static int computeProbeRecordsPerBatch(final long memoryAvailable,
+ final int numPartitions,
+ final int defaultProbeRecordsPerBatch,
+ final int minProbeRecordsPerBatch,
+ final long maxProbeBatchSize,
+ final long maxOutputBatchSize,
+ final long defaultPartitionProbeBatchSize) {
+ long memoryForPartitionBatches = memoryAvailable - maxProbeBatchSize - maxOutputBatchSize;
+
+ if (memoryForPartitionBatches < 0) {
+ // We just don't have enough memory. We should do our best though by using the minimum batch size.
+ log.warn("Not enough memory for probing:\n" +
+ "Memory available: {}\n" +
+ "Max probe batch size: {}\n" +
+ "Max output batch size: {}",
+ memoryAvailable,
+ maxProbeBatchSize,
+ maxOutputBatchSize);
+ return minProbeRecordsPerBatch;
+ }
+
+ long memoryForPartitionBatch = (memoryForPartitionBatches + numPartitions - 1) / numPartitions;
+ long scaleFactor = (defaultPartitionProbeBatchSize + memoryForPartitionBatch - 1) / memoryForPartitionBatch;
+ return Math.max((int) (defaultProbeRecordsPerBatch / scaleFactor), minProbeRecordsPerBatch);
}
public static long calculateReservedMemory(final int numSpilledPartitions,
@@ -710,6 +737,11 @@ public class HashJoinMemoryCalculatorImpl implements HashJoinMemoryCalculator {
public boolean shouldSpill() {
Preconditions.checkState(initialized);
+ if (probeEmpty) {
+ // If the probe is empty, we should not trigger any spills.
+ return false;
+ }
+
long reservedMemory = calculateReservedMemory(
buildPartitionStatSet.getNumSpilledPartitions(),
maxProbeBatchSize,
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorConservativeImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorConservativeImpl.java
index 85750210c..a366eeafc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorConservativeImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorConservativeImpl.java
@@ -23,7 +23,7 @@ import org.apache.drill.exec.vector.IntVector;
import java.util.Map;
-import static org.apache.drill.exec.physical.impl.join.HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.computeValueVectorSize;
+import static org.apache.drill.exec.physical.impl.join.BatchSizePredictorImpl.computeValueVectorSize;
public class HashTableSizeCalculatorConservativeImpl implements HashTableSizeCalculator {
public static final String TYPE = "CONSERVATIVE";
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorLeanImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorLeanImpl.java
index 4f9e5855e..265b0e337 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorLeanImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashTableSizeCalculatorLeanImpl.java
@@ -23,7 +23,7 @@ import org.apache.drill.exec.vector.IntVector;
import java.util.Map;
-import static org.apache.drill.exec.physical.impl.join.HashJoinMemoryCalculatorImpl.PostBuildCalculationsImpl.computeValueVectorSize;
+import static org.apache.drill.exec.physical.impl.join.BatchSizePredictorImpl.computeValueVectorSize;
public class HashTableSizeCalculatorLeanImpl implements HashTableSizeCalculator {
public static final String TYPE = "LEAN";