diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl')
5 files changed, 14 insertions, 11 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 24fb1d8c3..429d15f4d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -587,7 +587,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem // Allocate the memory for the vectors in the output container batchMemoryManager.allocateVectors(container); - hashJoinProbe.setTargetOutputCount(batchMemoryManager.getOutputRowCount()); + hashJoinProbe.setTargetOutputCount(batchMemoryManager.getCurrentOutgoingMaxRowCount()); outputRecords = hashJoinProbe.probeAndProject(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java index c549143d5..2836794d8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java @@ -271,7 +271,8 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe { probeBatch.getSchema()); } case OK: - setTargetOutputCount(outgoingJoinBatch.getBatchMemoryManager().update(probeBatch, LEFT_INDEX,outputRecords)); + outgoingJoinBatch.getBatchMemoryManager().update(probeBatch, LEFT_INDEX,outputRecords); + setTargetOutputCount(outgoingJoinBatch.getBatchMemoryManager().getCurrentOutgoingMaxRowCount()); // calculated by update() recordsToProcess = probeBatch.getRecordCount(); recordsProcessed = 0; // If we received an empty batch do nothing diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java index 735f11f36..c916c95da 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java @@ -1182,7 +1182,9 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP> // For cases where all the previous input were consumed and send with previous output batch. But now we are building // a new output batch with new incoming then it will not cause any problem since outputIndex will be 0 - final int newOutputRowCount = batchMemoryManager.update(inputIndex, outputIndex); + batchMemoryManager.update(inputIndex, outputIndex); + final int newOutputRowCount = batchMemoryManager.getCurrentOutgoingMaxRowCount(); + if (isRecordBatchStatsLoggingEnabled()) { RecordBatchIOType type = inputIndex == LEFT_INDEX ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index d502c4f9d..36320ce2e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -124,7 +124,8 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> { */ @Override public void update(int inputIndex) { - status.setTargetOutputRowCount(super.update(inputIndex, status.getOutPosition())); + super.update(inputIndex, status.getOutPosition()); + status.setTargetOutputRowCount(super.getCurrentOutgoingMaxRowCount()); // calculated by update() RecordBatchIOType type = inputIndex == 0 ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT; RecordBatchStats.logRecordBatchStats(type, getRecordBatchSizer(inputIndex), getRecordBatchStatsContext()); } @@ -184,15 +185,13 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> { status.prepare(); // loop so we can start over again if we find a new batch was created. while (true) { + boolean isNewSchema = false; // Check result of last iteration. switch (status.getOutcome()) { - case BATCH_RETURNED: - allocateBatch(false); - status.resetOutputPos(); - status.setTargetOutputRowCount(batchMemoryManager.getOutputRowCount()); - break; case SCHEMA_CHANGED: - allocateBatch(true); + isNewSchema = true; + case BATCH_RETURNED: + allocateBatch(isNewSchema); status.resetOutputPos(); status.setTargetOutputRowCount(batchMemoryManager.getOutputRowCount()); break; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java index 3b8ab8d2e..95c6acd5e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java @@ -192,7 +192,8 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin { leftRecordCount = 0; break; case OK: - setTargetOutputCount(outgoing.getBatchMemoryManager().update(left, LEFT_INDEX,outputIndex)); + outgoing.getBatchMemoryManager().update(left, LEFT_INDEX,outputIndex); + setTargetOutputCount(outgoing.getBatchMemoryManager().getCurrentOutgoingMaxRowCount()); // calculated by update() RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT, outgoing.getBatchMemoryManager().getRecordBatchSizer(LEFT_INDEX), outgoing.getRecordBatchStatsContext()); |