aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSorabh Hamirwasia <shamirwasia@maprtech.com>2019-02-07 13:54:14 -0800
committerkarthik <kmanivannan@maprtech.com>2019-03-08 13:43:37 -0800
commit362cded3fbcb99c0c4f951bb97016cb2b0542f94 (patch)
treecc16b473832dd4c24a3d614a2dc0d6d5297ffb28
parent75ca5238e627e8d8612d56f654da06992837f6af (diff)
DRILL-7046: Support for loading and parsing new RM config file
closes #1652
-rw-r--r--common/src/main/java/org/apache/drill/common/config/ConfigConstants.java (renamed from common/src/main/java/org/apache/drill/common/config/CommonConstants.java)24
-rw-r--r--common/src/main/java/org/apache/drill/common/config/ConfigFileInfo.java33
-rw-r--r--common/src/main/java/org/apache/drill/common/config/DrillConfig.java87
-rw-r--r--common/src/main/java/org/apache/drill/common/config/DrillExecConfigFileInfo.java41
-rw-r--r--common/src/main/java/org/apache/drill/common/config/DrillProperties.java7
-rw-r--r--common/src/main/java/org/apache/drill/common/config/DrillRMConfigFileInfo.java41
-rw-r--r--common/src/main/java/org/apache/drill/common/scanner/BuildTimeScan.java3
-rw-r--r--common/src/main/java/org/apache/drill/common/scanner/ClassPathScanner.java9
-rw-r--r--common/src/main/java/org/apache/drill/common/scanner/RunTimeScan.java5
-rw-r--r--common/src/test/java/org/apache/drill/categories/ResourceManagerTest.java21
-rw-r--r--drill-yarn/src/main/java/org/apache/drill/yarn/core/DrillOnYarnConfig.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/NodeResources.java66
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfig.java71
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfigImpl.java171
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueueAssignmentResult.java79
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/RMCommonDefaults.java39
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePool.java70
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolImpl.java268
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTree.java45
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTreeImpl.java164
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/exception/QueueSelectionException.java31
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/exception/RMConfigException.java31
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/package-info.java108
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/AbstractQueueSelectionPolicy.java41
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/BestFitQueueSelection.java93
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/DefaultQueueSelection.java52
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/QueueSelectionPolicy.java48
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/QueueSelectionPolicyFactory.java53
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/RandomQueueSelection.java51
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/package-info.java45
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AbstractResourcePoolSelector.java40
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AclSelector.java285
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AndSelector.java56
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ComplexSelectors.java60
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/DefaultSelector.java48
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/NotEqualSelector.java62
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/OrSelector.java58
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ResourcePoolSelector.java58
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ResourcePoolSelectorFactory.java63
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/TagSelector.java76
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/package-info.java59
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/package-info.java21
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/util/MemoryConfigParser.java70
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java47
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java33
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java34
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java24
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandlerService.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedResourceManager.java82
-rw-r--r--exec/java-exec/src/main/resources/drill-module.conf8
-rw-r--r--exec/java-exec/src/main/resources/drill-rm-default.conf34
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/TestRMConfigLoad.java117
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/TestResourcePoolTree.java277
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/TestBestFitSelectionPolicy.java122
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/TestDefaultSelectionPolicy.java103
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestAclSelector.java265
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestComplexSelectors.java247
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestNotEqualSelector.java120
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestResourcePoolSelectors.java46
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestTagSelector.java84
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java4
63 files changed, 4316 insertions, 125 deletions
diff --git a/common/src/main/java/org/apache/drill/common/config/CommonConstants.java b/common/src/main/java/org/apache/drill/common/config/ConfigConstants.java
index e203972b8..3283fe07f 100644
--- a/common/src/main/java/org/apache/drill/common/config/CommonConstants.java
+++ b/common/src/main/java/org/apache/drill/common/config/ConfigConstants.java
@@ -17,21 +17,33 @@
*/
package org.apache.drill.common.config;
-public interface CommonConstants {
+public final class ConfigConstants {
/** Default (base) configuration file name. (Classpath resource pathname.) */
- String CONFIG_DEFAULT_RESOURCE_PATHNAME = "drill-default.conf";
+ public static final String CONFIG_DEFAULT_RESOURCE_PATHNAME = "drill-default.conf";
/** Module configuration files name. (Classpath resource pathname.) */
- String DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME = "drill-module.conf";
+ public static final String DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME = "drill-module.conf";
/** Distribution Specific Override configuration file name. (Classpath resource pathname.) */
- String CONFIG_DISTRIBUTION_RESOURCE_PATHNAME = "drill-distrib.conf";
+ public static final String CONFIG_DISTRIBUTION_RESOURCE_PATHNAME = "drill-distrib.conf";
/** Override configuration file name. (Classpath resource pathname.) */
- String CONFIG_OVERRIDE_RESOURCE_PATHNAME = "drill-override.conf";
+ public static final String CONFIG_OVERRIDE_RESOURCE_PATHNAME = "drill-override.conf";
/** Override plugins configs file name. (Classpath resource pathname.) */
- String STORAGE_PLUGINS_OVERRIDE_CONF = "storage-plugins-override.conf";
+ public static final String STORAGE_PLUGINS_OVERRIDE_CONF = "storage-plugins-override.conf";
+ /** Default RM configuration file name. (Classpath resource pathname.) */
+ public static final String RM_CONFIG_DEFAULT_RESOURCE_PATHNAME = "drill-rm-default.conf";
+
+ /** Distribution Specific RM Override configuration file name. (Classpath resource pathname.) */
+ public static final String RM_CONFIG_DISTRIBUTION_RESOURCE_PATHNAME = "drill-rm-distrib.conf";
+
+ /** RM Override configuration file name. (Classpath resource pathname.) */
+ public static final String RM_CONFIG_OVERRIDE_RESOURCE_PATHNAME = "drill-rm-override.conf";
+
+ // suppress default constructor
+ private ConfigConstants() {
+ }
}
diff --git a/common/src/main/java/org/apache/drill/common/config/ConfigFileInfo.java b/common/src/main/java/org/apache/drill/common/config/ConfigFileInfo.java
new file mode 100644
index 000000000..1f0fe971b
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/config/ConfigFileInfo.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.config;
+
+/**
+ * Interface that defines implementation to get all the config files names for default, module specific, distribution
+ * specific and override files.
+ */
+public interface ConfigFileInfo {
+
+ String getDefaultFileName();
+
+ String getModuleFileName();
+
+ String getDistributionFileName();
+
+ String getOverrideFileName();
+}
diff --git a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
index d7d7340a5..6fa09e5a4 100644
--- a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
+++ b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java
@@ -17,30 +17,30 @@
*/
package org.apache.drill.common.config;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigRenderOptions;
+import io.netty.util.internal.PlatformDependent;
+import org.apache.drill.common.exceptions.DrillConfigurationException;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.scanner.ClassPathScanner;
+import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
+import org.reflections.util.ClasspathHelper;
+
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.lang.reflect.Constructor;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
-import io.netty.util.internal.PlatformDependent;
-import org.apache.drill.common.exceptions.DrillConfigurationException;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.scanner.ClassPathScanner;
-import org.reflections.util.ClasspathHelper;
-
-import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
-import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigRenderOptions;
-
public class DrillConfig extends NestedConfig {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillConfig.class);
@@ -139,7 +139,7 @@ public class DrillConfig extends NestedConfig {
* @param overrideFileResourcePathname
* the classpath resource pathname of the file to use for
* configuration override purposes; {@code null} specifies to use the
- * default pathname ({@link CommonConstants#CONFIG_OVERRIDE_RESOURCE_PATHNAME}) (does
+ * default pathname ({@link ConfigConstants#CONFIG_OVERRIDE_RESOURCE_PATHNAME}) (does
* <strong>not</strong> specify to suppress trying to load an
* overrides file)
* @return A merged Config object.
@@ -153,7 +153,7 @@ public class DrillConfig extends NestedConfig {
*/
@VisibleForTesting
public static DrillConfig create(Properties testConfigurations) {
- return create(null, testConfigurations, true);
+ return create(null, testConfigurations, true, new DrillExecConfigFileInfo());
}
/**
@@ -161,7 +161,27 @@ public class DrillConfig extends NestedConfig {
* see {@link #create(String)}'s {@code overrideFileResourcePathname}
*/
public static DrillConfig create(String overrideFileResourcePathname, boolean enableServerConfigs) {
- return create(overrideFileResourcePathname, null, enableServerConfigs);
+ return create(overrideFileResourcePathname, null, enableServerConfigs, new DrillExecConfigFileInfo());
+ }
+
+ /**
+ * Merged DrillConfig object for all the RM Configurations provided through various resource files. The order of
+ * precedence is as follows:
+ * <p>
+ * Configuration values are retrieved as follows:
+ * <ul>
+ * <li>Check a single copy of "drill-rm-override.conf". If multiple copies are
+ * on the classpath, which copy is read is indeterminate.</li>
+ * <li>Check a single copy of "drill-rm-distrib.conf". If multiple copies are
+ * on the classpath, which copy is read is indeterminate. </li>
+ * <li>Check a single copy of "{@code drill-rm-default.conf}". If multiple
+ * copies are on the classpath, which copy is read is indeterminate.</li>
+ * </ul>
+ * </p>
+ * @return A merged Config object.
+ */
+ public static DrillConfig createForRM() {
+ return create(null, null, true, new DrillRMConfigFileInfo());
}
/**
@@ -181,35 +201,37 @@ public class DrillConfig extends NestedConfig {
* is assimilated
* @param enableServerConfigs
* whether to enable server-specific configuration options
- * @return
+ * @param configInfo
+ * see {@link ConfigFileInfo}
+ * @return {@link DrillConfig} object with all configs from passed in resource files
*/
private static DrillConfig create(String overrideFileResourcePathname,
final Properties overriderProps,
- final boolean enableServerConfigs) {
+ final boolean enableServerConfigs,
+ ConfigFileInfo configInfo) {
final StringBuilder logString = new StringBuilder();
final Stopwatch watch = Stopwatch.createStarted();
- overrideFileResourcePathname =
- overrideFileResourcePathname == null
- ? CommonConstants.CONFIG_OVERRIDE_RESOURCE_PATHNAME
- : overrideFileResourcePathname;
+ overrideFileResourcePathname = overrideFileResourcePathname == null ?
+ configInfo.getOverrideFileName() : overrideFileResourcePathname;
// 1. Load defaults configuration file.
- Config fallback = null;
+ Config fallback = ConfigFactory.empty();
final ClassLoader[] classLoaders = ClasspathHelper.classLoaders();
for (ClassLoader classLoader : classLoaders) {
final URL url =
- classLoader.getResource(CommonConstants.CONFIG_DEFAULT_RESOURCE_PATHNAME);
+ classLoader.getResource(configInfo.getDefaultFileName());
if (null != url) {
logString.append("Base Configuration:\n\t- ").append(url).append("\n");
fallback =
- ConfigFactory.load(classLoader,
- CommonConstants.CONFIG_DEFAULT_RESOURCE_PATHNAME);
+ ConfigFactory.load(classLoader, configInfo.getDefaultFileName());
break;
}
}
// 2. Load per-module configuration files.
- final Collection<URL> urls = ClassPathScanner.getConfigURLs();
+ final String perModuleResourcePathName = configInfo.getModuleFileName();
+ final Collection<URL> urls = (perModuleResourcePathName != null) ?
+ ClassPathScanner.getConfigURLs(perModuleResourcePathName) : new ArrayList<>();
logString.append("\nIntermediate Configuration and Plugin files, in order of precedence:\n");
for (URL url : urls) {
logString.append("\t- ").append(url).append("\n");
@@ -220,12 +242,12 @@ public class DrillConfig extends NestedConfig {
final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
// 3. Load distribution specific configuration file.
- final URL distribConfigFileUrl = classLoader.getResource(CommonConstants.CONFIG_DISTRIBUTION_RESOURCE_PATHNAME);
+ final URL distribConfigFileUrl = classLoader.getResource(configInfo.getDistributionFileName());
if (null != distribConfigFileUrl ) {
logString.append("Distribution Specific Configuration File: ").append(distribConfigFileUrl).append("\n");
}
fallback =
- ConfigFactory.load(CommonConstants.CONFIG_DISTRIBUTION_RESOURCE_PATHNAME).withFallback(fallback);
+ ConfigFactory.load(configInfo.getDistributionFileName()).withFallback(fallback);
// 4. Load any specified overrides configuration file along with any
// overrides from JVM system properties (e.g., {-Dname=value").
@@ -258,7 +280,7 @@ public class DrillConfig extends NestedConfig {
return new DrillConfig(effectiveConfig.resolve());
}
- public <T> Class<T> getClassAt(String location, Class<T> clazz) throws DrillConfigurationException {
+ private <T> Class<T> getClassAt(String location, Class<T> clazz) throws DrillConfigurationException {
final String className = getString(location);
if (className == null) {
throw new DrillConfigurationException(String.format(
@@ -286,8 +308,7 @@ public class DrillConfig extends NestedConfig {
public <T> T getInstanceOf(String location, Class<T> clazz) throws DrillConfigurationException{
final Class<T> c = getClassAt(location, clazz);
try {
- final T t = c.newInstance();
- return t;
+ return c.newInstance();
} catch (Exception ex) {
throw new DrillConfigurationException(String.format("Failure while instantiating class [%s] located at '%s.", clazz.getCanonicalName(), location), ex);
}
diff --git a/common/src/main/java/org/apache/drill/common/config/DrillExecConfigFileInfo.java b/common/src/main/java/org/apache/drill/common/config/DrillExecConfigFileInfo.java
new file mode 100644
index 000000000..c95f38b70
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/config/DrillExecConfigFileInfo.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.config;
+
+public class DrillExecConfigFileInfo implements ConfigFileInfo {
+
+ @Override
+ public String getDefaultFileName() {
+ return ConfigConstants.CONFIG_DEFAULT_RESOURCE_PATHNAME;
+ }
+
+ @Override
+ public String getModuleFileName() {
+ return ConfigConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME;
+ }
+
+ @Override
+ public String getDistributionFileName() {
+ return ConfigConstants.CONFIG_DISTRIBUTION_RESOURCE_PATHNAME;
+ }
+
+ @Override
+ public String getOverrideFileName() {
+ return ConfigConstants.CONFIG_OVERRIDE_RESOURCE_PATHNAME;
+ }
+}
diff --git a/common/src/main/java/org/apache/drill/common/config/DrillProperties.java b/common/src/main/java/org/apache/drill/common/config/DrillProperties.java
index f7688f569..87b489fbd 100644
--- a/common/src/main/java/org/apache/drill/common/config/DrillProperties.java
+++ b/common/src/main/java/org/apache/drill/common/config/DrillProperties.java
@@ -83,6 +83,8 @@ public final class DrillProperties extends Properties {
public static final String TLS_PROVIDER = "TLSProvider";
public static final String USE_SYSTEM_TRUSTSTORE = "useSystemTrustStore";
+ public static final String QUERY_TAGS = "queryTags";
+
// Although all properties from the application are sent to the server (from the client), the following
// sets of properties are used by the client and server respectively. These are reserved words.
@@ -94,14 +96,15 @@ public final class DrillProperties extends Properties {
SERVICE_PRINCIPAL, SERVICE_NAME, SERVICE_HOST, REALM, KEYTAB, KERBEROS_FROM_SUBJECT,
ENABLE_TLS, TLS_PROTOCOL, TRUSTSTORE_TYPE, TRUSTSTORE_PATH, TRUSTSTORE_PASSWORD,
DISABLE_HOST_VERIFICATION, DISABLE_CERT_VERIFICATION, TLS_HANDSHAKE_TIMEOUT, TLS_PROVIDER,
- USE_SYSTEM_TRUSTSTORE
+ USE_SYSTEM_TRUSTSTORE, QUERY_TAGS
);
public static final ImmutableSet<String> ACCEPTED_BY_SERVER = ImmutableSet.of(
USER /** deprecated */, PASSWORD /** deprecated */,
SCHEMA,
IMPERSONATION_TARGET,
- QUOTING_IDENTIFIERS
+ QUOTING_IDENTIFIERS,
+ QUERY_TAGS
);
private DrillProperties() {
diff --git a/common/src/main/java/org/apache/drill/common/config/DrillRMConfigFileInfo.java b/common/src/main/java/org/apache/drill/common/config/DrillRMConfigFileInfo.java
new file mode 100644
index 000000000..1db06ff7d
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/config/DrillRMConfigFileInfo.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.config;
+
+public class DrillRMConfigFileInfo implements ConfigFileInfo {
+
+ @Override
+ public String getDefaultFileName() {
+ return ConfigConstants.RM_CONFIG_DEFAULT_RESOURCE_PATHNAME;
+ }
+
+ @Override
+ public String getModuleFileName() {
+ return null;
+ }
+
+ @Override
+ public String getDistributionFileName() {
+ return ConfigConstants.RM_CONFIG_DISTRIBUTION_RESOURCE_PATHNAME;
+ }
+
+ @Override
+ public String getOverrideFileName() {
+ return ConfigConstants.RM_CONFIG_OVERRIDE_RESOURCE_PATHNAME;
+ }
+}
diff --git a/common/src/main/java/org/apache/drill/common/scanner/BuildTimeScan.java b/common/src/main/java/org/apache/drill/common/scanner/BuildTimeScan.java
index a8b65cc91..1e52d4fc6 100644
--- a/common/src/main/java/org/apache/drill/common/scanner/BuildTimeScan.java
+++ b/common/src/main/java/org/apache/drill/common/scanner/BuildTimeScan.java
@@ -28,6 +28,7 @@ import java.net.URL;
import java.util.List;
import java.util.Set;
+import org.apache.drill.common.config.ConfigConstants;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.scanner.persistence.ScanResult;
@@ -136,7 +137,7 @@ public class BuildTimeScan {
basePath = "/" + basePath;
}
URL url = new URL("file:" + basePath);
- Set<URL> markedPaths = ClassPathScanner.getMarkedPaths();
+ Set<URL> markedPaths = ClassPathScanner.getMarkedPaths(ConfigConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME);
if (!markedPaths.contains(url)) {
throw new IllegalArgumentException(url + " not in " + markedPaths);
}
diff --git a/common/src/main/java/org/apache/drill/common/scanner/ClassPathScanner.java b/common/src/main/java/org/apache/drill/common/scanner/ClassPathScanner.java
index 353f3af6c..eeec2d69e 100644
--- a/common/src/main/java/org/apache/drill/common/scanner/ClassPathScanner.java
+++ b/common/src/main/java/org/apache/drill/common/scanner/ClassPathScanner.java
@@ -32,7 +32,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import org.apache.drill.common.config.CommonConstants;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.scanner.persistence.AnnotationDescriptor;
@@ -304,12 +303,12 @@ public final class ClassPathScanner {
/**
* @return paths that have a drill config file in them
*/
- static Set<URL> getMarkedPaths() {
- return forResource(CommonConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME, true);
+ static Set<URL> getMarkedPaths(String resourcePathName) {
+ return forResource(resourcePathName, true);
}
- public static Collection<URL> getConfigURLs() {
- return forResource(CommonConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME, false);
+ public static Collection<URL> getConfigURLs(String resourcePathName) {
+ return forResource(resourcePathName, false);
}
/**
diff --git a/common/src/main/java/org/apache/drill/common/scanner/RunTimeScan.java b/common/src/main/java/org/apache/drill/common/scanner/RunTimeScan.java
index 70b9939f9..cc86c6792 100644
--- a/common/src/main/java/org/apache/drill/common/scanner/RunTimeScan.java
+++ b/common/src/main/java/org/apache/drill/common/scanner/RunTimeScan.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Set;
+import org.apache.drill.common.config.ConfigConstants;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.scanner.persistence.ScanResult;
@@ -42,7 +43,7 @@ public class RunTimeScan {
* @return getMarkedPaths() sans getPrescannedPaths()
*/
static Collection<URL> getNonPrescannedMarkedPaths() {
- Collection<URL> markedPaths = ClassPathScanner.getMarkedPaths();
+ Collection<URL> markedPaths = ClassPathScanner.getMarkedPaths(ConfigConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME);
markedPaths.removeAll(BuildTimeScan.getPrescannedPaths());
return markedPaths;
}
@@ -69,7 +70,7 @@ public class RunTimeScan {
} else {
// scan everything
return ClassPathScanner.scan(
- ClassPathScanner.getMarkedPaths(),
+ ClassPathScanner.getMarkedPaths(ConfigConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME),
packagePrefixes,
scannedBaseClasses,
scannedAnnotations,
diff --git a/common/src/test/java/org/apache/drill/categories/ResourceManagerTest.java b/common/src/test/java/org/apache/drill/categories/ResourceManagerTest.java
new file mode 100644
index 000000000..fc05e8be0
--- /dev/null
+++ b/common/src/test/java/org/apache/drill/categories/ResourceManagerTest.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.categories;
+
+public interface ResourceManagerTest {
+}
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/DrillOnYarnConfig.java b/drill-yarn/src/main/java/org/apache/drill/yarn/core/DrillOnYarnConfig.java
index 38ecd1c0e..cfc46053c 100644
--- a/drill-yarn/src/main/java/org/apache/drill/yarn/core/DrillOnYarnConfig.java
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/DrillOnYarnConfig.java
@@ -25,7 +25,7 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
-import org.apache.drill.common.config.CommonConstants;
+import org.apache.drill.common.config.ConfigConstants;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.scanner.ClassPathScanner;
import org.apache.drill.common.scanner.persistence.ScanResult;
@@ -283,7 +283,7 @@ public class DrillOnYarnConfig {
private static Config loadDrillConfig() {
drillConfig = DrillConfig
- .create(CommonConstants.CONFIG_OVERRIDE_RESOURCE_PATHNAME);
+ .create(ConfigConstants.CONFIG_OVERRIDE_RESOURCE_PATHNAME);
return drillConfig.resolve();
}
@@ -394,10 +394,10 @@ public class DrillOnYarnConfig {
classLoader = DrillOnYarnConfig.class.getClassLoader();
}
- URL url = classLoader.getResource(CommonConstants.CONFIG_OVERRIDE_RESOURCE_PATHNAME);
+ URL url = classLoader.getResource(ConfigConstants.CONFIG_OVERRIDE_RESOURCE_PATHNAME);
if (url == null) {
throw new DoyConfigException(
- "Drill configuration file is missing: " + CommonConstants.CONFIG_OVERRIDE_RESOURCE_PATHNAME);
+ "Drill configuration file is missing: " + ConfigConstants.CONFIG_OVERRIDE_RESOURCE_PATHNAME);
}
File confFile;
try {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 6fce8b707..93adda17c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -666,6 +666,7 @@ public final class ExecConstants {
public static final String ORDERED_MUX_EXCHANGE = "planner.enable_ordered_mux_exchange";
// Resource management boot-time options.
+ public static final String RM_ENABLED = "drill.exec.rm.enabled";
public static final String MAX_MEMORY_PER_NODE = "drill.exec.rm.memory_per_node";
public static final String MAX_CPUS_PER_NODE = "drill.exec.rm.cpus_per_node";
@@ -685,6 +686,16 @@ public final class ExecConstants {
public static final LongValidator QUEUE_TIMEOUT = new PositiveLongValidator("exec.queue.timeout_millis", Long.MAX_VALUE,
new OptionDescription("Indicates how long a query can wait in queue before the query fails. Range: 0-9223372036854775807"));
+ // New Smart RM boot time configs
+ public static final String RM_QUERY_TAGS_KEY = "exec.rm.queryTags";
+ public static final StringValidator RM_QUERY_TAGS_VALIDATOR = new StringValidator(RM_QUERY_TAGS_KEY,
+ new OptionDescription("Allows user to set coma separated list of tags for all the queries submitted over a session"));
+
+ public static final String RM_QUEUES_WAIT_FOR_PREFERRED_NODES_KEY = "exec.rm.queues.wait_for_preferred_nodes";
+ public static final BooleanValidator RM_QUEUES_WAIT_FOR_PREFERRED_NODES_VALIDATOR = new BooleanValidator
+ (RM_QUEUES_WAIT_FOR_PREFERRED_NODES_KEY, new OptionDescription("Allows user to enable/disable " +
+ "wait_for_preferred_nodes configuration across rm queues for all the queries submitted over a session"));
+
// Ratio of memory for small queries vs. large queries.
// Each small query gets 1 unit, each large query gets QUEUE_MEMORY_RATIO units.
// A lower limit of 1 enforces the intuition that a large query should never get
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
index 137969a34..143560529 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
@@ -39,7 +39,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
import org.apache.drill.shaded.guava.com.google.common.io.Files;
import com.typesafe.config.ConfigFactory;
import org.apache.commons.io.FileUtils;
-import org.apache.drill.common.config.CommonConstants;
+import org.apache.drill.common.config.ConfigConstants;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.FunctionCall;
@@ -445,7 +445,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
*/
private ScanResult scan(ClassLoader classLoader, Path path, URL[] urls) throws IOException {
Enumeration<URL> markerFileEnumeration = classLoader.getResources(
- CommonConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME);
+ ConfigConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME);
while (markerFileEnumeration.hasMoreElements()) {
URL markerFile = markerFileEnumeration.nextElement();
if (markerFile.getPath().contains(path.toUri().getPath())) {
@@ -462,7 +462,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
}
}
throw new JarValidationException(String.format("Marker file %s is missing in %s",
- CommonConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME, path.getName()));
+ ConfigConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME, path.getName()));
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/NodeResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/NodeResources.java
new file mode 100644
index 000000000..485702c8f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/NodeResources.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr;
+
+/**
+ * Provides resources for a node in cluster. Currently it is used to support only 2 kind of resources:
+ * <ul>
+ * <li>Memory</li>
+ * <li>Virtual CPU count</li>
+ * </ul>
+ * It also has a version field to support extensibility in future to add other resources like network, disk, etc
+ */
+public class NodeResources {
+
+ private final int version;
+
+ private final long memoryInBytes;
+
+ private final int numVirtualCpu;
+
+ public NodeResources(long memoryInBytes, int numVirtualCpu) {
+ this.memoryInBytes = memoryInBytes;
+ this.numVirtualCpu = numVirtualCpu;
+ this.version = 1;
+ }
+
+ public NodeResources(long memoryInBytes, int numPhysicalCpu, int vFactor) {
+ this(memoryInBytes, numPhysicalCpu * vFactor);
+ }
+
+ public long getMemoryInBytes() {
+ return memoryInBytes;
+ }
+
+ public long getMemoryInMB() {
+ return Math.round((memoryInBytes / 1024L) / 1024L);
+ }
+
+ public long getMemoryInGB() {
+ return Math.round(getMemoryInMB() / 1024L);
+ }
+
+ public int getNumVirtualCpu() {
+ return numVirtualCpu;
+ }
+
+ @Override
+ public String toString() {
+ return "{ Version: " + version + ", MemoryInBytes: " + memoryInBytes + ", VirtualCPU: " + numVirtualCpu + " }";
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfig.java
new file mode 100644
index 000000000..db79ec573
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfig.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config;
+
+/**
+ * Interface which defines an implementation for managing queue configuration of a leaf {@link ResourcePool}
+ */
+public interface QueryQueueConfig {
+ String getQueueId();
+
+ String getQueueName();
+
+ /**
+ * Given number of available nodes in the cluster what is the total memory resource share of this queue cluster wide
+ * @param numClusterNodes number of available cluster nodes
+ * @return Queue's total cluster memory resource share
+ */
+ long getQueueTotalMemoryInMB(int numClusterNodes);
+
+ /**
+ * @return Maximum query memory (in MB) that a query in this queue can consume on a node
+ */
+ long getMaxQueryMemoryInMBPerNode();
+
+ /**
+ * Given number of available nodes in the cluster what is the max memory in MB a query in this queue can be assigned
+ * cluster wide
+ * @param numClusterNodes number of available cluster nodes
+ * @return Maximum query memory (in MB) in this queue
+ */
+ long getMaxQueryTotalMemoryInMB(int numClusterNodes);
+
+ /**
+ * Determines if admitted queries in this queue should wait in the queue if resources on the preferred assigned
+ * nodes of a query determined by planner is unavailable.
+ * @return <tt>true</tt> indicates an admitted query to wait until resources on all the preferred nodes are available or
+ * wait until timeout is reached, <tt>false</tt> indicates an admitted query to not wait and find other nodes with
+ * available resources if resources on a preferred node is unavailable.
+ */
+ boolean waitForPreferredNodes();
+
+ /**
+ * @return Maximum number of queries that can be admitted in the queue
+ */
+ int getMaxAdmissibleQueries();
+
+ /**
+ * @return Maximum number of queries that will be allowed to wait in the queue before failing a query right away
+ */
+ int getMaxWaitingQueries();
+
+ /**
+ * @return Maximum time in milliseconds for which a query can be in waiting state inside a queue
+ */
+ int getWaitTimeoutInMs();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfigImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfigImpl.java
new file mode 100644
index 000000000..948d1b943
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfigImpl.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config;
+
+import com.typesafe.config.Config;
+import org.apache.drill.exec.resourcemgr.NodeResources;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+import org.apache.drill.exec.resourcemgr.util.MemoryConfigParser;
+
+import java.util.UUID;
+
+import static org.apache.drill.exec.resourcemgr.config.RMCommonDefaults.MAX_ADMISSIBLE_QUERY_COUNT;
+import static org.apache.drill.exec.resourcemgr.config.RMCommonDefaults.MAX_WAITING_QUERY_COUNT;
+
+/**
+ * Parses and initialize QueueConfiguration for a {@link ResourcePool}. It also generates a unique UUID for each queue
+ * which will be later used by Drillbit to store in Zookeeper along with
+ * {@link org.apache.drill.exec.proto.beans.DrillbitEndpoint}. This UUID is used in the leader election mechanism in
+ * which Drillbit participates for each of the configured rm queue.
+ */
+public class QueryQueueConfigImpl implements QueryQueueConfig {
+
+ // Optional queue configurations
+ private static final String MAX_ADMISSIBLE_KEY = "max_admissible";
+
+ private static final String MAX_WAITING_KEY = "max_waiting";
+
+ private static final String MAX_WAIT_TIMEOUT_KEY = "max_wait_timeout";
+
+ private static final String WAIT_FOR_PREFERRED_NODES_KEY = "wait_for_preferred_nodes";
+
+ private static final String MAX_QUERY_MEMORY_PER_NODE_FORMAT = "([0-9]+)\\s*([kKmMgG]?)\\s*$";
+
+ // Required queue configurations in MAX_QUERY_MEMORY_PER_NODE_FORMAT pattern
+ private static final String MAX_QUERY_MEMORY_PER_NODE_KEY = "max_query_memory_per_node";
+
+ private final String queueUUID;
+
+ private final String queueName;
+
+ private int maxAdmissibleQuery;
+
+ private int maxWaitingQuery;
+
+ private int maxWaitingTimeout;
+
+ private boolean waitForPreferredNodes;
+
+ private final NodeResources queueResourceShare;
+
+ private NodeResources queryPerNodeResourceShare;
+
+ public QueryQueueConfigImpl(Config queueConfig, String poolName,
+ NodeResources queueNodeResource) throws RMConfigException {
+ this.queueUUID = UUID.randomUUID().toString();
+ this.queueName = poolName;
+ this.queueResourceShare = queueNodeResource;
+ parseQueueConfig(queueConfig);
+ }
+
+ /**
+ * Assigns either supplied or default values for the optional queue configuration. For required configuration uses
+ * the supplied value in queueConfig object or throws proper exception if not present
+ * @param queueConfig Config object for ResourcePool queue
+ * @throws RMConfigException in case of error while parsing config
+ */
+ private void parseQueueConfig(Config queueConfig) throws RMConfigException {
+ this.maxAdmissibleQuery = queueConfig.hasPath(MAX_ADMISSIBLE_KEY) ?
+ queueConfig.getInt(MAX_ADMISSIBLE_KEY) : MAX_ADMISSIBLE_QUERY_COUNT;
+ this.maxWaitingQuery = queueConfig.hasPath(MAX_WAITING_KEY) ?
+ queueConfig.getInt(MAX_WAITING_KEY) : MAX_WAITING_QUERY_COUNT;
+ this.maxWaitingTimeout = queueConfig.hasPath(MAX_WAIT_TIMEOUT_KEY) ?
+ queueConfig.getInt(MAX_WAIT_TIMEOUT_KEY) : RMCommonDefaults.MAX_WAIT_TIMEOUT_IN_MS;
+ this.waitForPreferredNodes = queueConfig.hasPath(WAIT_FOR_PREFERRED_NODES_KEY) ?
+ queueConfig.getBoolean(WAIT_FOR_PREFERRED_NODES_KEY) : RMCommonDefaults.WAIT_FOR_PREFERRED_NODES;
+ this.queryPerNodeResourceShare = parseAndGetNodeShare(queueConfig);
+ }
+
+ @Override
+ public String getQueueId() {
+ return queueUUID;
+ }
+
+ @Override
+ public String getQueueName() {
+ return queueName;
+ }
+
+ /**
+ * Total memory share of this queue in the cluster based on per node resource share. It assumes that the cluster is
+ * made up of homogeneous nodes in terms of resources
+ * @param numClusterNodes total number of available cluster nodes which can participate in this queue
+ * @return queue memory share in MB
+ */
+ @Override
+ public long getQueueTotalMemoryInMB(int numClusterNodes) {
+ return queueResourceShare.getMemoryInMB() * numClusterNodes;
+ }
+
+ @Override
+ public long getMaxQueryMemoryInMBPerNode() {
+ return queryPerNodeResourceShare.getMemoryInMB();
+ }
+
+ @Override
+ public long getMaxQueryTotalMemoryInMB(int numClusterNodes) {
+ return queryPerNodeResourceShare.getMemoryInMB() * numClusterNodes;
+ }
+
+ @Override
+ public boolean waitForPreferredNodes() {
+ return waitForPreferredNodes;
+ }
+
+ @Override
+ public int getMaxAdmissibleQueries() {
+ return maxAdmissibleQuery;
+ }
+
+ @Override
+ public int getMaxWaitingQueries() {
+ return maxWaitingQuery;
+ }
+
+ @Override
+ public int getWaitTimeoutInMs() {
+ return maxWaitingTimeout;
+ }
+
+ /**
+ * Parses queues {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY} configuration using
+ * {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_FORMAT} pattern to get memory value in bytes using
+ * {@link MemoryConfigParser#parseMemoryConfigString(String, String)} utility.
+ * @param queueConfig Queue Configuration object
+ * @return NodeResources created with value of {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY} in bytes
+ * @throws RMConfigException in case the config value is absent or doesn't follow the specified format
+ */
+ private NodeResources parseAndGetNodeShare(Config queueConfig) throws RMConfigException {
+ try {
+ long memoryPerNodeInBytes = MemoryConfigParser.parseMemoryConfigString(
+ queueConfig.getString(MAX_QUERY_MEMORY_PER_NODE_KEY), MAX_QUERY_MEMORY_PER_NODE_FORMAT);
+ return new NodeResources(memoryPerNodeInBytes, Integer.MAX_VALUE);
+ } catch (Exception ex) {
+ throw new RMConfigException(String.format("Failed while parsing %s for queue %s", MAX_QUERY_MEMORY_PER_NODE_KEY,
+ queueName), ex);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "{ QueueName: " + queueName + ", QueueId: " + queueUUID + ", QueuePerNodeResource(MB): " +
+ queryPerNodeResourceShare.toString() + ", MaxQueryMemPerNode(MB): " + queryPerNodeResourceShare.toString() +
+ ", MaxAdmissible: " + maxAdmissibleQuery + ", MaxWaiting: " + maxWaitingQuery + ", MaxWaitTimeout: " +
+ maxWaitingTimeout + ", WaitForPreferredNodes: " + waitForPreferredNodes + "}";
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueueAssignmentResult.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueueAssignmentResult.java
new file mode 100644
index 000000000..5de2e34fe
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueueAssignmentResult.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Used to keep track of selected leaf and all rejected {@link ResourcePool} for the provided query.
+ * It is used by {@link ResourcePoolImpl#visitAndSelectPool(QueueAssignmentResult, QueryContext)} to store
+ * information about all the matching and non-matching ResourcePools for a query when ResourcePool selector is
+ * evaluated against query metadata. Later it is used by
+ * {@link ResourcePool#visitAndSelectPool(QueueAssignmentResult, QueryContext)} to apply
+ * {@link org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicy} to select only one queue out of all
+ * the selected queues for a query. It also provides an API to dump all the debug information to know which pools were
+ * selected and rejected for a query.
+ */
+public class QueueAssignmentResult {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueueAssignmentResult.class);
+
+ private final List<ResourcePool> selectedLeafPools = new ArrayList<>();
+
+ private final List<ResourcePool> rejectedPools = new ArrayList<>();
+
+ public void addSelectedPool(ResourcePool pool) {
+ Preconditions.checkState(pool.isLeafPool(), "Selected pool %s is not a leaf pool",
+ pool.getPoolName());
+ selectedLeafPools.add(pool);
+ }
+
+ public void addRejectedPool(ResourcePool pool) {
+ rejectedPools.add(pool);
+ }
+
+ public List<ResourcePool> getSelectedLeafPools() {
+ return selectedLeafPools;
+ }
+
+ public List<ResourcePool> getRejectedPools() {
+ return rejectedPools;
+ }
+
+ public void logAssignmentResult(String queryId) {
+ logger.debug("For query {}. Details[{}]", queryId, toString());
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Selected Leaf Pools: {");
+ for (ResourcePool pool : selectedLeafPools) {
+ sb.append(pool.getPoolName()).append(", ");
+ }
+ sb.append("} and Rejected pools: {");
+ for (ResourcePool pool : rejectedPools) {
+ sb.append(pool.getPoolName()).append(", ");
+ }
+ sb.append("}");
+ return sb.toString();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/RMCommonDefaults.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/RMCommonDefaults.java
new file mode 100644
index 000000000..dc486fe09
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/RMCommonDefaults.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config;
+
+import org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicy.SelectionPolicy;
+
+/**
+ * Defines all the default values used for the optional configurations for ResourceManagement
+ */
+public final class RMCommonDefaults {
+
+ public static final int MAX_ADMISSIBLE_QUERY_COUNT = 10;
+
+ public static final int MAX_WAITING_QUERY_COUNT = 10;
+
+ public static final int MAX_WAIT_TIMEOUT_IN_MS = 30_000;
+
+ public static final boolean WAIT_FOR_PREFERRED_NODES = true;
+
+ public static final double ROOT_POOL_DEFAULT_MEMORY_PERCENT = 0.9;
+
+ public static final SelectionPolicy ROOT_POOL_DEFAULT_QUEUE_SELECTION_POLICY = SelectionPolicy.BESTFIT;
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePool.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePool.java
new file mode 100644
index 000000000..c2f851224
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePool.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.config.selectors.ResourcePoolSelector;
+
+import java.util.List;
+
+/**
+ * Interface which defines an implementation of ResourcePool configuration for {@link ResourcePoolTree}
+ */
+public interface ResourcePool {
+ String getPoolName();
+
+ boolean isLeafPool();
+
+ boolean isDefaultPool();
+
+ // Only valid for leaf pool since it will have a queue assigned to it with this configuration
+ long getMaxQueryMemoryPerNode();
+
+ /**
+ * Evaluates this pool selector to see if the query can be admitted in this pool. If yes then evaluates all
+ * the child pools selectors as well. During traversal it builds the QueueAssignment result which consists of all
+ * the selected leaf pools and all rejected intermediate pools.
+ * @param assignmentResult
+ * @param queryContext
+ */
+ void visitAndSelectPool(QueueAssignmentResult assignmentResult, QueryContext queryContext);
+
+ /**
+ * @return Percentage of memory share assigned to this pool
+ */
+ double getPoolMemoryShare();
+
+ long getPoolMemoryInMB(int numClusterNodes);
+
+ /**
+ * Only valid for leaf pool.
+ * @return Returns queue configuration assigned to this leaf pool
+ */
+ QueryQueueConfig getQueryQueue();
+
+ /**
+ * @return Full path of the resource pool from root to this pool
+ */
+ String getFullPath();
+
+ ResourcePool getParentPool();
+
+ List<ResourcePool> getChildPools();
+
+ ResourcePoolSelector getSelector();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolImpl.java
new file mode 100644
index 000000000..79cf6c82f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolImpl.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config;
+
+import com.typesafe.config.Config;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.NodeResources;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+import org.apache.drill.exec.resourcemgr.config.selectors.DefaultSelector;
+import org.apache.drill.exec.resourcemgr.config.selectors.ResourcePoolSelector;
+import org.apache.drill.exec.resourcemgr.config.selectors.ResourcePoolSelectorFactory;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Parses and initializes all the provided configuration for a ResourcePool defined in RM configuration. It takes
+ * care of creating all the child ResourcePools belonging to this Resource Pool, {@link ResourcePoolSelector} for this
+ * pool and a {@link QueryQueueConfig} if it's a leaf pool.
+ */
+public class ResourcePoolImpl implements ResourcePool {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResourcePoolImpl.class);
+
+ public static final String POOL_NAME_KEY = "pool_name";
+
+ public static final String POOL_MEMORY_SHARE_KEY = "memory";
+
+ public static final String POOL_CHILDREN_POOLS_KEY = "child_pools";
+
+ public static final String POOL_SELECTOR_KEY = "selector";
+
+ public static final String POOL_QUEUE_KEY = "queue";
+
+ private String poolName;
+
+ private List<ResourcePool> childPools;
+
+ private final double parentResourceShare;
+
+ private final double poolResourceShare;
+
+ private QueryQueueConfig assignedQueue;
+
+ private final ResourcePoolSelector assignedSelector;
+
+ private NodeResources poolResourcePerNode;
+
+ private final ResourcePool parentPool;
+
+ ResourcePoolImpl(Config poolConfig, double poolAbsResourceShare, double parentResourceShare,
+ NodeResources parentNodeResource, ResourcePool parentPool,
+ Map<String, QueryQueueConfig> leafQueueCollector) throws RMConfigException {
+ try {
+ this.poolName = poolConfig.getString(POOL_NAME_KEY);
+ this.parentResourceShare = parentResourceShare;
+ this.poolResourceShare = poolAbsResourceShare * this.parentResourceShare;
+ this.parentPool = parentPool;
+ assignedSelector = ResourcePoolSelectorFactory.createSelector(poolConfig.hasPath(POOL_SELECTOR_KEY)
+ ? poolConfig.getConfig(POOL_SELECTOR_KEY) : null);
+ parseAndCreateChildPools(poolConfig, parentNodeResource, leafQueueCollector);
+ } catch (RMConfigException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new RMConfigException(String.format("Failure while parsing configuration for pool: %s. [Details: " +
+ "PoolConfig: %s]", poolName, poolConfig), ex);
+ }
+ }
+
+ @Override
+ public String getPoolName() {
+ return poolName;
+ }
+
+ /**
+ * Determines if this ResourcePool is a leaf pool or not which will have a queue associated with it
+ * @return <tt>true</tt> If a leaf pool, <tt>false</tt> otherwise
+ */
+ @Override
+ public boolean isLeafPool() {
+ return childPools == null && assignedQueue != null;
+ }
+
+ /**
+ * Determines if this ResourcePool is a default pool or not which will act as a sink for all the queries
+ * @return <tt>true</tt> If a Default pool, <tt>false</tt> otherwise
+ */
+ @Override
+ public boolean isDefaultPool() {
+ return (assignedSelector instanceof DefaultSelector);
+ }
+
+ @Override
+ public long getMaxQueryMemoryPerNode() {
+ Preconditions.checkState(isLeafPool() && assignedQueue != null, "max_query_memory_per_node is " +
+ "only valid for leaf level pools which has a queue assigned to it [Details: PoolName: %s]", poolName);
+ return assignedQueue.getMaxQueryMemoryInMBPerNode();
+ }
+
+ /**
+ * Used to determine if a ResourcePool is selected for a given query or not. It uses the assigned selector of this
+ * ResourcePool which takes in query metadata to determine if a query is allowed in this pool.
+ * @param assignmentResult Used to keep track of all selected leaf pools and all rejected pools for given query
+ * @param queryContext Contains query metadata like user, groups, tags, etc used by ResourcePoolSelector
+ */
+ @Override
+ public void visitAndSelectPool(QueueAssignmentResult assignmentResult, QueryContext queryContext) {
+ if (assignedSelector.isQuerySelected(queryContext)) {
+ if (isLeafPool()) {
+ assignmentResult.addSelectedPool(this);
+ } else {
+ // Check for each of the child pools
+ for (ResourcePool childPool : childPools) {
+ childPool.visitAndSelectPool(assignmentResult, queryContext);
+ }
+ }
+ } else {
+ assignmentResult.addRejectedPool(this);
+ }
+ }
+
+ /**
+ * Actual percentage share of memory assigned to this ResourcePool
+ * @return Pool memory share in percentage
+ */
+ @Override
+ public double getPoolMemoryShare() {
+ return poolResourceShare;
+ }
+
+ /**
+ * Total memory share in MB assigned to this ResourcePool
+ * @param numClusterNodes number of available cluster nodes for this pool
+ * @return Pool memory share in MB
+ */
+ @Override
+ public long getPoolMemoryInMB(int numClusterNodes) {
+ return poolResourcePerNode.getMemoryInMB() * numClusterNodes;
+ }
+
+ /**
+ * Parses and creates all the child ResourcePools if this is an intermediate pool or QueryQueueConfig is its a leaf
+ * resource pool
+ * @param poolConfig Config object for this ResourcePool
+ * @param parentResource Parent ResourcePool NodeResources
+ * @param leafQueueCollector Collector which keeps track of all leaf queues
+ * @throws RMConfigException in case of bad configuration
+ */
+ private void parseAndCreateChildPools(Config poolConfig, NodeResources parentResource,
+ Map<String, QueryQueueConfig> leafQueueCollector) throws RMConfigException {
+ this.poolResourcePerNode = new NodeResources(Math.round(parentResource.getMemoryInBytes() * poolResourceShare),
+ parentResource.getNumVirtualCpu());
+ if (poolConfig.hasPath(POOL_CHILDREN_POOLS_KEY)) {
+ childPools = Lists.newArrayList();
+ List<? extends Config> childPoolsConfig = poolConfig.getConfigList(POOL_CHILDREN_POOLS_KEY);
+ logger.debug("Creating {} child pools for parent pool {}", childPoolsConfig.size(), poolName);
+ for (Config childConfig : childPoolsConfig) {
+ try {
+ final ResourcePool childPool = new ResourcePoolImpl(childConfig, childConfig.getDouble(POOL_MEMORY_SHARE_KEY),
+ poolResourceShare, poolResourcePerNode, this, leafQueueCollector);
+ childPools.add(childPool);
+ } catch (RMConfigException ex) {
+ logger.error("Failure while configuring child ResourcePool. [Details: PoolName: {}, ChildPoolConfig with " +
+ "error: {}]", poolName, childConfig);
+ throw ex;
+ } catch (Exception ex) {
+ throw new RMConfigException(String.format("Failure while configuring the child ResourcePool. [Details: " +
+ "PoolName: %s, ChildPoolConfig with error: %s]", poolName, childConfig), ex);
+ }
+ }
+
+ if (childPools.isEmpty()) {
+ throw new RMConfigException(String.format("Empty config for child_pools is not allowed. Please configure the " +
+ "child_pools property of pool %s correctly or associate a queue with it with no child_pools", poolName));
+ }
+ } else {
+ logger.info("Resource Pool {} is a leaf level pool with queue assigned to it", poolName);
+
+ if (leafQueueCollector.containsKey(poolName)) {
+ throw new RMConfigException(String.format("Found non-unique leaf pools with name: %s and config: %s. Leaf " +
+ "pool names has to be unique since they represent a queue.", poolName, poolConfig));
+ }
+ assignedQueue = new QueryQueueConfigImpl(poolConfig.getConfig(POOL_QUEUE_KEY), poolName, poolResourcePerNode);
+ leafQueueCollector.put(poolName, assignedQueue);
+ }
+ }
+
+ /**
+ * If this a leaf pool then returns the {@link QueryQueueConfig} for the queue associated with this pool
+ * @return {@link QueryQueueConfig} object for this pool
+ */
+ @Override
+ public QueryQueueConfig getQueryQueue() {
+ Preconditions.checkState(isLeafPool() && assignedQueue != null, "QueryQueue is only " +
+ "valid for leaf level pools.[Details: PoolName: %s]", poolName);
+ return assignedQueue;
+ }
+
+ @Override
+ public ResourcePool getParentPool() {
+ return parentPool;
+ }
+
+ /**
+ * Returns full path in terms of concatenated pool names from root pool to this pool in {@link ResourcePoolTree}
+ * @return String with pool names from root to this pool
+ */
+ @Override
+ public String getFullPath() {
+ StringBuilder pathBuilder = new StringBuilder(poolName);
+ ResourcePool parent = parentPool;
+ while(parent != null) {
+ pathBuilder.append(parent.getPoolName());
+ parent = parent.getParentPool();
+ }
+
+ return pathBuilder.toString();
+ }
+
+ @Override
+ public List<ResourcePool> getChildPools() {
+ Preconditions.checkState(!isLeafPool() && assignedQueue == null,
+ "There are no child pools for a leaf ResourcePool");
+ return childPools;
+ }
+
+ @Override
+ public ResourcePoolSelector getSelector() {
+ return assignedSelector;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{PoolName: ").append(poolName);
+ sb.append(", PoolResourceShare: ").append(poolResourceShare);
+ sb.append(", Selector: ").append(assignedSelector.getSelectorType());
+ if (isLeafPool()) {
+ sb.append(", Queue: [").append(assignedQueue.toString()).append("]");
+ } else {
+ sb.append(", ChildPools: [");
+
+ for (ResourcePool childPool : childPools) {
+ sb.append(childPool.toString());
+ sb.append(", ");
+ }
+ sb.append("]");
+ }
+ sb.append("}");
+ return sb.toString();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTree.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTree.java
new file mode 100644
index 000000000..309bb24db
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTree.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.NodeResources;
+import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException;
+import org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicy;
+
+import java.util.Map;
+
+/**
+ * Interface which defines the implementation of a hierarchical configuration for all the ResourcePool that will be
+ * used for ResourceManagement
+ */
+public interface ResourcePoolTree {
+
+ ResourcePool getRootPool();
+
+ Map<String, QueryQueueConfig> getAllLeafQueues();
+
+ double getResourceShare();
+
+ QueueAssignmentResult selectAllQueues(QueryContext queryContext);
+
+ QueryQueueConfig selectOneQueue(QueryContext queryContext, NodeResources queryMaxNodeResource)
+ throws QueueSelectionException;
+
+ QueueSelectionPolicy getSelectionPolicyInUse();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTreeImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTreeImpl.java
new file mode 100644
index 000000000..cc12a0945
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTreeImpl.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config;
+
+import com.typesafe.config.Config;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.NodeResources;
+import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+import org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicy;
+import org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicy.SelectionPolicy;
+import org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicyFactory;
+import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.drill.exec.resourcemgr.config.RMCommonDefaults.ROOT_POOL_DEFAULT_MEMORY_PERCENT;
+import static org.apache.drill.exec.resourcemgr.config.RMCommonDefaults.ROOT_POOL_DEFAULT_QUEUE_SELECTION_POLICY;
+
+/**
+ * Parses and initializes configuration for ResourceManagement in Drill. It takes care of creating all the ResourcePools
+ * recursively maintaining the n-ary tree hierarchy defined in the configuration.
+ */
+public class ResourcePoolTreeImpl implements ResourcePoolTree {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResourcePoolTreeImpl.class);
+
+ private static final String ROOT_POOL_MEMORY_SHARE_KEY = "memory";
+
+ private static final String ROOT_POOL_QUEUE_SELECTION_POLICY_KEY = "queue_selection_policy";
+
+ public static final String ROOT_POOL_CONFIG_KEY = "drill.exec.rm";
+
+ private final ResourcePool rootPool;
+
+ private final Config rmConfig;
+
+ private final NodeResources totalNodeResources;
+
+ private final double resourceShare;
+
+ private final Map<String, QueryQueueConfig> leafQueues = Maps.newHashMap();
+
+ private final QueueSelectionPolicy selectionPolicy;
+
+ public ResourcePoolTreeImpl(Config rmConfig, long totalNodeMemory,
+ int totalNodePhysicalCpu, int vFactor) throws RMConfigException {
+ this(rmConfig, new NodeResources(totalNodeMemory, totalNodePhysicalCpu, vFactor));
+ }
+
+ private ResourcePoolTreeImpl(Config rmConfig, NodeResources totalNodeResources) throws RMConfigException {
+ try {
+ this.rmConfig = rmConfig;
+ this.totalNodeResources = totalNodeResources;
+ this.resourceShare = this.rmConfig.hasPath(ROOT_POOL_MEMORY_SHARE_KEY) ?
+ this.rmConfig.getDouble(ROOT_POOL_MEMORY_SHARE_KEY) : ROOT_POOL_DEFAULT_MEMORY_PERCENT;
+ this.selectionPolicy = QueueSelectionPolicyFactory.createSelectionPolicy(
+ this.rmConfig.hasPath(ROOT_POOL_QUEUE_SELECTION_POLICY_KEY) ?
+ SelectionPolicy.valueOf(rmConfig.getString(ROOT_POOL_QUEUE_SELECTION_POLICY_KEY)) :
+ ROOT_POOL_DEFAULT_QUEUE_SELECTION_POLICY);
+ rootPool = new ResourcePoolImpl(this.rmConfig.getConfig(ROOT_POOL_CONFIG_KEY), resourceShare, 1.0,
+ totalNodeResources, null, leafQueues);
+ logger.debug("Dumping RM configuration {}", toString());
+ } catch (RMConfigException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new RMConfigException(String.format("Failure while parsing root pool configuration. " +
+ "[Details: Config: %s]", rmConfig), ex);
+ }
+ }
+
+ /**
+ * @return root {@link ResourcePool}
+ */
+ @Override
+ public ResourcePool getRootPool() {
+ return rootPool;
+ }
+
+ /**
+ * @return Map containing all the configured leaf queues
+ */
+ @Override
+ public Map<String, QueryQueueConfig> getAllLeafQueues() {
+ return leafQueues;
+ }
+
+ @Override
+ public double getResourceShare() {
+ return resourceShare;
+ }
+
+ /**
+ * Creates {@link QueueAssignmentResult} which contains list of all selected leaf ResourcePools and all the rejected
+ * ResourcePools for the provided query. Performs DFS of the ResourcePoolTree to traverse and find
+ * selected/rejected ResourcePools.
+ * @param queryContext {@link QueryContext} which contains metadata required for the given query
+ * @return {@link QueueAssignmentResult} populated with selected/rejected ResourcePools
+ */
+ @Override
+ public QueueAssignmentResult selectAllQueues(QueryContext queryContext) {
+ QueueAssignmentResult assignmentResult = new QueueAssignmentResult();
+ rootPool.visitAndSelectPool(assignmentResult, queryContext);
+ return assignmentResult;
+ }
+
+ /**
+ * Selects a leaf queue out of all the possibles leaf queues returned by
+ * {@link ResourcePoolTree#selectAllQueues(QueryContext)} for a given query based on the configured
+ * {@link QueueSelectionPolicy}. If none of the queue qualifies then it throws {@link QueueSelectionException}
+ * @param queryContext {@link QueryContext} which contains metadata for given query
+ * @param queryMaxNodeResource Max resources on a node required for given query
+ * @return {@link QueryQueueConfig} for the selected leaf queue
+ * @throws QueueSelectionException If no leaf queue is selected
+ */
+ @Override
+ public QueryQueueConfig selectOneQueue(QueryContext queryContext, NodeResources queryMaxNodeResource)
+ throws QueueSelectionException {
+ final QueueAssignmentResult assignmentResult = selectAllQueues(queryContext);
+ final List<ResourcePool> selectedPools = assignmentResult.getSelectedLeafPools();
+ if (selectedPools.size() == 0) {
+ throw new QueueSelectionException(String.format("No resource pools to choose from for the query: %s",
+ queryContext.getQueryId()));
+ } else if (selectedPools.size() == 1) {
+ return selectedPools.get(0).getQueryQueue();
+ }
+
+ return selectionPolicy.selectQueue(selectedPools, queryContext, queryMaxNodeResource).getQueryQueue();
+ }
+
+ /**
+ * @return {@link QueueSelectionPolicy} which is used to chose one queue out of all the available options for a query
+ */
+ @Override
+ public QueueSelectionPolicy getSelectionPolicyInUse() {
+ return selectionPolicy;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{ NodeResources: ").append(totalNodeResources.toString());
+ sb.append(", ResourcePercent: ").append(resourceShare);
+ sb.append(", SelectionPolicy: ").append(selectionPolicy.getSelectionPolicy());
+ sb.append(", RootPool: ").append(rootPool.toString());
+ sb.append("}");
+ return sb.toString();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/exception/QueueSelectionException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/exception/QueueSelectionException.java
new file mode 100644
index 000000000..8f0275dd2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/exception/QueueSelectionException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.exception;
+
+/**
+ * Used in case of error while selecting a queue for a given query
+ */
+public class QueueSelectionException extends Exception {
+ public QueueSelectionException(String msg) {
+ super(msg);
+ }
+
+ public QueueSelectionException(String msg, Exception ex) {
+ super(msg, ex);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/exception/RMConfigException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/exception/RMConfigException.java
new file mode 100644
index 000000000..d6648333d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/exception/RMConfigException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.exception;
+
+/**
+ * Used in cases of any error with the ResourceManagement configuration
+ */
+public class RMConfigException extends Exception {
+ public RMConfigException(String message) {
+ super(message);
+ }
+
+ public RMConfigException(String message, Exception e) {
+ super(message, e);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/package-info.java
new file mode 100644
index 000000000..6829e452d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/package-info.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package contains the configuration components of ResourceManagement feature in Drill. ResourceManagement will
+ * have it's own configuration file supporting the similar hierarchy of files as supported by Drill's current
+ * configuration and supports HOCON format. All the supported files for ResourceManagement is listed in
+ * {@link org.apache.drill.common.config.ConfigConstants}. However whether the feature is enabled/disabled is still
+ * controlled by a configuration {@link org.apache.drill.exec.ExecConstants#RM_ENABLED} available in the Drill's main
+ * configuration file. The rm config files will be parsed and loaded only when the feature is enabled. The
+ * configuration is a hierarchical tree {@link org.apache.drill.exec.resourcemgr.config.ResourcePoolTree} of
+ * {@link org.apache.drill.exec.resourcemgr.config.ResourcePool}. At the top will be the root pool which represents
+ * the entire resources (only memory in version 1) which is available to ResourceManager to use for admitting queries.
+ * It is assumed that all the nodes in the Drill cluster is homogeneous and given same amount of memory resources.
+ * The root pool can be further divided into child ResourcePools to divide the resources among multiple child pools.
+ * Each child pool get's a resource share from it's parent resource pool. In theory there is no limit on the number
+ * of ResourcePools that can be configured to divide the cluster resources.
+ * <p>
+ * In addition to other parameters defined later root ResourcePool also supports a configuration
+ * {@link org.apache.drill.exec.resourcemgr.config.ResourcePoolTreeImpl#ROOT_POOL_QUEUE_SELECTION_POLICY_KEY} which
+ * helps to select exactly one leaf pool out of all the possible options available for a query. For details please
+ * see package-info.java of {@link org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicy}.
+ * {@link org.apache.drill.exec.resourcemgr.config.ResourcePoolTree#selectOneQueue(org.apache.drill.exec.ops.QueryContext,
+ * org.apache.drill.exec.resourcemgr.NodeResources)} method is used by parallelizer to get a queue which will be used
+ * to admit a query. The selected queue resource constraints are used by parallelizer to allocate proper resources
+ * to a query so that it remains within the bounds.
+ * </p>
+ * <p>
+ * The ResourcePools falls under 2 category:
+ * <ul>
+ * <li>Intermediate Pool: As the name suggests all the pools between root and leaf pool falls under this
+ * category. It helps to navigate a query through the ResourcePoolTree hierarchy to find leaf pools using selectors.
+ * The intermediate ResourcePool help to subdivide a parent resource pool resource and doesn't have an actual queue
+ * associated with it. A query will only be executed in a queue associated with a ResourcePool not the ResourcePool
+ * itself.
+ * </li>
+ * <li>Leaf Pool: All the ResourcePools which doesn't have any child pools associated with it are leaf
+ * ResourcePools. All the leaf pools should have a unique name associated with it and should always have exactly one
+ * queue configured with it. The queue of a leaf pool is where the queries will be admitted and a resource slice will
+ * be given to it. All the leaf ResourcePools will collectively comprise of all the resource share available to
+ * Drill's ResourceManager to allocate to all the queries.
+ * </li>
+ * </ul>
+ * Configurations Supported by ResourcePool:
+ * <ul>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.ResourcePoolImpl#POOL_MEMORY_SHARE_KEY}: Percentage of
+ * memory share of parent ResourcePool assigned to this pool</li>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.ResourcePoolImpl#POOL_SELECTOR_KEY}: A selector assigned
+ * to this pool. For details please see package-info.java of
+ * {@link org.apache.drill.exec.resourcemgr.config.selectors.ResourcePoolSelector}
+ * </li>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.ResourcePoolImpl#POOL_QUEUE_KEY}: Queue configuration
+ * associated with this pool. It should always be configured for a leaf pool only. If configured with an
+ * intermediate pool then it will be ignored.
+ * </li>
+ * </ul>
+ * </p>
+ * <p>
+ * A queue always have 1:1 relationship with a leaf pool. Queries are admitted and executed with a resource slice
+ * from the queue. It supports following configurations:
+ * <ul>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.QueryQueueConfigImpl#MAX_ADMISSIBLE_KEY}: Upper bound on the
+ * total number of queries that can be admitted inside a queue. After this limit is reached all the queries
+ * will be moved to waiting state.</li>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.QueryQueueConfigImpl#MAX_WAITING_KEY}: Limits the
+ * total number of queries that can be in waiting state inside a queue. After this limit is reached all the new
+ * queries will be failed immediately.</li>
+ * <li> {@link org.apache.drill.exec.resourcemgr.config.QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY}:
+ * Limits the maximum memory any query in this queue can consume on any node in the cluster. This is to limit a
+ * query from a queue to consume all the resources on a node so that other queues query can also have some
+ * resources available for it. Ideally it's advised that sum of value of this parameter for all queues should not
+ * exceed the total memory on a node.
+ * </li>
+ * <li> {@link org.apache.drill.exec.resourcemgr.config.QueryQueueConfigImpl#WAIT_FOR_PREFERRED_NODES_KEY}: This
+ * configuration helps to decide if an admitted query in a queue should wait until it has available resources on all
+ * the nodes assigned to it by planner for its execution. By default it's true. When set to false then for the nodes
+ * which doesn't have available resources for a query will be replaced with another node with enough resources.
+ * </li>
+ * </ul>
+ * </p>
+ * Once all the configuration are parsed an in-memory structures are created then for each query planner will select
+ * a queue where a query can be admitted. The queue selection process happens by traversing the ResourcePoolTree. During
+ * traversal process the query metadata is evaluated against assigned selector of a ResourcePool. If the selector
+ * returns true then traversal continues to it's child pools otherwise it stops there and tries another pool. With
+ * the traversal it finds all the leaf pools which are eligible for admitting the query and store that information in
+ * {@link org.apache.drill.exec.resourcemgr.config.QueueAssignmentResult}. Later the selected pools are passed to
+ * configured QueueSelectionPolicy to select one queue for the query. Planner uses that selected queue's max query
+ * memory per node parameter to limit resource assignment to all the fragments of a query on a node. After a query is
+ * planned with resource constraints it is sent to leader of that queue to ask for admission. If admitted the query
+ * required resources are reserved in global state store and query is executed on the cluster. For details please see
+ * the design document and functional spec linked in <a href="https://issues.apache.org/jira/browse/DRILL-7026">
+ * DRILL-7026</a>
+ */
+package org.apache.drill.exec.resourcemgr.config; \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/AbstractQueueSelectionPolicy.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/AbstractQueueSelectionPolicy.java
new file mode 100644
index 000000000..7e9efe7e5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/AbstractQueueSelectionPolicy.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectionpolicy;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.NodeResources;
+import org.apache.drill.exec.resourcemgr.config.ResourcePool;
+import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException;
+
+import java.util.List;
+
+public abstract class AbstractQueueSelectionPolicy implements QueueSelectionPolicy {
+ private final SelectionPolicy selectionPolicy;
+
+ public AbstractQueueSelectionPolicy(SelectionPolicy selectionPolicy) {
+ this.selectionPolicy = selectionPolicy;
+ }
+
+ @Override
+ public SelectionPolicy getSelectionPolicy() {
+ return selectionPolicy;
+ }
+
+ public abstract ResourcePool selectQueue(List<ResourcePool> allPools, QueryContext queryContext,
+ NodeResources maxResourcePerNode) throws QueueSelectionException;
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/BestFitQueueSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/BestFitQueueSelection.java
new file mode 100644
index 000000000..a76f19336
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/BestFitQueueSelection.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectionpolicy;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.NodeResources;
+import org.apache.drill.exec.resourcemgr.config.QueryQueueConfig;
+import org.apache.drill.exec.resourcemgr.config.ResourcePool;
+import org.apache.drill.exec.resourcemgr.config.QueryQueueConfigImpl;
+import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException;
+
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * Helps to select a queue whose {@link QueryQueueConfig#getMaxQueryMemoryInMBPerNode()} is nearest to the max memory
+ * on a node required by the given query. Nearest is found by following rule:
+ * <ul>
+ * <li>
+ * Queue whose {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY} is
+ * equal to max memory per node of given query
+ * </li>
+ * <li>
+ * Queue whose {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY} is
+ * just greater than max memory per node of given query. From all queues whose max_query_memory_per_node is
+ * greater than what is needed by the query, the queue with minimum value is chosen.
+ * </li>
+ * <li>
+ * Queue whose {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY} is
+ * just less than max memory per node of given query. From all queues whose max_query_memory_per_node is
+ * less than what is needed by the query, the queue with maximum value is chosen.
+ * </li>
+ * </ul>
+ */
+public class BestFitQueueSelection extends AbstractQueueSelectionPolicy {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BestFitQueueSelection.class);
+
+ public BestFitQueueSelection() {
+ super(SelectionPolicy.BESTFIT);
+ }
+
+ /**
+ * Comparator used to sort the leaf ResourcePool lists based on
+ * {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY}
+ */
+ private static class BestFitComparator implements Comparator<ResourcePool> {
+ @Override
+ public int compare(ResourcePool o1, ResourcePool o2) {
+ long pool1Value = o1.getQueryQueue().getMaxQueryMemoryInMBPerNode();
+ long pool2Value = o2.getQueryQueue().getMaxQueryMemoryInMBPerNode();
+ return Long.compare(pool1Value, pool2Value);
+ }
+ }
+
+ @Override
+ public ResourcePool selectQueue(List<ResourcePool> allPools, QueryContext queryContext,
+ NodeResources maxResourcePerNode) throws QueueSelectionException {
+ if (allPools.isEmpty()) {
+ throw new QueueSelectionException(String.format("There are no pools to apply %s selection policy pool for the " +
+ "query: %s", getSelectionPolicy().toString(), queryContext.getQueryId()));
+ }
+
+ allPools.sort(new BestFitComparator());
+ final long queryMaxNodeMemory = maxResourcePerNode.getMemoryInMB();
+ ResourcePool selectedPool = allPools.get(0);
+ for (ResourcePool pool : allPools) {
+ selectedPool = pool;
+ long poolMaxNodeMem = pool.getQueryQueue().getMaxQueryMemoryInMBPerNode();
+ if (poolMaxNodeMem >= queryMaxNodeMemory) {
+ break;
+ }
+ }
+ logger.debug("Selected pool {} based on {} policy for query {}", selectedPool.getPoolName(),
+ getSelectionPolicy().toString(),
+ queryContext.getQueryId());
+ return selectedPool;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/DefaultQueueSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/DefaultQueueSelection.java
new file mode 100644
index 000000000..f1c03c3f5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/DefaultQueueSelection.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectionpolicy;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.NodeResources;
+import org.apache.drill.exec.resourcemgr.config.ResourcePool;
+import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException;
+
+import java.util.List;
+
+/**
+ * Helps to select the first default queue in the list of all the provided queues. If there is no default queue
+ * present it throws {@link QueueSelectionException}. Default queue is a queue associated with {@link ResourcePool}
+ * which has {@link org.apache.drill.exec.resourcemgr.config.selectors.DefaultSelector} assigned to it
+ */
+public class DefaultQueueSelection extends AbstractQueueSelectionPolicy {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DefaultQueueSelection.class);
+
+ public DefaultQueueSelection() {
+ super(SelectionPolicy.DEFAULT);
+ }
+
+ @Override
+ public ResourcePool selectQueue(List<ResourcePool> allPools, QueryContext queryContext,
+ NodeResources maxResourcePerNode) throws QueueSelectionException {
+ for (ResourcePool pool : allPools) {
+ if (pool.isDefaultPool()) {
+ logger.debug("Selected default pool: {} for the query: {}", pool.getPoolName(), queryContext.getQueryId());
+ return pool;
+ }
+ }
+
+ throw new QueueSelectionException(String.format("There is no default pool to select from list of pools provided " +
+ "for the query: %s", queryContext.getQueryId()));
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/QueueSelectionPolicy.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/QueueSelectionPolicy.java
new file mode 100644
index 000000000..5fab2d44c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/QueueSelectionPolicy.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectionpolicy;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.NodeResources;
+import org.apache.drill.exec.resourcemgr.config.ResourcePool;
+import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException;
+
+import java.util.List;
+
+/**
+ * Interface that defines all the implementation of a QueueSelectionPolicy supported by ResourceManagement
+ */
+public interface QueueSelectionPolicy {
+
+ enum SelectionPolicy {
+ UNKNOWN,
+ DEFAULT,
+ RANDOM,
+ BESTFIT;
+
+ @Override
+ public String toString() {
+ return name().toLowerCase();
+ }
+ }
+
+ SelectionPolicy getSelectionPolicy();
+
+ ResourcePool selectQueue(List<ResourcePool> allPools, QueryContext queryContext, NodeResources maxResourcePerNode)
+ throws QueueSelectionException;
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/QueueSelectionPolicyFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/QueueSelectionPolicyFactory.java
new file mode 100644
index 000000000..6ae96e7a6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/QueueSelectionPolicyFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectionpolicy;
+
+/**
+ * Factory to return an instance of {@link QueueSelectionPolicy} based on the configured policy name. By default if
+ * the configured policy name doesn't matches any supported policies then it returns {@link BestFitQueueSelection}
+ */
+public class QueueSelectionPolicyFactory {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueueSelectionPolicyFactory.class);
+
+ public static QueueSelectionPolicy createSelectionPolicy(QueueSelectionPolicy.SelectionPolicy policy) {
+ logger.debug("Creating SelectionPolicy of type {}", policy);
+ QueueSelectionPolicy selectionPolicy;
+ switch (policy) {
+ case DEFAULT:
+ selectionPolicy = new DefaultQueueSelection();
+ break;
+ case BESTFIT:
+ selectionPolicy = new BestFitQueueSelection();
+ break;
+ case RANDOM:
+ selectionPolicy = new RandomQueueSelection();
+ break;
+ default:
+ logger.info("QueueSelectionPolicy is not configured so proceeding with the bestfit as default policy");
+ selectionPolicy = new BestFitQueueSelection();
+ break;
+ }
+
+ return selectionPolicy;
+ }
+
+ // prevents from instantiation
+ private QueueSelectionPolicyFactory() {
+ // no-op
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/RandomQueueSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/RandomQueueSelection.java
new file mode 100644
index 000000000..63c51f29d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/RandomQueueSelection.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectionpolicy;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.NodeResources;
+import org.apache.drill.exec.resourcemgr.config.ResourcePool;
+import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Randomly selects a queue from the list of all the provided queues. If no pools are provided then it throws
+ * {@link QueueSelectionException}
+ */
+public class RandomQueueSelection extends AbstractQueueSelectionPolicy {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RandomQueueSelection.class);
+
+ public RandomQueueSelection() {
+ super(SelectionPolicy.RANDOM);
+ }
+
+ @Override
+ public ResourcePool selectQueue(List<ResourcePool> allPools, QueryContext queryContext,
+ NodeResources maxResourcePerNode) throws QueueSelectionException {
+ if (allPools.size() == 0) {
+ throw new QueueSelectionException(String.format("Input pool list is empty to apply %s selection policy",
+ getSelectionPolicy().toString()));
+ }
+ Collections.shuffle(allPools);
+ ResourcePool selectedPool = allPools.get(0);
+ logger.debug("Selected random pool: {} for query: {}", selectedPool.getPoolName(), queryContext.getQueryId());
+ return selectedPool;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/package-info.java
new file mode 100644
index 000000000..8c1e257c2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/package-info.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines all the selection policy implementation which can be configured with Resource Management. The
+ * configuration which is used to specify a policy is
+ * {@link org.apache.drill.exec.resourcemgr.config.ResourcePoolTreeImpl#ROOT_POOL_QUEUE_SELECTION_POLICY_KEY}. Selection Policy
+ * helps to select a single leaf ResourcePool out of all the eligible pools whose queue can be used to admit this query.
+ * Currently there are 3 types of supported policies. In future more policies can be supported by implementing
+ * {@link org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicy} interface.
+ * <ul>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.selectionpolicy.DefaultQueueSelection}: Out of all the eligible pools
+ * this policy will choose a default pool in the list. If there are multiple default pools present in the list then
+ * it will return the first default pool. If there is no default pool present then it throws
+ * {@link org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException}
+ * </li>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.selectionpolicy.RandomQueueSelection}: Out of all the eligible pools
+ * this policy will choose a pool at random. If there are no pools to select from then it throws
+ * {@link org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException}
+ * </li>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.selectionpolicy.BestFitQueueSelection}: Out of all the eligible pools
+ * this policy will choose a pool whose queue configuration
+ * {@link org.apache.drill.exec.resourcemgr.config.QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY} value is closest to
+ * the max memory on a node required by the query. It tries to find a pool whose value for MAX_QUERY_MEMORY_PER_NODE
+ * is equal to queries max memory per node requirement. If there is no such pool then find the pool with config value
+ * just greater than queries max memory per node. Otherwise find a pool with config value just less than queries
+ * max memory per node.
+ * </li>
+ * </ul>
+ */
+package org.apache.drill.exec.resourcemgr.config.selectionpolicy; \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AbstractResourcePoolSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AbstractResourcePoolSelector.java
new file mode 100644
index 000000000..6f474e3a0
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AbstractResourcePoolSelector.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors;
+
+import org.apache.drill.exec.ops.QueryContext;
+
+public abstract class AbstractResourcePoolSelector implements ResourcePoolSelector {
+
+ protected final SelectorType SELECTOR_TYPE;
+
+ AbstractResourcePoolSelector(SelectorType type) {
+ SELECTOR_TYPE = type;
+ }
+
+ public SelectorType getSelectorType() {
+ return SELECTOR_TYPE;
+ }
+
+ public abstract boolean isQuerySelected(QueryContext queryContext);
+
+ @Override
+ public String toString() {
+ return SELECTOR_TYPE.toString();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AclSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AclSelector.java
new file mode 100644
index 000000000..50acb860d
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AclSelector.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors;
+
+import avro.shaded.com.google.common.annotations.VisibleForTesting;
+import com.typesafe.config.Config;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+import org.apache.drill.exec.util.ImpersonationUtil;
+import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Evaluates if a query can be admitted to a ResourcePool or not by comparing query user/groups with the
+ * configured users/groups policies for this selector. AclSelector can be configured using both long-form syntax or
+ * short-form syntax as defined below:
+ * <ul>
+ * <li>Long-Form Syntax: Allows to use identifiers to specify allowed and disallowed users/groups. For example:
+ * users: [alice:+, bob:-] means alice is allowed whereas bob is denied access to the pool</li>
+ * <li>Short-Form Syntax: Allows to specify lists of allowed users/groups only. For example: users: [alice, bob]
+ * means only alice and bob are allowed access to this pool</li>
+ * </ul>
+ * The selector also supports * as a wildcard for both long and short form syntax to allow/deny all users/groups.
+ * Example configuration is of form:
+ * <code><pre>
+ * selector: {
+ * acl: {
+ * users: [alice:+, bob:-],
+ * groups: [sales, marketing]
+ * }
+ * }
+ * </pre></code>
+ */
+public class AclSelector extends AbstractResourcePoolSelector {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AclSelector.class);
+
+ private final Set<String> allowedUsers = Sets.newHashSet();
+
+ private final Set<String> allowedGroups = Sets.newHashSet();
+
+ private final Set<String> deniedUsers = Sets.newHashSet();
+
+ private final Set<String> deniedGroups = Sets.newHashSet();
+
+ private final Config aclSelectorValue;
+
+ private static final String ACL_VALUE_GROUPS_KEY = "groups";
+
+ private static final String ACL_VALUE_USERS_KEY = "users";
+
+ private static final String ACL_LONG_SYNTAX_SEPARATOR = ":";
+
+ private static final String ACL_LONG_ALLOWED_IDENTIFIER = "+";
+
+ private static final String ACL_LONG_DISALLOWED_IDENTIFIER = "-";
+
+ private static final String ACL_ALLOW_ALL = "*";
+
+
+ AclSelector(Config configValue) throws RMConfigException {
+ super(SelectorType.ACL);
+ this.aclSelectorValue = configValue;
+ validateAndParseACL(aclSelectorValue);
+ }
+
+ /**
+ * Determines if a given query is selected by this ACL selector of a Resource Pool or not. Following rules are
+ * followed to evaluate the selection. Assumption: There is an assumption made that if a user or group is configured
+ * in both +ve/-ve respective lists then it will be treated to be present in -ve list.
+ *
+ * Rules:
+ * 1) Check if query user is present in -ve users list, If yes then query is not selected else go to 2
+ * 2) Check if query user is present in +ve users list, If yes then query is selected else go to 3
+ * 3) Check if * is present in -ve users list, if yes then query is not selected else go to 4
+ * 4) Check if * is present in +ve users list, if yes then query is selected else go to 5
+ * 5) If here that means query user or * is absent in both +ve and -ve users list so check for groups of query user
+ * in step 6
+ * 6) Check if any of groups of query user is present in -ve groups list, If yes then query is not selected else go
+ * to 7
+ * 7) Check if any of groups of query user is present in +ve groups list, If yes then query selected else go to 8
+ * 8) Check if * is present in -ve groups list, If yes then query is not selected else go to 9
+ * 9) Check if * is present in +ve groups list, If yes then query is selected else go to 10
+ * 10) Query user and groups of it is neither present is +ve/-ve users list not +ve/-ve groups list hence the query
+ * is not selected
+ *
+ * @param queryContext QueryContext to get information about query user
+ * @return true if a query is selected by this selector, false otherwise
+ */
+ @Override
+ public boolean isQuerySelected(QueryContext queryContext) {
+ final String queryUser = queryContext.getQueryUserName();
+ final UserGroupInformation queryUserUGI = ImpersonationUtil.createProxyUgi(queryUser);
+ final Set<String> queryGroups = Sets.newHashSet(queryUserUGI.getGroupNames());
+ return checkQueryUserGroups(queryUser, queryGroups);
+ }
+
+ @VisibleForTesting
+ public boolean checkQueryUserGroups(String queryUser, Set<String> queryGroups) {
+ // Check for +ve/-ve users information with query user
+ if (deniedUsers.contains(queryUser)) {
+ logger.debug("Query user is present in configured ACL -ve users list");
+ return false;
+ } else if (allowedUsers.contains(queryUser)) {
+ logger.debug("Query user is present in configured ACL +ve users list");
+ return true;
+ } else if (isStarInDisAllowedUsersList()) {
+ logger.debug("Query user is absent in configured ACL +ve/-ve users list but * is in -ve users list");
+ return false;
+ } else if (isStarInAllowedUsersList()) {
+ logger.debug("Query user is absent in configured ACL +ve/-ve users list but * is in +ve users list");
+ return true;
+ }
+
+ // Check for +ve/-ve groups information with groups of query user
+ if (Sets.intersection(queryGroups, deniedGroups).size() > 0) {
+ logger.debug("Groups of Query user is present in configured ACL -ve groups list");
+ return false;
+ } else if (Sets.intersection(queryGroups, allowedGroups).size() > 0) {
+ logger.debug("Groups of Query user is present in configured ACL +ve groups list");
+ return true;
+ } else if (isStarInDisAllowedGroupsList()) {
+ logger.debug("Groups of Query user is absent in configured ACL +ve/-ve groups list but * is in -ve groups list");
+ return false;
+ } else if (isStarInAllowedGroupsList()) {
+ logger.debug("Groups of Query user is absent in configured ACL +ve/-ve groups list but * is in +ve groups list");
+ return true;
+ }
+
+ logger.debug("Neither query user or group is present in configured ACL users/groups list");
+ return false;
+ }
+
+ /**
+ * Parses the acl selector config value for users and groups to populate list of allowed/denied users and groups.
+ * @param aclConfig Acl config to parse
+ * @throws RMConfigException in case of invalid config for either users/groups
+ */
+ private void validateAndParseACL(Config aclConfig) throws RMConfigException {
+
+ // ACL config doesn't have either group or user list
+ if (!aclConfig.hasPath(ACL_VALUE_GROUPS_KEY) && !aclConfig.hasPath(ACL_VALUE_USERS_KEY)) {
+ throw new RMConfigException(String.format("ACL Selector config is missing both group and user list information." +
+ " Please configure either of groups or users list. [Details: aclConfig: %s]", aclConfig));
+ }
+
+ if (aclConfig.hasPath(ACL_VALUE_USERS_KEY)) {
+ final List<String> users = aclSelectorValue.getStringList(ACL_VALUE_USERS_KEY);
+ parseACLInput(users, allowedUsers, deniedUsers);
+ }
+
+ if (aclConfig.hasPath(ACL_VALUE_GROUPS_KEY)) {
+ final List<String> groups = aclSelectorValue.getStringList(ACL_VALUE_GROUPS_KEY);
+ parseACLInput(groups, allowedGroups, deniedGroups);
+ }
+
+ // If no valid configuration is seen for this selector
+ if (allowedGroups.size() == 0 && deniedGroups.size() == 0 &&
+ deniedUsers.size() == 0 && allowedUsers.size() == 0) {
+ throw new RMConfigException("No valid users or groups information is configured for this ACL selector. Either " +
+ "use * or valid users/groups");
+ }
+
+ // Check if there is any intersection between allowed and disallowed users/groups
+ Set<String> wrongConfig = Sets.intersection(allowedUsers, deniedUsers);
+ if (wrongConfig.size() > 0) {
+ logger.warn("These users are configured both in allowed and disallowed list. They will be treated as disallowed" +
+ ". [Details: users: {}]", wrongConfig);
+ allowedUsers.removeAll(wrongConfig);
+ }
+
+ wrongConfig = Sets.intersection(allowedGroups, deniedGroups);
+ if (wrongConfig.size() > 0) {
+ logger.warn("These groups are configured both in allowed and disallowed list. They will be treated as " +
+ "disallowed. [Details: groups: {}]", wrongConfig);
+ allowedGroups.removeAll(wrongConfig);
+ }
+ }
+
+ public Set<String> getAllowedUsers() {
+ return allowedUsers;
+ }
+
+ public Set<String> getAllowedGroups() {
+ return allowedGroups;
+ }
+
+ public Set<String> getDeniedUsers() {
+ return deniedUsers;
+ }
+
+ public Set<String> getDeniedGroups() {
+ return deniedGroups;
+ }
+
+ private boolean isStarInAllowedUsersList() {
+ return allowedUsers.contains(ACL_ALLOW_ALL);
+ }
+
+ private boolean isStarInAllowedGroupsList() {
+ return allowedGroups.contains(ACL_ALLOW_ALL);
+ }
+
+ private boolean isStarInDisAllowedUsersList() {
+ return deniedUsers.contains(ACL_ALLOW_ALL);
+ }
+
+ private boolean isStarInDisAllowedGroupsList() {
+ return deniedGroups.contains(ACL_ALLOW_ALL);
+ }
+
+ private void parseACLInput(List<String> acls, Set<String> allowedIdentity, Set<String> disAllowedIdentity) {
+ for (String aclValue : acls) {
+
+ if (aclValue.isEmpty()) {
+ continue;
+ }
+ // Check if it's long form syntax or shortForm syntax
+ String[] aclValueSplits = aclValue.split(ACL_LONG_SYNTAX_SEPARATOR);
+ if (aclValueSplits.length == 1) {
+ // short form
+ if (!allowedIdentity.add(aclValueSplits[0])) {
+ logger.info("Duplicate acl identity: {} found in configured list will be ignored", aclValueSplits[0]);
+ }
+ } else {
+ // long form
+ final String identifier = aclValueSplits[1];
+ if (identifier.equals(ACL_LONG_ALLOWED_IDENTIFIER)) {
+ if (!allowedIdentity.add(aclValueSplits[0])) {
+ logger.info("Duplicate acl identity: {} found in configured list will be ignored", aclValueSplits[0]);
+ }
+ } else if (identifier.equals(ACL_LONG_DISALLOWED_IDENTIFIER)) {
+ if (!disAllowedIdentity.add(aclValueSplits[0])) {
+ logger.info("Duplicate acl identity: {} found in configured list will be ignored", aclValueSplits[0]);
+ }
+ } else {
+ logger.error("Invalid long form syntax encountered hence ignoring ACL string {} . Details[Allowed " +
+ "identifiers are `+` and `-`. Encountered: {}]", aclValue, identifier);
+ }
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{ SelectorType: ").append(super.toString());
+ sb.append(", AllowedUsers: [");
+ for (String positiveUser : allowedUsers) {
+ sb.append(positiveUser).append(", ");
+ }
+ sb.append("], AllowedGroups: [");
+ for (String positiveGroup : allowedGroups) {
+ sb.append(positiveGroup).append(", ");
+ }
+ sb.append("], DisallowedUsers: [");
+ for (String negativeUser : deniedUsers) {
+ sb.append(negativeUser).append(", ");
+ }
+ sb.append("], DisallowedGroups: [");
+ for (String negativeGroup : deniedGroups) {
+ sb.append(negativeGroup).append(", ");
+ }
+ sb.append("]}");
+
+ return sb.toString();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AndSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AndSelector.java
new file mode 100644
index 000000000..82b51ee46
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AndSelector.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors;
+
+import com.typesafe.config.Config;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+
+import java.util.List;
+
+/**
+ * Complex selector whose value is list of other Simple or Complex Selectors. There has to be at least 2 other
+ * selectors configured in the value list for this selector. It does AND operation on result of all the child
+ * selectors configured with it to evaluate if a query can be admitted to it's ResourcePool or not.
+ *
+ * Example configuration is of form:
+ * <code><pre>
+ * selector: {
+ * and: [{tag: "BITool"},{tag: "operational"}]
+ * }
+ * </pre></code>
+ */
+public class AndSelector extends ComplexSelectors {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AndSelector.class);
+
+ AndSelector(List<? extends Config> configValue) throws RMConfigException {
+ super(SelectorType.AND, configValue);
+ }
+
+ @Override
+ public boolean isQuerySelected(QueryContext queryContext) {
+ for (ResourcePoolSelector childSelector : childSelectors) {
+ if (!childSelector.isQuerySelected(queryContext)) {
+ logger.debug("Query {} is not selected by the child selector of type {} in this complex AndSelector",
+ queryContext.getQueryId(), childSelector.getSelectorType().toString());
+ return false;
+ }
+ }
+ return true;
+ }
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ComplexSelectors.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ComplexSelectors.java
new file mode 100644
index 000000000..fe9ab52ae
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ComplexSelectors.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors;
+
+import com.typesafe.config.Config;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public abstract class ComplexSelectors extends AbstractResourcePoolSelector {
+
+ protected final List<ResourcePoolSelector> childSelectors = new ArrayList<>();
+
+ ComplexSelectors(SelectorType type, List<? extends Config> selectorConfig) throws RMConfigException {
+ super(type);
+ parseAndCreateChildSelectors(selectorConfig);
+ }
+
+ private void parseAndCreateChildSelectors(List<? extends Config> childConfigs) throws RMConfigException {
+ for (Config childConfig : childConfigs) {
+ childSelectors.add(ResourcePoolSelectorFactory.createSelector(childConfig));
+ }
+
+ if (childSelectors.size() < 2) {
+ throw new RMConfigException(String.format("For complex selector OR and AND it is expected to have atleast 2 " +
+ "selectors in the list but found %d", childSelectors.size()));
+ }
+ }
+
+ public abstract boolean isQuerySelected(QueryContext queryContext);
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{ SelectorType: ").append(super.toString());
+ sb.append(", of selectors [");
+ for (ResourcePoolSelector childSelector : childSelectors) {
+ sb.append(childSelector.toString()).append(", ");
+ }
+ sb.append("]}");
+ return sb.toString();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/DefaultSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/DefaultSelector.java
new file mode 100644
index 000000000..2336d03e2
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/DefaultSelector.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors;
+
+import org.apache.drill.exec.ops.QueryContext;
+
+/**
+ * When selector configuration is absent for a ResourcePool then it is associated with a DefaultSelector. It acts as
+ * a sink for all the queries which means all the queries will be selected by this default selector.
+ */
+public class DefaultSelector extends AbstractResourcePoolSelector {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DefaultSelector.class);
+
+ DefaultSelector() {
+ super(SelectorType.DEFAULT);
+ }
+
+ @Override
+ public SelectorType getSelectorType() {
+ return SELECTOR_TYPE;
+ }
+
+ @Override
+ public boolean isQuerySelected(QueryContext queryContext) {
+ logger.debug("Query {} is selected by this Default selector", queryContext.getQueryId());
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "{SelectorType: " + super.toString() + "}";
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/NotEqualSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/NotEqualSelector.java
new file mode 100644
index 000000000..653c3b538
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/NotEqualSelector.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors;
+
+import avro.shaded.com.google.common.annotations.VisibleForTesting;
+import com.typesafe.config.Config;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+
+/**
+ * Simple selector whose value is another Simple or Complex Selectors. It does NOT of result of its child selector
+ * configured with it to evaluate if a query can be admitted to it's ResourcePool or not.
+ *
+ * Example configuration is of form:
+ * <code><pre>
+ * selector: {
+ * not_equal: {tag: "BITool"}
+ * }
+ * </pre></code>
+ */
+public class NotEqualSelector extends AbstractResourcePoolSelector {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NotEqualSelector.class);
+
+ private final ResourcePoolSelector poolSelector;
+
+ NotEqualSelector(Config selectorValue) throws RMConfigException {
+ super(SelectorType.NOT_EQUAL);
+ poolSelector = ResourcePoolSelectorFactory.createSelector(selectorValue);
+ }
+
+ @Override
+ public boolean isQuerySelected(QueryContext queryContext) {
+ logger.debug("Query {} is evaluated for not_equal of selector type {}", queryContext.getQueryId(),
+ poolSelector.getSelectorType().toString());
+ return !poolSelector.isQuerySelected(queryContext);
+ }
+
+ @VisibleForTesting
+ public ResourcePoolSelector getPoolSelector() {
+ return poolSelector;
+ }
+
+ @Override
+ public String toString() {
+ return "{ SelectorType: " + super.toString() + ", of selector " + poolSelector.toString() + " }";
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/OrSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/OrSelector.java
new file mode 100644
index 000000000..5e8b2f9e0
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/OrSelector.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors;
+
+import com.typesafe.config.Config;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+
+import java.util.List;
+
+/**
+ * Complex selector whose value is list of other Simple or Complex Selectors. There has to be at least 2 other
+ * selectors configured in the value list for this selector. It does OR operation on result of all the child selectors
+ * configured with it to evaluate if a query can be admitted to it's ResourcePool or not.
+ *
+ * Example configuration is of form:
+ * <code></><pre>
+ * selector: {
+ * or: [{tag: "BITool"},{tag: "operational"}]
+ * }
+ * </pre></code>
+ */
+public class OrSelector extends ComplexSelectors {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrSelector.class);
+
+ OrSelector(List<? extends Config> configValue) throws RMConfigException {
+ super(SelectorType.OR, configValue);
+ }
+
+ @Override
+ public boolean isQuerySelected(QueryContext queryContext) {
+ for (ResourcePoolSelector childSelector : childSelectors) {
+ // If we find any selector evaluating to true then no need to evaluate other selectors in the list
+ if (childSelector.isQuerySelected(queryContext)) {
+ logger.debug("Query {} is selected by the child selector of type {} in this OrSelector",
+ queryContext.getQueryId(), childSelector.getSelectorType().toString());
+ return true;
+ }
+ }
+
+ return false;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ResourcePoolSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ResourcePoolSelector.java
new file mode 100644
index 000000000..f5dbf0db1
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ResourcePoolSelector.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors;
+
+import org.apache.drill.exec.ops.QueryContext;
+
+/**
+ * Interface that defines implementation for selectors assigned to a ResourcePool. ResourcePoolSelector helps to
+ * evaluate if a given query can be admitted into a ResourcePool or not. Based on the assigned selector type to a
+ * ResourcePool it uses the query metadata with it's own configured values and make a decision for a query. The
+ * SelectorType defines all the supported ResourcePoolSelector which can be assigned to a ResourcePool. The
+ * configuration of a selector is of type:
+ * <code><pre>
+ * selector: {
+ * SelectorType:SelectorValue
+ * }
+ * where SelectorValue can be a string (for SelectorType tag),
+ * object (for SelectorType acl and not_equal) and
+ * list of objects (for SelectorType and, or)
+ * when selector config is absent then a DefaultSelector is associated with the ResourcePool
+ * </pre></code>
+ */
+public interface ResourcePoolSelector {
+
+ enum SelectorType {
+ UNKNOWN,
+ DEFAULT,
+ TAG,
+ ACL,
+ OR,
+ AND,
+ NOT_EQUAL;
+
+ @Override
+ public String toString() {
+ return name().toLowerCase();
+ }
+ }
+
+ SelectorType getSelectorType();
+
+ boolean isQuerySelected(QueryContext queryContext);
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ResourcePoolSelectorFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ResourcePoolSelectorFactory.java
new file mode 100644
index 000000000..0ad130269
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ResourcePoolSelectorFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors;
+
+import com.typesafe.config.Config;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+import org.apache.drill.exec.resourcemgr.config.selectors.ResourcePoolSelector.SelectorType;
+
+public class ResourcePoolSelectorFactory {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResourcePoolSelectorFactory.class);
+
+ public static ResourcePoolSelector createSelector(Config selectorConfig) throws RMConfigException {
+ ResourcePoolSelector poolSelector = null;
+ String selectorType = SelectorType.DEFAULT.toString();
+ try {
+ if (selectorConfig == null) {
+ poolSelector = new DefaultSelector();
+ } else if (selectorConfig.hasPath(SelectorType.TAG.toString())) {
+ selectorType = SelectorType.TAG.toString();
+ poolSelector = new TagSelector(selectorConfig.getString(selectorType));
+ } else if (selectorConfig.hasPath(SelectorType.ACL.toString())) {
+ selectorType = SelectorType.ACL.toString();
+ poolSelector = new AclSelector(selectorConfig.getConfig(selectorType));
+ } else if (selectorConfig.hasPath(SelectorType.OR.toString())) {
+ selectorType = SelectorType.OR.toString();
+ poolSelector = new OrSelector(selectorConfig.getConfigList(selectorType));
+ } else if (selectorConfig.hasPath(SelectorType.AND.toString())) {
+ selectorType = SelectorType.AND.toString();
+ poolSelector = new AndSelector(selectorConfig.getConfigList(selectorType));
+ } else if (selectorConfig.hasPath(SelectorType.NOT_EQUAL.toString())) {
+ selectorType = SelectorType.NOT_EQUAL.toString();
+ poolSelector = new NotEqualSelector(selectorConfig.getConfig(selectorType));
+ }
+ } catch (Exception ex) {
+ throw new RMConfigException(String.format("There is an error with value configuration for selector type %s",
+ selectorType), ex);
+ }
+
+ // if here means either a selector is chosen or wrong configuration
+ if (poolSelector == null) {
+ throw new RMConfigException(String.format("Configured selector is either empty or not supported. [Details: " +
+ "SelectorConfig: %s]", selectorConfig));
+ }
+
+ logger.debug("Created selector of type {}", poolSelector.getSelectorType().toString());
+ return poolSelector;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/TagSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/TagSelector.java
new file mode 100644
index 000000000..79237123c
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/TagSelector.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+
+/**
+ * Simple selector whose value is a string representing a tag. It tries to match it's configured tag with one of
+ * the tags configured for the query using connection/session parameter. If a query posses at least one tag same as
+ * this selector tag then it will be admitted in the respective ResourcePool.
+ *
+ * Example configuration is of form:
+ * <code><pre>
+ * selector: {
+ * tag: "BITool"
+ * }
+ * </pre></code>
+ */
+public class TagSelector extends AbstractResourcePoolSelector {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TagSelector.class);
+
+ private final String configuredTag;
+
+ TagSelector(String selectorValue) throws RMConfigException {
+ super(SelectorType.TAG);
+
+ if (selectorValue == null || selectorValue.isEmpty()) {
+ throw new RMConfigException("Tag value of this selector is either null or empty. Please configure a valid tag " +
+ "as string.");
+ }
+ configuredTag = selectorValue;
+ }
+
+ @Override
+ public SelectorType getSelectorType() {
+ return SELECTOR_TYPE;
+ }
+
+ @Override
+ public boolean isQuerySelected(QueryContext queryContext) {
+ String[] queryTags = queryContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY).string_val.split(",");
+ for (String queryTag : queryTags) {
+ if (queryTag.equals(configuredTag)) {
+ logger.debug("Query {} tag {} matches the selector tag {}", queryContext.getQueryId(), queryTag, configuredTag);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public String getTagValue() {
+ return configuredTag;
+ }
+
+ @Override
+ public String toString() {
+ return "{ SelectorType: " + super.toString() + ", TagValue: [" + configuredTag + "]}";
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/package-info.java
new file mode 100644
index 000000000..441c24aed
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/package-info.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Defines all the Selectors which can be assigned to a ResourcePool in the ResourceManagement configuration. A
+ * selector helps to evaluate that given a query and it's metadata if that query can be admitted inside associated
+ * ResourcePool or not. Selectors are associated with both intermediate and leaf level ResourcePools. The intermediate
+ * pool selectors helps to navigate the ResourcePool hierarchy to reach a leaf level ResourcePool where query will
+ * actually be admitted in the queue associated with a leaf pool. Whereas leaf pool selector will help to choose all
+ * the leaf pools which can be considered to admit a query. A selector can be configured for a ResourcePool
+ * using the {@link org.apache.drill.exec.resourcemgr.config.ResourcePoolImpl#POOL_SELECTOR_KEY} configuration. If the
+ * selector configuration is missing for a ResourcePool then it is associated with a Default Selector making it
+ * a Default ResourcePool. Selectors are configured as a key value pair where key represents it's type and
+ * value is what it uses to evaluate a query. Currently there are 6 different types of supported selectors. In future
+ * more selectos can be supported by implementing
+ * {@link org.apache.drill.exec.resourcemgr.config.selectors.ResourcePoolSelector} interface.
+ * <ul>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.selectors.DefaultSelector}: It acts as a sink and will always return
+ * true for all the queries. It is associated with a ResourcePool which is not assigned any selector in the
+ * configuration making it a default pool.
+ * </li>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.selectors.TagSelector}: A selector which has a tag (String) value
+ * associated with it. It evaluates all the tags associated with a query to see if there is a match or not. A query is
+ * only selected by this selector if it has a tag same as that configured for this selector.
+ * </li>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.selectors.AclSelector}: A selector which has users/groups policies
+ * value associated with it. It evaluates these policies against the users/groups of query session to check if the
+ * query can be selected or not. It supports long/short form syntax to configure the acl policies. It also supports *
+ * as wildcard character to allow/deny all users/groups in its policies. Please see AclSelector class javadoc for more
+ * details.
+ * </li>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.selectors.OrSelector}: A selector which can have lists of 2 or more
+ * other selectors configured as it's value except for Default selector. It performs || operation on the selection
+ * result of all other configured selectors.</li>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.selectors.AndSelector}: A selector which can have lists of 2 or more
+ * other selectors configured as it's value except for Default selector. It performs && operation on the selection
+ * result of all other configured selectors.</li>
+ * <li>{@link org.apache.drill.exec.resourcemgr.config.selectors.NotEqualSelector}: A selector which can have any other
+ * selector defined above configured as it's value except for Default selector. It will compare the query metadata
+ * against the configured selector and will return ! of that as a selection result. For example: if a TagSelector
+ * with value "sales" is configured for this NotSelector then it will select queries whose doesn't have sales tag
+ * associated with it.</li>
+ * </ul>
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors; \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/package-info.java
new file mode 100644
index 000000000..0658172f6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * This package will contain all the components of resource manager in Drill.
+ */
+package org.apache.drill.exec.resourcemgr; \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/util/MemoryConfigParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/util/MemoryConfigParser.java
new file mode 100644
index 000000000..8d5f5f26f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/util/MemoryConfigParser.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.util;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Utility class which helps in parsing memory configuration string using the passed in pattern to get memory value in
+ * bytes.
+ */
+public class MemoryConfigParser {
+
+ /**
+ * @param memoryConfig Memory configuration string
+ * @param patternToUse Pattern which will be used to parse the memory configuration string
+ * @return memory value in bytes
+ */
+ public static long parseMemoryConfigString(String memoryConfig, String patternToUse) {
+ Pattern pattern = Pattern.compile(patternToUse);
+ Matcher patternMatcher = pattern.matcher(memoryConfig);
+
+ long memoryPerNodeInBytes = 0;
+ if (patternMatcher.matches()) {
+ memoryPerNodeInBytes = Long.parseLong(patternMatcher.group(1));
+
+ // group 2 can be optional
+ String group2 = patternMatcher.group(2);
+ if (!group2.isEmpty()) {
+ switch (group2.charAt(0)) {
+ case 'G':
+ case 'g':
+ memoryPerNodeInBytes *= 1073741824L;
+ break;
+ case 'K':
+ case 'k':
+ memoryPerNodeInBytes *= 1024L;
+ break;
+ case 'M':
+ case 'm':
+ memoryPerNodeInBytes *= 1048576L;
+ break;
+ default:
+ throw new IllegalArgumentException(String.format("Memory Configuration %s didn't matched any of the" +
+ " supported suffixes. [Details: Supported: kKmMgG, Actual: %s", memoryConfig, group2));
+ }
+ }
+ } else {
+ throw new IllegalArgumentException(String.format("Memory Configuration %s didn't matched supported format. " +
+ "Supported format is %s?", memoryConfig, patternToUse));
+ }
+
+ return memoryPerNodeInBytes;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index dc1e9ccf6..e2fd1e845 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -17,17 +17,16 @@
*/
package org.apache.drill.exec.rpc.user;
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-
-import javax.net.ssl.SSLEngine;
-import javax.security.sasl.SaslException;
-
+import com.google.protobuf.MessageLite;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import org.apache.drill.common.config.DrillProperties;
import org.apache.drill.common.exceptions.DrillException;
import org.apache.drill.exec.exception.DrillbitStartupException;
@@ -65,17 +64,15 @@ import org.apache.hadoop.security.HadoopKerberosName;
import org.joda.time.DateTime;
import org.slf4j.Logger;
-import com.google.protobuf.MessageLite;
-
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
+import javax.net.ssl.SSLEngine;
+import javax.security.sasl.SaslException;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserServer.class);
@@ -197,7 +194,11 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
}
/**
- * {@link AbstractRemoteConnection} implementation for user connection. Also implements {@link UserClientConnection}.
+ * It represents a client connection accepted by Foreman Drillbit's UserServer from a DrillClient. This connection
+ * is used to get hold of {@link UserSession} which stores all session related information like session options
+ * changed over the lifetime of this connection. There is a 1:1 mapping between a BitToUserConnection and a
+ * UserSession. This connection object is also used to send query data and result back to the client submitted as part
+ * of the session tied to this connection.
*/
public class BitToUserConnection extends AbstractServerConnection<BitToUserConnection>
implements UserClientConnection {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
index e856ac58d..0798deabb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java
@@ -24,6 +24,7 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.base.Strings;
@@ -119,14 +120,34 @@ public class UserSession implements AutoCloseable {
return this;
}
- public UserSession build() {
+ private boolean canApplyUserProperty() {
+ final StringBuilder sb = new StringBuilder();
if (userSession.properties.containsKey(DrillProperties.QUOTING_IDENTIFIERS)) {
- if (userSession.sessionOptions != null) {
+ sb.append(DrillProperties.QUOTING_IDENTIFIERS).append(",");
+ }
+
+ if (userSession.properties.containsKey(DrillProperties.QUERY_TAGS)) {
+ sb.append(DrillProperties.QUERY_TAGS);
+ }
+
+ if (userSession.sessionOptions == null && sb.length() > 0) {
+ logger.warn("User property {} can't be installed as a server option without the session option manager",
+ sb.toString());
+ return false;
+ }
+ return true;
+ }
+
+ public UserSession build() {
+ if (canApplyUserProperty()) {
+ if (userSession.properties.containsKey(DrillProperties.QUOTING_IDENTIFIERS)) {
userSession.setSessionOption(PlannerSettings.QUOTING_IDENTIFIERS_KEY,
- userSession.properties.getProperty(DrillProperties.QUOTING_IDENTIFIERS));
- } else {
- logger.warn("User property {} can't be installed as a server option without the session option manager",
- DrillProperties.QUOTING_IDENTIFIERS);
+ userSession.properties.getProperty(DrillProperties.QUOTING_IDENTIFIERS));
+ }
+
+ if (userSession.properties.containsKey(DrillProperties.QUERY_TAGS)) {
+ userSession.setSessionOption(ExecConstants.RM_QUERY_TAGS_KEY,
+ userSession.properties.getProperty(DrillProperties.QUERY_TAGS));
}
}
UserSession session = userSession;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 73a039b72..89061763b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -17,19 +17,6 @@
*/
package org.apache.drill.exec.server;
-import java.io.IOException;
-import java.nio.file.InvalidPathException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.nio.file.StandardWatchEventKinds;
-import java.nio.file.WatchEvent;
-import java.nio.file.WatchKey;
-import java.nio.file.WatchService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.tools.ToolProvider;
-
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.StackTrace;
import org.apache.drill.common.concurrent.ExtendedLatch;
@@ -43,6 +30,7 @@ import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle;
import org.apache.drill.exec.coord.zk.ZKClusterCoordinator;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;
import org.apache.drill.exec.server.DrillbitStateManager.DrillbitState;
import org.apache.drill.exec.server.options.OptionDefinition;
import org.apache.drill.exec.server.options.OptionValue;
@@ -51,21 +39,31 @@ import org.apache.drill.exec.server.options.SystemOptionManager;
import org.apache.drill.exec.server.rest.WebServer;
import org.apache.drill.exec.service.ServiceEngine;
import org.apache.drill.exec.store.StoragePluginRegistry;
-import org.apache.drill.exec.store.sys.store.provider.CachingPersistentStoreProvider;
-import org.apache.drill.exec.store.sys.store.provider.InMemoryStoreProvider;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
+import org.apache.drill.exec.store.sys.store.provider.CachingPersistentStoreProvider;
+import org.apache.drill.exec.store.sys.store.provider.InMemoryStoreProvider;
import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider;
import org.apache.drill.exec.util.GuavaPatcher;
import org.apache.drill.exec.util.ProtobufPatcher;
import org.apache.drill.exec.work.WorkManager;
-import org.apache.zookeeper.Environment;
-
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
+import org.apache.zookeeper.Environment;
import org.slf4j.bridge.SLF4JBridgeHandler;
+import javax.tools.ToolProvider;
+import java.io.IOException;
+import java.nio.file.InvalidPathException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
/**
* Starts, tracks and stops all the required services for a Drillbit daemon to work.
*/
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 030e1a346..9cd670e99 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -17,15 +17,6 @@
*/
package org.apache.drill.exec.server.options;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-
import org.apache.commons.collections.IteratorUtils;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.config.LogicalPlanPersistence;
@@ -39,10 +30,18 @@ import org.apache.drill.exec.store.sys.PersistentStoreConfig;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
import org.apache.drill.exec.store.sys.store.provider.InMemoryStoreProvider;
import org.apache.drill.exec.util.AssertionUtil;
-
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
/**
* <p> {@link OptionManager} that holds options within {@link org.apache.drill.exec.server.DrillbitContext}.
* Only one instance of this class exists per drillbit. Options set at the system level affect the entire system and
@@ -277,7 +276,10 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
new OptionDefinition(ExecConstants.HLL_ACCURACY_VALIDATOR),
new OptionDefinition(ExecConstants.DETERMINISTIC_SAMPLING_VALIDATOR),
new OptionDefinition(ExecConstants.NDV_BLOOM_FILTER_ELEMENTS_VALIDATOR),
- new OptionDefinition(ExecConstants.NDV_BLOOM_FILTER_FPOS_PROB_VALIDATOR)
+ new OptionDefinition(ExecConstants.NDV_BLOOM_FILTER_FPOS_PROB_VALIDATOR),
+ new OptionDefinition(ExecConstants.RM_QUERY_TAGS_VALIDATOR,
+ new OptionMetaData(OptionValue.AccessibleScopes.SESSION_AND_QUERY, false, false)),
+ new OptionDefinition(ExecConstants.RM_QUEUES_WAIT_FOR_PREFERRED_NODES_VALIDATOR)
};
CaseInsensitiveMap<OptionDefinition> map = Arrays.stream(definitions)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandlerService.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandlerService.java
index f5fa122fe..3551b6769 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandlerService.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandlerService.java
@@ -21,7 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
import org.apache.drill.shaded.guava.com.google.common.io.Resources;
import com.jasonclawson.jackson.dataformat.hocon.HoconFactory;
-import org.apache.drill.common.config.CommonConstants;
+import org.apache.drill.common.config.ConfigConstants;
import org.apache.drill.common.config.LogicalPlanPersistence;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.logical.StoragePluginConfig;
@@ -45,10 +45,10 @@ import static org.apache.drill.exec.store.StoragePluginRegistry.ACTION_ON_STORAG
/**
* Drill plugins handler, which allows to update storage plugins configs from the
- * {@link CommonConstants#STORAGE_PLUGINS_OVERRIDE_CONF} conf file
+ * {@link ConfigConstants#STORAGE_PLUGINS_OVERRIDE_CONF} conf file
*
* TODO: DRILL-6564: It can be improved with configs versioning and service of creating
- * {@link CommonConstants#STORAGE_PLUGINS_OVERRIDE_CONF}
+ * {@link ConfigConstants#STORAGE_PLUGINS_OVERRIDE_CONF}
*/
public class StoragePluginsHandlerService implements StoragePluginsHandler {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginsHandlerService.class);
@@ -124,17 +124,17 @@ public class StoragePluginsHandlerService implements StoragePluginsHandler {
}
/**
- * Get the new storage plugins from the {@link CommonConstants#STORAGE_PLUGINS_OVERRIDE_CONF} file if it exists,
+ * Get the new storage plugins from the {@link ConfigConstants#STORAGE_PLUGINS_OVERRIDE_CONF} file if it exists,
* null otherwise
*
* @return storage plugins
*/
private StoragePlugins getNewStoragePlugins() {
- Set<URL> urlSet = ClassPathScanner.forResource(CommonConstants.STORAGE_PLUGINS_OVERRIDE_CONF, false);
+ Set<URL> urlSet = ClassPathScanner.forResource(ConfigConstants.STORAGE_PLUGINS_OVERRIDE_CONF, false);
if (!urlSet.isEmpty()) {
if (urlSet.size() != 1) {
DrillRuntimeException.format("More than one %s file is placed in Drill's classpath: %s",
- CommonConstants.STORAGE_PLUGINS_OVERRIDE_CONF, urlSet);
+ ConfigConstants.STORAGE_PLUGINS_OVERRIDE_CONF, urlSet);
}
pluginsOverrideFileUrl = urlSet.iterator().next();
try {
@@ -142,11 +142,11 @@ public class StoragePluginsHandlerService implements StoragePluginsHandler {
return lpPersistence.getMapper().readValue(newPluginsData, StoragePlugins.class);
} catch (IOException e) {
logger.error("Failures are obtained while loading %s file. Proceed without update",
- CommonConstants.STORAGE_PLUGINS_OVERRIDE_CONF, e);
+ ConfigConstants.STORAGE_PLUGINS_OVERRIDE_CONF, e);
}
}
logger.trace("The {} file is absent. Proceed without updating of the storage plugins configs",
- CommonConstants.STORAGE_PLUGINS_OVERRIDE_CONF);
+ ConfigConstants.STORAGE_PLUGINS_OVERRIDE_CONF);
return null;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedResourceManager.java
new file mode 100644
index 000000000..af9ba00f4
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedResourceManager.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.config.ResourcePoolTree;
+import org.apache.drill.exec.resourcemgr.config.ResourcePoolTreeImpl;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.work.foreman.Foreman;
+
+public class DistributedResourceManager implements ResourceManager {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedResourceManager.class);
+
+ private final ResourcePoolTree rmPoolTree;
+
+ private final DrillbitContext context;
+
+ private final DrillConfig rmConfig;
+
+ private final ResourceManager delegatedRM;
+
+ public DistributedResourceManager(DrillbitContext context) throws DrillRuntimeException {
+ try {
+ this.context = context;
+ this.rmConfig = DrillConfig.createForRM();
+ rmPoolTree = new ResourcePoolTreeImpl(rmConfig, DrillConfig.getMaxDirectMemory(),
+ Runtime.getRuntime().availableProcessors(), 1);
+ logger.debug("Successfully parsed RM config \n{}", rmConfig.getConfig(ResourcePoolTreeImpl.ROOT_POOL_CONFIG_KEY));
+ this.delegatedRM = new DefaultResourceManager();
+ } catch (RMConfigException ex) {
+ throw new DrillRuntimeException(String.format("Failed while parsing Drill RM Configs. Drillbit won't be started" +
+ " unless config is fixed or RM is disabled by setting %s to false", ExecConstants.RM_ENABLED), ex);
+ }
+ }
+ @Override
+ public long memoryPerNode() {
+ return delegatedRM.memoryPerNode();
+ }
+
+ @Override
+ public int cpusPerNode() {
+ return delegatedRM.cpusPerNode();
+ }
+
+ @Override
+ public QueryResourceAllocator newResourceAllocator(QueryContext queryContext) {
+ return delegatedRM.newResourceAllocator(queryContext);
+ }
+
+ @Override
+ public QueryResourceManager newQueryRM(Foreman foreman) {
+ return delegatedRM.newQueryRM(foreman);
+ }
+
+ public ResourcePoolTree getRmPoolTree() {
+ return rmPoolTree;
+ }
+
+ @Override
+ public void close() {
+ delegatedRM.close();
+ }
+}
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 4f6fbb278..7e9415530 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -407,6 +407,8 @@ drill.exec: {
// Resource management
rm : {
+ // Enables Drill RM feature by default. To disable it set the below parameter to false
+ enabled: false
// Memory per node normally comes from the direct memory alloated on the JVM
// command line. This parameter, if other than 0, further limits the amount.
// Primarily for testing.
@@ -675,5 +677,9 @@ drill.exec.options: {
exec.statistics.ndv_accuracy: 20,
exec.statistics.ndv_extrapolation_bf_elements: 1000000,
exec.statistics.ndv_extrapolation_bf_fpprobability: 10,
- exec.statistics.deterministic_sampling: false
+ exec.statistics.deterministic_sampling: false,
+ exec.query.return_result_set_for_ddl: true,
+ # ========= rm related options ===========
+ exec.rm.queryTags: "",
+ exec.rm.queues.wait_for_preferred_nodes: true
}
diff --git a/exec/java-exec/src/main/resources/drill-rm-default.conf b/exec/java-exec/src/main/resources/drill-rm-default.conf
new file mode 100644
index 000000000..520947f56
--- /dev/null
+++ b/exec/java-exec/src/main/resources/drill-rm-default.conf
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http:// www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+drill.exec.rm: {
+ pool_name: "root",
+ memory: 0.9, // 90% of total direct memory allocated to Drill
+ queue_selection_policy: "bestfit", // policy to select queue for a query when multiple queues are eligible
+ selector: {
+ acl: {
+ users: ["*"],
+ groups: ["*"]
+ }
+ }
+ queue: {
+ max_query_memory_per_node: 8G // supported format regex [0-9]*[kKmMgG]?
+ max_waiting: 10, // default
+ max_admissible: 10, // default
+ max_wait_timeout: 30000, // default in ms
+ wait_for_preferred_nodes: true // default
+ }
+} \ No newline at end of file
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/TestRMConfigLoad.java b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/TestRMConfigLoad.java
new file mode 100644
index 000000000..16f7f64f0
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/TestRMConfigLoad.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr;
+
+import org.apache.drill.categories.ResourceManagerTest;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.resourcemgr.config.QueryQueueConfig;
+import org.apache.drill.exec.resourcemgr.config.RMCommonDefaults;
+import org.apache.drill.exec.resourcemgr.config.ResourcePoolTree;
+import org.apache.drill.exec.resourcemgr.config.selectors.AclSelector;
+import org.apache.drill.exec.work.foreman.rm.DefaultResourceManager;
+import org.apache.drill.exec.work.foreman.rm.DistributedResourceManager;
+import org.apache.drill.exec.work.foreman.rm.ResourceManager;
+import org.apache.drill.test.BaseDirTestWatcher;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.apache.drill.test.DrillTest;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Ignore("These tests will be ignored until integration with new DistributedResourceManager is done")
+@Category(ResourceManagerTest.class)
+public final class TestRMConfigLoad extends DrillTest {
+
+ @Rule
+ public final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher();
+
+ @Test
+ public void testDefaultRMConfig() throws Exception {
+ ClusterFixtureBuilder fixtureBuilder = ClusterFixture.builder(dirTestWatcher)
+ .configProperty(ExecConstants.RM_ENABLED, true)
+ .configProperty(ExecConstants.DRILL_PORT_HUNT, true)
+ .withLocalZk();
+
+ try (ClusterFixture cluster = fixtureBuilder.build()) {
+ ResourceManager resourceManager = cluster.drillbit().getContext().getResourceManager();
+ assertTrue(resourceManager instanceof DistributedResourceManager);
+
+ ResourcePoolTree poolTree = ((DistributedResourceManager) resourceManager).getRmPoolTree();
+ assertTrue("In drill-rm-default root pool is not leaf pool", poolTree.getRootPool().isLeafPool());
+ assertTrue("selector in drill-rm-default is not acl selector",
+ poolTree.getRootPool().getSelector() instanceof AclSelector);
+ assertEquals("max_query_memory_per_node in drill-rm-default is not configured with expected default value",
+ 8 * 1024L, poolTree.getRootPool().getMaxQueryMemoryPerNode());
+ assertEquals("queue_selection_policy in drill-rm-default is not configured with expected default value",
+ RMCommonDefaults.ROOT_POOL_DEFAULT_QUEUE_SELECTION_POLICY,
+ poolTree.getSelectionPolicyInUse().getSelectionPolicy());
+ assertEquals("memory share of root pool in drill-rm-default is not configured with expected default value",
+ RMCommonDefaults.ROOT_POOL_DEFAULT_MEMORY_PERCENT, poolTree.getResourceShare(), 0);
+
+ final QueryQueueConfig defaultQueue = poolTree.getRootPool().getQueryQueue();
+ assertEquals("max_admissible in drill-rm-default is not configured with expected default value",
+ RMCommonDefaults.MAX_ADMISSIBLE_QUERY_COUNT, defaultQueue.getMaxAdmissibleQueries());
+ assertEquals("max_waiting in drill-rm-default is not configured with expected default value",
+ RMCommonDefaults.MAX_WAITING_QUERY_COUNT, defaultQueue.getMaxWaitingQueries());
+ assertEquals("max_wait_timeout in drill-rm-default is not configured with expected default value",
+ RMCommonDefaults.MAX_WAIT_TIMEOUT_IN_MS, defaultQueue.getWaitTimeoutInMs());
+ assertEquals("wait_for_preferred_nodes in drill-rm-default is not configured with expected default value",
+ RMCommonDefaults.WAIT_FOR_PREFERRED_NODES, defaultQueue.waitForPreferredNodes());
+ }
+ }
+
+ @Test
+ public void testDefaultRMWithLocalCoordinatorAndRMEnabled() throws Exception {
+ ClusterFixtureBuilder fixtureBuilder = ClusterFixture.builder(dirTestWatcher)
+ .configProperty(ExecConstants.RM_ENABLED, true);
+
+ try (ClusterFixture cluster = fixtureBuilder.build()) {
+ ResourceManager resourceManager = cluster.drillbit().getContext().getResourceManager();
+ assertTrue(resourceManager instanceof DefaultResourceManager);
+ }
+ }
+
+ @Test
+ public void testDefaultRMWithLocalCoordinatorAndRMDisabled() throws Exception {
+ ClusterFixtureBuilder fixtureBuilder = ClusterFixture.builder(dirTestWatcher)
+ .configProperty(ExecConstants.RM_ENABLED, false);
+
+ try (ClusterFixture cluster = fixtureBuilder.build()) {
+ ResourceManager resourceManager = cluster.drillbit().getContext().getResourceManager();
+ assertTrue(resourceManager instanceof DefaultResourceManager);
+ }
+ }
+
+ @Test
+ public void testDefaultRMOnlyRMDisabled() throws Exception {
+ ClusterFixtureBuilder fixtureBuilder = ClusterFixture.builder(dirTestWatcher)
+ .configProperty(ExecConstants.RM_ENABLED, false)
+ .configProperty(ExecConstants.DRILL_PORT_HUNT, true)
+ .withLocalZk();
+
+ try (ClusterFixture cluster = fixtureBuilder.build()) {
+ ResourceManager resourceManager = cluster.drillbit().getContext().getResourceManager();
+ assertTrue(resourceManager instanceof DefaultResourceManager);
+ }
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/TestResourcePoolTree.java b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/TestResourcePoolTree.java
new file mode 100644
index 000000000..fea132d41
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/TestResourcePoolTree.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.drill.categories.ResourceManagerTest;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.config.QueueAssignmentResult;
+import org.apache.drill.exec.resourcemgr.config.ResourcePool;
+import org.apache.drill.exec.resourcemgr.config.ResourcePoolImpl;
+import org.apache.drill.exec.resourcemgr.config.ResourcePoolTree;
+import org.apache.drill.exec.resourcemgr.config.ResourcePoolTreeImpl;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+import org.apache.drill.exec.server.options.OptionValue;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.drill.exec.resourcemgr.config.RMCommonDefaults.ROOT_POOL_DEFAULT_QUEUE_SELECTION_POLICY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@Category(ResourceManagerTest.class)
+public final class TestResourcePoolTree {
+
+ private static final Map<String, Object> poolTreeConfig = new HashMap<>();
+
+ private static final Map<String, Object> pool1 = new HashMap<>();
+
+ private static final Map<String, Object> pool2 = new HashMap<>();
+
+ private static final Map<String, Object> queue1 = new HashMap<>();
+
+ private static final List<Object> childResourcePools = new ArrayList<>();
+
+ private static final Map<String, Object> tagSelectorConfig1 = new HashMap<>();
+
+ private static final Map<String, Object> tagSelectorConfig2 = new HashMap<>();
+
+ private static final QueryContext mockContext = mock(QueryContext.class);
+
+ @BeforeClass
+ public static void testSuiteSetup() {
+ pool1.put(ResourcePoolImpl.POOL_NAME_KEY, "dev");
+ pool1.put(ResourcePoolImpl.POOL_MEMORY_SHARE_KEY, 0.80);
+
+ pool2.put(ResourcePoolImpl.POOL_NAME_KEY, "qa");
+ pool2.put(ResourcePoolImpl.POOL_MEMORY_SHARE_KEY, 0.20);
+
+ queue1.put("max_query_memory_per_node", 5534);
+
+ tagSelectorConfig1.put("tag", "small");
+ tagSelectorConfig2.put("tag", "large");
+ }
+
+ @After
+ public void afterTestCleanup() {
+ // cleanup resource tree
+ poolTreeConfig.clear();
+
+ // cleanup pools
+ pool1.remove(ResourcePoolImpl.POOL_QUEUE_KEY);
+ pool1.remove(ResourcePoolImpl.POOL_SELECTOR_KEY);
+
+ pool2.remove(ResourcePoolImpl.POOL_QUEUE_KEY);
+ pool2.remove(ResourcePoolImpl.POOL_SELECTOR_KEY);
+
+ childResourcePools.clear();
+ }
+
+ private ResourcePoolTree getPoolTreeConfig() throws RMConfigException {
+ poolTreeConfig.put(ResourcePoolImpl.POOL_NAME_KEY, "drill");
+ poolTreeConfig.put(ResourcePoolImpl.POOL_CHILDREN_POOLS_KEY, childResourcePools);
+
+ Config rmConfig = ConfigFactory.empty()
+ .withValue("drill.exec.rm", ConfigValueFactory.fromMap(poolTreeConfig));
+ return new ResourcePoolTreeImpl(rmConfig, 10000, 10, 2);
+ }
+
+ private boolean checkExpectedVsActualPools(List<ResourcePool> actual, List<String> expectedNames) {
+ if (actual.size() != expectedNames.size()) {
+ return false;
+ }
+
+ for (ResourcePool pool : actual) {
+ if (!expectedNames.contains(pool.getPoolName())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Test
+ public void testTreeWith2LeafPool() throws Exception {
+ // pool with tag selector
+ pool1.put(ResourcePoolImpl.POOL_QUEUE_KEY, queue1);
+ pool1.put(ResourcePoolImpl.POOL_SELECTOR_KEY, tagSelectorConfig1);
+
+ // pool with default selector
+ pool2.put(ResourcePoolImpl.POOL_QUEUE_KEY, queue1);
+
+ childResourcePools.add(pool1);
+ childResourcePools.add(pool2);
+
+ ResourcePoolTree configTree = getPoolTreeConfig();
+
+ // get all leaf queues names
+ Set<String> expectedLeafQueue = new HashSet<>();
+ expectedLeafQueue.add((String)pool1.get("pool_name"));
+ expectedLeafQueue.add((String)pool2.get("pool_name"));
+
+ assertEquals("Root pool is different than expected", "drill", configTree.getRootPool().getPoolName());
+ assertEquals("Expected and actual leaf queue names are different", expectedLeafQueue,
+ configTree.getAllLeafQueues().keySet());
+ assertEquals("Unexpected Selection policy is in use", ROOT_POOL_DEFAULT_QUEUE_SELECTION_POLICY,
+ configTree.getSelectionPolicyInUse().getSelectionPolicy());
+ }
+
+ @Test(expected = RMConfigException.class)
+ public void testDuplicateLeafPool() throws Exception {
+ // leaf pool
+ pool1.put(ResourcePoolImpl.POOL_QUEUE_KEY, queue1);
+ childResourcePools.add(pool1);
+ childResourcePools.add(pool1);
+
+ getPoolTreeConfig();
+ }
+
+ @Test(expected = RMConfigException.class)
+ public void testMissingQueueAtLeafPool() throws Exception {
+ // leaf pool with queue
+ pool1.put(ResourcePoolImpl.POOL_QUEUE_KEY, queue1);
+ pool1.put(ResourcePoolImpl.POOL_SELECTOR_KEY, tagSelectorConfig1);
+ childResourcePools.add(pool1);
+ childResourcePools.add(pool2);
+
+ getPoolTreeConfig();
+ }
+
+ @Test(expected = RMConfigException.class)
+ public void testInvalidQueueAtLeafPool() throws Exception {
+ // leaf pool with invalid queue
+ int initialValue = (Integer)queue1.remove("max_query_memory_per_node");
+
+ try {
+ pool1.put(ResourcePoolImpl.POOL_QUEUE_KEY, queue1);
+ pool1.put(ResourcePoolImpl.POOL_SELECTOR_KEY, tagSelectorConfig1);
+ childResourcePools.add(pool1);
+
+ getPoolTreeConfig();
+ } finally {
+ queue1.put("max_query_memory_per_node", initialValue);
+ }
+ }
+
+ @Test
+ public void testRootPoolAsLeaf() throws Exception {
+ // leaf pool with queue
+ poolTreeConfig.put(ResourcePoolImpl.POOL_NAME_KEY, "drill");
+ poolTreeConfig.put(ResourcePoolImpl.POOL_QUEUE_KEY, queue1);
+ poolTreeConfig.put(ResourcePoolImpl.POOL_SELECTOR_KEY, tagSelectorConfig1);
+
+ Config rmConfig = ConfigFactory.empty()
+ .withValue("drill.exec.rm", ConfigValueFactory.fromMap(poolTreeConfig));
+ ResourcePoolTree poolTree = new ResourcePoolTreeImpl(rmConfig, 10000, 10, 2);
+
+ assertTrue("Root pool is not a leaf pool", poolTree.getRootPool().isLeafPool());
+ assertEquals("Root pool name is not drill", "drill", poolTree.getRootPool().getPoolName());
+ assertTrue("Root pool is not the only leaf pool", poolTree.getAllLeafQueues().size() == 1);
+ assertTrue("Root pool name is not same as leaf pool name", poolTree.getAllLeafQueues().containsKey("drill"));
+ assertFalse("Root pool should not be a default pool", poolTree.getRootPool().isDefaultPool());
+ }
+
+ @Test
+ public void testTreeWithLeafAndIntermediatePool() throws Exception {
+ // left leaf pool1 with tag selector
+ pool1.put(ResourcePoolImpl.POOL_QUEUE_KEY, queue1);
+ pool1.put(ResourcePoolImpl.POOL_SELECTOR_KEY, tagSelectorConfig1);
+
+ // left leaf pool2 with default selector
+ pool2.put(ResourcePoolImpl.POOL_QUEUE_KEY, queue1);
+
+ // intermediate left pool1 with 2 leaf pools (pool1, pool2)
+ Map<String, Object> interPool1 = new HashMap<>();
+ List<Object> childPools1 = new ArrayList<>();
+ childPools1.add(pool1);
+ childPools1.add(pool2);
+ interPool1.put(ResourcePoolImpl.POOL_NAME_KEY, "eng");
+ interPool1.put(ResourcePoolImpl.POOL_MEMORY_SHARE_KEY, 0.9);
+ interPool1.put(ResourcePoolImpl.POOL_CHILDREN_POOLS_KEY, childPools1);
+
+ // right leaf pool
+ Map<String, Object> rightLeafPool = new HashMap<>();
+ rightLeafPool.put(ResourcePoolImpl.POOL_NAME_KEY, "marketing");
+ rightLeafPool.put(ResourcePoolImpl.POOL_MEMORY_SHARE_KEY, 0.1);
+ rightLeafPool.put(ResourcePoolImpl.POOL_QUEUE_KEY, queue1);
+ rightLeafPool.put(ResourcePoolImpl.POOL_SELECTOR_KEY, tagSelectorConfig2);
+
+ childResourcePools.add(interPool1);
+ childResourcePools.add(rightLeafPool);
+ ResourcePoolTree configTree = getPoolTreeConfig();
+
+ // Test successful selection of all leaf pools
+ OptionValue testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, ExecConstants
+ .RM_QUERY_TAGS_KEY, "small,large", OptionValue.OptionScope.SESSION);
+ when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption);
+ QueueAssignmentResult assignmentResult = configTree.selectAllQueues(mockContext);
+ List<ResourcePool> selectedPools = assignmentResult.getSelectedLeafPools();
+ List<String> expectedPools = new ArrayList<>();
+ expectedPools.add("dev");
+ expectedPools.add("qa");
+ expectedPools.add("marketing");
+
+ assertTrue("All leaf pools are not selected", selectedPools.size() == 3);
+ assertTrue("Selected leaf pools and expected pools are different",
+ checkExpectedVsActualPools(selectedPools, expectedPools));
+
+ // Test successful selection of multiple leaf pools
+ expectedPools.clear();
+ testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, ExecConstants
+ .RM_QUERY_TAGS_KEY, "small", OptionValue.OptionScope.SESSION);
+ when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption);
+ assignmentResult = configTree.selectAllQueues(mockContext);
+ selectedPools = assignmentResult.getSelectedLeafPools();
+ expectedPools.add("qa");
+ expectedPools.add("dev");
+ assertTrue("Expected 2 pools to be selected", selectedPools.size() == 2);
+ assertTrue("Selected leaf pools and expected pools are different",
+ checkExpectedVsActualPools(selectedPools, expectedPools));
+
+ // Test successful selection of only left default pool
+ expectedPools.clear();
+ testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, ExecConstants
+ .RM_QUERY_TAGS_KEY, "medium", OptionValue.OptionScope.SESSION);
+ when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption);
+ assignmentResult = configTree.selectAllQueues(mockContext);
+ selectedPools = assignmentResult.getSelectedLeafPools();
+ expectedPools.add("qa");
+ assertTrue("More than one leaf pool is selected", selectedPools.size() == 1);
+ assertTrue("Selected leaf pools and expected pools are different",
+ checkExpectedVsActualPools(selectedPools, expectedPools));
+
+ // cleanup
+ interPool1.clear();
+ rightLeafPool.clear();
+ expectedPools.clear();
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/TestBestFitSelectionPolicy.java b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/TestBestFitSelectionPolicy.java
new file mode 100644
index 000000000..3dcde00ca
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/TestBestFitSelectionPolicy.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectionpolicy;
+
+import org.apache.drill.categories.ResourceManagerTest;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.resourcemgr.NodeResources;
+import org.apache.drill.exec.resourcemgr.config.QueryQueueConfig;
+import org.apache.drill.exec.resourcemgr.config.RMCommonDefaults;
+import org.apache.drill.exec.resourcemgr.config.ResourcePool;
+import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@Category(ResourceManagerTest.class)
+public final class TestBestFitSelectionPolicy {
+
+ private static QueueSelectionPolicy selectionPolicy;
+
+ private static final QueryContext queryContext = mock(QueryContext.class);
+
+ private static final NodeResources queryMaxResources = new NodeResources(1500*1024L*1024L, 2);
+
+ private static final List<Long> poolMemory = new ArrayList<>();
+
+ @BeforeClass
+ public static void testSetup() {
+ selectionPolicy = QueueSelectionPolicyFactory.createSelectionPolicy
+ (RMCommonDefaults.ROOT_POOL_DEFAULT_QUEUE_SELECTION_POLICY);
+ when(queryContext.getQueryId()).thenReturn(UserBitShared.QueryId.getDefaultInstance());
+ }
+
+ @After
+ public void afterTestSetup() {
+ poolMemory.clear();
+ }
+
+ private void testCommonHelper(long expectedPoolMem) throws Exception {
+ List<ResourcePool> inputPools = new ArrayList<>();
+ ResourcePool expectedPool = null;
+
+ for (Long poolMemory : poolMemory) {
+ final ResourcePool testPool = mock(ResourcePool.class);
+ final QueryQueueConfig testPoolQueue = mock(QueryQueueConfig.class);
+ when(testPool.getQueryQueue()).thenReturn(testPoolQueue);
+ when(testPoolQueue.getMaxQueryMemoryInMBPerNode()).thenReturn(poolMemory);
+ inputPools.add(testPool);
+
+ if (poolMemory == expectedPoolMem) {
+ expectedPool = testPool;
+ }
+ }
+
+ ResourcePool selectedPool = selectionPolicy.selectQueue(inputPools, queryContext, queryMaxResources);
+ assertEquals("Selected Pool and expected pool is different", expectedPool, selectedPool);
+ }
+
+ @Test(expected = QueueSelectionException.class)
+ public void testWithNoPool() throws Exception {
+ testCommonHelper(0);
+ }
+
+ @Test
+ public void testWithSinglePool() throws Exception {
+ poolMemory.add(1000L);
+ testCommonHelper(1000);
+ }
+
+ @Test
+ public void testWithMultiplePoolWithGreaterMaxNodeMemory() throws Exception {
+ poolMemory.add(2500L);
+ poolMemory.add(2000L);
+ testCommonHelper(2000);
+ }
+
+ @Test
+ public void testWithMultiplePoolWithLesserMaxNodeMemory() throws Exception {
+ poolMemory.add(700L);
+ poolMemory.add(500L);
+ testCommonHelper(700);
+ }
+
+ @Test
+ public void testMixOfPoolLess_Greater_MaxNodeMemory() throws Exception {
+ poolMemory.add(1000L);
+ poolMemory.add(2000L);
+ testCommonHelper(2000);
+ }
+
+ @Test
+ public void testMixOfPoolLess_Greater_EqualMaxNodeMemory() throws Exception {
+ poolMemory.add(1000L);
+ poolMemory.add(2000L);
+ poolMemory.add(1500L);
+ testCommonHelper(1500);
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/TestDefaultSelectionPolicy.java b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/TestDefaultSelectionPolicy.java
new file mode 100644
index 000000000..a78d106f3
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/TestDefaultSelectionPolicy.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectionpolicy;
+
+import org.apache.drill.categories.ResourceManagerTest;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.resourcemgr.config.ResourcePool;
+import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@Category(ResourceManagerTest.class)
+public final class TestDefaultSelectionPolicy {
+
+ private static QueueSelectionPolicy selectionPolicy;
+
+ private static final QueryContext queryContext = mock(QueryContext.class);
+
+ @BeforeClass
+ public static void testSetup() {
+ selectionPolicy = QueueSelectionPolicyFactory.createSelectionPolicy(
+ QueueSelectionPolicy.SelectionPolicy.DEFAULT);
+ when(queryContext.getQueryId()).thenReturn(UserBitShared.QueryId.getDefaultInstance());
+ }
+
+ @Test(expected = QueueSelectionException.class)
+ public void testWithNoDefaultPool() throws Exception {
+ List<ResourcePool> inputPools = new ArrayList<>();
+ final ResourcePool testPool1 = mock(ResourcePool.class);
+ when(testPool1.isDefaultPool()).thenReturn(false);
+ final ResourcePool testPool2 = mock(ResourcePool.class);
+ when(testPool2.isDefaultPool()).thenReturn(false);
+
+ inputPools.add(testPool1);
+ inputPools.add(testPool2);
+ selectionPolicy.selectQueue(inputPools, queryContext, null);
+ }
+
+ @Test
+ public void testWithSingleDefaultPool() throws Exception {
+ List<ResourcePool> inputPools = new ArrayList<>();
+ final ResourcePool testPool1 = mock(ResourcePool.class);
+ when(testPool1.isDefaultPool()).thenReturn(true);
+ inputPools.add(testPool1);
+
+ final ResourcePool selectedPool = selectionPolicy.selectQueue(inputPools, queryContext, null);
+ assertEquals("Selected Pool and expected pool is different",testPool1, selectedPool);
+ }
+
+ @Test
+ public void testWithMultipleDefaultPool() throws Exception {
+ List<ResourcePool> inputPools = new ArrayList<>();
+ final ResourcePool testPool1 = mock(ResourcePool.class);
+ when(testPool1.isDefaultPool()).thenReturn(true);
+ final ResourcePool testPool2 = mock(ResourcePool.class);
+ when(testPool2.isDefaultPool()).thenReturn(true);
+
+ inputPools.add(testPool1);
+ inputPools.add(testPool2);
+
+ final ResourcePool selectedPool = selectionPolicy.selectQueue(inputPools, queryContext, null);
+ assertEquals("Selected Pool and expected pool is different",testPool1, selectedPool);
+ }
+
+ @Test
+ public void testMixOfDefaultAndNonDefaultPool() throws Exception {
+ List<ResourcePool> inputPools = new ArrayList<>();
+ final ResourcePool testPool1 = mock(ResourcePool.class);
+ when(testPool1.isDefaultPool()).thenReturn(false);
+ final ResourcePool testPool2 = mock(ResourcePool.class);
+ when(testPool2.isDefaultPool()).thenReturn(true);
+
+ inputPools.add(testPool1);
+ inputPools.add(testPool2);
+
+ final ResourcePool selectedPool = selectionPolicy.selectQueue(inputPools, queryContext, null);
+ assertEquals("Selected Pool and expected pool is different",testPool2, selectedPool);
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestAclSelector.java b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestAclSelector.java
new file mode 100644
index 000000000..fbdb108fd
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestAclSelector.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.drill.categories.ResourceManagerTest;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@Category(ResourceManagerTest.class)
+public final class TestAclSelector {
+
+ private static final List<String> groupsValue = new ArrayList<>();
+
+ private static final List<String> usersValue = new ArrayList<>();
+
+ private static final List<String> emptyList = new ArrayList<>();
+
+ private static final Map<String, List<String>> aclConfigValue = new HashMap<>();
+
+ private boolean checkIfSame(Object[] expected, Object[] actual) {
+ Arrays.sort(expected);
+ Arrays.sort(actual);
+ return Arrays.equals(expected, actual);
+ }
+
+ @After
+ public void cleanupAfterTest() {
+ groupsValue.clear();
+ usersValue.clear();
+ aclConfigValue.clear();
+ }
+
+ private ResourcePoolSelector testNegativeHelper() throws RMConfigException {
+ final Config testConfig = ConfigFactory.empty()
+ .withValue("acl", ConfigValueFactory.fromMap(aclConfigValue));
+ return ResourcePoolSelectorFactory.createSelector(testConfig);
+ }
+
+ private ResourcePoolSelector testCommonHelper(List<String> expectedPositiveUsers, List<String> expectedPositiveGroups,
+ List<String> expectedNegativeUsers, List<String> expectedNegativeGroups)
+ throws RMConfigException {
+ ResourcePoolSelector testSelector = testNegativeHelper();
+ assertTrue("TestSelector is not a ACL selector", testSelector instanceof AclSelector);
+ assertTrue("Expected +ve users config mismatched with actual config",
+ checkIfSame(expectedPositiveUsers.toArray(), ((AclSelector) testSelector).getAllowedUsers().toArray()));
+ assertTrue("Expected +ve groups config mismatched with actual config",
+ checkIfSame(expectedPositiveGroups.toArray(), ((AclSelector) testSelector).getAllowedGroups().toArray()));
+ assertTrue("Expected -ve users config mismatched with actual config",
+ checkIfSame(expectedNegativeUsers.toArray(), ((AclSelector) testSelector).getDeniedUsers().toArray()));
+ assertTrue("Expected -ve groups config mismatched with actual config",
+ checkIfSame(expectedNegativeGroups.toArray(), ((AclSelector) testSelector).getDeniedGroups().toArray()));
+ return testSelector;
+ }
+
+ @Test
+ public void testValidACLSelector_shortSyntax() throws Exception {
+ groupsValue.add("sales");
+ groupsValue.add("marketing");
+
+ usersValue.add("user1");
+ usersValue.add("user2");
+
+ aclConfigValue.put("groups", groupsValue);
+ aclConfigValue.put("users", usersValue);
+ AclSelector testSelector = (AclSelector) testCommonHelper(usersValue, groupsValue, emptyList, emptyList);
+
+ // check based on valid/invalid user
+ Set<String> groups = new HashSet<>();
+ assertFalse(testSelector.checkQueryUserGroups("user3", groups));
+ assertTrue(testSelector.checkQueryUserGroups("user1", groups));
+
+ // check based on correct group
+ groups.add("sales");
+ assertTrue(testSelector.checkQueryUserGroups("user3", groups));
+ }
+
+ @Test
+ public void testACLSelector_onlyUsers() throws Exception {
+ usersValue.add("user1");
+ aclConfigValue.put("users", usersValue);
+ testCommonHelper(usersValue, groupsValue, emptyList, emptyList);
+ }
+
+ @Test
+ public void testACLSelector_onlyGroups() throws Exception {
+ groupsValue.add("group1");
+ aclConfigValue.put("groups", groupsValue);
+ testCommonHelper(usersValue, groupsValue, emptyList, emptyList);
+ }
+
+ @Test(expected = RMConfigException.class)
+ public void testInValidACLSelector_shortSyntax() throws Exception {
+ aclConfigValue.put("groups", new ArrayList<>());
+ aclConfigValue.put("users", new ArrayList<>());
+ testNegativeHelper();
+ }
+
+ @Test
+ public void testValidACLSelector_longSyntax() throws Exception {
+
+ groupsValue.add("sales:+");
+ groupsValue.add("marketing:-");
+
+ List<String> expectedAllowedGroups = new ArrayList<>();
+ expectedAllowedGroups.add("sales");
+
+ List<String> expectedDisAllowedGroups = new ArrayList<>();
+ expectedDisAllowedGroups.add("marketing");
+
+ usersValue.add("user1:+");
+ usersValue.add("user2:-");
+
+ List<String> expectedAllowedUsers = new ArrayList<>();
+ expectedAllowedUsers.add("user1");
+ List<String> expectedDisAllowedUsers = new ArrayList<>();
+ expectedDisAllowedUsers.add("user2");
+
+ aclConfigValue.put("groups", groupsValue);
+ aclConfigValue.put("users", usersValue);
+ AclSelector testSelector = (AclSelector)testCommonHelper(expectedAllowedUsers, expectedAllowedGroups,
+ expectedDisAllowedUsers, expectedDisAllowedGroups);
+
+ Set<String> queryGroups = new HashSet<>();
+ // Negative user/group
+ queryGroups.add("marketing");
+ assertFalse(testSelector.checkQueryUserGroups("user2", queryGroups));
+
+ // Invalid user -ve group
+ assertFalse(testSelector.checkQueryUserGroups("user3", queryGroups));
+
+ // -ve user +ve group
+ queryGroups.clear();
+ queryGroups.add("sales");
+ assertFalse(testSelector.checkQueryUserGroups("user2", queryGroups));
+
+ // Invalid user +ve group
+ assertTrue(testSelector.checkQueryUserGroups("user3", queryGroups));
+ }
+
+ @Test(expected = RMConfigException.class)
+ public void testInvalidLongSyntaxIdentifier() throws Exception {
+ groupsValue.add("sales:|");
+ aclConfigValue.put("groups", groupsValue);
+ testNegativeHelper();
+ }
+
+ @Test
+ public void testMixLongShortAclSyntax() throws Exception {
+ groupsValue.add("groups1");
+ groupsValue.add("groups2:+");
+ groupsValue.add("groups3:-");
+
+ List<String> expectedAllowedGroups = new ArrayList<>();
+ expectedAllowedGroups.add("groups1");
+ expectedAllowedGroups.add("groups2");
+
+ List<String> expectedDisAllowedGroups = new ArrayList<>();
+ expectedDisAllowedGroups.add("groups3");
+
+ usersValue.add("user1");
+ usersValue.add("user2:+");
+ usersValue.add("user3:-");
+
+ List<String> expectedAllowedUsers = new ArrayList<>();
+ expectedAllowedUsers.add("user1");
+ expectedAllowedUsers.add("user2");
+
+ List<String> expectedDisAllowedUsers = new ArrayList<>();
+ expectedDisAllowedUsers.add("user3");
+
+ aclConfigValue.put("groups", groupsValue);
+ aclConfigValue.put("users", usersValue);
+
+ testCommonHelper(expectedAllowedUsers, expectedAllowedGroups, expectedDisAllowedUsers, expectedDisAllowedGroups);
+ }
+
+ @Test
+ public void testSameUserBothInPositiveNegative() throws Exception {
+ usersValue.add("user1:+");
+ usersValue.add("user1:-");
+
+ List<String> expectedDisAllowedUsers = new ArrayList<>();
+ expectedDisAllowedUsers.add("user1");
+
+ aclConfigValue.put("users", usersValue);
+ testCommonHelper(emptyList, emptyList, expectedDisAllowedUsers, emptyList);
+ }
+
+ @Test
+ public void testStarInPositiveUsers() throws Exception {
+ usersValue.add("*:+");
+ usersValue.add("user1:-");
+
+ List<String> expectedAllowedUsers = new ArrayList<>();
+ expectedAllowedUsers.add("*");
+
+ List<String> expectedDisAllowedUsers = new ArrayList<>();
+ expectedDisAllowedUsers.add("user1");
+
+ aclConfigValue.put("users", usersValue);
+ AclSelector testSelector = (AclSelector)testCommonHelper(expectedAllowedUsers, emptyList,
+ expectedDisAllowedUsers, emptyList);
+
+ Set<String> queryGroups = new HashSet<>();
+ // -ve user with Invalid groups
+ assertFalse(testSelector.checkQueryUserGroups("user1", queryGroups));
+
+ // Other user with invalid groups
+ assertTrue(testSelector.checkQueryUserGroups("user2", queryGroups));
+ }
+
+ @Test
+ public void testStarInNegativeUsers() throws Exception {
+ usersValue.add("*:-");
+ usersValue.add("user1:+");
+
+ List<String> expectedAllowedUsers = new ArrayList<>();
+ expectedAllowedUsers.add("user1");
+
+ List<String> expectedDisAllowedUsers = new ArrayList<>();
+ expectedDisAllowedUsers.add("*");
+
+ aclConfigValue.put("users", usersValue);
+ AclSelector testSelector = (AclSelector)testCommonHelper(expectedAllowedUsers, emptyList,
+ expectedDisAllowedUsers, emptyList);
+
+ Set<String> queryGroups = new HashSet<>();
+ // Other user with Invalid groups
+ assertFalse(testSelector.checkQueryUserGroups("user2", queryGroups));
+
+ // +ve user with invalid groups
+ assertTrue(testSelector.checkQueryUserGroups("user1", queryGroups));
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestComplexSelectors.java b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestComplexSelectors.java
new file mode 100644
index 000000000..d381d05a0
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestComplexSelectors.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.drill.categories.ResourceManagerTest;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+import org.apache.drill.exec.server.options.OptionValue;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@Category(ResourceManagerTest.class)
+public final class TestComplexSelectors {
+
+ private static final Map<String, String> tagSelectorConfig1 = new HashMap<>();
+
+ private static final Map<String, String> tagSelectorConfig2 = new HashMap<>();
+
+ private static final QueryContext mockContext = mock(QueryContext.class);
+
+ private static final List<Object> complexSelectorValue = new ArrayList<>();
+
+ @BeforeClass
+ public static void classLevelSetup() {
+ tagSelectorConfig1.put("tag", "small");
+ tagSelectorConfig2.put("tag", "large");
+ }
+
+ @After
+ public void testLevelSetup() {
+ complexSelectorValue.clear();
+ }
+
+ private ResourcePoolSelector testOrCommonHelper() throws RMConfigException {
+ Config testConfig = ConfigFactory.empty().withValue("or", ConfigValueFactory.fromIterable(complexSelectorValue));
+ final ResourcePoolSelector testSelector = ResourcePoolSelectorFactory.createSelector(testConfig);
+ assertTrue("TestSelector is not a OrSelector", testSelector instanceof OrSelector);
+ return testSelector;
+ }
+
+ private ResourcePoolSelector testAndCommonHelper() throws RMConfigException {
+ Config testConfig = ConfigFactory.empty().withValue("and", ConfigValueFactory.fromIterable(complexSelectorValue));
+ final ResourcePoolSelector testSelector = ResourcePoolSelectorFactory.createSelector(testConfig);
+ assertTrue("TestSelector is not a AndSelector", testSelector instanceof AndSelector);
+ return testSelector;
+ }
+
+ @Test(expected = RMConfigException.class)
+ public void testOrSelectorSingleValidSelector() throws Exception {
+ // setup complexSelectorValue
+ complexSelectorValue.add(tagSelectorConfig1);
+ testOrCommonHelper();
+ }
+
+ @Test(expected = RMConfigException.class)
+ public void testOrSelectorEmptyValue() throws Exception {
+ testOrCommonHelper();
+ }
+
+ @Test(expected = RMConfigException.class)
+ public void testAndSelectorEmptyValue() throws Exception {
+ testAndCommonHelper();
+ }
+
+ @Test(expected = RMConfigException.class)
+ public void testOrSelectorStringValue() throws Exception {
+ Config testConfig = ConfigFactory.empty().withValue("or", ConfigValueFactory.fromAnyRef("dummy"));
+ ResourcePoolSelectorFactory.createSelector(testConfig);
+ }
+
+ @Test(expected = RMConfigException.class)
+ public void testAndSelectorStringValue() throws Exception {
+ Config testConfig = ConfigFactory.empty().withValue("and", ConfigValueFactory.fromAnyRef("dummy"));
+ ResourcePoolSelectorFactory.createSelector(testConfig);
+ }
+
+ @Test
+ public void testOrSelectorWithTwoValidSelector() throws Exception {
+ // setup complexSelectorValue
+ complexSelectorValue.add(tagSelectorConfig1);
+ complexSelectorValue.add(tagSelectorConfig2);
+
+ // Test for OrSelector
+ ResourcePoolSelector testSelector = testOrCommonHelper();
+
+ // Test successful selection with OrSelector
+ OptionValue testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY,
+ ExecConstants.RM_QUERY_TAGS_KEY, "small", OptionValue.OptionScope.SESSION);
+ when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption);
+ assertTrue(testSelector.isQuerySelected(mockContext));
+
+ // Test unsuccessful selection with OrSelector
+ testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY,
+ ExecConstants.RM_QUERY_TAGS_KEY, "medium", OptionValue.OptionScope.SESSION);
+ when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption);
+ assertFalse(testSelector.isQuerySelected(mockContext));
+ }
+
+ @Test
+ public void testAndSelectorWithTwoValidSelector() throws Exception {
+ // setup complexSelectorValue
+ complexSelectorValue.add(tagSelectorConfig1);
+ complexSelectorValue.add(tagSelectorConfig2);
+
+ // Test for AndSelector
+ ResourcePoolSelector testSelector = testAndCommonHelper();
+
+ // Test successful selection with AndSelector
+ OptionValue testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY,
+ ExecConstants.RM_QUERY_TAGS_KEY, "small,large", OptionValue.OptionScope.SESSION);
+ when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption);
+ assertTrue(testSelector.isQuerySelected(mockContext));
+
+ // Test unsuccessful selection with AndSelector
+ testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY,
+ ExecConstants.RM_QUERY_TAGS_KEY, "small", OptionValue.OptionScope.SESSION);
+ when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption);
+ assertFalse(testSelector.isQuerySelected(mockContext));
+ }
+
+ @Test
+ public void testAndSelectorWithNotEqualSelector() throws Exception {
+ // setup NotEqualSelector config
+ Map<String, Object> notEqualConfig = new HashMap<>();
+ notEqualConfig.put("not_equal", tagSelectorConfig1);
+
+ // setup complex selector value
+ complexSelectorValue.add(notEqualConfig);
+ complexSelectorValue.add(tagSelectorConfig2);
+
+ // Test for AndSelector
+ ResourcePoolSelector testSelector = testAndCommonHelper();
+
+ // Test successful selection with AndSelector
+ OptionValue testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY,
+ ExecConstants.RM_QUERY_TAGS_KEY, "large", OptionValue.OptionScope.SESSION);
+ when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption);
+ assertTrue(testSelector.isQuerySelected(mockContext));
+
+ // Test unsuccessful selection with AndSelector
+ testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY,
+ ExecConstants.RM_QUERY_TAGS_KEY, "small", OptionValue.OptionScope.SESSION);
+ when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption);
+ assertFalse(testSelector.isQuerySelected(mockContext));
+ }
+
+ @Test
+ public void testORSelectorWithNotEqualSelector() throws Exception {
+ // setup NotEqualSelector config
+ Map<String, Object> notEqualConfig = new HashMap<>();
+ notEqualConfig.put("not_equal", tagSelectorConfig1);
+
+ // setup complex selector value
+ complexSelectorValue.add(notEqualConfig);
+ complexSelectorValue.add(tagSelectorConfig2);
+
+ // Test for AndSelector
+ ResourcePoolSelector testSelector = testOrCommonHelper();
+
+ // Test successful selection with AndSelector
+ OptionValue testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY,
+ ExecConstants.RM_QUERY_TAGS_KEY, "medium", OptionValue.OptionScope.SESSION);
+ when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption);
+ assertTrue(testSelector.isQuerySelected(mockContext));
+
+ testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY,
+ ExecConstants.RM_QUERY_TAGS_KEY, "large", OptionValue.OptionScope.SESSION);
+ when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption);
+ assertTrue(testSelector.isQuerySelected(mockContext));
+
+ // Test unsuccessful selection with AndSelector
+ testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY,
+ ExecConstants.RM_QUERY_TAGS_KEY, "small", OptionValue.OptionScope.SESSION);
+ when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption);
+ assertFalse(testSelector.isQuerySelected(mockContext));
+ }
+
+ @Test
+ public void testAndSelectorWithOrSelector() throws Exception {
+ // setup OrSelector config
+ List<Object> orConfigValue = new ArrayList<>();
+ orConfigValue.add(tagSelectorConfig1);
+ orConfigValue.add(tagSelectorConfig2);
+
+ Map<String, Object> orConfig = new HashMap<>();
+ orConfig.put("or", orConfigValue);
+
+ // get another TagSelector config
+ Map<String, String> tagSelectorConfig3 = new HashMap<>();
+ tagSelectorConfig3.put("tag", "medium");
+
+ // setup complex selector value
+ complexSelectorValue.add(orConfig);
+ complexSelectorValue.add(tagSelectorConfig3);
+
+ // Test for AndSelector
+ ResourcePoolSelector testSelector = testAndCommonHelper();
+
+ // Test successful selection with AndSelector
+ OptionValue testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY,
+ ExecConstants.RM_QUERY_TAGS_KEY, "small,medium", OptionValue.OptionScope.SESSION);
+ when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption);
+ assertTrue(testSelector.isQuerySelected(mockContext));
+
+ testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY,
+ ExecConstants.RM_QUERY_TAGS_KEY, "large,medium", OptionValue.OptionScope.SESSION);
+ when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption);
+ assertTrue(testSelector.isQuerySelected(mockContext));
+
+ // Test unsuccessful selection with AndSelector
+ testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY,
+ ExecConstants.RM_QUERY_TAGS_KEY, "small,verylarge", OptionValue.OptionScope.SESSION);
+ when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption);
+ assertFalse(testSelector.isQuerySelected(mockContext));
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestNotEqualSelector.java b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestNotEqualSelector.java
new file mode 100644
index 000000000..b3053aa22
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestNotEqualSelector.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.drill.categories.ResourceManagerTest;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+import org.apache.drill.exec.server.options.OptionValue;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@Category(ResourceManagerTest.class)
+public final class TestNotEqualSelector {
+
+ private ResourcePoolSelector testCommonHelper(Map<String, ? extends Object> selectorValue) throws RMConfigException {
+ Config testConfig = ConfigFactory.empty()
+ .withValue("not_equal", ConfigValueFactory.fromMap(selectorValue));
+ final ResourcePoolSelector testSelector = ResourcePoolSelectorFactory.createSelector(testConfig);
+ assertTrue("TestSelector is not a not_equal selector", testSelector instanceof NotEqualSelector);
+ return testSelector;
+ }
+
+ @Test
+ public void testValidNotEqualTagSelector() throws Exception {
+ Map<String, String> tagSelectorConfig = new HashMap<>();
+ tagSelectorConfig.put("tag", "marketing");
+ NotEqualSelector testSelector = (NotEqualSelector)testCommonHelper(tagSelectorConfig);
+ assertTrue("Expected child selector type to be TagSelector",
+ testSelector.getPoolSelector() instanceof TagSelector);
+
+ QueryContext mockContext = mock(QueryContext.class);
+
+ // Test successful selection
+ OptionValue testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, ExecConstants
+ .RM_QUERY_TAGS_KEY, "small,large", OptionValue.OptionScope.SESSION);
+ when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption);
+ assertTrue(testSelector.isQuerySelected(mockContext));
+
+ // Test unsuccessful selection
+ testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, ExecConstants
+ .RM_QUERY_TAGS_KEY, "marketing", OptionValue.OptionScope.SESSION);
+ when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption);
+ assertFalse(testSelector.isQuerySelected(mockContext));
+ }
+
+ @Test
+ public void testValidNotEqualAclSelector() throws Exception {
+ Map<String, List<String>> aclSelectorValue = new HashMap<>();
+ List<String> usersValue = new ArrayList<>();
+ usersValue.add("user1");
+ usersValue.add("user2");
+
+ List<String> groupsValue = new ArrayList<>();
+ groupsValue.add("group1");
+ groupsValue.add("group2");
+
+ aclSelectorValue.put("users", usersValue);
+ aclSelectorValue.put("groups", groupsValue);
+
+ Map<String, Map<String, List<String>>> aclSelectorConfig = new HashMap<>();
+ aclSelectorConfig.put("acl", aclSelectorValue);
+ NotEqualSelector testSelector = (NotEqualSelector)testCommonHelper(aclSelectorConfig);
+ assertTrue("Expected child selector type to be TagSelector",
+ testSelector.getPoolSelector() instanceof AclSelector);
+ }
+
+ @Test(expected = RMConfigException.class)
+ public void testInValidNotEqualAclSelector() throws Exception {
+ Map<String, List<String>> aclSelectorValue = new HashMap<>();
+
+ aclSelectorValue.put("users", new ArrayList<>());
+ aclSelectorValue.put("groups", new ArrayList<>());
+
+ Map<String, Map<String, List<String>>> aclSelectorConfig = new HashMap<>();
+ aclSelectorConfig.put("acl", aclSelectorValue);
+ testCommonHelper(aclSelectorConfig);
+ }
+
+ @Test(expected = RMConfigException.class)
+ public void testNotEqualSelectorEmptyValue() throws Exception {
+ Map<String, String> notEqualSelectorValue = new HashMap<>();
+ testCommonHelper(notEqualSelectorValue);
+ }
+
+ @Test(expected = RMConfigException.class)
+ public void testNotEqualSelectorStringValue() throws Exception {
+ Config testConfig = ConfigFactory.empty()
+ .withValue("not_equal", ConfigValueFactory.fromAnyRef("null"));
+ ResourcePoolSelectorFactory.createSelector(testConfig);
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestResourcePoolSelectors.java b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestResourcePoolSelectors.java
new file mode 100644
index 000000000..0a2d121c9
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestResourcePoolSelectors.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.drill.categories.ResourceManagerTest;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertTrue;
+
+@Category(ResourceManagerTest.class)
+public class TestResourcePoolSelectors {
+
+ @Test
+ public void testNullSelectorConfig() throws Exception {
+ final ResourcePoolSelector testSelector = ResourcePoolSelectorFactory.createSelector(null);
+ assertTrue("TestSelector with null config is not of type Default Selector",
+ testSelector instanceof DefaultSelector);
+ assertTrue("DefaultSelector type is not default",
+ testSelector.getSelectorType() == ResourcePoolSelector.SelectorType.DEFAULT);
+ }
+
+ @Test(expected = RMConfigException.class)
+ public void testEmptySelectorConfig() throws Exception {
+ Config testConfig = ConfigFactory.empty();
+ ResourcePoolSelectorFactory.createSelector(testConfig);
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestTagSelector.java b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestTagSelector.java
new file mode 100644
index 000000000..d93731513
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestTagSelector.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.resourcemgr.config.selectors;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.drill.categories.ResourceManagerTest;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException;
+import org.apache.drill.exec.server.options.OptionValue;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@Category(ResourceManagerTest.class)
+public final class TestTagSelector {
+
+ private ResourcePoolSelector testCommonHelper(Object tagValue) throws RMConfigException {
+ Config testConfig = ConfigFactory.empty()
+ .withValue("tag", ConfigValueFactory.fromAnyRef(tagValue));
+ final ResourcePoolSelector testSelector = ResourcePoolSelectorFactory.createSelector(testConfig);
+ assertTrue("TestSelector is not a tag selector", testSelector instanceof TagSelector);
+ assertEquals("Unexpected value for tag selector", tagValue, ((TagSelector) testSelector).getTagValue());
+ return testSelector;
+ }
+
+ // Tests for TagSelector
+ @Test
+ public void testValidTagSelector() throws Exception {
+ testCommonHelper("marketing");
+ }
+
+ @Test(expected = RMConfigException.class)
+ public void testInValidTagSelector() throws Exception {
+ testCommonHelper("");
+ testCommonHelper(null);
+ }
+
+ @Test
+ public void testTagSelectorWithQueryTags() throws Exception {
+ QueryContext mockContext = mock(QueryContext.class);
+ TagSelector testSelector = (TagSelector) testCommonHelper("small");
+
+ // Test successful selection
+ OptionValue testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, ExecConstants
+ .RM_QUERY_TAGS_KEY, "small,large", OptionValue.OptionScope.SESSION);
+ when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption);
+ assertTrue(testSelector.isQuerySelected(mockContext));
+
+ // Test empty query tags
+ testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, ExecConstants
+ .RM_QUERY_TAGS_KEY, "", OptionValue.OptionScope.SESSION);
+ when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption);
+ assertFalse(testSelector.isQuerySelected(mockContext));
+
+ // Test different query tags
+ testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, ExecConstants
+ .RM_QUERY_TAGS_KEY, "medium", OptionValue.OptionScope.SESSION);
+ when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption);
+ assertFalse(testSelector.isQuerySelected(mockContext));
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java b/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java
index 16ac42e27..dd9da2219 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java
@@ -23,7 +23,7 @@ import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.categories.SqlFunctionTest;
-import org.apache.drill.common.config.CommonConstants;
+import org.apache.drill.common.config.ConfigConstants;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.exec.ExecConstants;
@@ -205,7 +205,7 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
.unOrdered()
.baselineColumns("ok", "summary")
.baselineValues(false, String.format(summary,
- CommonConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME, jar))
+ ConfigConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME, jar))
.go();
}