aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
blob: 6a805c142257af3d3fb496cf12132e0fc33ff81c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.util;

import java.util.LinkedList;
import java.util.List;

import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.memory.RootAllocatorFactory;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.server.options.OptionManager;

public class MemoryAllocationUtilities {

  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MemoryAllocationUtilities.class);

  /**
   * Helper method to setup Memory Allocations
   * <p>
   * Plan the memory for buffered operators (the only ones that can spill in this release)
   * based on assumptions. These assumptions are the amount of memory per node to give
   * to each query and the number of sort operators per node.
   * <p>
   * The reason the total
   * memory is an assumption is that we have know knowledge of the number of queries
   * that can run, so we need the user to tell use that information by configuring the
   * amount of memory to be assumed available to each query.
   * <p>
   * The number of sorts per node could be calculated, but we instead simply take
   * the worst case: the maximum per-query, per-node parallization and assume that
   * all sorts appear in all fragments &mdash; a gross oversimplification, but one
   * that Drill has long made.
   * <p>
   * since this method can be used in multiple places adding it in this class
   * rather than keeping it in Foreman
   * @param plan
   * @param queryContext
   */
  public static void setupBufferedOpsMemoryAllocations(final PhysicalPlan plan, final QueryContext queryContext) {

    // Test plans may already have a pre-defined memory plan.
    // Otherwise, determine memory allocation.

    if (plan.getProperties().hasResourcePlan) {
      return;
    }
    // look for external sorts
    final List<PhysicalOperator> bufferedOpList = new LinkedList<>();
    for (final PhysicalOperator op : plan.getSortedOperators()) {
      if (op.isBufferedOperator()) {
        bufferedOpList.add(op);
      }
    }

    // if there are any sorts, compute the maximum allocation, and set it on them
    if (bufferedOpList.size() > 0) {
      final OptionManager optionManager = queryContext.getOptions();
      double cpu_load_average = optionManager.getOption(ExecConstants.CPU_LOAD_AVERAGE);
      final long maxWidth = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE);
      final long maxWidthPerNode = ExecConstants.MAX_WIDTH_PER_NODE.computeMaxWidth(cpu_load_average,maxWidth);
      long maxAllocPerNode = Math.min(DrillConfig.getMaxDirectMemory(),
          queryContext.getConfig().getLong(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC));
      maxAllocPerNode = Math.min(maxAllocPerNode,
          optionManager.getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val);
      final long maxOperatorAlloc = maxAllocPerNode / (bufferedOpList.size() * maxWidthPerNode);
      logger.debug("Max buffered operator alloc: {}", maxOperatorAlloc);

      // User configurable option to allow forcing minimum memory.
      // Ensure that the buffered ops receive the minimum memory needed to make progress.
      // Without this, the math might work out to allocate too little memory.
      final long opMinMem = queryContext.getOptions().getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP_KEY).num_val;

      for(final PhysicalOperator op : bufferedOpList) {

        long alloc = Math.max(maxOperatorAlloc, op.getInitialAllocation());
        alloc = Math.max(alloc, opMinMem);
        op.setMaxAllocation(alloc);
      }
    }
    plan.getProperties().hasResourcePlan = true;
  }
}