aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/record
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/record')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java84
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java36
2 files changed, 96 insertions, 24 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
index 4344e1374..a06e2c35d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
@@ -40,7 +40,30 @@ public class JoinBatchMemoryManager extends RecordBatchMemoryManager {
this.columnsToExclude = excludedColumns;
}
- private int updateInternal(int inputIndex, int outputPosition, boolean useAggregate) {
+ /**
+ * Update the memory manager parameters based on the new incoming batch
+ *
+ * Notice three (possibly) different "row counts" for the outgoing batches:
+ *
+ * 1. The rowCount that the current outgoing batch was allocated with (always a power of 2; e.g. 8192)
+ * 2. The new rowCount computed based on the newly seen input rows (always a power of 2); may be bigger than (1) if the
+ * new input rows are much smaller than before (e.g. 16384), or smaller (e.g. 4096) if the new rows are much wider.
+ * Subsequent outgoing batches would be allocated based on this (2) new rowCount.
+ * 3. The target rowCount for the current outgoing batch. While initially (1), it may be resized down if the new rows
+ * are getting bigger. In any case it won't be resized above (1) (to avoid IOOB) or below the current number of rows
+ * in that batch (i.e., outputPosition). (Need not be a power of two; e.g., 7983).
+ *
+ * After every call to update() while the outgoing batch is active, the current target should be updated with (3) by
+ * calling getCurrentOutgoingMaxRowCount() .
+ *
+ * Comment: The "power of 2" in the above (1) and (2) is actually "power of 2 minus 1" (e.g. 65535, or 8191) in order
+ * to avoid memory waste in case offset vectors are used (see DRILL-5446)
+ *
+ * @param inputIndex Left (0) or Right (1)
+ * @param outputPosition Position (i.e. number of inserted rows) in the current output batch
+ * @param useAggregate If true, compute using average row width (else based on allocated sizes)
+ */
+ private void updateInternal(int inputIndex, int outputPosition, boolean useAggregate) {
updateIncomingStats(inputIndex);
rowWidth[inputIndex] = useAggregate ? (int) getAvgInputRowWidth(inputIndex) : getRecordBatchSizer(inputIndex).getRowAllocWidth();
@@ -60,7 +83,7 @@ public class JoinBatchMemoryManager extends RecordBatchMemoryManager {
// This is possible for empty batches or
// when first set of batches come with OK_NEW_SCHEMA and no data.
if (newOutgoingRowWidth == 0 || newOutgoingRowWidth == getOutgoingRowWidth()) {
- return getOutputRowCount();
+ return;
}
// Adjust for the current batch.
@@ -75,34 +98,73 @@ public class JoinBatchMemoryManager extends RecordBatchMemoryManager {
// These are number of rows we can fit in remaining memory based on new outgoing row width.
final int numOutputRowsRemaining = RecordBatchSizer.safeDivide(remainingMemory, newOutgoingRowWidth);
+ final int currentOutputBatchRowCount = getOutputRowCount();
+
// update the value to be used for next batch(es)
setOutputRowCount(configOutputBatchSize, newOutgoingRowWidth);
// set the new row width
setOutgoingRowWidth(newOutgoingRowWidth);
- return adjustOutputRowCount(outputPosition + numOutputRowsRemaining);
+ int newOutputRowCount = getOutputRowCount();
+
+ if ( currentOutputBatchRowCount != newOutputRowCount ) {
+ logger.debug("Memory manager update changed the output row count from {} to {}",currentOutputBatchRowCount,newOutputRowCount);
+ }
+
+ // The current outgoing batch target count (i.e., max number of rows to put there) is modified to be the current number of rows there
+ // plus as many of the future new rows that would fit in the remaining memory (e.g., if the new rows are wider, fewer would fit), but
+ // in any case no larger than the size the batch was allocated for (to avoid IOOB on the allocated vectors)
+ setCurrentOutgoingMaxRowCount(Math.min(currentOutputBatchRowCount, outputPosition + numOutputRowsRemaining ));
}
+ /**
+ * Update the memory manager parameters based on the new incoming batch
+ *
+ * @param inputIndex Left (0) or Right (1)
+ * @param outputPosition Position (i.e. number of inserted rows) in the output batch
+ * @param useAggregate Compute using average row width (else based on allocated sizes)
+ */
@Override
- public int update(int inputIndex, int outputPosition, boolean useAggregate) {
+ public void update(int inputIndex, int outputPosition, boolean useAggregate) {
setRecordBatchSizer(inputIndex, new RecordBatchSizer(recordBatch[inputIndex]));
- return updateInternal(inputIndex, outputPosition, useAggregate);
+ updateInternal(inputIndex, outputPosition, useAggregate);
}
+ /**
+ * Update the memory manager parameters based on the new incoming batch (based on allocated sizes, not average row size)
+ *
+ * @param inputIndex Left (0) or Right (1)
+ * @param outputPosition Position (i.e. number of inserted rows) in the output batch
+ */
@Override
- public int update(int inputIndex, int outputPosition) {
- return update(inputIndex, outputPosition, false);
+ public void update(int inputIndex, int outputPosition) {
+ update(inputIndex, outputPosition, false);
}
+ /**
+ * Update the memory manager parameters based on the given (incoming) batch
+ *
+ * @param batch Update based on the data in this batch
+ * @param inputIndex Left (0) or Right (1)
+ * @param outputPosition Position (i.e. number of inserted rows) in the output batch
+ * @param useAggregate Compute using average row width (else based on allocated sizes)
+ */
@Override
- public int update(RecordBatch batch, int inputIndex, int outputPosition, boolean useAggregate) {
+ public void update(RecordBatch batch, int inputIndex, int outputPosition, boolean useAggregate) {
setRecordBatchSizer(inputIndex, new RecordBatchSizer(batch));
- return updateInternal(inputIndex, outputPosition, useAggregate);
+ updateInternal(inputIndex, outputPosition, useAggregate);
}
+ /**
+ * Update the memory manager parameters based on the given (incoming) batch (based on allocated sizes, not average row size)
+ *
+ * @param batch Update based on the data in this batch
+ * @param inputIndex Left (0) or Right (1)
+ * @param outputPosition Position (i.e. number of inserted rows) in the output batch
+ */
@Override
- public int update(RecordBatch batch, int inputIndex, int outputPosition) {
- return update(batch, inputIndex, outputPosition, false);
+ public void update(RecordBatch batch, int inputIndex, int outputPosition) {
+ update(batch, inputIndex, outputPosition, false);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
index 64f225ca5..a6b6ed510 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
@@ -24,10 +24,16 @@ import org.apache.drill.exec.vector.ValueVector;
import java.util.List;
public class RecordBatchMemoryManager {
- protected static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT;
+ // The " - 1 " in the number of rows below was chosen to avoid a waste of (possible) offset vectors memory
+ // where a power-of-2 rows size needlessly doubles the offset vector (see DRILL-5446)
+ protected static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT - 1;
protected static final int MIN_NUM_ROWS = 1;
protected static final int DEFAULT_INPUT_INDEX = 0;
private int outputRowCount = MAX_NUM_ROWS;
+ // max row count allowed in the current output batch; it would never exceed the allocated size (i.e. outputRowCount)
+ // but may go lower (e.g., when early rows are narrow, the outgoing batch can hold many output rows, but if later
+ // the incoming rows become wide, then less (than planned) would fit into the remaining current allocated memory)
+ private int currentOutgoingMaxRowCount = MAX_NUM_ROWS;
private int outgoingRowWidth;
private int outputBatchSize;
private RecordBatchSizer[] sizer;
@@ -144,11 +150,6 @@ public class RecordBatchMemoryManager {
outputBatchStats = new BatchStats();
}
- public int update(int inputIndex, int outputPosition) {
- // by default just return the outputRowCount
- return getOutputRowCount();
- }
-
public void update(int inputIndex) {
}
@@ -166,17 +167,20 @@ public class RecordBatchMemoryManager {
updateIncomingStats(index);
}
- public int update(int inputIndex, int outputPosition, boolean useAggregate) {
- // by default just return the outputRowCount
- return getOutputRowCount();
+ public void update(int inputIndex, int outputPosition, boolean useAggregate) {
+ throw new IllegalStateException("Should only be called on JoinBatchMemoryManager");
+ }
+
+ public void update(int inputIndex, int outputPosition) {
+ throw new IllegalStateException("Should only be called on JoinBatchMemoryManager");
}
- public int update(RecordBatch batch, int inputIndex, int outputPosition) {
- return getOutputRowCount();
+ public void update(RecordBatch batch, int inputIndex, int outputPosition) {
+ throw new IllegalStateException("Should only be called on JoinBatchMemoryManager");
}
- public int update(RecordBatch batch, int inputIndex, int outputPosition, boolean useAggregate) {
- return getOutputRowCount();
+ public void update(RecordBatch batch, int inputIndex, int outputPosition, boolean useAggregate) {
+ throw new IllegalStateException("Should only be called on JoinBatchMemoryManager");
}
public boolean updateIfNeeded(int newOutgoingRowWidth) {
@@ -199,6 +203,7 @@ public class RecordBatchMemoryManager {
return outputRowCount;
}
+ public int getCurrentOutgoingMaxRowCount() { return currentOutgoingMaxRowCount; }
/**
* Given batchSize and rowWidth, this will set output rowCount taking into account
* the min and max that is allowed.
@@ -209,8 +214,13 @@ public class RecordBatchMemoryManager {
public void setOutputRowCount(int outputRowCount) {
this.outputRowCount = outputRowCount;
+ if ( outputRowCount > MIN_NUM_ROWS && Integer.highestOneBit(outputRowCount) == outputRowCount ) {
+ this.outputRowCount--;
+ }
}
+ public void setCurrentOutgoingMaxRowCount(int newTargetOutputCount) { this.currentOutgoingMaxRowCount = newTargetOutputCount; }
+
/**
* This will adjust rowCount taking into account the min and max that is allowed.
* We will round down to nearest power of two - 1 for better memory utilization.