diff options
author | Ben-Zvi <bben-zvi@mapr.com> | 2019-03-04 21:05:32 -0800 |
---|---|---|
committer | Sorabh Hamirwasia <sorabh@apache.org> | 2019-03-14 22:25:20 -0700 |
commit | 3b85694be4c37bb217366287c182d50ceadda4ab (patch) | |
tree | f7237d8a32603994353d9a9bbb176bb1876a8068 /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java | |
parent | d22e68b83d1d0cc0539d79ae0cb3aa70ae3242ad (diff) |
DRILL-6707: Update target outgoing batch row count between current position and allocated size
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java | 13 |
1 files changed, 6 insertions, 7 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index d502c4f9d..36320ce2e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -124,7 +124,8 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> { */ @Override public void update(int inputIndex) { - status.setTargetOutputRowCount(super.update(inputIndex, status.getOutPosition())); + super.update(inputIndex, status.getOutPosition()); + status.setTargetOutputRowCount(super.getCurrentOutgoingMaxRowCount()); // calculated by update() RecordBatchIOType type = inputIndex == 0 ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT; RecordBatchStats.logRecordBatchStats(type, getRecordBatchSizer(inputIndex), getRecordBatchStatsContext()); } @@ -184,15 +185,13 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> { status.prepare(); // loop so we can start over again if we find a new batch was created. while (true) { + boolean isNewSchema = false; // Check result of last iteration. switch (status.getOutcome()) { - case BATCH_RETURNED: - allocateBatch(false); - status.resetOutputPos(); - status.setTargetOutputRowCount(batchMemoryManager.getOutputRowCount()); - break; case SCHEMA_CHANGED: - allocateBatch(true); + isNewSchema = true; + case BATCH_RETURNED: + allocateBatch(isNewSchema); status.resetOutputPos(); status.setTargetOutputRowCount(batchMemoryManager.getOutputRowCount()); break; |