aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSorabh Hamirwasia <sorabh@apache.org>2019-03-12 14:25:39 -0700
committerSorabh Hamirwasia <sorabh@apache.org>2019-03-14 22:25:33 -0700
commitd69af973d1b91f71dd6314d01dd9399b1f7b9016 (patch)
treed6f6cab474f749c64efb14309248be05baa9f03b
parent3b85694be4c37bb217366287c182d50ceadda4ab (diff)
DRILL-6707: Removed changes for setOutputRowCount.
Modified LateralJoin to use new setCurrentOutgoingMaxRowCount api Limit CurrentOutgoingMaxRowCount to MAX_NUM_ROWS Fix HashJoin to fix failing tests closes #1650
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java37
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java22
3 files changed, 47 insertions, 14 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 429d15f4d..24fb1d8c3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -587,7 +587,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
// Allocate the memory for the vectors in the output container
batchMemoryManager.allocateVectors(container);
- hashJoinProbe.setTargetOutputCount(batchMemoryManager.getCurrentOutgoingMaxRowCount());
+ hashJoinProbe.setTargetOutputCount(batchMemoryManager.getOutputRowCount());
outputRecords = hashJoinProbe.probeAndProject();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index c916c95da..04a459987 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -232,6 +232,14 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// vectors and throws NPE. The actual checks are done in updateMemoryManager
updateMemoryManager(RIGHT_INDEX);
+ if (outputIndex > 0) {
+ // this means batch is already allocated but because of new incoming the width and output row count might have
+ // changed. So update the maxOutputRowCount with new value
+ if (useMemoryManager) {
+ setMaxOutputRowCount(batchMemoryManager.getCurrentOutgoingMaxRowCount());
+ }
+ }
+ // if output is not allocated then maxRowCount will be set correctly below
// allocate space for the outgoing batch
allocateVectors();
@@ -700,6 +708,12 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
}
// Since schema has change so we have new empty vectors in output container hence allocateMemory for them
allocateVectors();
+ } else {
+ // means we are using already allocated output batch so row count may have changed based on new incoming
+ // batch hence update it
+ if (useMemoryManager) {
+ setMaxOutputRowCount(batchMemoryManager.getCurrentOutgoingMaxRowCount());
+ }
}
}
} // output batch is full to its max capacity
@@ -882,11 +896,19 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
* Simple method to allocate space for all the vectors in the container.
*/
private void allocateVectors() {
+ // This check is here and will be true only in case of left join where the pending rows from previous left batch is
+ // copied to the new output batch. Then same output batch is used to fill remaining memory using new left & right
+ // batches.
if (outputIndex > 0) {
logger.trace("Allocation is already done for output container vectors since it already holds some record");
return;
}
+ // Set this as max output rows to be filled in output batch since memory for that many rows are allocated
+ if (useMemoryManager) {
+ setMaxOutputRowCount(batchMemoryManager.getOutputRowCount());
+ }
+
for (VectorWrapper w : container) {
RecordBatchSizer.ColumnSize colSize = batchMemoryManager.getColumnSize(w.getField().getName());
colSize.allocateVector(w.getValueVector(), maxOutputRowCount);
@@ -1153,6 +1175,10 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
*/
@VisibleForTesting
public void setMaxOutputRowCount(int outputRowCount) {
+ if (isRecordBatchStatsLoggingEnabled()) {
+ RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
+ "Previous OutputRowCount: %d, New OutputRowCount: %d", maxOutputRowCount, outputRowCount);
+ }
maxOutputRowCount = outputRowCount;
}
@@ -1183,18 +1209,11 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// For cases where all the previous input were consumed and send with previous output batch. But now we are building
// a new output batch with new incoming then it will not cause any problem since outputIndex will be 0
batchMemoryManager.update(inputIndex, outputIndex);
- final int newOutputRowCount = batchMemoryManager.getCurrentOutgoingMaxRowCount();
-
if (isRecordBatchStatsLoggingEnabled()) {
RecordBatchIOType type = inputIndex == LEFT_INDEX ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT;
- RecordBatchStats.logRecordBatchStats(type, batchMemoryManager.getRecordBatchSizer(inputIndex), getRecordBatchStatsContext());
- RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
- "Previous OutputRowCount: %d, New OutputRowCount: %d", maxOutputRowCount, newOutputRowCount);
- }
-
- if (useMemoryManager) {
- maxOutputRowCount = newOutputRowCount;
+ RecordBatchStats.logRecordBatchStats(type, batchMemoryManager.getRecordBatchSizer(inputIndex),
+ getRecordBatchStatsContext());
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
index a6b6ed510..9bd905f88 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
@@ -199,10 +199,19 @@ public class RecordBatchMemoryManager {
return true;
}
+ /**
+ * Should be used as maximum output row count that can be filled in output batch when a new output batch is
+ * allocated after calling update on BatchMemoryManager.
+ * @return outputRowCount max output row count
+ */
public int getOutputRowCount() {
return outputRowCount;
}
+ /**
+ * Should be used as maximum output row count that can be filled in output batch which is already allocated.
+ * @return currentOutgoingMaxRowCount max output row count for current output batch
+ */
public int getCurrentOutgoingMaxRowCount() { return currentOutgoingMaxRowCount; }
/**
* Given batchSize and rowWidth, this will set output rowCount taking into account
@@ -213,13 +222,18 @@ public class RecordBatchMemoryManager {
}
public void setOutputRowCount(int outputRowCount) {
+ Preconditions.checkArgument(outputRowCount <= MAX_NUM_ROWS);
this.outputRowCount = outputRowCount;
- if ( outputRowCount > MIN_NUM_ROWS && Integer.highestOneBit(outputRowCount) == outputRowCount ) {
- this.outputRowCount--;
- }
}
- public void setCurrentOutgoingMaxRowCount(int newTargetOutputCount) { this.currentOutgoingMaxRowCount = newTargetOutputCount; }
+ /**
+ * Set the max row count which the current output batch (already allocated) can contain. Since this setter doesn't
+ * adjust the input value we make sure it doesn't go above MAX_NUM_ROWS
+ * @param newTargetOutputCount
+ */
+ public void setCurrentOutgoingMaxRowCount(int newTargetOutputCount) {
+ this.currentOutgoingMaxRowCount = Math.min(MAX_NUM_ROWS, newTargetOutputCount);
+ }
/**
* This will adjust rowCount taking into account the min and max that is allowed.