aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
blob: c6eb9d3b5364e954b54c0dc5f19a1e0ff1cafecc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.work.foreman.rm;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.apache.drill.exec.work.QueryWorkUnit.MinorFragmentDefn;
import org.apache.drill.exec.work.foreman.Foreman;
import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueLease;
import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException;

import org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
import org.apache.drill.shaded.guava.com.google.common.collect.Multimap;

/**
 * Global resource manager that provides basic admission control (AC) via a
 * configured queue: either the Zookeeper-based distributed queue or the
 * in-process embedded Drillbit queue. The queue places an upper limit on the
 * number of running queries. This limit then "slices" memory and CPU between
 * queries: each gets the same share of resources.
 * <p>
 * This is a "basic" implementation. Clearly, a more advanced implementation
 * could look at query cost to determine whether to give a given query more or
 * less than the "standard" share. That is left as a future exercise; in this
 * version we just want to get the basics working.
 * <p>
 * This is the resource manager level. This resource manager is paired with a
 * queue implementation to produce a complete solution. This composition-based
 * approach allows sharing of functionality across queue implementations.
 */

public class ThrottledResourceManager extends AbstractResourceManager {

  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
      .getLogger(ThrottledResourceManager.class);

  public static class QueuedResourceAllocator
      implements QueryResourceAllocator {

    protected final ThrottledResourceManager rm;
    protected QueryContext queryContext;
    protected PhysicalPlan plan;
    protected QueryWorkUnit work;
    protected double queryCost;

    protected QueuedResourceAllocator(final ThrottledResourceManager rm,
        QueryContext queryContext) {
      this.rm = rm;
      this.queryContext = queryContext;
    }

    @Override
    public void visitAbstractPlan(PhysicalPlan plan) {
      this.plan = plan;
      queryCost = plan.totalCost();
    }

    @Override
    public void visitPhysicalPlan(final QueryWorkUnit work) {
      this.work = work;
      planMemory();
    }

    private void planMemory() {
      if (plan.getProperties().hasResourcePlan) {
        logger.debug("Memory already planned.");
        return;
      }

      // Group fragments by node.

      Map<String, Collection<PhysicalOperator>> nodeMap = buildBufferedOpMap();

      // Memory must be symmetric to avoid bottlenecks in which one node has
      // sorts (say) with less memory than another, causing skew in data arrival
      // rates for downstream operators.

      int width = countBufferingOperators(nodeMap);

      // Then, share memory evenly across the
      // all sort operators on that node. This handles asymmetric distribution
      // such as occurs if a sort appears in the root fragment (the one with
      // screen),
      // which is never parallelized.

      for (Entry<String, Collection<PhysicalOperator>> entry : nodeMap.entrySet()) {
        planNodeMemory(entry.getKey(), entry.getValue(), width);
      }
    }

    private int countBufferingOperators(
        Map<String, Collection<PhysicalOperator>> nodeMap) {
      int width = 0;
      for (Collection<PhysicalOperator> fragSorts : nodeMap.values()) {
        width = Math.max(width, fragSorts.size());
      }
      return width;
    }

    /**
     * Given the set of buffered operators (from any number of fragments) on a
     * single node, shared the per-query memory equally across all the
     * operators.
     *
     * @param nodeAddr
     * @param bufferedOps
     * @param width
     */

    private void planNodeMemory(String nodeAddr,
        Collection<PhysicalOperator> bufferedOps, int width) {

      // If no buffering operators, nothing to plan.

      if (bufferedOps.isEmpty()) {
        return;
      }

      // Divide node memory evenly among the set of operators, in any minor
      // fragment, on the node. This is not very sophisticated: it does not
      // deal with, say, three stacked sorts in which, if sort A runs, then
      // B may be using memory, but C cannot be active. That kind of analysis
      // is left as a later exercise.

      long nodeMemory = queryMemoryPerNode();

      // Set a floor on the amount of memory per operator based on the
      // configured minimum. This is likely unhelpful because we are trying
      // to work around constrained memory by assuming more than we actually
      // have. This may lead to an OOM at run time.

      long preferredOpMemory = nodeMemory / width;
      long perOpMemory = Math.max(preferredOpMemory, rm.minimumOperatorMemory());
      if (preferredOpMemory < perOpMemory) {
        logger.warn("Preferred per-operator memory: {}, actual amount: {}",
            preferredOpMemory, perOpMemory);
      }
      logger.debug(
          "Query: {}, Node: {}, allocating {} bytes each for {} buffered operator(s).",
          QueryIdHelper.getQueryId(queryContext.getQueryId()), nodeAddr,
          perOpMemory, width);

      for (PhysicalOperator op : bufferedOps) {

        // Limit the memory to the maximum in the plan. Doing so is
        // likely unnecessary, and perhaps harmful, because the pre-planned
        // allocation is the default maximum hard-coded to 10 GB. This means
        // that even if 20 GB is available to the sort, it won't use more
        // than 10GB. This is probably more of a bug than a feature.

        long alloc = Math.min(perOpMemory, op.getMaxAllocation());

        // Place a floor on the memory that is the initial allocation,
        // since we don't want the operator to run out of memory when it
        // first starts.

        alloc = Math.max(alloc, op.getInitialAllocation());

        if (alloc > preferredOpMemory && alloc != perOpMemory) {
          logger.warn("Allocated memory of {} for {} exceeds available memory of {} " +
                      "due to operator minimum",
              alloc, op.getClass().getSimpleName(), preferredOpMemory);
        }
        else if (alloc < preferredOpMemory) {
          logger.warn("Allocated memory of {} for {} is less than available memory " +
              "of {} due to operator limit",
              alloc, op.getClass().getSimpleName(), preferredOpMemory);
        }
        op.setMaxAllocation(alloc);
      }
    }

    protected long queryMemoryPerNode() {
      return rm.defaultQueryMemoryPerNode(plan.totalCost());
    }

    /**
     * Build a list of external sorts grouped by node. We start with a list of
     * minor fragments, each with an endpoint (node). Multiple minor fragments
     * may appear on each node, and each minor fragment may have 0, 1 or more
     * sorts.
     *
     * @return
     */

    private Map<String, Collection<PhysicalOperator>> buildBufferedOpMap() {
      Multimap<String, PhysicalOperator> map = ArrayListMultimap.create();
      getBufferedOps(map, work.getRootFragmentDefn());
      for (MinorFragmentDefn defn : work.getMinorFragmentDefns()) {
        getBufferedOps(map, defn);
      }
      return map.asMap();
    }

    /**
     * Searches a fragment operator tree to find buffered within that fragment.
     */

    protected static class BufferedOpFinder extends
        AbstractPhysicalVisitor<Void, List<PhysicalOperator>, RuntimeException> {
      @Override
      public Void visitOp(PhysicalOperator op, List<PhysicalOperator> value)
          throws RuntimeException {
        if (op.isBufferedOperator(null)) {
          value.add(op);
        }
        visitChildren(op, value);
        return null;
      }
    }

    private void getBufferedOps(Multimap<String, PhysicalOperator> map,
        MinorFragmentDefn defn) {
      List<PhysicalOperator> bufferedOps = getBufferedOps(defn.root());
      if (!bufferedOps.isEmpty()) {
        map.putAll(defn.fragment().getAssignment().getAddress(), bufferedOps);
      }
    }

    /**
     * Search an individual fragment tree to find any buffered operators it may
     * contain.
     *
     * @param root
     * @return
     */

    private List<PhysicalOperator> getBufferedOps(FragmentRoot root) {
      List<PhysicalOperator> bufferedOps = new ArrayList<>();
      BufferedOpFinder finder = new BufferedOpFinder();
      root.accept(finder, bufferedOps);
      return bufferedOps;
    }
  }

