aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
diff options
context:
space:
mode:
authorPadma Penumarthy <padma@padmas-mbp.attlocal.net>2018-07-01 09:43:40 -0700
committerVolodymyr Vysotskyi <vvovyk@gmail.com>2018-07-01 21:26:18 +0300
commit482a63549e1bfe2b238ea9bdaf7d42312e1f51f6 (patch)
tree4cb9187ec84efbf2a25d15c2f844f4715a98fb6b /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
parentdcc25800902dea81e87e2c7e3ddfb7fd9b281b42 (diff)
DRILL-6537: Limit the batch size for buffering operators based on how much memory they get
closes #1342
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java10
1 files changed, 7 insertions, 3 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 428a47ebf..047c59705 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -886,9 +886,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
partitions = new HashPartition[0];
// get the output batch size from config.
- int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
- batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, right);
- logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize);
+ final int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR);
+ final double avail_mem_factor = (double) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR);
+ int outputBatchSize = Math.min(configuredBatchSize, Integer.highestOneBit((int)(allocator.getLimit() * avail_mem_factor)));
+ logger.debug("BATCH_STATS, configured output batch size: {}, allocated memory {}, avail mem factor {}, output batch size: {}",
+ configuredBatchSize, allocator.getLimit(), avail_mem_factor, outputBatchSize);
+
+ batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right);
}
/**