From 3b85694be4c37bb217366287c182d50ceadda4ab Mon Sep 17 00:00:00 2001 From: Ben-Zvi Date: Mon, 4 Mar 2019 21:05:32 -0800 Subject: DRILL-6707: Update target outgoing batch row count between current position and allocated size --- .../exec/physical/impl/join/HashJoinBatch.java | 2 +- .../physical/impl/join/HashJoinProbeTemplate.java | 3 +- .../exec/physical/impl/join/LateralJoinBatch.java | 4 +- .../exec/physical/impl/join/MergeJoinBatch.java | 13 ++-- .../physical/impl/join/NestedLoopJoinTemplate.java | 3 +- .../drill/exec/record/JoinBatchMemoryManager.java | 84 +++++++++++++++++++--- .../exec/record/RecordBatchMemoryManager.java | 36 ++++++---- 7 files changed, 110 insertions(+), 35 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 24fb1d8c3..429d15f4d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -587,7 +587,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch implem // Allocate the memory for the vectors in the output container batchMemoryManager.allocateVectors(container); - hashJoinProbe.setTargetOutputCount(batchMemoryManager.getOutputRowCount()); + hashJoinProbe.setTargetOutputCount(batchMemoryManager.getCurrentOutgoingMaxRowCount()); outputRecords = hashJoinProbe.probeAndProject(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java index c549143d5..2836794d8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java @@ -271,7 +271,8 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { probeBatch.getSchema()); } case OK: - setTargetOutputCount(outgoingJoinBatch.getBatchMemoryManager().update(probeBatch, LEFT_INDEX,outputRecords)); + outgoingJoinBatch.getBatchMemoryManager().update(probeBatch, LEFT_INDEX,outputRecords); + setTargetOutputCount(outgoingJoinBatch.getBatchMemoryManager().getCurrentOutgoingMaxRowCount()); // calculated by update() recordsToProcess = probeBatch.getRecordCount(); recordsProcessed = 0; // If we received an empty batch do nothing diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java index 735f11f36..c916c95da 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java @@ -1182,7 +1182,9 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch // For cases where all the previous input were consumed and send with previous output batch. But now we are building // a new output batch with new incoming then it will not cause any problem since outputIndex will be 0 - final int newOutputRowCount = batchMemoryManager.update(inputIndex, outputIndex); + batchMemoryManager.update(inputIndex, outputIndex); + final int newOutputRowCount = batchMemoryManager.getCurrentOutgoingMaxRowCount(); + if (isRecordBatchStatsLoggingEnabled()) { RecordBatchIOType type = inputIndex == LEFT_INDEX ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index d502c4f9d..36320ce2e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -124,7 +124,8 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch { */ @Override public void update(int inputIndex) { - status.setTargetOutputRowCount(super.update(inputIndex, status.getOutPosition())); + super.update(inputIndex, status.getOutPosition()); + status.setTargetOutputRowCount(super.getCurrentOutgoingMaxRowCount()); // calculated by update() RecordBatchIOType type = inputIndex == 0 ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT; RecordBatchStats.logRecordBatchStats(type, getRecordBatchSizer(inputIndex), getRecordBatchStatsContext()); } @@ -184,15 +185,13 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch { status.prepare(); // loop so we can start over again if we find a new batch was created. while (true) { + boolean isNewSchema = false; // Check result of last iteration. switch (status.getOutcome()) { - case BATCH_RETURNED: - allocateBatch(false); - status.resetOutputPos(); - status.setTargetOutputRowCount(batchMemoryManager.getOutputRowCount()); - break; case SCHEMA_CHANGED: - allocateBatch(true); + isNewSchema = true; + case BATCH_RETURNED: + allocateBatch(isNewSchema); status.resetOutputPos(); status.setTargetOutputRowCount(batchMemoryManager.getOutputRowCount()); break; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java index 3b8ab8d2e..95c6acd5e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java @@ -192,7 +192,8 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin { leftRecordCount = 0; break; case OK: - setTargetOutputCount(outgoing.getBatchMemoryManager().update(left, LEFT_INDEX,outputIndex)); + outgoing.getBatchMemoryManager().update(left, LEFT_INDEX,outputIndex); + setTargetOutputCount(outgoing.getBatchMemoryManager().getCurrentOutgoingMaxRowCount()); // calculated by update() RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT, outgoing.getBatchMemoryManager().getRecordBatchSizer(LEFT_INDEX), outgoing.getRecordBatchStatsContext()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java index 4344e1374..a06e2c35d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java @@ -40,7 +40,30 @@ public class JoinBatchMemoryManager extends RecordBatchMemoryManager { this.columnsToExclude = excludedColumns; } - private int updateInternal(int inputIndex, int outputPosition, boolean useAggregate) { + /** + * Update the memory manager parameters based on the new incoming batch + * + * Notice three (possibly) different "row counts" for the outgoing batches: + * + * 1. The rowCount that the current outgoing batch was allocated with (always a power of 2; e.g. 8192) + * 2. The new rowCount computed based on the newly seen input rows (always a power of 2); may be bigger than (1) if the + * new input rows are much smaller than before (e.g. 16384), or smaller (e.g. 4096) if the new rows are much wider. + * Subsequent outgoing batches would be allocated based on this (2) new rowCount. + * 3. The target rowCount for the current outgoing batch. While initially (1), it may be resized down if the new rows + * are getting bigger. In any case it won't be resized above (1) (to avoid IOOB) or below the current number of rows + * in that batch (i.e., outputPosition). (Need not be a power of two; e.g., 7983). + * + * After every call to update() while the outgoing batch is active, the current target should be updated with (3) by + * calling getCurrentOutgoingMaxRowCount() . + * + * Comment: The "power of 2" in the above (1) and (2) is actually "power of 2 minus 1" (e.g. 65535, or 8191) in order + * to avoid memory waste in case offset vectors are used (see DRILL-5446) + * + * @param inputIndex Left (0) or Right (1) + * @param outputPosition Position (i.e. number of inserted rows) in the current output batch + * @param useAggregate If true, compute using average row width (else based on allocated sizes) + */ + private void updateInternal(int inputIndex, int outputPosition, boolean useAggregate) { updateIncomingStats(inputIndex); rowWidth[inputIndex] = useAggregate ? (int) getAvgInputRowWidth(inputIndex) : getRecordBatchSizer(inputIndex).getRowAllocWidth(); @@ -60,7 +83,7 @@ public class JoinBatchMemoryManager extends RecordBatchMemoryManager { // This is possible for empty batches or // when first set of batches come with OK_NEW_SCHEMA and no data. if (newOutgoingRowWidth == 0 || newOutgoingRowWidth == getOutgoingRowWidth()) { - return getOutputRowCount(); + return; } // Adjust for the current batch. @@ -75,34 +98,73 @@ public class JoinBatchMemoryManager extends RecordBatchMemoryManager { // These are number of rows we can fit in remaining memory based on new outgoing row width. final int numOutputRowsRemaining = RecordBatchSizer.safeDivide(remainingMemory, newOutgoingRowWidth); + final int currentOutputBatchRowCount = getOutputRowCount(); + // update the value to be used for next batch(es) setOutputRowCount(configOutputBatchSize, newOutgoingRowWidth); // set the new row width setOutgoingRowWidth(newOutgoingRowWidth); - return adjustOutputRowCount(outputPosition + numOutputRowsRemaining); + int newOutputRowCount = getOutputRowCount(); + + if ( currentOutputBatchRowCount != newOutputRowCount ) { + logger.debug("Memory manager update changed the output row count from {} to {}",currentOutputBatchRowCount,newOutputRowCount); + } + + // The current outgoing batch target count (i.e., max number of rows to put there) is modified to be the current number of rows there + // plus as many of the future new rows that would fit in the remaining memory (e.g., if the new rows are wider, fewer would fit), but + // in any case no larger than the size the batch was allocated for (to avoid IOOB on the allocated vectors) + setCurrentOutgoingMaxRowCount(Math.min(currentOutputBatchRowCount, outputPosition + numOutputRowsRemaining )); } + /** + * Update the memory manager parameters based on the new incoming batch + * + * @param inputIndex Left (0) or Right (1) + * @param outputPosition Position (i.e. number of inserted rows) in the output batch + * @param useAggregate Compute using average row width (else based on allocated sizes) + */ @Override - public int update(int inputIndex, int outputPosition, boolean useAggregate) { + public void update(int inputIndex, int outputPosition, boolean useAggregate) { setRecordBatchSizer(inputIndex, new RecordBatchSizer(recordBatch[inputIndex])); - return updateInternal(inputIndex, outputPosition, useAggregate); + updateInternal(inputIndex, outputPosition, useAggregate); } + /** + * Update the memory manager parameters based on the new incoming batch (based on allocated sizes, not average row size) + * + * @param inputIndex Left (0) or Right (1) + * @param outputPosition Position (i.e. number of inserted rows) in the output batch + */ @Override - public int update(int inputIndex, int outputPosition) { - return update(inputIndex, outputPosition, false); + public void update(int inputIndex, int outputPosition) { + update(inputIndex, outputPosition, false); } + /** + * Update the memory manager parameters based on the given (incoming) batch + * + * @param batch Update based on the data in this batch + * @param inputIndex Left (0) or Right (1) + * @param outputPosition Position (i.e. number of inserted rows) in the output batch + * @param useAggregate Compute using average row width (else based on allocated sizes) + */ @Override - public int update(RecordBatch batch, int inputIndex, int outputPosition, boolean useAggregate) { + public void update(RecordBatch batch, int inputIndex, int outputPosition, boolean useAggregate) { setRecordBatchSizer(inputIndex, new RecordBatchSizer(batch)); - return updateInternal(inputIndex, outputPosition, useAggregate); + updateInternal(inputIndex, outputPosition, useAggregate); } + /** + * Update the memory manager parameters based on the given (incoming) batch (based on allocated sizes, not average row size) + * + * @param batch Update based on the data in this batch + * @param inputIndex Left (0) or Right (1) + * @param outputPosition Position (i.e. number of inserted rows) in the output batch + */ @Override - public int update(RecordBatch batch, int inputIndex, int outputPosition) { - return update(batch, inputIndex, outputPosition, false); + public void update(RecordBatch batch, int inputIndex, int outputPosition) { + update(batch, inputIndex, outputPosition, false); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java index 64f225ca5..a6b6ed510 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java @@ -24,10 +24,16 @@ import org.apache.drill.exec.vector.ValueVector; import java.util.List; public class RecordBatchMemoryManager { - protected static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT; + // The " - 1 " in the number of rows below was chosen to avoid a waste of (possible) offset vectors memory + // where a power-of-2 rows size needlessly doubles the offset vector (see DRILL-5446) + protected static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT - 1; protected static final int MIN_NUM_ROWS = 1; protected static final int DEFAULT_INPUT_INDEX = 0; private int outputRowCount = MAX_NUM_ROWS; + // max row count allowed in the current output batch; it would never exceed the allocated size (i.e. outputRowCount) + // but may go lower (e.g., when early rows are narrow, the outgoing batch can hold many output rows, but if later + // the incoming rows become wide, then less (than planned) would fit into the remaining current allocated memory) + private int currentOutgoingMaxRowCount = MAX_NUM_ROWS; private int outgoingRowWidth; private int outputBatchSize; private RecordBatchSizer[] sizer; @@ -144,11 +150,6 @@ public class RecordBatchMemoryManager { outputBatchStats = new BatchStats(); } - public int update(int inputIndex, int outputPosition) { - // by default just return the outputRowCount - return getOutputRowCount(); - } - public void update(int inputIndex) { } @@ -166,17 +167,20 @@ public class RecordBatchMemoryManager { updateIncomingStats(index); } - public int update(int inputIndex, int outputPosition, boolean useAggregate) { - // by default just return the outputRowCount - return getOutputRowCount(); + public void update(int inputIndex, int outputPosition, boolean useAggregate) { + throw new IllegalStateException("Should only be called on JoinBatchMemoryManager"); + } + + public void update(int inputIndex, int outputPosition) { + throw new IllegalStateException("Should only be called on JoinBatchMemoryManager"); } - public int update(RecordBatch batch, int inputIndex, int outputPosition) { - return getOutputRowCount(); + public void update(RecordBatch batch, int inputIndex, int outputPosition) { + throw new IllegalStateException("Should only be called on JoinBatchMemoryManager"); } - public int update(RecordBatch batch, int inputIndex, int outputPosition, boolean useAggregate) { - return getOutputRowCount(); + public void update(RecordBatch batch, int inputIndex, int outputPosition, boolean useAggregate) { + throw new IllegalStateException("Should only be called on JoinBatchMemoryManager"); } public boolean updateIfNeeded(int newOutgoingRowWidth) { @@ -199,6 +203,7 @@ public class RecordBatchMemoryManager { return outputRowCount; } + public int getCurrentOutgoingMaxRowCount() { return currentOutgoingMaxRowCount; } /** * Given batchSize and rowWidth, this will set output rowCount taking into account * the min and max that is allowed. @@ -209,8 +214,13 @@ public class RecordBatchMemoryManager { public void setOutputRowCount(int outputRowCount) { this.outputRowCount = outputRowCount; + if ( outputRowCount > MIN_NUM_ROWS && Integer.highestOneBit(outputRowCount) == outputRowCount ) { + this.outputRowCount--; + } } + public void setCurrentOutgoingMaxRowCount(int newTargetOutputCount) { this.currentOutgoingMaxRowCount = newTargetOutputCount; } + /** * This will adjust rowCount taking into account the min and max that is allowed. * We will round down to nearest power of two - 1 for better memory utilization. -- cgit v1.2.3