aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTreeImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTreeImpl.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTreeImpl.java164
1 files changed, 164 insertions, 0 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTreeImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTreeImpl.java
new file mode 100644
index 000000000..cc12a0945
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTreeImpl.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config;
+
+import com.typesafe.config.Config;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.NodeResources;
+import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+import org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicy;
+import org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicy.SelectionPolicy;
+import org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicyFactory;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.drill.exec.resourcemgr.config.RMCommonDefaults.ROOT_POOL_DEFAULT_MEMORY_PERCENT;
+import static org.apache.drill.exec.resourcemgr.config.RMCommonDefaults.ROOT_POOL_DEFAULT_QUEUE_SELECTION_POLICY;
+
+/**
+ * Parses and initializes configuration for ResourceManagement in Drill. It takes care of creating all the ResourcePools
+ * recursively maintaining the n-ary tree hierarchy defined in the configuration.
+ */
+public class ResourcePoolTreeImpl implements ResourcePoolTree {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResourcePoolTreeImpl.class);
+
+ private static final String ROOT_POOL_MEMORY_SHARE_KEY = "memory";
+
+ private static final String ROOT_POOL_QUEUE_SELECTION_POLICY_KEY = "queue_selection_policy";
+
+ public static final String ROOT_POOL_CONFIG_KEY = "drill.exec.rm";
+
+ private final ResourcePool rootPool;
+
+ private final Config rmConfig;
+
+ private final NodeResources totalNodeResources;
+
+ private final double resourceShare;
+
+ private final Map<String, QueryQueueConfig> leafQueues = Maps.newHashMap();
+
+ private final QueueSelectionPolicy selectionPolicy;
+
+ public ResourcePoolTreeImpl(Config rmConfig, long totalNodeMemory,
+ int totalNodePhysicalCpu, int vFactor) throws RMConfigException {
+ this(rmConfig, new NodeResources(totalNodeMemory, totalNodePhysicalCpu, vFactor));
+ }
+
+ private ResourcePoolTreeImpl(Config rmConfig, NodeResources totalNodeResources) throws RMConfigException {
+ try {
+ this.rmConfig = rmConfig;
+ this.totalNodeResources = totalNodeResources;
+ this.resourceShare = this.rmConfig.hasPath(ROOT_POOL_MEMORY_SHARE_KEY) ?
+ this.rmConfig.getDouble(ROOT_POOL_MEMORY_SHARE_KEY) : ROOT_POOL_DEFAULT_MEMORY_PERCENT;
+ this.selectionPolicy = QueueSelectionPolicyFactory.createSelectionPolicy(
+ this.rmConfig.hasPath(ROOT_POOL_QUEUE_SELECTION_POLICY_KEY) ?
+ SelectionPolicy.valueOf(rmConfig.getString(ROOT_POOL_QUEUE_SELECTION_POLICY_KEY)) :
+ ROOT_POOL_DEFAULT_QUEUE_SELECTION_POLICY);
+ rootPool = new ResourcePoolImpl(this.rmConfig.getConfig(ROOT_POOL_CONFIG_KEY), resourceShare, 1.0,
+ totalNodeResources, null, leafQueues);
+ logger.debug("Dumping RM configuration {}", toString());
+ } catch (RMConfigException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new RMConfigException(String.format("Failure while parsing root pool configuration. " +
+ "[Details: Config: %s]", rmConfig), ex);
+ }
+ }
+
+ /**
+ * @return root {@link ResourcePool}
+ */
+ @Override
+ public ResourcePool getRootPool() {
+ return rootPool;
+ }
+
+ /**
+ * @return Map containing all the configured leaf queues
+ */
+ @Override
+ public Map<String, QueryQueueConfig> getAllLeafQueues() {
+ return leafQueues;
+ }
+
+ @Override
+ public double getResourceShare() {
+ return resourceShare;
+ }
+
+ /**
+ * Creates {@link QueueAssignmentResult} which contains list of all selected leaf ResourcePools and all the rejected
+ * ResourcePools for the provided query. Performs DFS of the ResourcePoolTree to traverse and find
+ * selected/rejected ResourcePools.
+ * @param queryContext {@link QueryContext} which contains metadata required for the given query
+ * @return {@link QueueAssignmentResult} populated with selected/rejected ResourcePools
+ */
+ @Override
+ public QueueAssignmentResult selectAllQueues(QueryContext queryContext) {
+ QueueAssignmentResult assignmentResult = new QueueAssignmentResult();
+ rootPool.visitAndSelectPool(assignmentResult, queryContext);
+ return assignmentResult;
+ }
+
+ /**
+ * Selects a leaf queue out of all the possibles leaf queues returned by
+ * {@link ResourcePoolTree#selectAllQueues(QueryContext)} for a given query based on the configured
+ * {@link QueueSelectionPolicy}. If none of the queue qualifies then it throws {@link QueueSelectionException}
+ * @param queryContext {@link QueryContext} which contains metadata for given query
+ * @param queryMaxNodeResource Max resources on a node required for given query
+ * @return {@link QueryQueueConfig} for the selected leaf queue
+ * @throws QueueSelectionException If no leaf queue is selected
+ */
+ @Override
+ public QueryQueueConfig selectOneQueue(QueryContext queryContext, NodeResources queryMaxNodeResource)
+ throws QueueSelectionException {
+ final QueueAssignmentResult assignmentResult = selectAllQueues(queryContext);
+ final List<ResourcePool> selectedPools = assignmentResult.getSelectedLeafPools();
+ if (selectedPools.size() == 0) {
+ throw new QueueSelectionException(String.format("No resource pools to choose from for the query: %s",
+ queryContext.getQueryId()));
+ } else if (selectedPools.size() == 1) {
+ return selectedPools.get(0).getQueryQueue();
+ }
+
+ return selectionPolicy.selectQueue(selectedPools, queryContext, queryMaxNodeResource).getQueryQueue();
+ }
+
+ /**
+ * @return {@link QueueSelectionPolicy} which is used to chose one queue out of all the available options for a query
+ */
+ @Override
+ public QueueSelectionPolicy getSelectionPolicyInUse() {
+ return selectionPolicy;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{ NodeResources: ").append(totalNodeResources.toString());
+ sb.append(", ResourcePercent: ").append(resourceShare);
+ sb.append(", SelectionPolicy: ").append(selectionPolicy.getSelectionPolicy());
+ sb.append(", RootPool: ").append(rootPool.toString());
+ sb.append("}");
+ return sb.toString();
+ }
+}