1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
|
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.util;
import java.util.LinkedList;
import java.util.List;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.memory.RootAllocatorFactory;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.server.options.OptionManager;
public class MemoryAllocationUtilities {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MemoryAllocationUtilities.class);
/**
* Helper method to setup Memory Allocations
* <p>
* Plan the memory for buffered operators (the only ones that can spill in this release)
* based on assumptions. These assumptions are the amount of memory per node to give
* to each query and the number of sort operators per node.
* <p>
* The reason the total
* memory is an assumption is that we have know knowledge of the number of queries
* that can run, so we need the user to tell use that information by configuring the
* amount of memory to be assumed available to each query.
* <p>
* The number of sorts per node could be calculated, but we instead simply take
* the worst case: the maximum per-query, per-node parallization and assume that
* all sorts appear in all fragments — a gross oversimplification, but one
* that Drill has long made.
* <p>
* since this method can be used in multiple places adding it in this class
* rather than keeping it in Foreman
* @param plan
* @param queryContext
*/
public static void setupBufferedOpsMemoryAllocations(final PhysicalPlan plan, final QueryContext queryContext) {
// Test plans may already have a pre-defined memory plan.
// Otherwise, determine memory allocation.
if (plan.getProperties().hasResourcePlan) {
return;
}
// look for external sorts
final List<PhysicalOperator> bufferedOpList = new LinkedList<>();
for (final PhysicalOperator op : plan.getSortedOperators()) {
if (op.isBufferedOperator()) {
bufferedOpList.add(op);
}
}
// if there are any sorts, compute the maximum allocation, and set it on them
if (bufferedOpList.size() > 0) {
final OptionManager optionManager = queryContext.getOptions();
double cpu_load_average = optionManager.getOption(ExecConstants.CPU_LOAD_AVERAGE);
final long maxWidth = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE);
final long maxWidthPerNode = ExecConstants.MAX_WIDTH_PER_NODE.computeMaxWidth(cpu_load_average,maxWidth);
long maxAllocPerNode = Math.min(DrillConfig.getMaxDirectMemory(),
queryContext.getConfig().getLong(RootAllocatorFactory.TOP_LEVEL_MAX_ALLOC));
maxAllocPerNode = Math.min(maxAllocPerNode,
optionManager.getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val);
final long maxOperatorAlloc = maxAllocPerNode / (bufferedOpList.size() * maxWidthPerNode);
logger.debug("Max buffered operator alloc: {}", maxOperatorAlloc);
// User configurable option to allow forcing minimum memory.
// Ensure that the buffered ops receive the minimum memory needed to make progress.
// Without this, the math might work out to allocate too little memory.
final long opMinMem = queryContext.getOptions().getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP_KEY).num_val;
for(final PhysicalOperator op : bufferedOpList) {
long alloc = Math.max(maxOperatorAlloc, op.getInitialAllocation());
alloc = Math.max(alloc, opMinMem);
op.setMaxAllocation(alloc);
}
}
plan.getProperties().hasResourcePlan = true;
}
}
|