aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
diff options
context:
space:
mode:
authorBen-Zvi <bben-zvi@mapr.com>2019-03-04 21:05:32 -0800
committerSorabh Hamirwasia <sorabh@apache.org>2019-03-14 22:25:20 -0700
commit3b85694be4c37bb217366287c182d50ceadda4ab (patch)
treef7237d8a32603994353d9a9bbb176bb1876a8068 /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
parentd22e68b83d1d0cc0539d79ae0cb3aa70ae3242ad (diff)
DRILL-6707: Update target outgoing batch row count between current position and allocated size
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java13
1 files changed, 6 insertions, 7 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index d502c4f9d..36320ce2e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -124,7 +124,8 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
*/
@Override
public void update(int inputIndex) {
- status.setTargetOutputRowCount(super.update(inputIndex, status.getOutPosition()));
+ super.update(inputIndex, status.getOutPosition());
+ status.setTargetOutputRowCount(super.getCurrentOutgoingMaxRowCount()); // calculated by update()
RecordBatchIOType type = inputIndex == 0 ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT;
RecordBatchStats.logRecordBatchStats(type, getRecordBatchSizer(inputIndex), getRecordBatchStatsContext());
}
@@ -184,15 +185,13 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
status.prepare();
// loop so we can start over again if we find a new batch was created.
while (true) {
+ boolean isNewSchema = false;
// Check result of last iteration.
switch (status.getOutcome()) {
- case BATCH_RETURNED:
- allocateBatch(false);
- status.resetOutputPos();
- status.setTargetOutputRowCount(batchMemoryManager.getOutputRowCount());
- break;
case SCHEMA_CHANGED:
- allocateBatch(true);
+ isNewSchema = true;
+ case BATCH_RETURNED:
+ allocateBatch(isNewSchema);
status.resetOutputPos();
status.setTargetOutputRowCount(batchMemoryManager.getOutputRowCount());
break;