aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/physical
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java3
5 files changed, 14 insertions, 11 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 24fb1d8c3..429d15f4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -587,7 +587,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
// Allocate the memory for the vectors in the output container
batchMemoryManager.allocateVectors(container);
- hashJoinProbe.setTargetOutputCount(batchMemoryManager.getOutputRowCount());
+ hashJoinProbe.setTargetOutputCount(batchMemoryManager.getCurrentOutgoingMaxRowCount());
outputRecords = hashJoinProbe.probeAndProject();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index c549143d5..2836794d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -271,7 +271,8 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
probeBatch.getSchema());
}
case OK:
- setTargetOutputCount(outgoingJoinBatch.getBatchMemoryManager().update(probeBatch, LEFT_INDEX,outputRecords));
+ outgoingJoinBatch.getBatchMemoryManager().update(probeBatch, LEFT_INDEX,outputRecords);
+ setTargetOutputCount(outgoingJoinBatch.getBatchMemoryManager().getCurrentOutgoingMaxRowCount()); // calculated by update()
recordsToProcess = probeBatch.getRecordCount();
recordsProcessed = 0;
// If we received an empty batch do nothing
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 735f11f36..c916c95da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -1182,7 +1182,9 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// For cases where all the previous input were consumed and send with previous output batch. But now we are building
// a new output batch with new incoming then it will not cause any problem since outputIndex will be 0
- final int newOutputRowCount = batchMemoryManager.update(inputIndex, outputIndex);
+ batchMemoryManager.update(inputIndex, outputIndex);
+ final int newOutputRowCount = batchMemoryManager.getCurrentOutgoingMaxRowCount();
+
if (isRecordBatchStatsLoggingEnabled()) {
RecordBatchIOType type = inputIndex == LEFT_INDEX ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index d502c4f9d..36320ce2e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -124,7 +124,8 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
*/
@Override
public void update(int inputIndex) {
- status.setTargetOutputRowCount(super.update(inputIndex, status.getOutPosition()));
+ super.update(inputIndex, status.getOutPosition());
+ status.setTargetOutputRowCount(super.getCurrentOutgoingMaxRowCount()); // calculated by update()
RecordBatchIOType type = inputIndex == 0 ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT;
RecordBatchStats.logRecordBatchStats(type, getRecordBatchSizer(inputIndex), getRecordBatchStatsContext());
}
@@ -184,15 +185,13 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
status.prepare();
// loop so we can start over again if we find a new batch was created.
while (true) {
+ boolean isNewSchema = false;
// Check result of last iteration.
switch (status.getOutcome()) {
- case BATCH_RETURNED:
- allocateBatch(false);
- status.resetOutputPos();
- status.setTargetOutputRowCount(batchMemoryManager.getOutputRowCount());
- break;
case SCHEMA_CHANGED:
- allocateBatch(true);
+ isNewSchema = true;
+ case BATCH_RETURNED:
+ allocateBatch(isNewSchema);
status.resetOutputPos();
status.setTargetOutputRowCount(batchMemoryManager.getOutputRowCount());
break;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
index 3b8ab8d2e..95c6acd5e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
@@ -192,7 +192,8 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
leftRecordCount = 0;
break;
case OK:
- setTargetOutputCount(outgoing.getBatchMemoryManager().update(left, LEFT_INDEX,outputIndex));
+ outgoing.getBatchMemoryManager().update(left, LEFT_INDEX,outputIndex);
+ setTargetOutputCount(outgoing.getBatchMemoryManager().getCurrentOutgoingMaxRowCount()); // calculated by update()
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT,
outgoing.getBatchMemoryManager().getRecordBatchSizer(LEFT_INDEX),
outgoing.getRecordBatchStatsContext());