diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java | 84 |
1 files changed, 73 insertions, 11 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java index 4344e1374..a06e2c35d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java @@ -40,7 +40,30 @@ public class JoinBatchMemoryManager extends RecordBatchMemoryManager { this.columnsToExclude = excludedColumns; } - private int updateInternal(int inputIndex, int outputPosition, boolean useAggregate) { + /** + * Update the memory manager parameters based on the new incoming batch + * + * Notice three (possibly) different "row counts" for the outgoing batches: + * + * 1. The rowCount that the current outgoing batch was allocated with (always a power of 2; e.g. 8192) + * 2. The new rowCount computed based on the newly seen input rows (always a power of 2); may be bigger than (1) if the + * new input rows are much smaller than before (e.g. 16384), or smaller (e.g. 4096) if the new rows are much wider. + * Subsequent outgoing batches would be allocated based on this (2) new rowCount. + * 3. The target rowCount for the current outgoing batch. While initially (1), it may be resized down if the new rows + * are getting bigger. In any case it won't be resized above (1) (to avoid IOOB) or below the current number of rows + * in that batch (i.e., outputPosition). (Need not be a power of two; e.g., 7983). + * + * After every call to update() while the outgoing batch is active, the current target should be updated with (3) by + * calling getCurrentOutgoingMaxRowCount() . + * + * Comment: The "power of 2" in the above (1) and (2) is actually "power of 2 minus 1" (e.g. 65535, or 8191) in order + * to avoid memory waste in case offset vectors are used (see DRILL-5446) + * + * @param inputIndex Left (0) or Right (1) + * @param outputPosition Position (i.e. number of inserted rows) in the current output batch + * @param useAggregate If true, compute using average row width (else based on allocated sizes) + */ + private void updateInternal(int inputIndex, int outputPosition, boolean useAggregate) { updateIncomingStats(inputIndex); rowWidth[inputIndex] = useAggregate ? (int) getAvgInputRowWidth(inputIndex) : getRecordBatchSizer(inputIndex).getRowAllocWidth(); @@ -60,7 +83,7 @@ public class JoinBatchMemoryManager extends RecordBatchMemoryManager { // This is possible for empty batches or // when first set of batches come with OK_NEW_SCHEMA and no data. if (newOutgoingRowWidth == 0 || newOutgoingRowWidth == getOutgoingRowWidth()) { - return getOutputRowCount(); + return; } // Adjust for the current batch. @@ -75,34 +98,73 @@ public class JoinBatchMemoryManager extends RecordBatchMemoryManager { // These are number of rows we can fit in remaining memory based on new outgoing row width. final int numOutputRowsRemaining = RecordBatchSizer.safeDivide(remainingMemory, newOutgoingRowWidth); + final int currentOutputBatchRowCount = getOutputRowCount(); + // update the value to be used for next batch(es) setOutputRowCount(configOutputBatchSize, newOutgoingRowWidth); // set the new row width setOutgoingRowWidth(newOutgoingRowWidth); - return adjustOutputRowCount(outputPosition + numOutputRowsRemaining); + int newOutputRowCount = getOutputRowCount(); + + if ( currentOutputBatchRowCount != newOutputRowCount ) { + logger.debug("Memory manager update changed the output row count from {} to {}",currentOutputBatchRowCount,newOutputRowCount); + } + + // The current outgoing batch target count (i.e., max number of rows to put there) is modified to be the current number of rows there + // plus as many of the future new rows that would fit in the remaining memory (e.g., if the new rows are wider, fewer would fit), but + // in any case no larger than the size the batch was allocated for (to avoid IOOB on the allocated vectors) + setCurrentOutgoingMaxRowCount(Math.min(currentOutputBatchRowCount, outputPosition + numOutputRowsRemaining )); } + /** + * Update the memory manager parameters based on the new incoming batch + * + * @param inputIndex Left (0) or Right (1) + * @param outputPosition Position (i.e. number of inserted rows) in the output batch + * @param useAggregate Compute using average row width (else based on allocated sizes) + */ @Override - public int update(int inputIndex, int outputPosition, boolean useAggregate) { + public void update(int inputIndex, int outputPosition, boolean useAggregate) { setRecordBatchSizer(inputIndex, new RecordBatchSizer(recordBatch[inputIndex])); - return updateInternal(inputIndex, outputPosition, useAggregate); + updateInternal(inputIndex, outputPosition, useAggregate); } + /** + * Update the memory manager parameters based on the new incoming batch (based on allocated sizes, not average row size) + * + * @param inputIndex Left (0) or Right (1) + * @param outputPosition Position (i.e. number of inserted rows) in the output batch + */ @Override - public int update(int inputIndex, int outputPosition) { - return update(inputIndex, outputPosition, false); + public void update(int inputIndex, int outputPosition) { + update(inputIndex, outputPosition, false); } + /** + * Update the memory manager parameters based on the given (incoming) batch + * + * @param batch Update based on the data in this batch + * @param inputIndex Left (0) or Right (1) + * @param outputPosition Position (i.e. number of inserted rows) in the output batch + * @param useAggregate Compute using average row width (else based on allocated sizes) + */ @Override - public int update(RecordBatch batch, int inputIndex, int outputPosition, boolean useAggregate) { + public void update(RecordBatch batch, int inputIndex, int outputPosition, boolean useAggregate) { setRecordBatchSizer(inputIndex, new RecordBatchSizer(batch)); - return updateInternal(inputIndex, outputPosition, useAggregate); + updateInternal(inputIndex, outputPosition, useAggregate); } + /** + * Update the memory manager parameters based on the given (incoming) batch (based on allocated sizes, not average row size) + * + * @param batch Update based on the data in this batch + * @param inputIndex Left (0) or Right (1) + * @param outputPosition Position (i.e. number of inserted rows) in the output batch + */ @Override - public int update(RecordBatch batch, int inputIndex, int outputPosition) { - return update(batch, inputIndex, outputPosition, false); + public void update(RecordBatch batch, int inputIndex, int outputPosition) { + update(batch, inputIndex, outputPosition, false); } } |