  /**
   * Per-query resource manager. Handles resources and optional queue lease for
   * a single query. As such, this is a non-shared resource: it is associated
   * with a Foreman: a single thread at plan time, and a single event (in some
   * thread) at query completion time. Because of these semantics, no
   * synchronization is needed within this class.
   */

  public static class QueuedQueryResourceManager extends QueuedResourceAllocator
      implements QueryResourceManager {

    private final Foreman foreman;
    private QueueLease lease;

    public QueuedQueryResourceManager(final ThrottledResourceManager rm,
        final Foreman foreman) {
      super(rm, foreman.getQueryContext());
      this.foreman = foreman;
    }

    @Override
    public void setCost(double cost) {
      this.queryCost = cost;
    }

    @Override
    public void admit() throws QueueTimeoutException, QueryQueueException {
      lease = rm.queue().enqueue(foreman.getQueryId(), queryCost);
    }

    @Override
    protected long queryMemoryPerNode() {

      // No lease: use static estimate.

      if (lease == null) {
        return super.queryMemoryPerNode();
      }

      // Use actual memory assigned to this query.

      return lease.queryMemoryPerNode();
    }

    @Override
    public void exit() {
      if (lease != null) {
        lease.release();
      }
      lease = null;
    }

    @Override
    public boolean hasQueue() { return true; }

    @Override
    public String queueName() {
      return lease == null ? null : lease.queueName();
    }
  }

  private final QueryQueue queue;

  public ThrottledResourceManager(final DrillbitContext drillbitContext,
      final QueryQueue queue) {
    super(drillbitContext);
    this.queue = queue;
    queue.setMemoryPerNode(memoryPerNode());
  }

  public long minimumOperatorMemory() {
    return queue.minimumOperatorMemory();
  }

  public long defaultQueryMemoryPerNode(double cost) {
    return queue.defaultQueryMemoryPerNode(cost);
  }

  public QueryQueue queue() { return queue; }

  @Override
  public QueryResourceAllocator newResourceAllocator(
      QueryContext queryContext) {
    return new QueuedResourceAllocator(this, queryContext);
  }

  @Override
  public QueryResourceManager newQueryRM(Foreman foreman) {
    return new QueuedQueryResourceManager(this, foreman);
  }

  @Override
  public void close() {
    queue.close();
  }
}