aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen-Zvi <bben-zvi@mapr.com>2019-03-04 21:05:32 -0800
committerSorabh Hamirwasia <sorabh@apache.org>2019-03-14 22:25:20 -0700
commit3b85694be4c37bb217366287c182d50ceadda4ab (patch)
treef7237d8a32603994353d9a9bbb176bb1876a8068
parentd22e68b83d1d0cc0539d79ae0cb3aa70ae3242ad (diff)
downloaddrill-3b85694be4c37bb217366287c182d50ceadda4ab.tar.gz
DRILL-6707: Update target outgoing batch row count between current position and allocated size
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java84
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java36
7 files changed, 110 insertions, 35 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 24fb1d8c3..429d15f4d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -587,7 +587,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
// Allocate the memory for the vectors in the output container
batchMemoryManager.allocateVectors(container);
- hashJoinProbe.setTargetOutputCount(batchMemoryManager.getOutputRowCount());
+ hashJoinProbe.setTargetOutputCount(batchMemoryManager.getCurrentOutgoingMaxRowCount());
outputRecords = hashJoinProbe.probeAndProject();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index c549143d5..2836794d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -271,7 +271,8 @@ public abstract class HashJoinProbeTemplate implements HashJoinProbe {
probeBatch.getSchema());
}
case OK:
- setTargetOutputCount(outgoingJoinBatch.getBatchMemoryManager().update(probeBatch, LEFT_INDEX,outputRecords));
+ outgoingJoinBatch.getBatchMemoryManager().update(probeBatch, LEFT_INDEX,outputRecords);
+ setTargetOutputCount(outgoingJoinBatch.getBatchMemoryManager().getCurrentOutgoingMaxRowCount()); // calculated by update()
recordsToProcess = probeBatch.getRecordCount();
recordsProcessed = 0;
// If we received an empty batch do nothing
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
index 735f11f36..c916c95da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/LateralJoinBatch.java
@@ -1182,7 +1182,9 @@ public class LateralJoinBatch extends AbstractBinaryRecordBatch<LateralJoinPOP>
// For cases where all the previous input were consumed and send with previous output batch. But now we are building
// a new output batch with new incoming then it will not cause any problem since outputIndex will be 0
- final int newOutputRowCount = batchMemoryManager.update(inputIndex, outputIndex);
+ batchMemoryManager.update(inputIndex, outputIndex);
+ final int newOutputRowCount = batchMemoryManager.getCurrentOutgoingMaxRowCount();
+
if (isRecordBatchStatsLoggingEnabled()) {
RecordBatchIOType type = inputIndex == LEFT_INDEX ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
index d502c4f9d..36320ce2e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
@@ -124,7 +124,8 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
*/
@Override
public void update(int inputIndex) {
- status.setTargetOutputRowCount(super.update(inputIndex, status.getOutPosition()));
+ super.update(inputIndex, status.getOutPosition());
+ status.setTargetOutputRowCount(super.getCurrentOutgoingMaxRowCount()); // calculated by update()
RecordBatchIOType type = inputIndex == 0 ? RecordBatchIOType.INPUT_LEFT : RecordBatchIOType.INPUT_RIGHT;
RecordBatchStats.logRecordBatchStats(type, getRecordBatchSizer(inputIndex), getRecordBatchStatsContext());
}
@@ -184,15 +185,13 @@ public class MergeJoinBatch extends AbstractBinaryRecordBatch<MergeJoinPOP> {
status.prepare();
// loop so we can start over again if we find a new batch was created.
while (true) {
+ boolean isNewSchema = false;
// Check result of last iteration.
switch (status.getOutcome()) {
- case BATCH_RETURNED:
- allocateBatch(false);
- status.resetOutputPos();
- status.setTargetOutputRowCount(batchMemoryManager.getOutputRowCount());
- break;
case SCHEMA_CHANGED:
- allocateBatch(true);
+ isNewSchema = true;
+ case BATCH_RETURNED:
+ allocateBatch(isNewSchema);
status.resetOutputPos();
status.setTargetOutputRowCount(batchMemoryManager.getOutputRowCount());
break;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
index 3b8ab8d2e..95c6acd5e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/NestedLoopJoinTemplate.java
@@ -192,7 +192,8 @@ public abstract class NestedLoopJoinTemplate implements NestedLoopJoin {
leftRecordCount = 0;
break;
case OK:
- setTargetOutputCount(outgoing.getBatchMemoryManager().update(left, LEFT_INDEX,outputIndex));
+ outgoing.getBatchMemoryManager().update(left, LEFT_INDEX,outputIndex);
+ setTargetOutputCount(outgoing.getBatchMemoryManager().getCurrentOutgoingMaxRowCount()); // calculated by update()
RecordBatchStats.logRecordBatchStats(RecordBatchIOType.INPUT_LEFT,
outgoing.getBatchMemoryManager().getRecordBatchSizer(LEFT_INDEX),
outgoing.getRecordBatchStatsContext());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
index 4344e1374..a06e2c35d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/JoinBatchMemoryManager.java
@@ -40,7 +40,30 @@ public class JoinBatchMemoryManager extends RecordBatchMemoryManager {
this.columnsToExclude = excludedColumns;
}
- private int updateInternal(int inputIndex, int outputPosition, boolean useAggregate) {
+ /**
+ * Update the memory manager parameters based on the new incoming batch
+ *
+ * Notice three (possibly) different "row counts" for the outgoing batches:
+ *
+ * 1. The rowCount that the current outgoing batch was allocated with (always a power of 2; e.g. 8192)
+ * 2. The new rowCount computed based on the newly seen input rows (always a power of 2); may be bigger than (1) if the
+ * new input rows are much smaller than before (e.g. 16384), or smaller (e.g. 4096) if the new rows are much wider.
+ * Subsequent outgoing batches would be allocated based on this (2) new rowCount.
+ * 3. The target rowCount for the current outgoing batch. While initially (1), it may be resized down if the new rows
+ * are getting bigger. In any case it won't be resized above (1) (to avoid IOOB) or below the current number of rows
+ * in that batch (i.e., outputPosition). (Need not be a power of two; e.g., 7983).
+ *
+ * After every call to update() while the outgoing batch is active, the current target should be updated with (3) by
+ * calling getCurrentOutgoingMaxRowCount() .
+ *
+ * Comment: The "power of 2" in the above (1) and (2) is actually "power of 2 minus 1" (e.g. 65535, or 8191) in order
+ * to avoid memory waste in case offset vectors are used (see DRILL-5446)
+ *
+ * @param inputIndex Left (0) or Right (1)
+ * @param outputPosition Position (i.e. number of inserted rows) in the current output batch
+ * @param useAggregate If true, compute using average row width (else based on allocated sizes)
+ */
+ private void updateInternal(int inputIndex, int outputPosition, boolean useAggregate) {
updateIncomingStats(inputIndex);
rowWidth[inputIndex] = useAggregate ? (int) getAvgInputRowWidth(inputIndex) : getRecordBatchSizer(inputIndex).getRowAllocWidth();
@@ -60,7 +83,7 @@ public class JoinBatchMemoryManager extends RecordBatchMemoryManager {
// This is possible for empty batches or
// when first set of batches come with OK_NEW_SCHEMA and no data.
if (newOutgoingRowWidth == 0 || newOutgoingRowWidth == getOutgoingRowWidth()) {
- return getOutputRowCount();
+ return;
}
// Adjust for the current batch.
@@ -75,34 +98,73 @@ public class JoinBatchMemoryManager extends RecordBatchMemoryManager {
// These are number of rows we can fit in remaining memory based on new outgoing row width.
final int numOutputRowsRemaining = RecordBatchSizer.safeDivide(remainingMemory, newOutgoingRowWidth);
+ final int currentOutputBatchRowCount = getOutputRowCount();
+
// update the value to be used for next batch(es)
setOutputRowCount(configOutputBatchSize, newOutgoingRowWidth);
// set the new row width
setOutgoingRowWidth(newOutgoingRowWidth);
- return adjustOutputRowCount(outputPosition + numOutputRowsRemaining);
+ int newOutputRowCount = getOutputRowCount();
+
+ if ( currentOutputBatchRowCount != newOutputRowCount ) {
+ logger.debug("Memory manager update changed the output row count from {} to {}",currentOutputBatchRowCount,newOutputRowCount);
+ }
+
+ // The current outgoing batch target count (i.e., max number of rows to put there) is modified to be the current number of rows there
+ // plus as many of the future new rows that would fit in the remaining memory (e.g., if the new rows are wider, fewer would fit), but
+ // in any case no larger than the size the batch was allocated for (to avoid IOOB on the allocated vectors)
+ setCurrentOutgoingMaxRowCount(Math.min(currentOutputBatchRowCount, outputPosition + numOutputRowsRemaining ));
}
+ /**
+ * Update the memory manager parameters based on the new incoming batch
+ *
+ * @param inputIndex Left (0) or Right (1)
+ * @param outputPosition Position (i.e. number of inserted rows) in the output batch
+ * @param useAggregate Compute using average row width (else based on allocated sizes)
+ */
@Override
- public int update(int inputIndex, int outputPosition, boolean useAggregate) {
+ public void update(int inputIndex, int outputPosition, boolean useAggregate) {
setRecordBatchSizer(inputIndex, new RecordBatchSizer(recordBatch[inputIndex]));
- return updateInternal(inputIndex, outputPosition, useAggregate);
+ updateInternal(inputIndex, outputPosition, useAggregate);
}
+ /**
+ * Update the memory manager parameters based on the new incoming batch (based on allocated sizes, not average row size)
+ *
+ * @param inputIndex Left (0) or Right (1)
+ * @param outputPosition Position (i.e. number of inserted rows) in the output batch
+ */
@Override
- public int update(int inputIndex, int outputPosition) {
- return update(inputIndex, outputPosition, false);
+ public void update(int inputIndex, int outputPosition) {
+ update(inputIndex, outputPosition, false);
}
+ /**
+ * Update the memory manager parameters based on the given (incoming) batch
+ *
+ * @param batch Update based on the data in this batch
+ * @param inputIndex Left (0) or Right (1)
+ * @param outputPosition Position (i.e. number of inserted rows) in the output batch
+ * @param useAggregate Compute using average row width (else based on allocated sizes)
+ */
@Override
- public int update(RecordBatch batch, int inputIndex, int outputPosition, boolean useAggregate) {
+ public void update(RecordBatch batch, int inputIndex, int outputPosition, boolean useAggregate) {
setRecordBatchSizer(inputIndex, new RecordBatchSizer(batch));
- return updateInternal(inputIndex, outputPosition, useAggregate);
+ updateInternal(inputIndex, outputPosition, useAggregate);
}
+ /**
+ * Update the memory manager parameters based on the given (incoming) batch (based on allocated sizes, not average row size)
+ *
+ * @param batch Update based on the data in this batch
+ * @param inputIndex Left (0) or Right (1)
+ * @param outputPosition Position (i.e. number of inserted rows) in the output batch
+ */
@Override
- public int update(RecordBatch batch, int inputIndex, int outputPosition) {
- return update(batch, inputIndex, outputPosition, false);
+ public void update(RecordBatch batch, int inputIndex, int outputPosition) {
+ update(batch, inputIndex, outputPosition, false);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
index 64f225ca5..a6b6ed510 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchMemoryManager.java
@@ -24,10 +24,16 @@ import org.apache.drill.exec.vector.ValueVector;
import java.util.List;
public class RecordBatchMemoryManager {
- protected static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT;
+ // The " - 1 " in the number of rows below was chosen to avoid a waste of (possible) offset vectors memory
+ // where a power-of-2 rows size needlessly doubles the offset vector (see DRILL-5446)
+ protected static final int MAX_NUM_ROWS = ValueVector.MAX_ROW_COUNT - 1;
protected static final int MIN_NUM_ROWS = 1;
protected static final int DEFAULT_INPUT_INDEX = 0;
private int outputRowCount = MAX_NUM_ROWS;
+ // max row count allowed in the current output batch; it would never exceed the allocated size (i.e. outputRowCount)
+ // but may go lower (e.g., when early rows are narrow, the outgoing batch can hold many output rows, but if later
+ // the incoming rows become wide, then less (than planned) would fit into the remaining current allocated memory)
+ private int currentOutgoingMaxRowCount = MAX_NUM_ROWS;
private int outgoingRowWidth;
private int outputBatchSize;
private RecordBatchSizer[] sizer;
@@ -144,11 +150,6 @@ public class RecordBatchMemoryManager {
outputBatchStats = new BatchStats();
}
- public int update(int inputIndex, int outputPosition) {
- // by default just return the outputRowCount
- return getOutputRowCount();
- }
-
public void update(int inputIndex) {
}
@@ -166,17 +167,20 @@ public class RecordBatchMemoryManager {
updateIncomingStats(index);
}
- public int update(int inputIndex, int outputPosition, boolean useAggregate) {
- // by default just return the outputRowCount
- return getOutputRowCount();
+ public void update(int inputIndex, int outputPosition, boolean useAggregate) {
+ throw new IllegalStateException("Should only be called on JoinBatchMemoryManager");
+ }
+
+ public void update(int inputIndex, int outputPosition) {
+ throw new IllegalStateException("Should only be called on JoinBatchMemoryManager");
}
- public int update(RecordBatch batch, int inputIndex, int outputPosition) {
- return getOutputRowCount();
+ public void update(RecordBatch batch, int inputIndex, int outputPosition) {
+ throw new IllegalStateException("Should only be called on JoinBatchMemoryManager");
}
- public int update(RecordBatch batch, int inputIndex, int outputPosition, boolean useAggregate) {
- return getOutputRowCount();
+ public void update(RecordBatch batch, int inputIndex, int outputPosition, boolean useAggregate) {
+ throw new IllegalStateException("Should only be called on JoinBatchMemoryManager");
}
public boolean updateIfNeeded(int newOutgoingRowWidth) {
@@ -199,6 +203,7 @@ public class RecordBatchMemoryManager {
return outputRowCount;
}
+ public int getCurrentOutgoingMaxRowCount() { return currentOutgoingMaxRowCount; }
/**
* Given batchSize and rowWidth, this will set output rowCount taking into account
* the min and max that is allowed.
@@ -209,8 +214,13 @@ public class RecordBatchMemoryManager {
public void setOutputRowCount(int outputRowCount) {
this.outputRowCount = outputRowCount;
+ if ( outputRowCount > MIN_NUM_ROWS && Integer.highestOneBit(outputRowCount) == outputRowCount ) {
+ this.outputRowCount--;
+ }
}
+ public void setCurrentOutgoingMaxRowCount(int newTargetOutputCount) { this.currentOutgoingMaxRowCount = newTargetOutputCount; }
+
/**
* This will adjust rowCount taking into account the min and max that is allowed.
* We will round down to nearest power of two - 1 for better memory utilization.