aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueueQueryParallelizer.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueueQueryParallelizer.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueueQueryParallelizer.java173
1 files changed, 173 insertions, 0 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueueQueryParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueueQueryParallelizer.java
new file mode 100644
index 000000000..c986fca29
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueueQueryParallelizer.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.common.util.function.CheckedConsumer;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.cost.NodeResource;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Collection;
+import java.util.Set;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+/**
+ * Paralellizer specialized for managing resources for a query based on Queues. This parallelizer
+ * does not deal with increase/decrease of the parallelization of a query plan based on the current
+ * cluster state. However, the memory assignment for each operator, minor fragment and major
+ * fragment is based on the cluster state and provided queue configuration.
+ */
+public class QueueQueryParallelizer extends SimpleParallelizer {
+ private final boolean planHasMemory;
+ private final QueryContext queryContext;
+ private final Map<DrillbitEndpoint, Map<PhysicalOperator, Long>> operators;
+
+ public QueueQueryParallelizer(boolean memoryPlanning, QueryContext queryContext) {
+ super(queryContext);
+ this.planHasMemory = memoryPlanning;
+ this.queryContext = queryContext;
+ this.operators = new HashMap<>();
+ }
+
+ // return the memory computed for a physical operator on a drillbitendpoint.
+ public BiFunction<DrillbitEndpoint, PhysicalOperator, Long> getMemory() {
+ return (endpoint, operator) -> {
+ if (planHasMemory) {
+ return operators.get(endpoint).get(operator);
+ }
+ else {
+ return operator.getMaxAllocation();
+ }
+ };
+ }
+
+ /**
+ * Function called by the SimpleParallelizer to adjust the memory post parallelization.
+ * The overall logic is to traverse the fragment tree and call the MemoryCalculator on
+ * each major fragment. Once the memory is computed, resource requirement are accumulated
+ * per drillbit.
+ *
+ * The total resource requirements are used to select a queue. If the selected queue's
+ * resource limit is more/less than the query's requirement than the memory will be re-adjusted.
+ *
+ * @param planningSet context of the fragments.
+ * @param roots root fragments.
+ * @param activeEndpoints currently active endpoints.
+ * @throws PhysicalOperatorSetupException
+ */
+ public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
+ Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException {
+
+ if (planHasMemory) {
+ return;
+ }
+ // total node resources for the query plan maintained per drillbit.
+ final Map<DrillbitEndpoint, NodeResource> totalNodeResources =
+ activeEndpoints.stream().collect(Collectors.toMap(x ->x,
+ x -> NodeResource.create()));
+
+ // list of the physical operators and their memory requirements per drillbit.
+ final Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> operators =
+ activeEndpoints.stream().collect(Collectors.toMap(x -> x,
+ x -> new ArrayList<>()));
+
+ for (Wrapper wrapper : roots) {
+ traverse(wrapper, CheckedConsumer.throwingConsumerWrapper((Wrapper fragment) -> {
+ MemoryCalculator calculator = new MemoryCalculator(planningSet, queryContext);
+ fragment.getNode().getRoot().accept(calculator, fragment);
+ NodeResource.merge(totalNodeResources, fragment.getResourceMap());
+ operators.entrySet()
+ .stream()
+ .forEach((entry) -> entry.getValue()
+ .addAll(calculator.getBufferedOperators(entry.getKey())));
+ }));
+ }
+ //queryrm.selectQueue( pass the max node Resource) returns queue configuration.
+ Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> memoryAdjustedOperators = ensureOperatorMemoryWithinLimits(operators, totalNodeResources, 10);
+ memoryAdjustedOperators.entrySet().stream().forEach((x) -> {
+ Map<PhysicalOperator, Long> memoryPerOperator = x.getValue().stream()
+ .collect(Collectors.toMap(operatorLongPair -> operatorLongPair.getLeft(),
+ operatorLongPair -> operatorLongPair.getRight(),
+ (mem_1, mem_2) -> (mem_1 + mem_2)));
+ this.operators.put(x.getKey(), memoryPerOperator);
+ });
+ }
+
+
+ /**
+ * Helper method to adjust the memory for the buffered operators.
+ * @param memoryPerOperator list of physical operators per drillbit
+ * @param nodeResourceMap resources per drillbit.
+ * @param nodeLimit permissible node limit.
+ * @return list of operators which contain adjusted memory limits.
+ */
+ private Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>>
+ ensureOperatorMemoryWithinLimits(Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> memoryPerOperator,
+ Map<DrillbitEndpoint, NodeResource> nodeResourceMap, int nodeLimit) {
+ // Get the physical operators which are above the node memory limit.
+ Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> onlyMemoryAboveLimitOperators = new HashMap<>();
+ memoryPerOperator.entrySet().stream().forEach((entry) -> {
+ onlyMemoryAboveLimitOperators.putIfAbsent(entry.getKey(), new ArrayList<>());
+ if (nodeResourceMap.get(entry.getKey()).getMemory() > nodeLimit) {
+ onlyMemoryAboveLimitOperators.get(entry.getKey()).addAll(entry.getValue());
+ }
+ });
+
+
+ // Compute the total memory required by the physical operators on the drillbits which are above node limit.
+ // Then use the total memory to adjust the memory requirement based on the permissible node limit.
+ Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> memoryAdjustedDrillbits = new HashMap<>();
+ onlyMemoryAboveLimitOperators.entrySet().stream().forEach(
+ entry -> {
+ Long totalMemory = entry.getValue().stream().mapToLong(Pair::getValue).sum();
+ List<Pair<PhysicalOperator, Long>> adjustedMemory = entry.getValue().stream().map(operatorMemory -> {
+ // formula to adjust the memory is (optimalMemory / totalMemory(this is for all the operators)) * permissible_node_limit.
+ return Pair.of(operatorMemory.getKey(), (long) Math.ceil(operatorMemory.getValue()/totalMemory * nodeLimit));
+ }).collect(Collectors.toList());
+ memoryAdjustedDrillbits.put(entry.getKey(), adjustedMemory);
+ }
+ );
+
+ // Get all the operations on drillbits which were adjusted for memory and merge them with operators which are not
+ // adjusted for memory.
+ Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> allDrillbits = new HashMap<>();
+ memoryPerOperator.entrySet().stream().filter((entry) -> !memoryAdjustedDrillbits.containsKey(entry.getKey())).forEach(
+ operatorMemory -> {
+ allDrillbits.put(operatorMemory.getKey(), operatorMemory.getValue());
+ }
+ );
+
+ memoryAdjustedDrillbits.entrySet().stream().forEach(
+ operatorMemory -> {
+ allDrillbits.put(operatorMemory.getKey(), operatorMemory.getValue());
+ }
+ );
+
+ // At this point allDrillbits contains the operators on all drillbits. The memory also is adjusted based on the nodeLimit and
+ // the ratio of their requirements.
+ return allDrillbits;
+ }
+} \ No newline at end of file