diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java | 37 |
1 files changed, 28 insertions, 9 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java index c916c95da..04a459987 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java @@ -232,6 +232,14 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // vectors and throws NPE. The actual checks are done in updateMemoryManager updateMemoryManager(RIGHT_INDEX); + if (outputIndex > 0) { + // this means batch is already allocated but because of new incoming the width and output row count might have + // changed. So update the maxOutputRowCount with new value + if (useMemoryManager) { + setMaxOutputRowCount(batchMemoryManager.getCurrentOutgoingMaxRowCount()); + } + } + // if output is not allocated then maxRowCount will be set correctly below // allocate space for the outgoing batch allocateVectors(); @@ -700,6 +708,12 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> } // Since schema has change so we have new empty vectors in output container hence allocateMemory for them allocateVectors(); + } else { + // means we are using already allocated output batch so row count may have changed based on new incoming + // batch hence update it + if (useMemoryManager) { + setMaxOutputRowCount(batchMemoryManager.getCurrentOutgoingMaxRowCount()); + } } } } // output batch is full to its max capacity @@ -882,11 +896,19 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> * Simple method to allocate space for all the vectors in the container. */ private void allocateVectors() { + // This check is here and will be true only in case of left join where the pending rows from previous left batch is + // copied to the new output batch. Then same output batch is used to fill remaining memory using new left & right + // batches. if (outputIndex > 0) { logger.trace("Allocation is already done for output container vectors since it already holds some record"); return; } + // Set this as max output rows to be filled in output batch since memory for that many rows are allocated + if (useMemoryManager) { + setMaxOutputRowCount(batchMemoryManager.getOutputRowCount()); + } + for (VectorWrapper w : container) { RecordBatchSizer.ColumnSize colSize = batchMemoryManager.getColumnSize(w.getField().getName()); colSize.allocateVector(w.getValueVector(), maxOutputRowCount); @@ -1153,6 +1175,10 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> */ @VisibleForTesting public void setMaxOutputRowCount(int outputRowCount) { + if (isRecordBatchStatsLoggingEnabled()) { + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "Previous OutputRowCount: %d, New OutputRowCount: %d", maxOutputRowCount, outputRowCount); + } maxOutputRowCount = outputRowCount; } @@ -1183,18 +1209,11 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // For cases where all the previous input were consumed and send with previous output batch. But now we are building // a new output batch with new incoming then it will not cause any problem since outputIndex will be 0 batchMemoryManager.update(inputIndex, outputIndex); - final int newOutputRowCount = batchMemoryManager.getCurrentOutgoingMaxRowCount(); - if (isRecordBatchStatsLoggingEnabled()) { RecordBatchIOType type = inputIndex == LEFT_INDEX ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT; - RecordBatchStats.logRecordBatchStats(type, batchMemoryManager.getRecordBatchSizer(inputIndex), getRecordBatchStatsContext()); - RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), - "Previous OutputRowCount: %d, New OutputRowCount: %d", maxOutputRowCount, newOutputRowCount); - } - - if (useMemoryManager) { - maxOutputRowCount = newOutputRowCount; + RecordBatchStats.logRecordBatchStats(type, batchMemoryManager.getRecordBatchSizer(inputIndex), + getRecordBatchStatsContext()); } } |