aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfigImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfigImpl.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfigImpl.java171
1 files changed, 171 insertions, 0 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfigImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfigImpl.java
new file mode 100644
index 000000000..948d1b943
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfigImpl.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config;
+
+import com.typesafe.config.Config;
+import org.apache.drill.exec.resourcemgr.NodeResources;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+import org.apache.drill.exec.resourcemgr.util.MemoryConfigParser;
+
+import java.util.UUID;
+
+import static org.apache.drill.exec.resourcemgr.config.RMCommonDefaults.MAX_ADMISSIBLE_QUERY_COUNT;
+import static org.apache.drill.exec.resourcemgr.config.RMCommonDefaults.MAX_WAITING_QUERY_COUNT;
+
+/**
+ * Parses and initialize QueueConfiguration for a {@link ResourcePool}. It also generates a unique UUID for each queue
+ * which will be later used by Drillbit to store in Zookeeper along with
+ * {@link org.apache.drill.exec.proto.beans.DrillbitEndpoint}. This UUID is used in the leader election mechanism in
+ * which Drillbit participates for each of the configured rm queue.
+ */
+public class QueryQueueConfigImpl implements QueryQueueConfig {
+
+ // Optional queue configurations
+ private static final String MAX_ADMISSIBLE_KEY = "max_admissible";
+
+ private static final String MAX_WAITING_KEY = "max_waiting";
+
+ private static final String MAX_WAIT_TIMEOUT_KEY = "max_wait_timeout";
+
+ private static final String WAIT_FOR_PREFERRED_NODES_KEY = "wait_for_preferred_nodes";
+
+ private static final String MAX_QUERY_MEMORY_PER_NODE_FORMAT = "([0-9]+)\\s*([kKmMgG]?)\\s*$";
+
+ // Required queue configurations in MAX_QUERY_MEMORY_PER_NODE_FORMAT pattern
+ private static final String MAX_QUERY_MEMORY_PER_NODE_KEY = "max_query_memory_per_node";
+
+ private final String queueUUID;
+
+ private final String queueName;
+
+ private int maxAdmissibleQuery;
+
+ private int maxWaitingQuery;
+
+ private int maxWaitingTimeout;
+
+ private boolean waitForPreferredNodes;
+
+ private final NodeResources queueResourceShare;
+
+ private NodeResources queryPerNodeResourceShare;
+
+ public QueryQueueConfigImpl(Config queueConfig, String poolName,
+ NodeResources queueNodeResource) throws RMConfigException {
+ this.queueUUID = UUID.randomUUID().toString();
+ this.queueName = poolName;
+ this.queueResourceShare = queueNodeResource;
+ parseQueueConfig(queueConfig);
+ }
+
+ /**
+ * Assigns either supplied or default values for the optional queue configuration. For required configuration uses
+ * the supplied value in queueConfig object or throws proper exception if not present
+ * @param queueConfig Config object for ResourcePool queue
+ * @throws RMConfigException in case of error while parsing config
+ */
+ private void parseQueueConfig(Config queueConfig) throws RMConfigException {
+ this.maxAdmissibleQuery = queueConfig.hasPath(MAX_ADMISSIBLE_KEY) ?
+ queueConfig.getInt(MAX_ADMISSIBLE_KEY) : MAX_ADMISSIBLE_QUERY_COUNT;
+ this.maxWaitingQuery = queueConfig.hasPath(MAX_WAITING_KEY) ?
+ queueConfig.getInt(MAX_WAITING_KEY) : MAX_WAITING_QUERY_COUNT;
+ this.maxWaitingTimeout = queueConfig.hasPath(MAX_WAIT_TIMEOUT_KEY) ?
+ queueConfig.getInt(MAX_WAIT_TIMEOUT_KEY) : RMCommonDefaults.MAX_WAIT_TIMEOUT_IN_MS;
+ this.waitForPreferredNodes = queueConfig.hasPath(WAIT_FOR_PREFERRED_NODES_KEY) ?
+ queueConfig.getBoolean(WAIT_FOR_PREFERRED_NODES_KEY) : RMCommonDefaults.WAIT_FOR_PREFERRED_NODES;
+ this.queryPerNodeResourceShare = parseAndGetNodeShare(queueConfig);
+ }
+
+ @Override
+ public String getQueueId() {
+ return queueUUID;
+ }
+
+ @Override
+ public String getQueueName() {
+ return queueName;
+ }
+
+ /**
+ * Total memory share of this queue in the cluster based on per node resource share. It assumes that the cluster is
+ * made up of homogeneous nodes in terms of resources
+ * @param numClusterNodes total number of available cluster nodes which can participate in this queue
+ * @return queue memory share in MB
+ */
+ @Override
+ public long getQueueTotalMemoryInMB(int numClusterNodes) {
+ return queueResourceShare.getMemoryInMB() * numClusterNodes;
+ }
+
+ @Override
+ public long getMaxQueryMemoryInMBPerNode() {
+ return queryPerNodeResourceShare.getMemoryInMB();
+ }
+
+ @Override
+ public long getMaxQueryTotalMemoryInMB(int numClusterNodes) {
+ return queryPerNodeResourceShare.getMemoryInMB() * numClusterNodes;
+ }
+
+ @Override
+ public boolean waitForPreferredNodes() {
+ return waitForPreferredNodes;
+ }
+
+ @Override
+ public int getMaxAdmissibleQueries() {
+ return maxAdmissibleQuery;
+ }
+
+ @Override
+ public int getMaxWaitingQueries() {
+ return maxWaitingQuery;
+ }
+
+ @Override
+ public int getWaitTimeoutInMs() {
+ return maxWaitingTimeout;
+ }
+
+ /**
+ * Parses queues {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY} configuration using
+ * {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_FORMAT} pattern to get memory value in bytes using
+ * {@link MemoryConfigParser#parseMemoryConfigString(String, String)} utility.
+ * @param queueConfig Queue Configuration object
+ * @return NodeResources created with value of {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY} in bytes
+ * @throws RMConfigException in case the config value is absent or doesn't follow the specified format
+ */
+ private NodeResources parseAndGetNodeShare(Config queueConfig) throws RMConfigException {
+ try {
+ long memoryPerNodeInBytes = MemoryConfigParser.parseMemoryConfigString(
+ queueConfig.getString(MAX_QUERY_MEMORY_PER_NODE_KEY), MAX_QUERY_MEMORY_PER_NODE_FORMAT);
+ return new NodeResources(memoryPerNodeInBytes, Integer.MAX_VALUE);
+ } catch (Exception ex) {
+ throw new RMConfigException(String.format("Failed while parsing %s for queue %s", MAX_QUERY_MEMORY_PER_NODE_KEY,
+ queueName), ex);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "{ QueueName: " + queueName + ", QueueId: " + queueUUID + ", QueuePerNodeResource(MB): " +
+ queryPerNodeResourceShare.toString() + ", MaxQueryMemPerNode(MB): " + queryPerNodeResourceShare.toString() +
+ ", MaxAdmissible: " + maxAdmissibleQuery + ", MaxWaiting: " + maxWaitingQuery + ", MaxWaitTimeout: " +
+ maxWaitingTimeout + ", WaitForPreferredNodes: " + waitForPreferredNodes + "}";
+ }
+}