aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache
diff options
context:
space:
mode:
authorSorabh Hamirwasia <shamirwasia@maprtech.com>2019-02-07 13:54:14 -0800
committerkarthik <kmanivannan@maprtech.com>2019-03-08 13:43:37 -0800
commit362cded3fbcb99c0c4f951bb97016cb2b0542f94 (patch)
treecc16b473832dd4c24a3d614a2dc0d6d5297ffb28 /exec/java-exec/src/main/java/org/apache
parent75ca5238e627e8d8612d56f654da06992837f6af (diff)
DRILL-7046: Support for loading and parsing new RM config file
closes #1652
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/NodeResources.java66
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfig.java71
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfigImpl.java171
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueueAssignmentResult.java79
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/RMCommonDefaults.java39
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePool.java70
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolImpl.java268
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTree.java45
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTreeImpl.java164
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/exception/QueueSelectionException.java31
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/exception/RMConfigException.java31
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/package-info.java108
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/AbstractQueueSelectionPolicy.java41
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/BestFitQueueSelection.java93
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/DefaultQueueSelection.java52
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/QueueSelectionPolicy.java48
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/QueueSelectionPolicyFactory.java53
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/RandomQueueSelection.java51
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/package-info.java45
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AbstractResourcePoolSelector.java40
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AclSelector.java285
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AndSelector.java56
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ComplexSelectors.java60
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/DefaultSelector.java48
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/NotEqualSelector.java62
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/OrSelector.java58
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ResourcePoolSelector.java58
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ResourcePoolSelectorFactory.java63
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/TagSelector.java76
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/package-info.java59
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/package-info.java21
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/util/MemoryConfigParser.java70
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java47
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java33
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java34
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java24
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandlerService.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedResourceManager.java82
40 files changed, 2666 insertions, 69 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 6fce8b707..93adda17c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -666,6 +666,7 @@ public final class ExecConstants {
public static final String ORDERED_MUX_EXCHANGE = "planner.enable_ordered_mux_exchange";
// Resource management boot-time options.
+ public static final String RM_ENABLED = "drill.exec.rm.enabled";
public static final String MAX_MEMORY_PER_NODE = "drill.exec.rm.memory_per_node";
public static final String MAX_CPUS_PER_NODE = "drill.exec.rm.cpus_per_node";
@@ -685,6 +686,16 @@ public final class ExecConstants {
public static final LongValidator QUEUE_TIMEOUT = new PositiveLongValidator("exec.queue.timeout_millis", Long.MAX_VALUE,
new OptionDescription("Indicates how long a query can wait in queue before the query fails. Range: 0-9223372036854775807"));
+ // New Smart RM boot time configs
+ public static final String RM_QUERY_TAGS_KEY = "exec.rm.queryTags";
+ public static final StringValidator RM_QUERY_TAGS_VALIDATOR = new StringValidator(RM_QUERY_TAGS_KEY,
+ new OptionDescription("Allows user to set coma separated list of tags for all the queries submitted over a session"));
+
+ public static final String RM_QUEUES_WAIT_FOR_PREFERRED_NODES_KEY = "exec.rm.queues.wait_for_preferred_nodes";
+ public static final BooleanValidator RM_QUEUES_WAIT_FOR_PREFERRED_NODES_VALIDATOR = new BooleanValidator
+ (RM_QUEUES_WAIT_FOR_PREFERRED_NODES_KEY, new OptionDescription("Allows user to enable/disable " +
+ "wait_for_preferred_nodes configuration across rm queues for all the queries submitted over a session"));
+
// Ratio of memory for small queries vs. large queries.
// Each small query gets 1 unit, each large query gets QUEUE_MEMORY_RATIO units.
// A lower limit of 1 enforces the intuition that a large query should never get
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
index 137969a34..143560529 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
@@ -39,7 +39,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
import org.apache.drill.shaded.guava.com.google.common.io.Files;
import com.typesafe.config.ConfigFactory;
import org.apache.commons.io.FileUtils;
-import org.apache.drill.common.config.CommonConstants;
+import org.apache.drill.common.config.ConfigConstants;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.FunctionCall;
@@ -445,7 +445,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
*/
private ScanResult scan(ClassLoader classLoader, Path path, URL[] urls) throws IOException {
Enumeration<URL> markerFileEnumeration = classLoader.getResources(
- CommonConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME);
+ ConfigConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME);
while (markerFileEnumeration.hasMoreElements()) {
URL markerFile = markerFileEnumeration.nextElement();
if (markerFile.getPath().contains(path.toUri().getPath())) {
@@ -462,7 +462,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
}
}
throw new JarValidationException(String.format("Marker file %s is missing in %s",
- CommonConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME, path.getName()));
+ ConfigConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME, path.getName()));
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/NodeResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/NodeResources.java
new file mode 100644
index 000000000..485702c8f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/NodeResources.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr;
+
+/**
+ * Provides resources for a node in cluster. Currently it is used to support only 2 kind of resources:
+ * <ul>
+ * <li>Memory</li>
+ * <li>Virtual CPU count</li>
+ * </ul>
+ * It also has a version field to support extensibility in future to add other resources like network, disk, etc
+ */
+public class NodeResources {
+
+ private final int version;
+
+ private final long memoryInBytes;
+
+ private final int numVirtualCpu;
+
+ public NodeResources(long memoryInBytes, int numVirtualCpu) {
+ this.memoryInBytes = memoryInBytes;
+ this.numVirtualCpu = numVirtualCpu;
+ this.version = 1;
+ }
+
+ public NodeResources(long memoryInBytes, int numPhysicalCpu, int vFactor) {
+ this(memoryInBytes, numPhysicalCpu * vFactor);
+ }
+
+ public long getMemoryInBytes() {
+ return memoryInBytes;
+ }
+
+ public long getMemoryInMB() {
+ return Math.round((memoryInBytes / 1024L) / 1024L);
+ }
+
+ public long getMemoryInGB() {
+ return Math.round(getMemoryInMB() / 1024L);
+ }
+
+ public int getNumVirtualCpu() {
+ return numVirtualCpu;
+ }
+
+ @Override
+ public String toString() {
+ return "{ Version: " + version + ", MemoryInBytes: " + memoryInBytes + ", VirtualCPU: " + numVirtualCpu + " }";
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfig.java
new file mode 100644
index 000000000..db79ec573
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfig.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config;
+
+/**
+ * Interface which defines an implementation for managing queue configuration of a leaf {@link ResourcePool}
+ */
+public interface QueryQueueConfig {
+ String getQueueId();
+
+ String getQueueName();
+
+ /**
+ * Given number of available nodes in the cluster what is the total memory resource share of this queue cluster wide
+ * @param numClusterNodes number of available cluster nodes
+ * @return Queue's total cluster memory resource share
+ */
+ long getQueueTotalMemoryInMB(int numClusterNodes);
+
+ /**
+ * @return Maximum query memory (in MB) that a query in this queue can consume on a node
+ */
+ long getMaxQueryMemoryInMBPerNode();
+
+ /**
+ * Given number of available nodes in the cluster what is the max memory in MB a query in this queue can be assigned
+ * cluster wide
+ * @param numClusterNodes number of available cluster nodes
+ * @return Maximum query memory (in MB) in this queue
+ */
+ long getMaxQueryTotalMemoryInMB(int numClusterNodes);
+
+ /**
+ * Determines if admitted queries in this queue should wait in the queue if resources on the preferred assigned
+ * nodes of a query determined by planner is unavailable.
+ * @return <tt>true</tt> indicates an admitted query to wait until resources on all the preferred nodes are available or
+ * wait until timeout is reached, <tt>false</tt> indicates an admitted query to not wait and find other nodes with
+ * available resources if resources on a preferred node is unavailable.
+ */
+ boolean waitForPreferredNodes();
+
+ /**
+ * @return Maximum number of queries that can be admitted in the queue
+ */
+ int getMaxAdmissibleQueries();
+
+ /**
+ * @return Maximum number of queries that will be allowed to wait in the queue before failing a query right away
+ */
+ int getMaxWaitingQueries();
+
+ /**
+ * @return Maximum time in milliseconds for which a query can be in waiting state inside a queue
+ */
+ int getWaitTimeoutInMs();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfigImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfigImpl.java
new file mode 100644
index 000000000..948d1b943
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfigImpl.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config;
+
+import com.typesafe.config.Config;
+import org.apache.drill.exec.resourcemgr.NodeResources;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+import org.apache.drill.exec.resourcemgr.util.MemoryConfigParser;
+
+import java.util.UUID;
+
+import static org.apache.drill.exec.resourcemgr.config.RMCommonDefaults.MAX_ADMISSIBLE_QUERY_COUNT;
+import static org.apache.drill.exec.resourcemgr.config.RMCommonDefaults.MAX_WAITING_QUERY_COUNT;
+
+/**
+ * Parses and initialize QueueConfiguration for a {@link ResourcePool}. It also generates a unique UUID for each queue
+ * which will be later used by Drillbit to store in Zookeeper along with
+ * {@link org.apache.drill.exec.proto.beans.DrillbitEndpoint}. This UUID is used in the leader election mechanism in
+ * which Drillbit participates for each of the configured rm queue.
+ */
+public class QueryQueueConfigImpl implements QueryQueueConfig {
+
+ // Optional queue configurations
+ private static final String MAX_ADMISSIBLE_KEY = "max_admissible";
+
+ private static final String MAX_WAITING_KEY = "max_waiting";
+
+ private static final String MAX_WAIT_TIMEOUT_KEY = "max_wait_timeout";
+
+ private static final String WAIT_FOR_PREFERRED_NODES_KEY = "wait_for_preferred_nodes";
+
+ private static final String MAX_QUERY_MEMORY_PER_NODE_FORMAT = "([0-9]+)\\s*([kKmMgG]?)\\s*$";
+
+ // Required queue configurations in MAX_QUERY_MEMORY_PER_NODE_FORMAT pattern
+ private static final String MAX_QUERY_MEMORY_PER_NODE_KEY = "max_query_memory_per_node";
+
+ private final String queueUUID;
+
+ private final String queueName;
+
+ private int maxAdmissibleQuery;
+
+ private int maxWaitingQuery;
+
+ private int maxWaitingTimeout;
+
+ private boolean waitForPreferredNodes;
+
+ private final NodeResources queueResourceShare;
+
+ private NodeResources queryPerNodeResourceShare;
+
+ public QueryQueueConfigImpl(Config queueConfig, String poolName,
+ NodeResources queueNodeResource) throws RMConfigException {
+ this.queueUUID = UUID.randomUUID().toString();
+ this.queueName = poolName;
+ this.queueResourceShare = queueNodeResource;
+ parseQueueConfig(queueConfig);
+ }
+
+ /**
+ * Assigns either supplied or default values for the optional queue configuration. For required configuration uses
+ * the supplied value in queueConfig object or throws proper exception if not present
+ * @param queueConfig Config object for ResourcePool queue
+ * @throws RMConfigException in case of error while parsing config
+ */
+ private void parseQueueConfig(Config queueConfig) throws RMConfigException {
+ this.maxAdmissibleQuery = queueConfig.hasPath(MAX_ADMISSIBLE_KEY) ?
+ queueConfig.getInt(MAX_ADMISSIBLE_KEY) : MAX_ADMISSIBLE_QUERY_COUNT;
+ this.maxWaitingQuery = queueConfig.hasPath(MAX_WAITING_KEY) ?
+ queueConfig.getInt(MAX_WAITING_KEY) : MAX_WAITING_QUERY_COUNT;
+ this.maxWaitingTimeout = queueConfig.hasPath(MAX_WAIT_TIMEOUT_KEY) ?
+ queueConfig.getInt(MAX_WAIT_TIMEOUT_KEY) : RMCommonDefaults.MAX_WAIT_TIMEOUT_IN_MS;
+ this.waitForPreferredNodes = queueConfig.hasPath(WAIT_FOR_PREFERRED_NODES_KEY) ?
+ queueConfig.getBoolean(WAIT_FOR_PREFERRED_NODES_KEY) : RMCommonDefaults.WAIT_FOR_PREFERRED_NODES;
+ this.queryPerNodeResourceShare = parseAndGetNodeShare(queueConfig);
+ }
+
+ @Override
+ public String getQueueId() {
+ return queueUUID;
+ }
+
+ @Override
+ public String getQueueName() {
+ return queueName;
+ }
+
+ /**
+ * Total memory share of this queue in the cluster based on per node resource share. It assumes that the cluster is
+ * made up of homogeneous nodes in terms of resources
+ * @param numClusterNodes total number of available cluster nodes which can participate in this queue
+ * @return queue memory share in MB
+ */
+ @Override
+ public long getQueueTotalMemoryInMB(int numClusterNodes) {
+ return queueResourceShare.getMemoryInMB() * numClusterNodes;
+ }
+
+ @Override
+ public long getMaxQueryMemoryInMBPerNode() {
+ return queryPerNodeResourceShare.getMemoryInMB();
+ }
+
+ @Override
+ public long getMaxQueryTotalMemoryInMB(int numClusterNodes) {
+ return queryPerNodeResourceShare.getMemoryInMB() * numClusterNodes;
+ }
+
+ @Override
+ public boolean waitForPreferredNodes() {
+ return waitForPreferredNodes;
+ }
+
+ @Override
+ public int getMaxAdmissibleQueries() {
+ return maxAdmissibleQuery;
+ }
+
+ @Override
+ public int getMaxWaitingQueries() {
+ return maxWaitingQuery;
+ }
+
+ @Override
+ public int getWaitTimeoutInMs() {
+ return maxWaitingTimeout;
+ }
+
+ /**
+ * Parses queues {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY} configuration using
+ * {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_FORMAT} pattern to get memory value in bytes using
+ * {@link MemoryConfigParser#parseMemoryConfigString(String, String)} utility.
+ * @param queueConfig Queue Configuration object
+ * @return NodeResources created with value of {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY} in bytes
+ * @throws RMConfigException in case the config value is absent or doesn't follow the specified format
+ */
+ private NodeResources parseAndGetNodeShare(Config queueConfig) throws RMConfigException {
+ try {
+ long memoryPerNodeInBytes = MemoryConfigParser.parseMemoryConfigString(
+ queueConfig.getString(MAX_QUERY_MEMORY_PER_NODE_KEY), MAX_QUERY_MEMORY_PER_NODE_FORMAT);
+ return new NodeResources(memoryPerNodeInBytes, Integer.MAX_VALUE);
+ } catch (Exception ex) {
+ throw new RMConfigException(String.format("Failed while parsing %s for queue %s", MAX_QUERY_MEMORY_PER_NODE_KEY,
+ queueName), ex);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "{ QueueName: " + queueName + ", QueueId: " + queueUUID + ", QueuePerNodeResource(MB): " +
+ queryPerNodeResourceShare.toString() + ", MaxQueryMemPerNode(MB): " + queryPerNodeResourceShare.toString() +
+ ", MaxAdmissible: " + maxAdmissibleQuery + ", MaxWaiting: " + maxWaitingQuery + ", MaxWaitTimeout: " +
+ maxWaitingTimeout + ", WaitForPreferredNodes: " + waitForPreferredNodes + "}";
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueueAssignmentResult.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueueAssignmentResult.java
new file mode 100644
index 000000000..5de2e34fe
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueueAssignmentResult.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Used to keep track of selected leaf and all rejected {@link ResourcePool} for the provided query.
+ * It is used by {@link ResourcePoolImpl#visitAndSelectPool(QueueAssignmentResult, QueryContext)} to store
+ * information about all the matching and non-matching ResourcePools for a query when ResourcePool selector is
+ * evaluated against query metadata. Later it is used by
+ * {@link ResourcePool#visitAndSelectPool(QueueAssignmentResult, QueryContext)} to apply
+ * {@link org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicy} to select only one queue out of all
+ * the selected queues for a query. It also provides an API to dump all the debug information to know which pools were
+ * selected and rejected for a query.
+ */
+public class QueueAssignmentResult {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueueAssignmentResult.class);
+
+ private final List<ResourcePool> selectedLeafPools = new ArrayList<>();
+
+ private final List<ResourcePool> rejectedPools = new ArrayList<>();
+
+ public void addSelectedPool(ResourcePool pool) {
+ Preconditions.checkState(pool.isLeafPool(), "Selected pool %s is not a leaf pool",
+ pool.getPoolName());
+ selectedLeafPools.add(pool);
+ }
+
+ public void addRejectedPool(ResourcePool pool) {
+ rejectedPools.add(pool);
+ }
+
+ public List<ResourcePool> getSelectedLeafPools() {
+ return selectedLeafPools;
+ }
+
+ public List<ResourcePool> getRejectedPools() {
+ return rejectedPools;
+ }
+
+ public void logAssignmentResult(String queryId) {
+ logger.debug("For query {}. Details[{}]", queryId, toString());
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Selected Leaf Pools: {");
+ for (ResourcePool pool : selectedLeafPools) {
+ sb.append(pool.getPoolName()).append(", ");
+ }
+ sb.append("} and Rejected pools: {");
+ for (ResourcePool pool : rejectedPools) {
+ sb.append(pool.getPoolName()).append(", ");
+ }
+ sb.append("}");
+ return sb.toString();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/RMCommonDefaults.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/RMCommonDefaults.java
new file mode 100644
index 000000000..dc486fe09
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/RMCommonDefaults.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config;
+
+import org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicy.SelectionPolicy;
+
+/**
+ * Defines all the default values used for the optional configurations for ResourceManagement
+ */
+public final class RMCommonDefaults {
+
+ public static final int MAX_ADMISSIBLE_QUERY_COUNT = 10;
+
+ public static final int MAX_WAITING_QUERY_COUNT = 10;
+
+ public static final int MAX_WAIT_TIMEOUT_IN_MS = 30_000;
+
+ public static final boolean WAIT_FOR_PREFERRED_NODES = true;
+
+ public static final double ROOT_POOL_DEFAULT_MEMORY_PERCENT = 0.9;
+
+ public static final SelectionPolicy ROOT_POOL_DEFAULT_QUEUE_SELECTION_POLICY = SelectionPolicy.BESTFIT;
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePool.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePool.java
new file mode 100644
index 000000000..c2f851224
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePool.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.config.selectors.ResourcePoolSelector;
+
+import java.util.List;
+
+/**
+ * Interface which defines an implementation of ResourcePool configuration for {@link ResourcePoolTree}
+ */
+public interface ResourcePool {
+ String getPoolName();
+
+ boolean isLeafPool();
+
+ boolean isDefaultPool();
+
+ // Only valid for leaf pool since it will have a queue assigned to it with this configuration
+ long getMaxQueryMemoryPerNode();
+
+ /**
+ * Evaluates this pool selector to see if the query can be admitted in this pool. If yes then evaluates all
+ * the child pools selectors as well. During traversal it builds the QueueAssignment result which consists of all
+ * the selected leaf pools and all rejected intermediate pools.
+ * @param assignmentResult
+ * @param queryContext
+ */
+ void visitAndSelectPool(QueueAssignmentResult assignmentResult, QueryContext queryContext);
+
+ /**
+ * @return Percentage of memory share assigned to this pool
+ */
+ double getPoolMemoryShare();
+
+ long getPoolMemoryInMB(int numClusterNodes);
+
+ /**
+ * Only valid for leaf pool.
+ * @return Returns queue configuration assigned to this leaf pool
+ */
+ QueryQueueConfig getQueryQueue();
+
+ /**
+ * @return Full path of the resource pool from root to this pool
+ */
+ String getFullPath();
+
+ ResourcePool getParentPool();
+
+ List<ResourcePool> getChildPools();
+
+ ResourcePoolSelector getSelector();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolImpl.java
new file mode 100644
index 000000000..79cf6c82f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolImpl.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config;
+
+import com.typesafe.config.Config;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.NodeResources;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+import org.apache.drill.exec.resourcemgr.config.selectors.DefaultSelector;
+import org.apache.drill.exec.resourcemgr.config.selectors.ResourcePoolSelector;
+import org.apache.drill.exec.resourcemgr.config.selectors.ResourcePoolSelectorFactory;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Parses and initializes all the provided configuration for a ResourcePool defined in RM configuration. It takes
+ * care of creating all the child ResourcePools belonging to this Resource Pool, {@link ResourcePoolSelector} for this
+ * pool and a {@link QueryQueueConfig} if it's a leaf pool.
+ */
+public class ResourcePoolImpl implements ResourcePool {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResourcePoolImpl.class);
+
+ public static final String POOL_NAME_KEY = "pool_name";
+
+ public static final String POOL_MEMORY_SHARE_KEY = "memory";
+
+ public static final String POOL_CHILDREN_POOLS_KEY = "child_pools";
+
+ public static final String POOL_SELECTOR_KEY = "selector";
+
+ public static final String POOL_QUEUE_KEY = "queue";
+
+ private String poolName;
+
+ private List<ResourcePool> childPools;
+
+ private final double parentResourceShare;
+
+ private final double poolResourceShare;
+
+ private QueryQueueConfig assignedQueue;
+
+ private final ResourcePoolSelector assignedSelector;
+
+ private NodeResources poolResourcePerNode;
+
+ private final ResourcePool parentPool;
+
+ ResourcePoolImpl(Config poolConfig, double poolAbsResourceShare, double parentResourceShare,
+ NodeResources parentNodeResource, ResourcePool parentPool,
+ Map<String, QueryQueueConfig> leafQueueCollector) throws RMConfigException {
+ try {
+ this.poolName = poolConfig.getString(POOL_NAME_KEY);
+ this.parentResourceShare = parentResourceShare;
+ this.poolResourceShare = poolAbsResourceShare * this.parentResourceShare;
+ this.parentPool = parentPool;
+ assignedSelector = ResourcePoolSelectorFactory.createSelector(poolConfig.hasPath(POOL_SELECTOR_KEY)
+ ? poolConfig.getConfig(POOL_SELECTOR_KEY) : null);
+ parseAndCreateChildPools(poolConfig, parentNodeResource, leafQueueCollector);
+ } catch (RMConfigException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new RMConfigException(String.format("Failure while parsing configuration for pool: %s. [Details: " +
+ "PoolConfig: %s]", poolName, poolConfig), ex);
+ }
+ }
+
+ @Override
+ public String getPoolName() {
+ return poolName;
+ }
+
+ /**
+ * Determines if this ResourcePool is a leaf pool or not which will have a queue associated with it
+ * @return <tt>true</tt> If a leaf pool, <tt>false</tt> otherwise
+ */
+ @Override
+ public boolean isLeafPool() {
+ return childPools == null && assignedQueue != null;
+ }
+
+ /**
+ * Determines if this ResourcePool is a default pool or not which will act as a sink for all the queries
+ * @return <tt>true</tt> If a Default pool, <tt>false</tt> otherwise
+ */
+ @Override
+ public boolean isDefaultPool() {
+ return (assignedSelector instanceof DefaultSelector);
+ }
+
+ @Override
+ public long getMaxQueryMemoryPerNode() {
+ Preconditions.checkState(isLeafPool() && assignedQueue != null, "max_query_memory_per_node is " +
+ "only valid for leaf level pools which has a queue assigned to it [Details: PoolName: %s]", poolName);
+ return assignedQueue.getMaxQueryMemoryInMBPerNode();
+ }
+
+ /**
+ * Used to determine if a ResourcePool is selected for a given query or not. It uses the assigned selector of this
+ * ResourcePool which takes in query metadata to determine if a query is allowed in this pool.
+ * @param assignmentResult Used to keep track of all selected leaf pools and all rejected pools for given query
+ * @param queryContext Contains query metadata like user, groups, tags, etc used by ResourcePoolSelector
+ */
+ @Override
+ public void visitAndSelectPool(QueueAssignmentResult assignmentResult, QueryContext queryContext) {
+ if (assignedSelector.isQuerySelected(queryContext)) {
+ if (isLeafPool()) {
+ assignmentResult.addSelectedPool(this);
+ } else {
+ // Check for each of the child pools
+ for (ResourcePool childPool : childPools) {
+ childPool.visitAndSelectPool(assignmentResult, queryContext);
+ }
+ }
+ } else {
+ assignmentResult.addRejectedPool(this);
+ }
+ }
+
+ /**
+ * Actual percentage share of memory assigned to this ResourcePool
+ * @return Pool memory share in percentage
+ */
+ @Override
+ public double getPoolMemoryShare() {
+ return poolResourceShare;
+ }
+
+ /**
+ * Total memory share in MB assigned to this ResourcePool
+ * @param numClusterNodes number of available cluster nodes for this pool
+ * @return Pool memory share in MB
+ */
+ @Override
+ public long getPoolMemoryInMB(int numClusterNodes) {
+ return poolResourcePerNode.getMemoryInMB() * numClusterNodes;
+ }
+
+ /**
+ * Parses and creates all the child ResourcePools if this is an intermediate pool or QueryQueueConfig is its a leaf
+ * resource pool
+ * @param poolConfig Config object for this ResourcePool
+ * @param parentResource Parent ResourcePool NodeResources
+ * @param leafQueueCollector Collector which keeps track of all leaf queues
+ * @throws RMConfigException in case of bad configuration
+ */
+ private void parseAndCreateChildPools(Config poolConfig, NodeResources parentResource,
+ Map<String, QueryQueueConfig> leafQueueCollector) throws RMConfigException {
+ this.poolResourcePerNode = new NodeResources(Math.round(parentResource.getMemoryInBytes() * poolResourceShare),
+ parentResource.getNumVirtualCpu());
+ if (poolConfig.hasPath(POOL_CHILDREN_POOLS_KEY)) {
+ childPools = Lists.newArrayList();
+ List<? extends Config> childPoolsConfig = poolConfig.getConfigList(POOL_CHILDREN_POOLS_KEY);
+ logger.debug("Creating {} child pools for parent pool {}", childPoolsConfig.size(), poolName);
+ for (Config childConfig : childPoolsConfig) {
+ try {
+ final ResourcePool childPool = new ResourcePoolImpl(childConfig, childConfig.getDouble(POOL_MEMORY_SHARE_KEY),
+ poolResourceShare, poolResourcePerNode, this, leafQueueCollector);
+ childPools.add(childPool);
+ } catch (RMConfigException ex) {
+ logger.error("Failure while configuring child ResourcePool. [Details: PoolName: {}, ChildPoolConfig with " +
+ "error: {}]", poolName, childConfig);
+ throw ex;
+ } catch (Exception ex) {
+ throw new RMConfigException(String.format("Failure while configuring the child ResourcePool. [Details: " +
+ "PoolName: %s, ChildPoolConfig with error: %s]", poolName, childConfig), ex);
+ }
+ }
+
+ if (childPools.isEmpty()) {
+ throw new RMConfigException(String.format("Empty config for child_pools is not allowed. Please configure the " +
+ "child_pools property of pool %s correctly or associate a queue with it with no child_pools", poolName));
+ }
+ } else {
+ logger.info("Resource Pool {} is a leaf level pool with queue assigned to it", poolName);
+
+ if (leafQueueCollector.containsKey(poolName)) {
+ throw new RMConfigException(String.format("Found non-unique leaf pools with name: %s and config: %s. Leaf " +
+ "pool names has to be unique since they represent a queue.", poolName, poolConfig));
+ }
+ assignedQueue = new QueryQueueConfigImpl(poolConfig.getConfig(POOL_QUEUE_KEY), poolName, poolResourcePerNode);
+ leafQueueCollector.put(poolName, assignedQueue);
+ }
+ }
+
+ /**
+ * If this a leaf pool then returns the {@link QueryQueueConfig} for the queue associated with this pool
+ * @return {@link QueryQueueConfig} object for this pool
+ */
+ @Override
+ public QueryQueueConfig getQueryQueue() {
+ Preconditions.checkState(isLeafPool() && assignedQueue != null, "QueryQueue is only " +
+ "valid for leaf level pools.[Details: PoolName: %s]", poolName);
+ return assignedQueue;
+ }
+
+ @Override
+ public ResourcePool getParentPool() {
+ return parentPool;
+ }
+
+ /**
+ * Returns full path in terms of concatenated pool names from root pool to this pool in {@link ResourcePoolTree}
+ * @return String with pool names from root to this pool
+ */
+ @Override
+ public String getFullPath() {
+ StringBuilder pathBuilder = new StringBuilder(poolName);
+ ResourcePool parent = parentPool;
+ while(parent != null) {
+ pathBuilder.append(parent.getPoolName());
+ parent = parent.getParentPool();
+ }
+
+ return pathBuilder.toString();
+ }
+
+ @Override
+ public List<ResourcePool> getChildPools() {
+ Preconditions.checkState(!isLeafPool() && assignedQueue == null,
+ "There are no child pools for a leaf ResourcePool");
+ return childPools;
+ }
+
+ @Override
+ public ResourcePoolSelector getSelector() {
+ return assignedSelector;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{PoolName: ").append(poolName);
+ sb.append(", PoolResourceShare: ").append(poolResourceShare);
+ sb.append(", Selector: ").append(assignedSelector.getSelectorType());
+ if (isLeafPool()) {
+ sb.append(", Queue: [").append(assignedQueue.toString()).append("]");
+ } else {
+ sb.append(", ChildPools: [");
+
+ for (ResourcePool childPool : childPools) {
+ sb.append(childPool.toString());
+ sb.append(", ");
+ }
+ sb.append("]");
+ }
+ sb.append("}");
+ return sb.toString();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTree.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTree.java
new file mode 100644
index 000000000..309bb24db
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTree.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.NodeResources;
+import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException;
+import org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicy;
+
+import java.util.Map;
+
+/**
+ * Interface which defines the implementation of a hierarchical configuration for all the ResourcePool that will be
+ * used for ResourceManagement
+ */
+public interface ResourcePoolTree {
+
+ ResourcePool getRootPool();
+
+ Map<String, QueryQueueConfig> getAllLeafQueues();
+
+ double getResourceShare();
+
+ QueueAssignmentResult selectAllQueues(QueryContext queryContext);
+
+ QueryQueueConfig selectOneQueue(QueryContext queryContext, NodeResources queryMaxNodeResource)
+ throws QueueSelectionException;
+
+ QueueSelectionPolicy getSelectionPolicyInUse();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTreeImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTreeImpl.java
new file mode 100644
index 000000000..cc12a0945
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTreeImpl.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config;
+
+import com.typesafe.config.Config;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.NodeResources;
+import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+import org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicy;
+import org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicy.SelectionPolicy;
+import org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicyFactory;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.drill.exec.resourcemgr.config.RMCommonDefaults.ROOT_POOL_DEFAULT_MEMORY_PERCENT;
+import static org.apache.drill.exec.resourcemgr.config.RMCommonDefaults.ROOT_POOL_DEFAULT_QUEUE_SELECTION_POLICY;
+
+/**
+ * Parses and initializes configuration for ResourceManagement in Drill. It takes care of creating all the ResourcePools
+ * recursively maintaining the n-ary tree hierarchy defined in the configuration.
+ */
+public class ResourcePoolTreeImpl implements ResourcePoolTree {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResourcePoolTreeImpl.class);
+
+ private static final String ROOT_POOL_MEMORY_SHARE_KEY = "memory";
+
+ private static final String ROOT_POOL_QUEUE_SELECTION_POLICY_KEY = "queue_selection_policy";
+
+ public static final String ROOT_POOL_CONFIG_KEY = "drill.exec.rm";
+
+ private final ResourcePool rootPool;
+
+ private final Config rmConfig;
+
+ private final NodeResources totalNodeResources;
+
+ private final double resourceShare;
+
+ private final Map<String, QueryQueueConfig> leafQueues = Maps.newHashMap();
+
+ private final QueueSelectionPolicy selectionPolicy;
+
+ public ResourcePoolTreeImpl(Config rmConfig, long totalNodeMemory,
+ int totalNodePhysicalCpu, int vFactor) throws RMConfigException {
+ this(rmConfig, new NodeResources(totalNodeMemory, totalNodePhysicalCpu, vFactor));
+ }
+
+ private ResourcePoolTreeImpl(Config rmConfig, NodeResources totalNodeResources) throws RMConfigException {
+ try {
+ this.rmConfig = rmConfig;
+ this.totalNodeResources = totalNodeResources;
+ this.resourceShare = this.rmConfig.hasPath(ROOT_POOL_MEMORY_SHARE_KEY) ?
+ this.rmConfig.getDouble(ROOT_POOL_MEMORY_SHARE_KEY) : ROOT_POOL_DEFAULT_MEMORY_PERCENT;
+ this.selectionPolicy = QueueSelectionPolicyFactory.createSelectionPolicy(
+ this.rmConfig.hasPath(ROOT_POOL_QUEUE_SELECTION_POLICY_KEY) ?
+ SelectionPolicy.valueOf(rmConfig.getString(ROOT_POOL_QUEUE_SELECTION_POLICY_KEY)) :
+ ROOT_POOL_DEFAULT_QUEUE_SELECTION_POLICY);
+ rootPool = new ResourcePoolImpl(this.rmConfig.getConfig(ROOT_POOL_CONFIG_KEY), resourceShare, 1.0,
+ totalNodeResources, null, leafQueues);
+ logger.debug("Dumping RM configuration {}", toString());
+ } catch (RMConfigException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new RMConfigException(String.format("Failure while parsing root pool configuration. " +
+ "[Details: Config: %s]", rmConfig), ex);
+ }
+ }
+
+ /**
+ * @return root {@link ResourcePool}
+ */
+ @Override
+ public ResourcePool getRootPool() {
+ return rootPool;
+ }
+
+ /**
+ * @return Map containing all the configured leaf queues
+ */
+ @Override
+ public Map<String, QueryQueueConfig> getAllLeafQueues() {
+ return leafQueues;
+ }
+
+ @Override
+ public double getResourceShare() {
+ return resourceShare;
+ }
+
+ /**
+ * Creates {@link QueueAssignmentResult} which contains list of all selected leaf ResourcePools and all the rejected
+ * ResourcePools for the provided query. Performs DFS of the ResourcePoolTree to traverse and find
+ * selected/rejected ResourcePools.
+ * @param queryContext {@link QueryContext} which contains metadata required for the given query
+ * @return {@link QueueAssignmentResult} populated with selected/rejected ResourcePools
+ */
+ @Override
+ public QueueAssignmentResult selectAllQueues(QueryContext queryContext) {
+ QueueAssignmentResult assignmentResult = new QueueAssignmentResult();
+ rootPool.visitAndSelectPool(assignmentResult, queryContext);
+ return assignmentResult;
+ }
+
+ /**
+ * Selects a leaf queue out of all the possibles leaf queues returned by
+ * {@link ResourcePoolTree#selectAllQueues(QueryContext)} for a given query based on the configured
+ * {@link QueueSelectionPolicy}. If none of the queue qualifies then it throws {@link QueueSelectionException}
+ * @param queryContext {@link QueryContext} which contains metadata for given query
+ * @param queryMaxNodeResource Max resources on a node required for given query
+ * @return {@link QueryQueueConfig} for the selected leaf queue
+ * @throws QueueSelectionException If no leaf queue is selected
+ */
+ @Override
+ public QueryQueueConfig selectOneQueue(QueryContext queryContext, NodeResources queryMaxNodeResource)
+ throws QueueSelectionException {
+ final QueueAssignmentResult assignmentResult = selectAllQueues(queryContext);
+ final List<ResourcePool> selectedPools = assignmentResult.getSelectedLeafPools();
+ if (selectedPools.size() == 0) {
+ throw new QueueSelectionException(String.format("No resource pools to choose from for the query: %s",
+ queryContext.getQueryId()));
+ } else if (selectedPools.size() == 1) {
+ return selectedPools.get(0).getQueryQueue();
+ }
+
+ return selectionPolicy.selectQueue(selectedPools, queryContext, queryMaxNodeResource).getQueryQueue();
+ }
+
+ /**
+ * @return {@link QueueSelectionPolicy} which is used to chose one queue out of all the available options for a query
+ */
+ @Override
+ public QueueSelectionPolicy getSelectionPolicyInUse() {
+ return selectionPolicy;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{ NodeResources: ").append(totalNodeResources.toString());
+ sb.append(", ResourcePercent: ").append(resourceShare);
+ sb.append(", SelectionPolicy: ").append(selectionPolicy.getSelectionPolicy());
+ sb.append(", RootPool: ").append(rootPool.toString());
+ sb.append("}");
+ return sb.toString();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/exception/QueueSelectionException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/exception/QueueSelectionException.java
new file mode 100644
index 000000000..8f0275dd2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/exception/QueueSelectionException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.exception;
+
+/**
+ * Used in case of error while selecting a queue for a given query
+ */
+public class QueueSelectionException extends Exception {
+ public QueueSelectionException(String msg) {
+ super(msg);
+ }
+
+ public QueueSelectionException(String msg, Exception ex) {
+ super(msg, ex);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/exception/RMConfigException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/exception/RMConfigException.java
new file mode 100644
index 000000000..d6648333d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/exception/RMConfigException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.exception;
+
+/**
+ * Used in cases of any error with the ResourceManagement configuration
+ */
+public class RMConfigException extends Exception {
+ public RMConfigException(String message) {
+ super(message);
+ }
+
+ public RMConfigException(String message, Exception e) {
+ super(message, e);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/package-info.java
new file mode 100644
index 000000000..6829e452d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/package-info.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains the configuration components of ResourceManagement feature in Drill. ResourceManagement will
+ * have it's own configuration file supporting the similar hierarchy of files as supported by Drill's current
+ * configuration and supports HOCON format. All the supported files for ResourceManagement is listed in
+ * {@link org.apache.drill.common.config.ConfigConstants}. However whether the feature is enabled/disabled is still
+ * controlled by a configuration {@link org.apache.drill.exec.ExecConstants#RM_ENABLED} available in the Drill's main
+ * configuration file. The rm config files will be parsed and loaded only when the feature is enabled. The
+ * configuration is a hierarchical tree {@link org.apache.drill.exec.resourcemgr.config.ResourcePoolTree} of
+ * {@link org.apache.drill.exec.resourcemgr.config.ResourcePool}. At the top will be the root pool which represents
+ * the entire resources (only memory in version 1) which is available to ResourceManager to use for admitting queries.
+ * It is assumed that all the nodes in the Drill cluster is homogeneous and given same amount of memory resources.
+ * The root pool can be further divided into child ResourcePools to divide the resources among multiple child pools.
+ * Each child pool get's a resource share from it's parent resource pool. In theory there is no limit on the number
+ * of ResourcePools that can be configured to divide the cluster resources.
+ * <p>
+ * In addition to other parameters defined later root ResourcePool also supports a configuration
+ * {@link org.apache.drill.exec.resourcemgr.config.ResourcePoolTreeImpl#ROOT_POOL_QUEUE_SELECTION_POLICY_KEY} which
+ * helps to select exactly one leaf pool out of all the possible options available for a query. For details please
+ * see package-info.java of {@link org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicy}.
+ * {@link org.apache.drill.exec.resourcemgr.config.ResourcePoolTree#selectOneQueue(org.apache.drill.exec.ops.QueryContext,
+ * org.apache.drill.exec.resourcemgr.NodeResources)} method is used by parallelizer to get a queue which will be used
+ * to admit a query. The selected queue resource constraints are used by parallelizer to allocate proper resources
+ * to a query so that it remains within the bounds.
+ * </p>
+ * <p>
+ * The ResourcePools falls under 2 category:
+ * <ul>
+ * <li>Intermediate Pool: As the name suggests all the pools between root and leaf pool falls under this
+ * category. It helps to navigate a query through the ResourcePoolTree hierarchy to find leaf pools using selectors.
+ * The intermediate ResourcePool help to subdivide a parent resource pool resource and doesn't have an actual queue
+ * associated with it. A query will only be executed in a queue associated with a ResourcePool not the ResourcePool
+ * itself.
+ * </li>
+ * <li>Leaf Pool: All the ResourcePools which doesn't have any child pools associated with it are leaf
+ * ResourcePools. All the leaf pools should have a unique name associated with it and should always have exactly one
+ * queue configured with it. The queue of a leaf pool is where the queries will be admitted and a resource slice will
+ * be given to it. All the leaf ResourcePools will collectively comprise of all the resource share available to
+ * Drill's ResourceManager to allocate to all the queries.
+ * </li>
+ * </ul>
+ * Configurations Supported by ResourcePool:
+ * <ul>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.ResourcePoolImpl#POOL_MEMORY_SHARE_KEY}: Percentage of
+ * memory share of parent ResourcePool assigned to this pool</li>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.ResourcePoolImpl#POOL_SELECTOR_KEY}: A selector assigned
+ * to this pool. For details please see package-info.java of
+ * {@link org.apache.drill.exec.resourcemgr.config.selectors.ResourcePoolSelector}
+ * </li>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.ResourcePoolImpl#POOL_QUEUE_KEY}: Queue configuration
+ * associated with this pool. It should always be configured for a leaf pool only. If configured with an
+ * intermediate pool then it will be ignored.
+ * </li>
+ * </ul>
+ * </p>
+ * <p>
+ * A queue always have 1:1 relationship with a leaf pool. Queries are admitted and executed with a resource slice
+ * from the queue. It supports following configurations:
+ * <ul>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.QueryQueueConfigImpl#MAX_ADMISSIBLE_KEY}: Upper bound on the
+ * total number of queries that can be admitted inside a queue. After this limit is reached all the queries
+ * will be moved to waiting state.</li>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.QueryQueueConfigImpl#MAX_WAITING_KEY}: Limits the
+ * total number of queries that can be in waiting state inside a queue. After this limit is reached all the new
+ * queries will be failed immediately.</li>
+ * <li> {@link org.apache.drill.exec.resourcemgr.config.QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY}:
+ * Limits the maximum memory any query in this queue can consume on any node in the cluster. This is to limit a
+ * query from a queue to consume all the resources on a node so that other queues query can also have some
+ * resources available for it. Ideally it's advised that sum of value of this parameter for all queues should not
+ * exceed the total memory on a node.
+ * </li>
+ * <li> {@link org.apache.drill.exec.resourcemgr.config.QueryQueueConfigImpl#WAIT_FOR_PREFERRED_NODES_KEY}: This
+ * configuration helps to decide if an admitted query in a queue should wait until it has available resources on all
+ * the nodes assigned to it by planner for its execution. By default it's true. When set to false then for the nodes
+ * which doesn't have available resources for a query will be replaced with another node with enough resources.
+ * </li>
+ * </ul>
+ * </p>
+ * Once all the configuration are parsed an in-memory structures are created then for each query planner will select
+ * a queue where a query can be admitted. The queue selection process happens by traversing the ResourcePoolTree. During
+ * traversal process the query metadata is evaluated against assigned selector of a ResourcePool. If the selector
+ * returns true then traversal continues to it's child pools otherwise it stops there and tries another pool. With
+ * the traversal it finds all the leaf pools which are eligible for admitting the query and store that information in
+ * {@link org.apache.drill.exec.resourcemgr.config.QueueAssignmentResult}. Later the selected pools are passed to
+ * configured QueueSelectionPolicy to select one queue for the query. Planner uses that selected queue's max query
+ * memory per node parameter to limit resource assignment to all the fragments of a query on a node. After a query is
+ * planned with resource constraints it is sent to leader of that queue to ask for admission. If admitted the query
+ * required resources are reserved in global state store and query is executed on the cluster. For details please see
+ * the design document and functional spec linked in <a href="https://issues.apache.org/jira/browse/DRILL-7026">
+ * DRILL-7026</a>
+ */
+package org.apache.drill.exec.resourcemgr.config; \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/AbstractQueueSelectionPolicy.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/AbstractQueueSelectionPolicy.java
new file mode 100644
index 000000000..7e9efe7e5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/AbstractQueueSelectionPolicy.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectionpolicy;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.NodeResources;
+import org.apache.drill.exec.resourcemgr.config.ResourcePool;
+import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException;
+
+import java.util.List;
+
+public abstract class AbstractQueueSelectionPolicy implements QueueSelectionPolicy {
+ private final SelectionPolicy selectionPolicy;
+
+ public AbstractQueueSelectionPolicy(SelectionPolicy selectionPolicy) {
+ this.selectionPolicy = selectionPolicy;
+ }
+
+ @Override
+ public SelectionPolicy getSelectionPolicy() {
+ return selectionPolicy;
+ }
+
+ public abstract ResourcePool selectQueue(List<ResourcePool> allPools, QueryContext queryContext,
+ NodeResources maxResourcePerNode) throws QueueSelectionException;
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/BestFitQueueSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/BestFitQueueSelection.java
new file mode 100644
index 000000000..a76f19336
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/BestFitQueueSelection.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectionpolicy;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.NodeResources;
+import org.apache.drill.exec.resourcemgr.config.QueryQueueConfig;
+import org.apache.drill.exec.resourcemgr.config.ResourcePool;
+import org.apache.drill.exec.resourcemgr.config.QueryQueueConfigImpl;
+import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException;
+
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * Helps to select a queue whose {@link QueryQueueConfig#getMaxQueryMemoryInMBPerNode()} is nearest to the max memory
+ * on a node required by the given query. Nearest is found by following rule:
+ * <ul>
+ * <li>
+ * Queue whose {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY} is
+ * equal to max memory per node of given query
+ * </li>
+ * <li>
+ * Queue whose {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY} is
+ * just greater than max memory per node of given query. From all queues whose max_query_memory_per_node is
+ * greater than what is needed by the query, the queue with minimum value is chosen.
+ * </li>
+ * <li>
+ * Queue whose {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY} is
+ * just less than max memory per node of given query. From all queues whose max_query_memory_per_node is
+ * less than what is needed by the query, the queue with maximum value is chosen.
+ * </li>
+ * </ul>
+ */
+public class BestFitQueueSelection extends AbstractQueueSelectionPolicy {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BestFitQueueSelection.class);
+
+ public BestFitQueueSelection() {
+ super(SelectionPolicy.BESTFIT);
+ }
+
+ /**
+ * Comparator used to sort the leaf ResourcePool lists based on
+ * {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY}
+ */
+ private static class BestFitComparator implements Comparator<ResourcePool> {
+ @Override
+ public int compare(ResourcePool o1, ResourcePool o2) {
+ long pool1Value = o1.getQueryQueue().getMaxQueryMemoryInMBPerNode();
+ long pool2Value = o2.getQueryQueue().getMaxQueryMemoryInMBPerNode();
+ return Long.compare(pool1Value, pool2Value);
+ }
+ }
+
+ @Override
+ public ResourcePool selectQueue(List<ResourcePool> allPools, QueryContext queryContext,
+ NodeResources maxResourcePerNode) throws QueueSelectionException {
+ if (allPools.isEmpty()) {
+ throw new QueueSelectionException(String.format("There are no pools to apply %s selection policy pool for the " +
+ "query: %s", getSelectionPolicy().toString(), queryContext.getQueryId()));
+ }
+
+ allPools.sort(new BestFitComparator());
+ final long queryMaxNodeMemory = maxResourcePerNode.getMemoryInMB();
+ ResourcePool selectedPool = allPools.get(0);
+ for (ResourcePool pool : allPools) {
+ selectedPool = pool;
+ long poolMaxNodeMem = pool.getQueryQueue().getMaxQueryMemoryInMBPerNode();
+ if (poolMaxNodeMem >= queryMaxNodeMemory) {
+ break;
+ }
+ }
+ logger.debug("Selected pool {} based on {} policy for query {}", selectedPool.getPoolName(),
+ getSelectionPolicy().toString(),
+ queryContext.getQueryId());
+ return selectedPool;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/DefaultQueueSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/DefaultQueueSelection.java
new file mode 100644
index 000000000..f1c03c3f5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/DefaultQueueSelection.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectionpolicy;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.NodeResources;
+import org.apache.drill.exec.resourcemgr.config.ResourcePool;
+import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException;
+
+import java.util.List;
+
+/**
+ * Helps to select the first default queue in the list of all the provided queues. If there is no default queue
+ * present it throws {@link QueueSelectionException}. Default queue is a queue associated with {@link ResourcePool}
+ * which has {@link org.apache.drill.exec.resourcemgr.config.selectors.DefaultSelector} assigned to it
+ */
+public class DefaultQueueSelection extends AbstractQueueSelectionPolicy {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DefaultQueueSelection.class);
+
+ public DefaultQueueSelection() {
+ super(SelectionPolicy.DEFAULT);
+ }
+
+ @Override
+ public ResourcePool selectQueue(List<ResourcePool> allPools, QueryContext queryContext,
+ NodeResources maxResourcePerNode) throws QueueSelectionException {
+ for (ResourcePool pool : allPools) {
+ if (pool.isDefaultPool()) {
+ logger.debug("Selected default pool: {} for the query: {}", pool.getPoolName(), queryContext.getQueryId());
+ return pool;
+ }
+ }
+
+ throw new QueueSelectionException(String.format("There is no default pool to select from list of pools provided " +
+ "for the query: %s", queryContext.getQueryId()));
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/QueueSelectionPolicy.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/QueueSelectionPolicy.java
new file mode 100644
index 000000000..5fab2d44c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/QueueSelectionPolicy.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectionpolicy;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.NodeResources;
+import org.apache.drill.exec.resourcemgr.config.ResourcePool;
+import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException;
+
+import java.util.List;
+
+/**
+ * Interface that defines all the implementation of a QueueSelectionPolicy supported by ResourceManagement
+ */
+public interface QueueSelectionPolicy {
+
+ enum SelectionPolicy {
+ UNKNOWN,
+ DEFAULT,
+ RANDOM,
+ BESTFIT;
+
+ @Override
+ public String toString() {
+ return name().toLowerCase();
+ }
+ }
+
+ SelectionPolicy getSelectionPolicy();
+
+ ResourcePool selectQueue(List<ResourcePool> allPools, QueryContext queryContext, NodeResources maxResourcePerNode)
+ throws QueueSelectionException;
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/QueueSelectionPolicyFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/QueueSelectionPolicyFactory.java
new file mode 100644
index 000000000..6ae96e7a6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/QueueSelectionPolicyFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectionpolicy;
+
+/**
+ * Factory to return an instance of {@link QueueSelectionPolicy} based on the configured policy name. By default if
+ * the configured policy name doesn't matches any supported policies then it returns {@link BestFitQueueSelection}
+ */
+public class QueueSelectionPolicyFactory {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueueSelectionPolicyFactory.class);
+
+ public static QueueSelectionPolicy createSelectionPolicy(QueueSelectionPolicy.SelectionPolicy policy) {
+ logger.debug("Creating SelectionPolicy of type {}", policy);
+ QueueSelectionPolicy selectionPolicy;
+ switch (policy) {
+ case DEFAULT:
+ selectionPolicy = new DefaultQueueSelection();
+ break;
+ case BESTFIT:
+ selectionPolicy = new BestFitQueueSelection();
+ break;
+ case RANDOM:
+ selectionPolicy = new RandomQueueSelection();
+ break;
+ default:
+ logger.info("QueueSelectionPolicy is not configured so proceeding with the bestfit as default policy");
+ selectionPolicy = new BestFitQueueSelection();
+ break;
+ }
+
+ return selectionPolicy;
+ }
+
+ // prevents from instantiation
+ private QueueSelectionPolicyFactory() {
+ // no-op
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/RandomQueueSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/RandomQueueSelection.java
new file mode 100644
index 000000000..63c51f29d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/RandomQueueSelection.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectionpolicy;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.NodeResources;
+import org.apache.drill.exec.resourcemgr.config.ResourcePool;
+import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Randomly selects a queue from the list of all the provided queues. If no pools are provided then it throws
+ * {@link QueueSelectionException}
+ */
+public class RandomQueueSelection extends AbstractQueueSelectionPolicy {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RandomQueueSelection.class);
+
+ public RandomQueueSelection() {
+ super(SelectionPolicy.RANDOM);
+ }
+
+ @Override
+ public ResourcePool selectQueue(List<ResourcePool> allPools, QueryContext queryContext,
+ NodeResources maxResourcePerNode) throws QueueSelectionException {
+ if (allPools.size() == 0) {
+ throw new QueueSelectionException(String.format("Input pool list is empty to apply %s selection policy",
+ getSelectionPolicy().toString()));
+ }
+ Collections.shuffle(allPools);
+ ResourcePool selectedPool = allPools.get(0);
+ logger.debug("Selected random pool: {} for query: {}", selectedPool.getPoolName(), queryContext.getQueryId());
+ return selectedPool;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/package-info.java
new file mode 100644
index 000000000..8c1e257c2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/package-info.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines all the selection policy implementation which can be configured with Resource Management. The
+ * configuration which is used to specify a policy is
+ * {@link org.apache.drill.exec.resourcemgr.config.ResourcePoolTreeImpl#ROOT_POOL_QUEUE_SELECTION_POLICY_KEY}. Selection Policy
+ * helps to select a single leaf ResourcePool out of all the eligible pools whose queue can be used to admit this query.
+ * Currently there are 3 types of supported policies. In future more policies can be supported by implementing
+ * {@link org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicy} interface.
+ * <ul>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.selectionpolicy.DefaultQueueSelection}: Out of all the eligible pools
+ * this policy will choose a default pool in the list. If there are multiple default pools present in the list then
+ * it will return the first default pool. If there is no default pool present then it throws
+ * {@link org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException}
+ * </li>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.selectionpolicy.RandomQueueSelection}: Out of all the eligible pools
+ * this policy will choose a pool at random. If there are no pools to select from then it throws
+ * {@link org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException}
+ * </li>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.selectionpolicy.BestFitQueueSelection}: Out of all the eligible pools
+ * this policy will choose a pool whose queue configuration
+ * {@link org.apache.drill.exec.resourcemgr.config.QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY} value is closest to
+ * the max memory on a node required by the query. It tries to find a pool whose value for MAX_QUERY_MEMORY_PER_NODE
+ * is equal to queries max memory per node requirement. If there is no such pool then find the pool with config value
+ * just greater than queries max memory per node. Otherwise find a pool with config value just less than queries
+ * max memory per node.
+ * </li>
+ * </ul>
+ */
+package org.apache.drill.exec.resourcemgr.config.selectionpolicy; \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AbstractResourcePoolSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AbstractResourcePoolSelector.java
new file mode 100644
index 000000000..6f474e3a0
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AbstractResourcePoolSelector.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors;
+
+import org.apache.drill.exec.ops.QueryContext;
+
+public abstract class AbstractResourcePoolSelector implements ResourcePoolSelector {
+
+ protected final SelectorType SELECTOR_TYPE;
+
+ AbstractResourcePoolSelector(SelectorType type) {
+ SELECTOR_TYPE = type;
+ }
+
+ public SelectorType getSelectorType() {
+ return SELECTOR_TYPE;
+ }
+
+ public abstract boolean isQuerySelected(QueryContext queryContext);
+
+ @Override
+ public String toString() {
+ return SELECTOR_TYPE.toString();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AclSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AclSelector.java
new file mode 100644
index 000000000..50acb860d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AclSelector.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors;
+
+import avro.shaded.com.google.common.annotations.VisibleForTesting;
+import com.typesafe.config.Config;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Evaluates if a query can be admitted to a ResourcePool or not by comparing query user/groups with the
+ * configured users/groups policies for this selector. AclSelector can be configured using both long-form syntax or
+ * short-form syntax as defined below:
+ * <ul>
+ * <li>Long-Form Syntax: Allows to use identifiers to specify allowed and disallowed users/groups. For example:
+ * users: [alice:+, bob:-] means alice is allowed whereas bob is denied access to the pool</li>
+ * <li>Short-Form Syntax: Allows to specify lists of allowed users/groups only. For example: users: [alice, bob]
+ * means only alice and bob are allowed access to this pool</li>
+ * </ul>
+ * The selector also supports * as a wildcard for both long and short form syntax to allow/deny all users/groups.
+ * Example configuration is of form:
+ * <code><pre>
+ * selector: {
+ * acl: {
+ * users: [alice:+, bob:-],
+ * groups: [sales, marketing]
+ * }
+ * }
+ * </pre></code>
+ */
+public class AclSelector extends AbstractResourcePoolSelector {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AclSelector.class);
+
+ private final Set<String> allowedUsers = Sets.newHashSet();
+
+ private final Set<String> allowedGroups = Sets.newHashSet();
+
+ private final Set<String> deniedUsers = Sets.newHashSet();
+
+ private final Set<String> deniedGroups = Sets.newHashSet();
+
+ private final Config aclSelectorValue;
+
+ private static final String ACL_VALUE_GROUPS_KEY = "groups";
+
+ private static final String ACL_VALUE_USERS_KEY = "users";
+
+ private static final String ACL_LONG_SYNTAX_SEPARATOR = ":";
+
+ private static final String ACL_LONG_ALLOWED_IDENTIFIER = "+";
+
+ private static final String ACL_LONG_DISALLOWED_IDENTIFIER = "-";
+
+ private static final String ACL_ALLOW_ALL = "*";
+
+
+ AclSelector(Config configValue) throws RMConfigException {
+ super(SelectorType.ACL);
+ this.aclSelectorValue = configValue;
+ validateAndParseACL(aclSelectorValue);
+ }
+
+ /**
+ * Determines if a given query is selected by this ACL selector of a Resource Pool or not. Following rules are
+ * followed to evaluate the selection. Assumption: There is an assumption made that if a user or group is configured
+ * in both +ve/-ve respective lists then it will be treated to be present in -ve list.
+ *
+ * Rules:
+ * 1) Check if query user is present in -ve users list, If yes then query is not selected else go to 2
+ * 2) Check if query user is present in +ve users list, If yes then query is selected else go to 3
+ * 3) Check if * is present in -ve users list, if yes then query is not selected else go to 4
+ * 4) Check if * is present in +ve users list, if yes then query is selected else go to 5
+ * 5) If here that means query user or * is absent in both +ve and -ve users list so check for groups of query user
+ * in step 6
+ * 6) Check if any of groups of query user is present in -ve groups list, If yes then query is not selected else go
+ * to 7
+ * 7) Check if any of groups of query user is present in +ve groups list, If yes then query selected else go to 8
+ * 8) Check if * is present in -ve groups list, If yes then query is not selected else go to 9
+ * 9) Check if * is present in +ve groups list, If yes then query is selected else go to 10
+ * 10) Query user and groups of it is neither present is +ve/-ve users list not +ve/-ve groups list hence the query
+ * is not selected
+ *
+ * @param queryContext QueryContext to get information about query user
+ * @return true if a query is selected by this selector, false otherwise
+ */
+ @Override
+ public boolean isQuerySelected(QueryContext queryContext) {
+ final String queryUser = queryContext.getQueryUserName();
+ final UserGroupInformation queryUserUGI = ImpersonationUtil.createProxyUgi(queryUser);
+ final Set<String> queryGroups = Sets.newHashSet(queryUserUGI.getGroupNames());
+ return checkQueryUserGroups(queryUser, queryGroups);
+ }
+
+ @VisibleForTesting
+ public boolean checkQueryUserGroups(String queryUser, Set<String> queryGroups) {
+ // Check for +ve/-ve users information with query user
+ if (deniedUsers.contains(queryUser)) {
+ logger.debug("Query user is present in configured ACL -ve users list");
+ return false;
+ } else if (allowedUsers.contains(queryUser)) {
+ logger.debug("Query user is present in configured ACL +ve users list");
+ return true;
+ } else if (isStarInDisAllowedUsersList()) {
+ logger.debug("Query user is absent in configured ACL +ve/-ve users list but * is in -ve users list");
+ return false;
+ } else if (isStarInAllowedUsersList()) {
+ logger.debug("Query user is absent in configured ACL +ve/-ve users list but * is in +ve users list");
+ return true;
+ }
+
+ // Check for +ve/-ve groups information with groups of query user
+ if (Sets.intersection(queryGroups, deniedGroups).size() > 0) {
+ logger.debug("Groups of Query user is present in configured ACL -ve groups list");
+ return false;
+ } else if (Sets.intersection(queryGroups, allowedGroups).size() > 0) {
+ logger.debug("Groups of Query user is present in configured ACL +ve groups list");
+ return true;
+ } else if (isStarInDisAllowedGroupsList()) {
+ logger.debug("Groups of Query user is absent in configured ACL +ve/-ve groups list but * is in -ve groups list");
+ return false;
+ } else if (isStarInAllowedGroupsList()) {
+ logger.debug("Groups of Query user is absent in configured ACL +ve/-ve groups list but * is in +ve groups list");
+ return true;
+ }
+
+ logger.debug("Neither query user or group is present in configured ACL users/groups list");
+ return false;
+ }
+
+ /**
+ * Parses the acl selector config value for users and groups to populate list of allowed/denied users and groups.
+ * @param aclConfig Acl config to parse
+ * @throws RMConfigException in case of invalid config for either users/groups
+ */
+ private void validateAndParseACL(Config aclConfig) throws RMConfigException {
+
+ // ACL config doesn't have either group or user list
+ if (!aclConfig.hasPath(ACL_VALUE_GROUPS_KEY) && !aclConfig.hasPath(ACL_VALUE_USERS_KEY)) {
+ throw new RMConfigException(String.format("ACL Selector config is missing both group and user list information." +
+ " Please configure either of groups or users list. [Details: aclConfig: %s]", aclConfig));
+ }
+
+ if (aclConfig.hasPath(ACL_VALUE_USERS_KEY)) {
+ final List<String> users = aclSelectorValue.getStringList(ACL_VALUE_USERS_KEY);
+ parseACLInput(users, allowedUsers, deniedUsers);
+ }
+
+ if (aclConfig.hasPath(ACL_VALUE_GROUPS_KEY)) {
+ final List<String> groups = aclSelectorValue.getStringList(ACL_VALUE_GROUPS_KEY);
+ parseACLInput(groups, allowedGroups, deniedGroups);
+ }
+
+ // If no valid configuration is seen for this selector
+ if (allowedGroups.size() == 0 && deniedGroups.size() == 0 &&
+ deniedUsers.size() == 0 && allowedUsers.size() == 0) {
+ throw new RMConfigException("No valid users or groups information is configured for this ACL selector. Either " +
+ "use * or valid users/groups");
+ }
+
+ // Check if there is any intersection between allowed and disallowed users/groups
+ Set<String> wrongConfig = Sets.intersection(allowedUsers, deniedUsers);
+ if (wrongConfig.size() > 0) {
+ logger.warn("These users are configured both in allowed and disallowed list. They will be treated as disallowed" +
+ ". [Details: users: {}]", wrongConfig);
+ allowedUsers.removeAll(wrongConfig);
+ }
+
+ wrongConfig = Sets.intersection(allowedGroups, deniedGroups);
+ if (wrongConfig.size() > 0) {
+ logger.warn("These groups are configured both in allowed and disallowed list. They will be treated as " +
+ "disallowed. [Details: groups: {}]", wrongConfig);
+ allowedGroups.removeAll(wrongConfig);
+ }
+ }
+
+ public Set<String> getAllowedUsers() {
+ return allowedUsers;
+ }
+
+ public Set<String> getAllowedGroups() {
+ return allowedGroups;
+ }
+
+ public Set<String> getDeniedUsers() {
+ return deniedUsers;
+ }
+
+ public Set<String> getDeniedGroups() {
+ return deniedGroups;
+ }
+
+ private boolean isStarInAllowedUsersList() {
+ return allowedUsers.contains(ACL_ALLOW_ALL);
+ }
+
+ private boolean isStarInAllowedGroupsList() {
+ return allowedGroups.contains(ACL_ALLOW_ALL);
+ }
+
+ private boolean isStarInDisAllowedUsersList() {
+ return deniedUsers.contains(ACL_ALLOW_ALL);
+ }
+
+ private boolean isStarInDisAllowedGroupsList() {
+ return deniedGroups.contains(ACL_ALLOW_ALL);
+ }
+
+ private void parseACLInput(List<String> acls, Set<String> allowedIdentity, Set<String> disAllowedIdentity) {
+ for (String aclValue : acls) {
+
+ if (aclValue.isEmpty()) {
+ continue;
+ }
+ // Check if it's long form syntax or shortForm syntax
+ String[] aclValueSplits = aclValue.split(ACL_LONG_SYNTAX_SEPARATOR);
+ if (aclValueSplits.length == 1) {
+ // short form
+ if (!allowedIdentity.add(aclValueSplits[0])) {
+ logger.info("Duplicate acl identity: {} found in configured list will be ignored", aclValueSplits[0]);
+ }
+ } else {
+ // long form
+ final String identifier = aclValueSplits[1];
+ if (identifier.equals(ACL_LONG_ALLOWED_IDENTIFIER)) {
+ if (!allowedIdentity.add(aclValueSplits[0])) {
+ logger.info("Duplicate acl identity: {} found in configured list will be ignored", aclValueSplits[0]);
+ }
+ } else if (identifier.equals(ACL_LONG_DISALLOWED_IDENTIFIER)) {
+ if (!disAllowedIdentity.add(aclValueSplits[0])) {
+ logger.info("Duplicate acl identity: {} found in configured list will be ignored", aclValueSplits[0]);
+ }
+ } else {
+ logger.error("Invalid long form syntax encountered hence ignoring ACL string {} . Details[Allowed " +
+ "identifiers are `+` and `-`. Encountered: {}]", aclValue, identifier);
+ }
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{ SelectorType: ").append(super.toString());
+ sb.append(", AllowedUsers: [");
+ for (String positiveUser : allowedUsers) {
+ sb.append(positiveUser).append(", ");
+ }
+ sb.append("], AllowedGroups: [");
+ for (String positiveGroup : allowedGroups) {
+ sb.append(positiveGroup).append(", ");
+ }
+ sb.append("], DisallowedUsers: [");
+ for (String negativeUser : deniedUsers) {
+ sb.append(negativeUser).append(", ");
+ }
+ sb.append("], DisallowedGroups: [");
+ for (String negativeGroup : deniedGroups) {
+ sb.append(negativeGroup).append(", ");
+ }
+ sb.append("]}");
+
+ return sb.toString();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AndSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AndSelector.java
new file mode 100644
index 000000000..82b51ee46
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AndSelector.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors;
+
+import com.typesafe.config.Config;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+
+import java.util.List;
+
+/**
+ * Complex selector whose value is list of other Simple or Complex Selectors. There has to be at least 2 other
+ * selectors configured in the value list for this selector. It does AND operation on result of all the child
+ * selectors configured with it to evaluate if a query can be admitted to it's ResourcePool or not.
+ *
+ * Example configuration is of form:
+ * <code><pre>
+ * selector: {
+ * and: [{tag: "BITool"},{tag: "operational"}]
+ * }
+ * </pre></code>
+ */
+public class AndSelector extends ComplexSelectors {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AndSelector.class);
+
+ AndSelector(List<? extends Config> configValue) throws RMConfigException {
+ super(SelectorType.AND, configValue);
+ }
+
+ @Override
+ public boolean isQuerySelected(QueryContext queryContext) {
+ for (ResourcePoolSelector childSelector : childSelectors) {
+ if (!childSelector.isQuerySelected(queryContext)) {
+ logger.debug("Query {} is not selected by the child selector of type {} in this complex AndSelector",
+ queryContext.getQueryId(), childSelector.getSelectorType().toString());
+ return false;
+ }
+ }
+ return true;
+ }
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ComplexSelectors.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ComplexSelectors.java
new file mode 100644
index 000000000..fe9ab52ae
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ComplexSelectors.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors;
+
+import com.typesafe.config.Config;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class ComplexSelectors extends AbstractResourcePoolSelector {
+
+ protected final List<ResourcePoolSelector> childSelectors = new ArrayList<>();
+
+ ComplexSelectors(SelectorType type, List<? extends Config> selectorConfig) throws RMConfigException {
+ super(type);
+ parseAndCreateChildSelectors(selectorConfig);
+ }
+
+ private void parseAndCreateChildSelectors(List<? extends Config> childConfigs) throws RMConfigException {
+ for (Config childConfig : childConfigs) {
+ childSelectors.add(ResourcePoolSelectorFactory.createSelector(childConfig));
+ }
+
+ if (childSelectors.size() < 2) {
+ throw new RMConfigException(String.format("For complex selector OR and AND it is expected to have atleast 2 " +
+ "selectors in the list but found %d", childSelectors.size()));
+ }
+ }
+
+ public abstract boolean isQuerySelected(QueryContext queryContext);
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{ SelectorType: ").append(super.toString());
+ sb.append(", of selectors [");
+ for (ResourcePoolSelector childSelector : childSelectors) {
+ sb.append(childSelector.toString()).append(", ");
+ }
+ sb.append("]}");
+ return sb.toString();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/DefaultSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/DefaultSelector.java
new file mode 100644
index 000000000..2336d03e2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/DefaultSelector.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors;
+
+import org.apache.drill.exec.ops.QueryContext;
+
+/**
+ * When selector configuration is absent for a ResourcePool then it is associated with a DefaultSelector. It acts as
+ * a sink for all the queries which means all the queries will be selected by this default selector.
+ */
+public class DefaultSelector extends AbstractResourcePoolSelector {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DefaultSelector.class);
+
+ DefaultSelector() {
+ super(SelectorType.DEFAULT);
+ }
+
+ @Override
+ public SelectorType getSelectorType() {
+ return SELECTOR_TYPE;
+ }
+
+ @Override
+ public boolean isQuerySelected(QueryContext queryContext) {
+ logger.debug("Query {} is selected by this Default selector", queryContext.getQueryId());
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "{SelectorType: " + super.toString() + "}";
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/NotEqualSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/NotEqualSelector.java
new file mode 100644
index 000000000..653c3b538
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/NotEqualSelector.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors;
+
+import avro.shaded.com.google.common.annotations.VisibleForTesting;
+import com.typesafe.config.Config;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+
+/**
+ * Simple selector whose value is another Simple or Complex Selectors. It does NOT of result of its child selector
+ * configured with it to evaluate if a query can be admitted to it's ResourcePool or not.
+ *
+ * Example configuration is of form:
+ * <code><pre>
+ * selector: {
+ * not_equal: {tag: "BITool"}
+ * }
+ * </pre></code>
+ */
+public class NotEqualSelector extends AbstractResourcePoolSelector {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NotEqualSelector.class);
+
+ private final ResourcePoolSelector poolSelector;
+
+ NotEqualSelector(Config selectorValue) throws RMConfigException {
+ super(SelectorType.NOT_EQUAL);
+ poolSelector = ResourcePoolSelectorFactory.createSelector(selectorValue);
+ }
+
+ @Override
+ public boolean isQuerySelected(QueryContext queryContext) {
+ logger.debug("Query {} is evaluated for not_equal of selector type {}", queryContext.getQueryId(),
+ poolSelector.getSelectorType().toString());
+ return !poolSelector.isQuerySelected(queryContext);
+ }
+
+ @VisibleForTesting
+ public ResourcePoolSelector getPoolSelector() {
+ return poolSelector;
+ }
+
+ @Override
+ public String toString() {
+ return "{ SelectorType: " + super.toString() + ", of selector " + poolSelector.toString() + " }";
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/OrSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/OrSelector.java
new file mode 100644
index 000000000..5e8b2f9e0
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/OrSelector.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors;
+
+import com.typesafe.config.Config;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+
+import java.util.List;
+
+/**
+ * Complex selector whose value is list of other Simple or Complex Selectors. There has to be at least 2 other
+ * selectors configured in the value list for this selector. It does OR operation on result of all the child selectors
+ * configured with it to evaluate if a query can be admitted to it's ResourcePool or not.
+ *
+ * Example configuration is of form:
+ * <code></><pre>
+ * selector: {
+ * or: [{tag: "BITool"},{tag: "operational"}]
+ * }
+ * </pre></code>
+ */
+public class OrSelector extends ComplexSelectors {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrSelector.class);
+
+ OrSelector(List<? extends Config> configValue) throws RMConfigException {
+ super(SelectorType.OR, configValue);
+ }
+
+ @Override
+ public boolean isQuerySelected(QueryContext queryContext) {
+ for (ResourcePoolSelector childSelector : childSelectors) {
+ // If we find any selector evaluating to true then no need to evaluate other selectors in the list
+ if (childSelector.isQuerySelected(queryContext)) {
+ logger.debug("Query {} is selected by the child selector of type {} in this OrSelector",
+ queryContext.getQueryId(), childSelector.getSelectorType().toString());
+ return true;
+ }
+ }
+
+ return false;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ResourcePoolSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ResourcePoolSelector.java
new file mode 100644
index 000000000..f5dbf0db1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ResourcePoolSelector.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors;
+
+import org.apache.drill.exec.ops.QueryContext;
+
+/**
+ * Interface that defines implementation for selectors assigned to a ResourcePool. ResourcePoolSelector helps to
+ * evaluate if a given query can be admitted into a ResourcePool or not. Based on the assigned selector type to a
+ * ResourcePool it uses the query metadata with it's own configured values and make a decision for a query. The
+ * SelectorType defines all the supported ResourcePoolSelector which can be assigned to a ResourcePool. The
+ * configuration of a selector is of type:
+ * <code><pre>
+ * selector: {
+ * SelectorType:SelectorValue
+ * }
+ * where SelectorValue can be a string (for SelectorType tag),
+ * object (for SelectorType acl and not_equal) and
+ * list of objects (for SelectorType and, or)
+ * when selector config is absent then a DefaultSelector is associated with the ResourcePool
+ * </pre></code>
+ */
+public interface ResourcePoolSelector {
+
+ enum SelectorType {
+ UNKNOWN,
+ DEFAULT,
+ TAG,
+ ACL,
+ OR,
+ AND,
+ NOT_EQUAL;
+
+ @Override
+ public String toString() {
+ return name().toLowerCase();
+ }
+ }
+
+ SelectorType getSelectorType();
+
+ boolean isQuerySelected(QueryContext queryContext);
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ResourcePoolSelectorFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ResourcePoolSelectorFactory.java
new file mode 100644
index 000000000..0ad130269
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ResourcePoolSelectorFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors;
+
+import com.typesafe.config.Config;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+import org.apache.drill.exec.resourcemgr.config.selectors.ResourcePoolSelector.SelectorType;
+
+public class ResourcePoolSelectorFactory {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResourcePoolSelectorFactory.class);
+
+ public static ResourcePoolSelector createSelector(Config selectorConfig) throws RMConfigException {
+ ResourcePoolSelector poolSelector = null;
+ String selectorType = SelectorType.DEFAULT.toString();
+ try {
+ if (selectorConfig == null) {
+ poolSelector = new DefaultSelector();
+ } else if (selectorConfig.hasPath(SelectorType.TAG.toString())) {
+ selectorType = SelectorType.TAG.toString();
+ poolSelector = new TagSelector(selectorConfig.getString(selectorType));
+ } else if (selectorConfig.hasPath(SelectorType.ACL.toString())) {
+ selectorType = SelectorType.ACL.toString();
+ poolSelector = new AclSelector(selectorConfig.getConfig(selectorType));
+ } else if (selectorConfig.hasPath(SelectorType.OR.toString())) {
+ selectorType = SelectorType.OR.toString();
+ poolSelector = new OrSelector(selectorConfig.getConfigList(selectorType));
+ } else if (selectorConfig.hasPath(SelectorType.AND.toString())) {
+ selectorType = SelectorType.AND.toString();
+ poolSelector = new AndSelector(selectorConfig.getConfigList(selectorType));
+ } else if (selectorConfig.hasPath(SelectorType.NOT_EQUAL.toString())) {
+ selectorType = SelectorType.NOT_EQUAL.toString();
+ poolSelector = new NotEqualSelector(selectorConfig.getConfig(selectorType));
+ }
+ } catch (Exception ex) {
+ throw new RMConfigException(String.format("There is an error with value configuration for selector type %s",
+ selectorType), ex);
+ }
+
+ // if here means either a selector is chosen or wrong configuration
+ if (poolSelector == null) {
+ throw new RMConfigException(String.format("Configured selector is either empty or not supported. [Details: " +
+ "SelectorConfig: %s]", selectorConfig));
+ }
+
+ logger.debug("Created selector of type {}", poolSelector.getSelectorType().toString());
+ return poolSelector;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/TagSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/TagSelector.java
new file mode 100644
index 000000000..79237123c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/TagSelector.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+
+/**
+ * Simple selector whose value is a string representing a tag. It tries to match it's configured tag with one of
+ * the tags configured for the query using connection/session parameter. If a query posses at least one tag same as
+ * this selector tag then it will be admitted in the respective ResourcePool.
+ *
+ * Example configuration is of form:
+ * <code><pre>
+ * selector: {
+ * tag: "BITool"
+ * }
+ * </pre></code>
+ */
+public class TagSelector extends AbstractResourcePoolSelector {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TagSelector.class);
+
+ private final String configuredTag;
+
+ TagSelector(String selectorValue) throws RMConfigException {
+ super(SelectorType.TAG);
+
+ if (selectorValue == null || selectorValue.isEmpty()) {
+ throw new RMConfigException("Tag value of this selector is either null or empty. Please configure a valid tag " +
+ "as string.");
+ }
+ configuredTag = selectorValue;
+ }
+
+ @Override
+ public SelectorType getSelectorType() {
+ return SELECTOR_TYPE;
+ }
+
+ @Override
+ public boolean isQuerySelected(QueryContext queryContext) {
+ String[] queryTags = queryContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY).string_val.split(",");
+ for (String queryTag : queryTags) {
+ if (queryTag.equals(configuredTag)) {
+ logger.debug("Query {} tag {} matches the selector tag {}", queryContext.getQueryId(), queryTag, configuredTag);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public String getTagValue() {
+ return configuredTag;
+ }
+
+ @Override
+ public String toString() {
+ return "{ SelectorType: " + super.toString() + ", TagValue: [" + configuredTag + "]}";
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/package-info.java
new file mode 100644
index 000000000..441c24aed
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/package-info.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines all the Selectors which can be assigned to a ResourcePool in the ResourceManagement configuration. A
+ * selector helps to evaluate that given a query and it's metadata if that query can be admitted inside associated
+ * ResourcePool or not. Selectors are associated with both intermediate and leaf level ResourcePools. The intermediate
+ * pool selectors helps to navigate the ResourcePool hierarchy to reach a leaf level ResourcePool where query will
+ * actually be admitted in the queue associated with a leaf pool. Whereas leaf pool selector will help to choose all
+ * the leaf pools which can be considered to admit a query. A selector can be configured for a ResourcePool
+ * using the {@link org.apache.drill.exec.resourcemgr.config.ResourcePoolImpl#POOL_SELECTOR_KEY} configuration. If the
+ * selector configuration is missing for a ResourcePool then it is associated with a Default Selector making it
+ * a Default ResourcePool. Selectors are configured as a key value pair where key represents it's type and
+ * value is what it uses to evaluate a query. Currently there are 6 different types of supported selectors. In future
+ * more selectos can be supported by implementing
+ * {@link org.apache.drill.exec.resourcemgr.config.selectors.ResourcePoolSelector} interface.
+ * <ul>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.selectors.DefaultSelector}: It acts as a sink and will always return
+ * true for all the queries. It is associated with a ResourcePool which is not assigned any selector in the
+ * configuration making it a default pool.
+ * </li>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.selectors.TagSelector}: A selector which has a tag (String) value
+ * associated with it. It evaluates all the tags associated with a query to see if there is a match or not. A query is
+ * only selected by this selector if it has a tag same as that configured for this selector.
+ * </li>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.selectors.AclSelector}: A selector which has users/groups policies
+ * value associated with it. It evaluates these policies against the users/groups of query session to check if the
+ * query can be selected or not. It supports long/short form syntax to configure the acl policies. It also supports *
+ * as wildcard character to allow/deny all users/groups in its policies. Please see AclSelector class javadoc for more
+ * details.
+ * </li>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.selectors.OrSelector}: A selector which can have lists of 2 or more
+ * other selectors configured as it's value except for Default selector. It performs || operation on the selection
+ * result of all other configured selectors.</li>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.selectors.AndSelector}: A selector which can have lists of 2 or more
+ * other selectors configured as it's value except for Default selector. It performs && operation on the selection
+ * result of all other configured selectors.</li>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.selectors.NotEqualSelector}: A selector which can have any other
+ * selector defined above configured as it's value except for Default selector. It will compare the query metadata
+ * against the configured selector and will return ! of that as a selection result. For example: if a TagSelector
+ * with value "sales" is configured for this NotSelector then it will select queries whose doesn't have sales tag
+ * associated with it.</li>
+ * </ul>
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors; \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/package-info.java
new file mode 100644
index 000000000..0658172f6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package will contain all the components of resource manager in Drill.
+ */
+package org.apache.drill.exec.resourcemgr; \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/util/MemoryConfigParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/util/MemoryConfigParser.java
new file mode 100644
index 000000000..8d5f5f26f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/util/MemoryConfigParser.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.util;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Utility class which helps in parsing memory configuration string using the passed in pattern to get memory value in
+ * bytes.
+ */
+public class MemoryConfigParser {
+
+ /**
+ * @param memoryConfig Memory configuration string
+ * @param patternToUse Pattern which will be used to parse the memory configuration string
+ * @return memory value in bytes
+ */
+ public static long parseMemoryConfigString(String memoryConfig, String patternToUse) {
+ Pattern pattern = Pattern.compile(patternToUse);
+ Matcher patternMatcher = pattern.matcher(memoryConfig);
+
+ long memoryPerNodeInBytes = 0;
+ if (patternMatcher.matches()) {
+ memoryPerNodeInBytes = Long.parseLong(patternMatcher.group(1));
+
+ // group 2 can be optional
+ String group2 = patternMatcher.group(2);
+ if (!group2.isEmpty()) {
+ switch (group2.charAt(0)) {
+ case 'G':
+ case 'g':
+ memoryPerNodeInBytes *= 1073741824L;
+ break;
+ case 'K':
+ case 'k':
+ memoryPerNodeInBytes *= 1024L;
+ break;
+ case 'M':
+ case 'm':
+ memoryPerNodeInBytes *= 1048576L;
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("Memory Configuration %s didn't matched any of the" +
+ " supported suffixes. [Details: Supported: kKmMgG, Actual: %s", memoryConfig, group2));
+ }
+ }
+ } else {
+ throw new IllegalArgumentException(String.format("Memory Configuration %s didn't matched supported format. " +
+ "Supported format is %s?", memoryConfig, patternToUse));
+ }
+
+ return memoryPerNodeInBytes;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index dc1e9ccf6..e2fd1e845 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -17,17 +17,16 @@
*/
package org.apache.drill.exec.rpc.user;
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.net.ssl.SSLEngine;
-import javax.security.sasl.SaslException;
-
+import com.google.protobuf.MessageLite;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import org.apache.drill.common.config.DrillProperties;
import org.apache.drill.common.exceptions.DrillException;
import org.apache.drill.exec.exception.DrillbitStartupException;
@@ -65,17 +64,15 @@ import org.apache.hadoop.security.HadoopKerberosName;
import org.joda.time.DateTime;
import org.slf4j.Logger;
-import com.google.protobuf.MessageLite;
-
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
+import javax.net.ssl.SSLEngine;
+import javax.security.sasl.SaslException;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserServer.class);
@@ -197,7 +194,11 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
}
/**
- * {@link AbstractRemoteConnection} implementation for user connection. Also implements {@link UserClientConnection}.
+ * It represents a client connection accepted by Foreman Drillbit's UserServer from a DrillClient. This connection
+ * is used to get hold of {@link UserSession} which stores all session related information like session options
+ * changed over the lifetime of this connection. There is a 1:1 mapping between a BitToUserConnection and a
+ * UserSession. This connection object is also used to send query data and result back to the client submitted as part
+ * of the session tied to this connection.
*/
public class BitToUserConnection extends AbstractServerConnection<BitToUserConnection>
implements UserClientConnection {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
index e856ac58d..0798deabb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
@@ -24,6 +24,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.base.Strings;
@@ -119,14 +120,34 @@ public class UserSession implements AutoCloseable {
return this;
}
- public UserSession build() {
+ private boolean canApplyUserProperty() {
+ final StringBuilder sb = new StringBuilder();
if (userSession.properties.containsKey(DrillProperties.QUOTING_IDENTIFIERS)) {
- if (userSession.sessionOptions != null) {
+ sb.append(DrillProperties.QUOTING_IDENTIFIERS).append(",");
+ }
+
+ if (userSession.properties.containsKey(DrillProperties.QUERY_TAGS)) {
+ sb.append(DrillProperties.QUERY_TAGS);
+ }
+
+ if (userSession.sessionOptions == null && sb.length() > 0) {
+ logger.warn("User property {} can't be installed as a server option without the session option manager",
+ sb.toString());
+ return false;
+ }
+ return true;
+ }
+
+ public UserSession build() {
+ if (canApplyUserProperty()) {
+ if (userSession.properties.containsKey(DrillProperties.QUOTING_IDENTIFIERS)) {
userSession.setSessionOption(PlannerSettings.QUOTING_IDENTIFIERS_KEY,
- userSession.properties.getProperty(DrillProperties.QUOTING_IDENTIFIERS));
- } else {
- logger.warn("User property {} can't be installed as a server option without the session option manager",
- DrillProperties.QUOTING_IDENTIFIERS);
+ userSession.properties.getProperty(DrillProperties.QUOTING_IDENTIFIERS));
+ }
+
+ if (userSession.properties.containsKey(DrillProperties.QUERY_TAGS)) {
+ userSession.setSessionOption(ExecConstants.RM_QUERY_TAGS_KEY,
+ userSession.properties.getProperty(DrillProperties.QUERY_TAGS));
}
}
UserSession session = userSession;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 73a039b72..89061763b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -17,19 +17,6 @@
*/
package org.apache.drill.exec.server;
-import java.io.IOException;
-import java.nio.file.InvalidPathException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardWatchEventKinds;
-import java.nio.file.WatchEvent;
-import java.nio.file.WatchKey;
-import java.nio.file.WatchService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.tools.ToolProvider;
-
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.StackTrace;
import org.apache.drill.common.concurrent.ExtendedLatch;
@@ -43,6 +30,7 @@ import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle;
import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;
import org.apache.drill.exec.server.DrillbitStateManager.DrillbitState;
import org.apache.drill.exec.server.options.OptionDefinition;
import org.apache.drill.exec.server.options.OptionValue;
@@ -51,21 +39,31 @@ import org.apache.drill.exec.server.options.SystemOptionManager;
import org.apache.drill.exec.server.rest.WebServer;
import org.apache.drill.exec.service.ServiceEngine;
import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.sys.store.provider.CachingPersistentStoreProvider;
-import org.apache.drill.exec.store.sys.store.provider.InMemoryStoreProvider;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
+import org.apache.drill.exec.store.sys.store.provider.CachingPersistentStoreProvider;
+import org.apache.drill.exec.store.sys.store.provider.InMemoryStoreProvider;
import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider;
import org.apache.drill.exec.util.GuavaPatcher;
import org.apache.drill.exec.util.ProtobufPatcher;
import org.apache.drill.exec.work.WorkManager;
-import org.apache.zookeeper.Environment;
-
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.zookeeper.Environment;
import org.slf4j.bridge.SLF4JBridgeHandler;
+import javax.tools.ToolProvider;
+import java.io.IOException;
+import java.nio.file.InvalidPathException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* Starts, tracks and stops all the required services for a Drillbit daemon to work.
*/
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 030e1a346..9cd670e99 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -17,15 +17,6 @@
*/
package org.apache.drill.exec.server.options;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-
import org.apache.commons.collections.IteratorUtils;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.config.LogicalPlanPersistence;
@@ -39,10 +30,18 @@ import org.apache.drill.exec.store.sys.PersistentStoreConfig;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
import org.apache.drill.exec.store.sys.store.provider.InMemoryStoreProvider;
import org.apache.drill.exec.util.AssertionUtil;
-
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
/**
* <p> {@link OptionManager} that holds options within {@link org.apache.drill.exec.server.DrillbitContext}.
* Only one instance of this class exists per drillbit. Options set at the system level affect the entire system and
@@ -277,7 +276,10 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
new OptionDefinition(ExecConstants.HLL_ACCURACY_VALIDATOR),
new OptionDefinition(ExecConstants.DETERMINISTIC_SAMPLING_VALIDATOR),
new OptionDefinition(ExecConstants.NDV_BLOOM_FILTER_ELEMENTS_VALIDATOR),
- new OptionDefinition(ExecConstants.NDV_BLOOM_FILTER_FPOS_PROB_VALIDATOR)
+ new OptionDefinition(ExecConstants.NDV_BLOOM_FILTER_FPOS_PROB_VALIDATOR),
+ new OptionDefinition(ExecConstants.RM_QUERY_TAGS_VALIDATOR,
+ new OptionMetaData(OptionValue.AccessibleScopes.SESSION_AND_QUERY, false, false)),
+ new OptionDefinition(ExecConstants.RM_QUEUES_WAIT_FOR_PREFERRED_NODES_VALIDATOR)
};
CaseInsensitiveMap<OptionDefinition> map = Arrays.stream(definitions)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandlerService.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandlerService.java
index f5fa122fe..3551b6769 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandlerService.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandlerService.java
@@ -21,7 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
import org.apache.drill.shaded.guava.com.google.common.io.Resources;
import com.jasonclawson.jackson.dataformat.hocon.HoconFactory;
-import org.apache.drill.common.config.CommonConstants;
+import org.apache.drill.common.config.ConfigConstants;
import org.apache.drill.common.config.LogicalPlanPersistence;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.logical.StoragePluginConfig;
@@ -45,10 +45,10 @@ import static org.apache.drill.exec.store.StoragePluginRegistry.ACTION_ON_STORAG
/**
* Drill plugins handler, which allows to update storage plugins configs from the
- * {@link CommonConstants#STORAGE_PLUGINS_OVERRIDE_CONF} conf file
+ * {@link ConfigConstants#STORAGE_PLUGINS_OVERRIDE_CONF} conf file
*
* TODO: DRILL-6564: It can be improved with configs versioning and service of creating
- * {@link CommonConstants#STORAGE_PLUGINS_OVERRIDE_CONF}
+ * {@link ConfigConstants#STORAGE_PLUGINS_OVERRIDE_CONF}
*/
public class StoragePluginsHandlerService implements StoragePluginsHandler {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginsHandlerService.class);
@@ -124,17 +124,17 @@ public class StoragePluginsHandlerService implements StoragePluginsHandler {
}
/**
- * Get the new storage plugins from the {@link CommonConstants#STORAGE_PLUGINS_OVERRIDE_CONF} file if it exists,
+ * Get the new storage plugins from the {@link ConfigConstants#STORAGE_PLUGINS_OVERRIDE_CONF} file if it exists,
* null otherwise
*
* @return storage plugins
*/
private StoragePlugins getNewStoragePlugins() {
- Set<URL> urlSet = ClassPathScanner.forResource(CommonConstants.STORAGE_PLUGINS_OVERRIDE_CONF, false);
+ Set<URL> urlSet = ClassPathScanner.forResource(ConfigConstants.STORAGE_PLUGINS_OVERRIDE_CONF, false);
if (!urlSet.isEmpty()) {
if (urlSet.size() != 1) {
DrillRuntimeException.format("More than one %s file is placed in Drill's classpath: %s",
- CommonConstants.STORAGE_PLUGINS_OVERRIDE_CONF, urlSet);
+ ConfigConstants.STORAGE_PLUGINS_OVERRIDE_CONF, urlSet);
}
pluginsOverrideFileUrl = urlSet.iterator().next();
try {
@@ -142,11 +142,11 @@ public class StoragePluginsHandlerService implements StoragePluginsHandler {
return lpPersistence.getMapper().readValue(newPluginsData, StoragePlugins.class);
} catch (IOException e) {
logger.error("Failures are obtained while loading %s file. Proceed without update",
- CommonConstants.STORAGE_PLUGINS_OVERRIDE_CONF, e);
+ ConfigConstants.STORAGE_PLUGINS_OVERRIDE_CONF, e);
}
}
logger.trace("The {} file is absent. Proceed without updating of the storage plugins configs",
- CommonConstants.STORAGE_PLUGINS_OVERRIDE_CONF);
+ ConfigConstants.STORAGE_PLUGINS_OVERRIDE_CONF);
return null;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedResourceManager.java
new file mode 100644
index 000000000..af9ba00f4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedResourceManager.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.config.ResourcePoolTree;
+import org.apache.drill.exec.resourcemgr.config.ResourcePoolTreeImpl;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.work.foreman.Foreman;
+
+public class DistributedResourceManager implements ResourceManager {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedResourceManager.class);
+
+ private final ResourcePoolTree rmPoolTree;
+
+ private final DrillbitContext context;
+
+ private final DrillConfig rmConfig;
+
+ private final ResourceManager delegatedRM;
+
+ public DistributedResourceManager(DrillbitContext context) throws DrillRuntimeException {
+ try {
+ this.context = context;
+ this.rmConfig = DrillConfig.createForRM();
+ rmPoolTree = new ResourcePoolTreeImpl(rmConfig, DrillConfig.getMaxDirectMemory(),
+ Runtime.getRuntime().availableProcessors(), 1);
+ logger.debug("Successfully parsed RM config \n{}", rmConfig.getConfig(ResourcePoolTreeImpl.ROOT_POOL_CONFIG_KEY));
+ this.delegatedRM = new DefaultResourceManager();
+ } catch (RMConfigException ex) {
+ throw new DrillRuntimeException(String.format("Failed while parsing Drill RM Configs. Drillbit won't be started" +
+ " unless config is fixed or RM is disabled by setting %s to false", ExecConstants.RM_ENABLED), ex);
+ }
+ }
+ @Override
+ public long memoryPerNode() {
+ return delegatedRM.memoryPerNode();
+ }
+
+ @Override
+ public int cpusPerNode() {
+ return delegatedRM.cpusPerNode();
+ }
+
+ @Override
+ public QueryResourceAllocator newResourceAllocator(QueryContext queryContext) {
+ return delegatedRM.newResourceAllocator(queryContext);
+ }
+
+ @Override
+ public QueryResourceManager newQueryRM(Foreman foreman) {
+ return delegatedRM.newQueryRM(foreman);
+ }
+
+ public ResourcePoolTree getRmPoolTree() {
+ return rmPoolTree;
+ }
+
+ @Override
+ public void close() {
+ delegatedRM.close();
+ }
+}