From d69af973d1b91f71dd6314d01dd9399b1f7b9016 Mon Sep 17 00:00:00 2001 From: Sorabh Hamirwasia Date: Tue, 12 Mar 2019 14:25:39 -0700 Subject: DRILL-6707: Removed changes for setOutputRowCount. Modified LateralJoin to use new setCurrentOutgoingMaxRowCount api Limit CurrentOutgoingMaxRowCount to MAX_NUM_ROWS Fix HashJoin to fix failing tests closes #1650 --- .../exec/physical/impl/join/HashJoinBatch.java | 2 +- .../exec/physical/impl/join/LateralJoinBatch.java | 37 ++++++++++++++++------ .../exec/record/RecordBatchMemoryManager.java | 22 ++++++++++--- 3 files changed, 47 insertions(+), 14 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 429d15f4d..24fb1d8c3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -587,7 +587,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch implem // Allocate the memory for the vectors in the output container batchMemoryManager.allocateVectors(container); - hashJoinProbe.setTargetOutputCount(batchMemoryManager.getCurrentOutgoingMaxRowCount()); + hashJoinProbe.setTargetOutputCount(batchMemoryManager.getOutputRowCount()); outputRecords = hashJoinProbe.probeAndProject(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java index c916c95da..04a459987 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java @@ -232,6 +232,14 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch // vectors and throws NPE. The actual checks are done in updateMemoryManager updateMemoryManager(RIGHT_INDEX); + if (outputIndex > 0) { + // this means batch is already allocated but because of new incoming the width and output row count might have + // changed. So update the maxOutputRowCount with new value + if (useMemoryManager) { + setMaxOutputRowCount(batchMemoryManager.getCurrentOutgoingMaxRowCount()); + } + } + // if output is not allocated then maxRowCount will be set correctly below // allocate space for the outgoing batch allocateVectors(); @@ -700,6 +708,12 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch } // Since schema has change so we have new empty vectors in output container hence allocateMemory for them allocateVectors(); + } else { + // means we are using already allocated output batch so row count may have changed based on new incoming + // batch hence update it + if (useMemoryManager) { + setMaxOutputRowCount(batchMemoryManager.getCurrentOutgoingMaxRowCount()); + } } } } // output batch is full to its max capacity @@ -882,11 +896,19 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch * Simple method to allocate space for all the vectors in the container. */ private void allocateVectors() { + // This check is here and will be true only in case of left join where the pending rows from previous left batch is + // copied to the new output batch. Then same output batch is used to fill remaining memory using new left & right + // batches. if (outputIndex > 0) { logger.trace("Allocation is already done for output container vectors since it already holds some record"); return; } + // Set this as max output rows to be filled in output batch since memory for that many rows are allocated + if (useMemoryManager) { + setMaxOutputRowCount(batchMemoryManager.getOutputRowCount()); + } + for (VectorWrapper w : container) { RecordBatchSizer.ColumnSize colSize = batchMemoryManager.getColumnSize(w.getField().getName()); colSize.allocateVector(w.getValueVector(), maxOutputRowCount); @@ -1153,6 +1175,10 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch */ @VisibleForTesting public void setMaxOutputRowCount(int outputRowCount) { + if (isRecordBatchStatsLoggingEnabled()) { + RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), + "Previous OutputRowCount: %d, New OutputRowCount: %d", maxOutputRowCount, outputRowCount); + } maxOutputRowCount = outputRowCount; } @@ -1183,18 +1209,11 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch // For cases where all the previous input were consumed and send with previous output batch. But now we are building // a new output batch with new incoming then it will not cause any problem since outputIndex will be 0 batchMemoryManager.update(inputIndex, outputIndex); - final int newOutputRowCount = batchMemoryManager.getCurrentOutgoingMaxRowCount(); - if (isRecordBatchStatsLoggingEnabled()) { RecordBatchIOType type = inputIndex == LEFT_INDEX ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT; - RecordBatchStats.logRecordBatchStats(type, batchMemoryManager.getRecordBatchSizer(inputIndex), getRecordBatchStatsContext()); - RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), - "Previous OutputRowCount: %d, New OutputRowCount: %d", maxOutputRowCount, newOutputRowCount); - } - - if (useMemoryManager) { - maxOutputRowCount = newOutputRowCount; + RecordBatchStats.logRecordBatchStats(type, batchMemoryManager.getRecordBatchSizer(inputIndex), + getRecordBatchStatsContext()); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java index a6b6ed510..9bd905f88 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java @@ -199,10 +199,19 @@ public class RecordBatchMemoryManager { return true; } + /** + * Should be used as maximum output row count that can be filled in output batch when a new output batch is + * allocated after calling update on BatchMemoryManager. + * @return outputRowCount max output row count + */ public int getOutputRowCount() { return outputRowCount; } + /** + * Should be used as maximum output row count that can be filled in output batch which is already allocated. + * @return currentOutgoingMaxRowCount max output row count for current output batch + */ public int getCurrentOutgoingMaxRowCount() { return currentOutgoingMaxRowCount; } /** * Given batchSize and rowWidth, this will set output rowCount taking into account @@ -213,13 +222,18 @@ public class RecordBatchMemoryManager { } public void setOutputRowCount(int outputRowCount) { + Preconditions.checkArgument(outputRowCount <= MAX_NUM_ROWS); this.outputRowCount = outputRowCount; - if ( outputRowCount > MIN_NUM_ROWS && Integer.highestOneBit(outputRowCount) == outputRowCount ) { - this.outputRowCount--; - } } - public void setCurrentOutgoingMaxRowCount(int newTargetOutputCount) { this.currentOutgoingMaxRowCount = newTargetOutputCount; } + /** + * Set the max row count which the current output batch (already allocated) can contain. Since this setter doesn't + * adjust the input value we make sure it doesn't go above MAX_NUM_ROWS + * @param newTargetOutputCount + */ + public void setCurrentOutgoingMaxRowCount(int newTargetOutputCount) { + this.currentOutgoingMaxRowCount = Math.min(MAX_NUM_ROWS, newTargetOutputCount); + } /** * This will adjust rowCount taking into account the min and max that is allowed. -- cgit v1.2.3