aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java36
1 files changed, 23 insertions, 13 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
index 64f225ca5..a6b6ed510 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
@@ -24,10 +24,16 @@ import org.apache.drill.exec.vector.ValueVector;
import java.util.List;
public class RecordBatchMemoryManager {
- protected static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT;
+ // The " - 1 " in the number of rows below was chosen to avoid a waste of (possible) offset vectors memory
+ // where a power-of-2 rows size needlessly doubles the offset vector (see DRILL-5446)
+ protected static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT - 1;
protected static final int MIN_NUM_ROWS = 1;
protected static final int DEFAULT_INPUT_INDEX = 0;
private int outputRowCount = MAX_NUM_ROWS;
+ // max row count allowed in the current output batch; it would never exceed the allocated size (i.e. outputRowCount)
+ // but may go lower (e.g., when early rows are narrow, the outgoing batch can hold many output rows, but if later
+ // the incoming rows become wide, then less (than planned) would fit into the remaining current allocated memory)
+ private int currentOutgoingMaxRowCount = MAX_NUM_ROWS;
private int outgoingRowWidth;
private int outputBatchSize;
private RecordBatchSizer[] sizer;
@@ -144,11 +150,6 @@ public class RecordBatchMemoryManager {
outputBatchStats = new BatchStats();
}
- public int update(int inputIndex, int outputPosition) {
- // by default just return the outputRowCount
- return getOutputRowCount();
- }
-
public void update(int inputIndex) {
}
@@ -166,17 +167,20 @@ public class RecordBatchMemoryManager {
updateIncomingStats(index);
}
- public int update(int inputIndex, int outputPosition, boolean useAggregate) {
- // by default just return the outputRowCount
- return getOutputRowCount();
+ public void update(int inputIndex, int outputPosition, boolean useAggregate) {
+ throw new IllegalStateException("Should only be called on JoinBatchMemoryManager");
+ }
+
+ public void update(int inputIndex, int outputPosition) {
+ throw new IllegalStateException("Should only be called on JoinBatchMemoryManager");
}
- public int update(RecordBatch batch, int inputIndex, int outputPosition) {
- return getOutputRowCount();
+ public void update(RecordBatch batch, int inputIndex, int outputPosition) {
+ throw new IllegalStateException("Should only be called on JoinBatchMemoryManager");
}
- public int update(RecordBatch batch, int inputIndex, int outputPosition, boolean useAggregate) {
- return getOutputRowCount();
+ public void update(RecordBatch batch, int inputIndex, int outputPosition, boolean useAggregate) {
+ throw new IllegalStateException("Should only be called on JoinBatchMemoryManager");
}
public boolean updateIfNeeded(int newOutgoingRowWidth) {
@@ -199,6 +203,7 @@ public class RecordBatchMemoryManager {
return outputRowCount;
}
+ public int getCurrentOutgoingMaxRowCount() { return currentOutgoingMaxRowCount; }
/**
* Given batchSize and rowWidth, this will set output rowCount taking into account
* the min and max that is allowed.
@@ -209,8 +214,13 @@ public class RecordBatchMemoryManager {
public void setOutputRowCount(int outputRowCount) {
this.outputRowCount = outputRowCount;
+ if ( outputRowCount > MIN_NUM_ROWS && Integer.highestOneBit(outputRowCount) == outputRowCount ) {
+ this.outputRowCount--;
+ }
}
+ public void setCurrentOutgoingMaxRowCount(int newTargetOutputCount) { this.currentOutgoingMaxRowCount = newTargetOutputCount; }
+
/**
* This will adjust rowCount taking into account the min and max that is allowed.
* We will round down to nearest power of two - 1 for better memory utilization.