aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java37
1 files changed, 28 insertions, 9 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index c916c95da..04a459987 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -232,6 +232,14 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// vectors and throws NPE. The actual checks are done in updateMemoryManager
updateMemoryManager(RIGHT_INDEX);
+ if (outputIndex > 0) {
+ // this means batch is already allocated but because of new incoming the width and output row count might have
+ // changed. So update the maxOutputRowCount with new value
+ if (useMemoryManager) {
+ setMaxOutputRowCount(batchMemoryManager.getCurrentOutgoingMaxRowCount());
+ }
+ }
+ // if output is not allocated then maxRowCount will be set correctly below
// allocate space for the outgoing batch
allocateVectors();
@@ -700,6 +708,12 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
}
// Since schema has change so we have new empty vectors in output container hence allocateMemory for them
allocateVectors();
+ } else {
+ // means we are using already allocated output batch so row count may have changed based on new incoming
+ // batch hence update it
+ if (useMemoryManager) {
+ setMaxOutputRowCount(batchMemoryManager.getCurrentOutgoingMaxRowCount());
+ }
}
}
} // output batch is full to its max capacity
@@ -882,11 +896,19 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
* Simple method to allocate space for all the vectors in the container.
*/
private void allocateVectors() {
+ // This check is here and will be true only in case of left join where the pending rows from previous left batch is
+ // copied to the new output batch. Then same output batch is used to fill remaining memory using new left & right
+ // batches.
if (outputIndex > 0) {
logger.trace("Allocation is already done for output container vectors since it already holds some record");
return;
}
+ // Set this as max output rows to be filled in output batch since memory for that many rows are allocated
+ if (useMemoryManager) {
+ setMaxOutputRowCount(batchMemoryManager.getOutputRowCount());
+ }
+
for (VectorWrapper w : container) {
RecordBatchSizer.ColumnSize colSize = batchMemoryManager.getColumnSize(w.getField().getName());
colSize.allocateVector(w.getValueVector(), maxOutputRowCount);
@@ -1153,6 +1175,10 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
*/
@VisibleForTesting
public void setMaxOutputRowCount(int outputRowCount) {
+ if (isRecordBatchStatsLoggingEnabled()) {
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "Previous OutputRowCount: %d, New OutputRowCount: %d", maxOutputRowCount, outputRowCount);
+ }
maxOutputRowCount = outputRowCount;
}
@@ -1183,18 +1209,11 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// For cases where all the previous input were consumed and send with previous output batch. But now we are building
// a new output batch with new incoming then it will not cause any problem since outputIndex will be 0
batchMemoryManager.update(inputIndex, outputIndex);
- final int newOutputRowCount = batchMemoryManager.getCurrentOutgoingMaxRowCount();
-
if (isRecordBatchStatsLoggingEnabled()) {
RecordBatchIOType type = inputIndex == LEFT_INDEX ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT;
- RecordBatchStats.logRecordBatchStats(type, batchMemoryManager.getRecordBatchSizer(inputIndex), getRecordBatchStatsContext());
- RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
- "Previous OutputRowCount: %d, New OutputRowCount: %d", maxOutputRowCount, newOutputRowCount);
- }
-
- if (useMemoryManager) {
- maxOutputRowCount = newOutputRowCount;
+ RecordBatchStats.logRecordBatchStats(type, batchMemoryManager.getRecordBatchSizer(inputIndex),
+ getRecordBatchStatsContext());
}
}