diff options
author | Padma Penumarthy <padma@padmas-mbp.attlocal.net> | 2018-07-01 09:43:40 -0700 |
---|---|---|
committer | Volodymyr Vysotskyi <vvovyk@gmail.com> | 2018-07-01 21:26:18 +0300 |
commit | 482a63549e1bfe2b238ea9bdaf7d42312e1f51f6 (patch) | |
tree | 4cb9187ec84efbf2a25d15c2f844f4715a98fb6b /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java | |
parent | dcc25800902dea81e87e2c7e3ddfb7fd9b281b42 (diff) |
DRILL-6537: Limit the batch size for buffering operators based on how much memory they get
closes #1342
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java | 10 |
1 files changed, 7 insertions, 3 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 428a47ebf..047c59705 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -886,9 +886,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { partitions = new HashPartition[0]; // get the output batch size from config. - int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); - batchMemoryManager = new JoinBatchMemoryManager(configuredBatchSize, left, right); - logger.debug("BATCH_STATS, configured output batch size: {}", configuredBatchSize); + final int configuredBatchSize = (int) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR); + final double avail_mem_factor = (double) context.getOptions().getOption(ExecConstants.OUTPUT_BATCH_SIZE_AVAIL_MEM_FACTOR_VALIDATOR); + int outputBatchSize = Math.min(configuredBatchSize, Integer.highestOneBit((int)(allocator.getLimit() * avail_mem_factor))); + logger.debug("BATCH_STATS, configured output batch size: {}, allocated memory {}, avail mem factor {}, output batch size: {}", + configuredBatchSize, allocator.getLimit(), avail_mem_factor, outputBatchSize); + + batchMemoryManager = new JoinBatchMemoryManager(outputBatchSize, left, right); } /** |