aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java84
1 files changed, 73 insertions, 11 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
index 4344e1374..a06e2c35d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
@@ -40,7 +40,30 @@ public class JoinBatchMemoryManager extends RecordBatchMemoryManager {
this.columnsToExclude = excludedColumns;
}
- private int updateInternal(int inputIndex, int outputPosition, boolean useAggregate) {
+ /**
+ * Update the memory manager parameters based on the new incoming batch
+ *
+ * Notice three (possibly) different "row counts" for the outgoing batches:
+ *
+ * 1. The rowCount that the current outgoing batch was allocated with (always a power of 2; e.g. 8192)
+ * 2. The new rowCount computed based on the newly seen input rows (always a power of 2); may be bigger than (1) if the
+ * new input rows are much smaller than before (e.g. 16384), or smaller (e.g. 4096) if the new rows are much wider.
+ * Subsequent outgoing batches would be allocated based on this (2) new rowCount.
+ * 3. The target rowCount for the current outgoing batch. While initially (1), it may be resized down if the new rows
+ * are getting bigger. In any case it won't be resized above (1) (to avoid IOOB) or below the current number of rows
+ * in that batch (i.e., outputPosition). (Need not be a power of two; e.g., 7983).
+ *
+ * After every call to update() while the outgoing batch is active, the current target should be updated with (3) by
+ * calling getCurrentOutgoingMaxRowCount() .
+ *
+ * Comment: The "power of 2" in the above (1) and (2) is actually "power of 2 minus 1" (e.g. 65535, or 8191) in order
+ * to avoid memory waste in case offset vectors are used (see DRILL-5446)
+ *
+ * @param inputIndex Left (0) or Right (1)
+ * @param outputPosition Position (i.e. number of inserted rows) in the current output batch
+ * @param useAggregate If true, compute using average row width (else based on allocated sizes)
+ */
+ private void updateInternal(int inputIndex, int outputPosition, boolean useAggregate) {
updateIncomingStats(inputIndex);
rowWidth[inputIndex] = useAggregate ? (int) getAvgInputRowWidth(inputIndex) : getRecordBatchSizer(inputIndex).getRowAllocWidth();
@@ -60,7 +83,7 @@ public class JoinBatchMemoryManager extends RecordBatchMemoryManager {
// This is possible for empty batches or
// when first set of batches come with OK_NEW_SCHEMA and no data.
if (newOutgoingRowWidth == 0 || newOutgoingRowWidth == getOutgoingRowWidth()) {
- return getOutputRowCount();
+ return;
}
// Adjust for the current batch.
@@ -75,34 +98,73 @@ public class JoinBatchMemoryManager extends RecordBatchMemoryManager {
// These are number of rows we can fit in remaining memory based on new outgoing row width.
final int numOutputRowsRemaining = RecordBatchSizer.safeDivide(remainingMemory, newOutgoingRowWidth);
+ final int currentOutputBatchRowCount = getOutputRowCount();
+
// update the value to be used for next batch(es)
setOutputRowCount(configOutputBatchSize, newOutgoingRowWidth);
// set the new row width
setOutgoingRowWidth(newOutgoingRowWidth);
- return adjustOutputRowCount(outputPosition + numOutputRowsRemaining);
+ int newOutputRowCount = getOutputRowCount();
+
+ if ( currentOutputBatchRowCount != newOutputRowCount ) {
+ logger.debug("Memory manager update changed the output row count from {} to {}",currentOutputBatchRowCount,newOutputRowCount);
+ }
+
+ // The current outgoing batch target count (i.e., max number of rows to put there) is modified to be the current number of rows there
+ // plus as many of the future new rows that would fit in the remaining memory (e.g., if the new rows are wider, fewer would fit), but
+ // in any case no larger than the size the batch was allocated for (to avoid IOOB on the allocated vectors)
+ setCurrentOutgoingMaxRowCount(Math.min(currentOutputBatchRowCount, outputPosition + numOutputRowsRemaining ));
}
+ /**
+ * Update the memory manager parameters based on the new incoming batch
+ *
+ * @param inputIndex Left (0) or Right (1)
+ * @param outputPosition Position (i.e. number of inserted rows) in the output batch
+ * @param useAggregate Compute using average row width (else based on allocated sizes)
+ */
@Override
- public int update(int inputIndex, int outputPosition, boolean useAggregate) {
+ public void update(int inputIndex, int outputPosition, boolean useAggregate) {
setRecordBatchSizer(inputIndex, new RecordBatchSizer(recordBatch[inputIndex]));
- return updateInternal(inputIndex, outputPosition, useAggregate);
+ updateInternal(inputIndex, outputPosition, useAggregate);
}
+ /**
+ * Update the memory manager parameters based on the new incoming batch (based on allocated sizes, not average row size)
+ *
+ * @param inputIndex Left (0) or Right (1)
+ * @param outputPosition Position (i.e. number of inserted rows) in the output batch
+ */
@Override
- public int update(int inputIndex, int outputPosition) {
- return update(inputIndex, outputPosition, false);
+ public void update(int inputIndex, int outputPosition) {
+ update(inputIndex, outputPosition, false);
}
+ /**
+ * Update the memory manager parameters based on the given (incoming) batch
+ *
+ * @param batch Update based on the data in this batch
+ * @param inputIndex Left (0) or Right (1)
+ * @param outputPosition Position (i.e. number of inserted rows) in the output batch
+ * @param useAggregate Compute using average row width (else based on allocated sizes)
+ */
@Override
- public int update(RecordBatch batch, int inputIndex, int outputPosition, boolean useAggregate) {
+ public void update(RecordBatch batch, int inputIndex, int outputPosition, boolean useAggregate) {
setRecordBatchSizer(inputIndex, new RecordBatchSizer(batch));
- return updateInternal(inputIndex, outputPosition, useAggregate);
+ updateInternal(inputIndex, outputPosition, useAggregate);
}
+ /**
+ * Update the memory manager parameters based on the given (incoming) batch (based on allocated sizes, not average row size)
+ *
+ * @param batch Update based on the data in this batch
+ * @param inputIndex Left (0) or Right (1)
+ * @param outputPosition Position (i.e. number of inserted rows) in the output batch
+ */
@Override
- public int update(RecordBatch batch, int inputIndex, int outputPosition) {
- return update(batch, inputIndex, outputPosition, false);
+ public void update(RecordBatch batch, int inputIndex, int outputPosition) {
+ update(batch, inputIndex, outputPosition, false);
}
}