diff options
63 files changed, 4316 insertions, 125 deletions
diff --git a/common/src/main/java/org/apache/drill/common/config/CommonConstants.java b/common/src/main/java/org/apache/drill/common/config/ConfigConstants.java index e203972b8..3283fe07f 100644 --- a/common/src/main/java/org/apache/drill/common/config/CommonConstants.java +++ b/common/src/main/java/org/apache/drill/common/config/ConfigConstants.java @@ -17,21 +17,33 @@ */ package org.apache.drill.common.config; -public interface CommonConstants { +public final class ConfigConstants { /** Default (base) configuration file name. (Classpath resource pathname.) */ - String CONFIG_DEFAULT_RESOURCE_PATHNAME = "drill-default.conf"; + public static final String CONFIG_DEFAULT_RESOURCE_PATHNAME = "drill-default.conf"; /** Module configuration files name. (Classpath resource pathname.) */ - String DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME = "drill-module.conf"; + public static final String DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME = "drill-module.conf"; /** Distribution Specific Override configuration file name. (Classpath resource pathname.) */ - String CONFIG_DISTRIBUTION_RESOURCE_PATHNAME = "drill-distrib.conf"; + public static final String CONFIG_DISTRIBUTION_RESOURCE_PATHNAME = "drill-distrib.conf"; /** Override configuration file name. (Classpath resource pathname.) */ - String CONFIG_OVERRIDE_RESOURCE_PATHNAME = "drill-override.conf"; + public static final String CONFIG_OVERRIDE_RESOURCE_PATHNAME = "drill-override.conf"; /** Override plugins configs file name. (Classpath resource pathname.) */ - String STORAGE_PLUGINS_OVERRIDE_CONF = "storage-plugins-override.conf"; + public static final String STORAGE_PLUGINS_OVERRIDE_CONF = "storage-plugins-override.conf"; + /** Default RM configuration file name. (Classpath resource pathname.) */ + public static final String RM_CONFIG_DEFAULT_RESOURCE_PATHNAME = "drill-rm-default.conf"; + + /** Distribution Specific RM Override configuration file name. (Classpath resource pathname.) */ + public static final String RM_CONFIG_DISTRIBUTION_RESOURCE_PATHNAME = "drill-rm-distrib.conf"; + + /** RM Override configuration file name. (Classpath resource pathname.) */ + public static final String RM_CONFIG_OVERRIDE_RESOURCE_PATHNAME = "drill-rm-override.conf"; + + // suppress default constructor + private ConfigConstants() { + } } diff --git a/common/src/main/java/org/apache/drill/common/config/ConfigFileInfo.java b/common/src/main/java/org/apache/drill/common/config/ConfigFileInfo.java new file mode 100644 index 000000000..1f0fe971b --- /dev/null +++ b/common/src/main/java/org/apache/drill/common/config/ConfigFileInfo.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.common.config; + +/** + * Interface that defines implementation to get all the config files names for default, module specific, distribution + * specific and override files. + */ +public interface ConfigFileInfo { + + String getDefaultFileName(); + + String getModuleFileName(); + + String getDistributionFileName(); + + String getOverrideFileName(); +} diff --git a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java index d7d7340a5..6fa09e5a4 100644 --- a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java +++ b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java @@ -17,30 +17,30 @@ */ package org.apache.drill.common.config; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigRenderOptions; +import io.netty.util.internal.PlatformDependent; +import org.apache.drill.common.exceptions.DrillConfigurationException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.common.scanner.ClassPathScanner; +import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; +import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; +import org.reflections.util.ClasspathHelper; + import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; import java.lang.reflect.Constructor; import java.net.URL; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map.Entry; import java.util.Properties; import java.util.concurrent.TimeUnit; -import io.netty.util.internal.PlatformDependent; -import org.apache.drill.common.exceptions.DrillConfigurationException; -import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.common.scanner.ClassPathScanner; -import org.reflections.util.ClasspathHelper; - -import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; -import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; -import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; -import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigRenderOptions; - public class DrillConfig extends NestedConfig { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillConfig.class); @@ -139,7 +139,7 @@ public class DrillConfig extends NestedConfig { * @param overrideFileResourcePathname * the classpath resource pathname of the file to use for * configuration override purposes; {@code null} specifies to use the - * default pathname ({@link CommonConstants#CONFIG_OVERRIDE_RESOURCE_PATHNAME}) (does + * default pathname ({@link ConfigConstants#CONFIG_OVERRIDE_RESOURCE_PATHNAME}) (does * <strong>not</strong> specify to suppress trying to load an * overrides file) * @return A merged Config object. @@ -153,7 +153,7 @@ public class DrillConfig extends NestedConfig { */ @VisibleForTesting public static DrillConfig create(Properties testConfigurations) { - return create(null, testConfigurations, true); + return create(null, testConfigurations, true, new DrillExecConfigFileInfo()); } /** @@ -161,7 +161,27 @@ public class DrillConfig extends NestedConfig { * see {@link #create(String)}'s {@code overrideFileResourcePathname} */ public static DrillConfig create(String overrideFileResourcePathname, boolean enableServerConfigs) { - return create(overrideFileResourcePathname, null, enableServerConfigs); + return create(overrideFileResourcePathname, null, enableServerConfigs, new DrillExecConfigFileInfo()); + } + + /** + * Merged DrillConfig object for all the RM Configurations provided through various resource files. The order of + * precedence is as follows: + * <p> + * Configuration values are retrieved as follows: + * <ul> + * <li>Check a single copy of "drill-rm-override.conf". If multiple copies are + * on the classpath, which copy is read is indeterminate.</li> + * <li>Check a single copy of "drill-rm-distrib.conf". If multiple copies are + * on the classpath, which copy is read is indeterminate. </li> + * <li>Check a single copy of "{@code drill-rm-default.conf}". If multiple + * copies are on the classpath, which copy is read is indeterminate.</li> + * </ul> + * </p> + * @return A merged Config object. + */ + public static DrillConfig createForRM() { + return create(null, null, true, new DrillRMConfigFileInfo()); } /** @@ -181,35 +201,37 @@ public class DrillConfig extends NestedConfig { * is assimilated * @param enableServerConfigs * whether to enable server-specific configuration options - * @return + * @param configInfo + * see {@link ConfigFileInfo} + * @return {@link DrillConfig} object with all configs from passed in resource files */ private static DrillConfig create(String overrideFileResourcePathname, final Properties overriderProps, - final boolean enableServerConfigs) { + final boolean enableServerConfigs, + ConfigFileInfo configInfo) { final StringBuilder logString = new StringBuilder(); final Stopwatch watch = Stopwatch.createStarted(); - overrideFileResourcePathname = - overrideFileResourcePathname == null - ? CommonConstants.CONFIG_OVERRIDE_RESOURCE_PATHNAME - : overrideFileResourcePathname; + overrideFileResourcePathname = overrideFileResourcePathname == null ? + configInfo.getOverrideFileName() : overrideFileResourcePathname; // 1. Load defaults configuration file. - Config fallback = null; + Config fallback = ConfigFactory.empty(); final ClassLoader[] classLoaders = ClasspathHelper.classLoaders(); for (ClassLoader classLoader : classLoaders) { final URL url = - classLoader.getResource(CommonConstants.CONFIG_DEFAULT_RESOURCE_PATHNAME); + classLoader.getResource(configInfo.getDefaultFileName()); if (null != url) { logString.append("Base Configuration:\n\t- ").append(url).append("\n"); fallback = - ConfigFactory.load(classLoader, - CommonConstants.CONFIG_DEFAULT_RESOURCE_PATHNAME); + ConfigFactory.load(classLoader, configInfo.getDefaultFileName()); break; } } // 2. Load per-module configuration files. - final Collection<URL> urls = ClassPathScanner.getConfigURLs(); + final String perModuleResourcePathName = configInfo.getModuleFileName(); + final Collection<URL> urls = (perModuleResourcePathName != null) ? + ClassPathScanner.getConfigURLs(perModuleResourcePathName) : new ArrayList<>(); logString.append("\nIntermediate Configuration and Plugin files, in order of precedence:\n"); for (URL url : urls) { logString.append("\t- ").append(url).append("\n"); @@ -220,12 +242,12 @@ public class DrillConfig extends NestedConfig { final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); // 3. Load distribution specific configuration file. - final URL distribConfigFileUrl = classLoader.getResource(CommonConstants.CONFIG_DISTRIBUTION_RESOURCE_PATHNAME); + final URL distribConfigFileUrl = classLoader.getResource(configInfo.getDistributionFileName()); if (null != distribConfigFileUrl ) { logString.append("Distribution Specific Configuration File: ").append(distribConfigFileUrl).append("\n"); } fallback = - ConfigFactory.load(CommonConstants.CONFIG_DISTRIBUTION_RESOURCE_PATHNAME).withFallback(fallback); + ConfigFactory.load(configInfo.getDistributionFileName()).withFallback(fallback); // 4. Load any specified overrides configuration file along with any // overrides from JVM system properties (e.g., {-Dname=value"). @@ -258,7 +280,7 @@ public class DrillConfig extends NestedConfig { return new DrillConfig(effectiveConfig.resolve()); } - public <T> Class<T> getClassAt(String location, Class<T> clazz) throws DrillConfigurationException { + private <T> Class<T> getClassAt(String location, Class<T> clazz) throws DrillConfigurationException { final String className = getString(location); if (className == null) { throw new DrillConfigurationException(String.format( @@ -286,8 +308,7 @@ public class DrillConfig extends NestedConfig { public <T> T getInstanceOf(String location, Class<T> clazz) throws DrillConfigurationException{ final Class<T> c = getClassAt(location, clazz); try { - final T t = c.newInstance(); - return t; + return c.newInstance(); } catch (Exception ex) { throw new DrillConfigurationException(String.format("Failure while instantiating class [%s] located at '%s.", clazz.getCanonicalName(), location), ex); } diff --git a/common/src/main/java/org/apache/drill/common/config/DrillExecConfigFileInfo.java b/common/src/main/java/org/apache/drill/common/config/DrillExecConfigFileInfo.java new file mode 100644 index 000000000..c95f38b70 --- /dev/null +++ b/common/src/main/java/org/apache/drill/common/config/DrillExecConfigFileInfo.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.common.config; + +public class DrillExecConfigFileInfo implements ConfigFileInfo { + + @Override + public String getDefaultFileName() { + return ConfigConstants.CONFIG_DEFAULT_RESOURCE_PATHNAME; + } + + @Override + public String getModuleFileName() { + return ConfigConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME; + } + + @Override + public String getDistributionFileName() { + return ConfigConstants.CONFIG_DISTRIBUTION_RESOURCE_PATHNAME; + } + + @Override + public String getOverrideFileName() { + return ConfigConstants.CONFIG_OVERRIDE_RESOURCE_PATHNAME; + } +} diff --git a/common/src/main/java/org/apache/drill/common/config/DrillProperties.java b/common/src/main/java/org/apache/drill/common/config/DrillProperties.java index f7688f569..87b489fbd 100644 --- a/common/src/main/java/org/apache/drill/common/config/DrillProperties.java +++ b/common/src/main/java/org/apache/drill/common/config/DrillProperties.java @@ -83,6 +83,8 @@ public final class DrillProperties extends Properties { public static final String TLS_PROVIDER = "TLSProvider"; public static final String USE_SYSTEM_TRUSTSTORE = "useSystemTrustStore"; + public static final String QUERY_TAGS = "queryTags"; + // Although all properties from the application are sent to the server (from the client), the following // sets of properties are used by the client and server respectively. These are reserved words. @@ -94,14 +96,15 @@ public final class DrillProperties extends Properties { SERVICE_PRINCIPAL, SERVICE_NAME, SERVICE_HOST, REALM, KEYTAB, KERBEROS_FROM_SUBJECT, ENABLE_TLS, TLS_PROTOCOL, TRUSTSTORE_TYPE, TRUSTSTORE_PATH, TRUSTSTORE_PASSWORD, DISABLE_HOST_VERIFICATION, DISABLE_CERT_VERIFICATION, TLS_HANDSHAKE_TIMEOUT, TLS_PROVIDER, - USE_SYSTEM_TRUSTSTORE + USE_SYSTEM_TRUSTSTORE, QUERY_TAGS ); public static final ImmutableSet<String> ACCEPTED_BY_SERVER = ImmutableSet.of( USER /** deprecated */, PASSWORD /** deprecated */, SCHEMA, IMPERSONATION_TARGET, - QUOTING_IDENTIFIERS + QUOTING_IDENTIFIERS, + QUERY_TAGS ); private DrillProperties() { diff --git a/common/src/main/java/org/apache/drill/common/config/DrillRMConfigFileInfo.java b/common/src/main/java/org/apache/drill/common/config/DrillRMConfigFileInfo.java new file mode 100644 index 000000000..1db06ff7d --- /dev/null +++ b/common/src/main/java/org/apache/drill/common/config/DrillRMConfigFileInfo.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.common.config; + +public class DrillRMConfigFileInfo implements ConfigFileInfo { + + @Override + public String getDefaultFileName() { + return ConfigConstants.RM_CONFIG_DEFAULT_RESOURCE_PATHNAME; + } + + @Override + public String getModuleFileName() { + return null; + } + + @Override + public String getDistributionFileName() { + return ConfigConstants.RM_CONFIG_DISTRIBUTION_RESOURCE_PATHNAME; + } + + @Override + public String getOverrideFileName() { + return ConfigConstants.RM_CONFIG_OVERRIDE_RESOURCE_PATHNAME; + } +} diff --git a/common/src/main/java/org/apache/drill/common/scanner/BuildTimeScan.java b/common/src/main/java/org/apache/drill/common/scanner/BuildTimeScan.java index a8b65cc91..1e52d4fc6 100644 --- a/common/src/main/java/org/apache/drill/common/scanner/BuildTimeScan.java +++ b/common/src/main/java/org/apache/drill/common/scanner/BuildTimeScan.java @@ -28,6 +28,7 @@ import java.net.URL; import java.util.List; import java.util.Set; +import org.apache.drill.common.config.ConfigConstants; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.scanner.persistence.ScanResult; @@ -136,7 +137,7 @@ public class BuildTimeScan { basePath = "/" + basePath; } URL url = new URL("file:" + basePath); - Set<URL> markedPaths = ClassPathScanner.getMarkedPaths(); + Set<URL> markedPaths = ClassPathScanner.getMarkedPaths(ConfigConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME); if (!markedPaths.contains(url)) { throw new IllegalArgumentException(url + " not in " + markedPaths); } diff --git a/common/src/main/java/org/apache/drill/common/scanner/ClassPathScanner.java b/common/src/main/java/org/apache/drill/common/scanner/ClassPathScanner.java index 353f3af6c..eeec2d69e 100644 --- a/common/src/main/java/org/apache/drill/common/scanner/ClassPathScanner.java +++ b/common/src/main/java/org/apache/drill/common/scanner/ClassPathScanner.java @@ -32,7 +32,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import org.apache.drill.common.config.CommonConstants; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.scanner.persistence.AnnotationDescriptor; @@ -304,12 +303,12 @@ public final class ClassPathScanner { /** * @return paths that have a drill config file in them */ - static Set<URL> getMarkedPaths() { - return forResource(CommonConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME, true); + static Set<URL> getMarkedPaths(String resourcePathName) { + return forResource(resourcePathName, true); } - public static Collection<URL> getConfigURLs() { - return forResource(CommonConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME, false); + public static Collection<URL> getConfigURLs(String resourcePathName) { + return forResource(resourcePathName, false); } /** diff --git a/common/src/main/java/org/apache/drill/common/scanner/RunTimeScan.java b/common/src/main/java/org/apache/drill/common/scanner/RunTimeScan.java index 70b9939f9..cc86c6792 100644 --- a/common/src/main/java/org/apache/drill/common/scanner/RunTimeScan.java +++ b/common/src/main/java/org/apache/drill/common/scanner/RunTimeScan.java @@ -22,6 +22,7 @@ import java.util.Collection; import java.util.List; import java.util.Set; +import org.apache.drill.common.config.ConfigConstants; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.scanner.persistence.ScanResult; @@ -42,7 +43,7 @@ public class RunTimeScan { * @return getMarkedPaths() sans getPrescannedPaths() */ static Collection<URL> getNonPrescannedMarkedPaths() { - Collection<URL> markedPaths = ClassPathScanner.getMarkedPaths(); + Collection<URL> markedPaths = ClassPathScanner.getMarkedPaths(ConfigConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME); markedPaths.removeAll(BuildTimeScan.getPrescannedPaths()); return markedPaths; } @@ -69,7 +70,7 @@ public class RunTimeScan { } else { // scan everything return ClassPathScanner.scan( - ClassPathScanner.getMarkedPaths(), + ClassPathScanner.getMarkedPaths(ConfigConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME), packagePrefixes, scannedBaseClasses, scannedAnnotations, diff --git a/common/src/test/java/org/apache/drill/categories/ResourceManagerTest.java b/common/src/test/java/org/apache/drill/categories/ResourceManagerTest.java new file mode 100644 index 000000000..fc05e8be0 --- /dev/null +++ b/common/src/test/java/org/apache/drill/categories/ResourceManagerTest.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.categories; + +public interface ResourceManagerTest { +} diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/DrillOnYarnConfig.java b/drill-yarn/src/main/java/org/apache/drill/yarn/core/DrillOnYarnConfig.java index 38ecd1c0e..cfc46053c 100644 --- a/drill-yarn/src/main/java/org/apache/drill/yarn/core/DrillOnYarnConfig.java +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/DrillOnYarnConfig.java @@ -25,7 +25,7 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; -import org.apache.drill.common.config.CommonConstants; +import org.apache.drill.common.config.ConfigConstants; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.scanner.ClassPathScanner; import org.apache.drill.common.scanner.persistence.ScanResult; @@ -283,7 +283,7 @@ public class DrillOnYarnConfig { private static Config loadDrillConfig() { drillConfig = DrillConfig - .create(CommonConstants.CONFIG_OVERRIDE_RESOURCE_PATHNAME); + .create(ConfigConstants.CONFIG_OVERRIDE_RESOURCE_PATHNAME); return drillConfig.resolve(); } @@ -394,10 +394,10 @@ public class DrillOnYarnConfig { classLoader = DrillOnYarnConfig.class.getClassLoader(); } - URL url = classLoader.getResource(CommonConstants.CONFIG_OVERRIDE_RESOURCE_PATHNAME); + URL url = classLoader.getResource(ConfigConstants.CONFIG_OVERRIDE_RESOURCE_PATHNAME); if (url == null) { throw new DoyConfigException( - "Drill configuration file is missing: " + CommonConstants.CONFIG_OVERRIDE_RESOURCE_PATHNAME); + "Drill configuration file is missing: " + ConfigConstants.CONFIG_OVERRIDE_RESOURCE_PATHNAME); } File confFile; try { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 6fce8b707..93adda17c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -666,6 +666,7 @@ public final class ExecConstants { public static final String ORDERED_MUX_EXCHANGE = "planner.enable_ordered_mux_exchange"; // Resource management boot-time options. + public static final String RM_ENABLED = "drill.exec.rm.enabled"; public static final String MAX_MEMORY_PER_NODE = "drill.exec.rm.memory_per_node"; public static final String MAX_CPUS_PER_NODE = "drill.exec.rm.cpus_per_node"; @@ -685,6 +686,16 @@ public final class ExecConstants { public static final LongValidator QUEUE_TIMEOUT = new PositiveLongValidator("exec.queue.timeout_millis", Long.MAX_VALUE, new OptionDescription("Indicates how long a query can wait in queue before the query fails. Range: 0-9223372036854775807")); + // New Smart RM boot time configs + public static final String RM_QUERY_TAGS_KEY = "exec.rm.queryTags"; + public static final StringValidator RM_QUERY_TAGS_VALIDATOR = new StringValidator(RM_QUERY_TAGS_KEY, + new OptionDescription("Allows user to set coma separated list of tags for all the queries submitted over a session")); + + public static final String RM_QUEUES_WAIT_FOR_PREFERRED_NODES_KEY = "exec.rm.queues.wait_for_preferred_nodes"; + public static final BooleanValidator RM_QUEUES_WAIT_FOR_PREFERRED_NODES_VALIDATOR = new BooleanValidator + (RM_QUEUES_WAIT_FOR_PREFERRED_NODES_KEY, new OptionDescription("Allows user to enable/disable " + + "wait_for_preferred_nodes configuration across rm queues for all the queries submitted over a session")); + // Ratio of memory for small queries vs. large queries. // Each small query gets 1 unit, each large query gets QUEUE_MEMORY_RATIO units. // A lower limit of 1 enforces the intuition that a large query should never get diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java index 137969a34..143560529 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java @@ -39,7 +39,7 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Sets; import org.apache.drill.shaded.guava.com.google.common.io.Files; import com.typesafe.config.ConfigFactory; import org.apache.commons.io.FileUtils; -import org.apache.drill.common.config.CommonConstants; +import org.apache.drill.common.config.ConfigConstants; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.expression.FunctionCall; @@ -445,7 +445,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au */ private ScanResult scan(ClassLoader classLoader, Path path, URL[] urls) throws IOException { Enumeration<URL> markerFileEnumeration = classLoader.getResources( - CommonConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME); + ConfigConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME); while (markerFileEnumeration.hasMoreElements()) { URL markerFile = markerFileEnumeration.nextElement(); if (markerFile.getPath().contains(path.toUri().getPath())) { @@ -462,7 +462,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au } } throw new JarValidationException(String.format("Marker file %s is missing in %s", - CommonConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME, path.getName())); + ConfigConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME, path.getName())); } /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/NodeResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/NodeResources.java new file mode 100644 index 000000000..485702c8f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/NodeResources.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr; + +/** + * Provides resources for a node in cluster. Currently it is used to support only 2 kind of resources: + * <ul> + * <li>Memory</li> + * <li>Virtual CPU count</li> + * </ul> + * It also has a version field to support extensibility in future to add other resources like network, disk, etc + */ +public class NodeResources { + + private final int version; + + private final long memoryInBytes; + + private final int numVirtualCpu; + + public NodeResources(long memoryInBytes, int numVirtualCpu) { + this.memoryInBytes = memoryInBytes; + this.numVirtualCpu = numVirtualCpu; + this.version = 1; + } + + public NodeResources(long memoryInBytes, int numPhysicalCpu, int vFactor) { + this(memoryInBytes, numPhysicalCpu * vFactor); + } + + public long getMemoryInBytes() { + return memoryInBytes; + } + + public long getMemoryInMB() { + return Math.round((memoryInBytes / 1024L) / 1024L); + } + + public long getMemoryInGB() { + return Math.round(getMemoryInMB() / 1024L); + } + + public int getNumVirtualCpu() { + return numVirtualCpu; + } + + @Override + public String toString() { + return "{ Version: " + version + ", MemoryInBytes: " + memoryInBytes + ", VirtualCPU: " + numVirtualCpu + " }"; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfig.java new file mode 100644 index 000000000..db79ec573 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfig.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config; + +/** + * Interface which defines an implementation for managing queue configuration of a leaf {@link ResourcePool} + */ +public interface QueryQueueConfig { + String getQueueId(); + + String getQueueName(); + + /** + * Given number of available nodes in the cluster what is the total memory resource share of this queue cluster wide + * @param numClusterNodes number of available cluster nodes + * @return Queue's total cluster memory resource share + */ + long getQueueTotalMemoryInMB(int numClusterNodes); + + /** + * @return Maximum query memory (in MB) that a query in this queue can consume on a node + */ + long getMaxQueryMemoryInMBPerNode(); + + /** + * Given number of available nodes in the cluster what is the max memory in MB a query in this queue can be assigned + * cluster wide + * @param numClusterNodes number of available cluster nodes + * @return Maximum query memory (in MB) in this queue + */ + long getMaxQueryTotalMemoryInMB(int numClusterNodes); + + /** + * Determines if admitted queries in this queue should wait in the queue if resources on the preferred assigned + * nodes of a query determined by planner is unavailable. + * @return <tt>true</tt> indicates an admitted query to wait until resources on all the preferred nodes are available or + * wait until timeout is reached, <tt>false</tt> indicates an admitted query to not wait and find other nodes with + * available resources if resources on a preferred node is unavailable. + */ + boolean waitForPreferredNodes(); + + /** + * @return Maximum number of queries that can be admitted in the queue + */ + int getMaxAdmissibleQueries(); + + /** + * @return Maximum number of queries that will be allowed to wait in the queue before failing a query right away + */ + int getMaxWaitingQueries(); + + /** + * @return Maximum time in milliseconds for which a query can be in waiting state inside a queue + */ + int getWaitTimeoutInMs(); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfigImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfigImpl.java new file mode 100644 index 000000000..948d1b943 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueryQueueConfigImpl.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config; + +import com.typesafe.config.Config; +import org.apache.drill.exec.resourcemgr.NodeResources; +import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException; +import org.apache.drill.exec.resourcemgr.util.MemoryConfigParser; + +import java.util.UUID; + +import static org.apache.drill.exec.resourcemgr.config.RMCommonDefaults.MAX_ADMISSIBLE_QUERY_COUNT; +import static org.apache.drill.exec.resourcemgr.config.RMCommonDefaults.MAX_WAITING_QUERY_COUNT; + +/** + * Parses and initialize QueueConfiguration for a {@link ResourcePool}. It also generates a unique UUID for each queue + * which will be later used by Drillbit to store in Zookeeper along with + * {@link org.apache.drill.exec.proto.beans.DrillbitEndpoint}. This UUID is used in the leader election mechanism in + * which Drillbit participates for each of the configured rm queue. + */ +public class QueryQueueConfigImpl implements QueryQueueConfig { + + // Optional queue configurations + private static final String MAX_ADMISSIBLE_KEY = "max_admissible"; + + private static final String MAX_WAITING_KEY = "max_waiting"; + + private static final String MAX_WAIT_TIMEOUT_KEY = "max_wait_timeout"; + + private static final String WAIT_FOR_PREFERRED_NODES_KEY = "wait_for_preferred_nodes"; + + private static final String MAX_QUERY_MEMORY_PER_NODE_FORMAT = "([0-9]+)\\s*([kKmMgG]?)\\s*$"; + + // Required queue configurations in MAX_QUERY_MEMORY_PER_NODE_FORMAT pattern + private static final String MAX_QUERY_MEMORY_PER_NODE_KEY = "max_query_memory_per_node"; + + private final String queueUUID; + + private final String queueName; + + private int maxAdmissibleQuery; + + private int maxWaitingQuery; + + private int maxWaitingTimeout; + + private boolean waitForPreferredNodes; + + private final NodeResources queueResourceShare; + + private NodeResources queryPerNodeResourceShare; + + public QueryQueueConfigImpl(Config queueConfig, String poolName, + NodeResources queueNodeResource) throws RMConfigException { + this.queueUUID = UUID.randomUUID().toString(); + this.queueName = poolName; + this.queueResourceShare = queueNodeResource; + parseQueueConfig(queueConfig); + } + + /** + * Assigns either supplied or default values for the optional queue configuration. For required configuration uses + * the supplied value in queueConfig object or throws proper exception if not present + * @param queueConfig Config object for ResourcePool queue + * @throws RMConfigException in case of error while parsing config + */ + private void parseQueueConfig(Config queueConfig) throws RMConfigException { + this.maxAdmissibleQuery = queueConfig.hasPath(MAX_ADMISSIBLE_KEY) ? + queueConfig.getInt(MAX_ADMISSIBLE_KEY) : MAX_ADMISSIBLE_QUERY_COUNT; + this.maxWaitingQuery = queueConfig.hasPath(MAX_WAITING_KEY) ? + queueConfig.getInt(MAX_WAITING_KEY) : MAX_WAITING_QUERY_COUNT; + this.maxWaitingTimeout = queueConfig.hasPath(MAX_WAIT_TIMEOUT_KEY) ? + queueConfig.getInt(MAX_WAIT_TIMEOUT_KEY) : RMCommonDefaults.MAX_WAIT_TIMEOUT_IN_MS; + this.waitForPreferredNodes = queueConfig.hasPath(WAIT_FOR_PREFERRED_NODES_KEY) ? + queueConfig.getBoolean(WAIT_FOR_PREFERRED_NODES_KEY) : RMCommonDefaults.WAIT_FOR_PREFERRED_NODES; + this.queryPerNodeResourceShare = parseAndGetNodeShare(queueConfig); + } + + @Override + public String getQueueId() { + return queueUUID; + } + + @Override + public String getQueueName() { + return queueName; + } + + /** + * Total memory share of this queue in the cluster based on per node resource share. It assumes that the cluster is + * made up of homogeneous nodes in terms of resources + * @param numClusterNodes total number of available cluster nodes which can participate in this queue + * @return queue memory share in MB + */ + @Override + public long getQueueTotalMemoryInMB(int numClusterNodes) { + return queueResourceShare.getMemoryInMB() * numClusterNodes; + } + + @Override + public long getMaxQueryMemoryInMBPerNode() { + return queryPerNodeResourceShare.getMemoryInMB(); + } + + @Override + public long getMaxQueryTotalMemoryInMB(int numClusterNodes) { + return queryPerNodeResourceShare.getMemoryInMB() * numClusterNodes; + } + + @Override + public boolean waitForPreferredNodes() { + return waitForPreferredNodes; + } + + @Override + public int getMaxAdmissibleQueries() { + return maxAdmissibleQuery; + } + + @Override + public int getMaxWaitingQueries() { + return maxWaitingQuery; + } + + @Override + public int getWaitTimeoutInMs() { + return maxWaitingTimeout; + } + + /** + * Parses queues {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY} configuration using + * {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_FORMAT} pattern to get memory value in bytes using + * {@link MemoryConfigParser#parseMemoryConfigString(String, String)} utility. + * @param queueConfig Queue Configuration object + * @return NodeResources created with value of {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY} in bytes + * @throws RMConfigException in case the config value is absent or doesn't follow the specified format + */ + private NodeResources parseAndGetNodeShare(Config queueConfig) throws RMConfigException { + try { + long memoryPerNodeInBytes = MemoryConfigParser.parseMemoryConfigString( + queueConfig.getString(MAX_QUERY_MEMORY_PER_NODE_KEY), MAX_QUERY_MEMORY_PER_NODE_FORMAT); + return new NodeResources(memoryPerNodeInBytes, Integer.MAX_VALUE); + } catch (Exception ex) { + throw new RMConfigException(String.format("Failed while parsing %s for queue %s", MAX_QUERY_MEMORY_PER_NODE_KEY, + queueName), ex); + } + } + + @Override + public String toString() { + return "{ QueueName: " + queueName + ", QueueId: " + queueUUID + ", QueuePerNodeResource(MB): " + + queryPerNodeResourceShare.toString() + ", MaxQueryMemPerNode(MB): " + queryPerNodeResourceShare.toString() + + ", MaxAdmissible: " + maxAdmissibleQuery + ", MaxWaiting: " + maxWaitingQuery + ", MaxWaitTimeout: " + + maxWaitingTimeout + ", WaitForPreferredNodes: " + waitForPreferredNodes + "}"; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueueAssignmentResult.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueueAssignmentResult.java new file mode 100644 index 000000000..5de2e34fe --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/QueueAssignmentResult.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config; + +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; + +import java.util.ArrayList; +import java.util.List; + +/** + * Used to keep track of selected leaf and all rejected {@link ResourcePool} for the provided query. + * It is used by {@link ResourcePoolImpl#visitAndSelectPool(QueueAssignmentResult, QueryContext)} to store + * information about all the matching and non-matching ResourcePools for a query when ResourcePool selector is + * evaluated against query metadata. Later it is used by + * {@link ResourcePool#visitAndSelectPool(QueueAssignmentResult, QueryContext)} to apply + * {@link org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicy} to select only one queue out of all + * the selected queues for a query. It also provides an API to dump all the debug information to know which pools were + * selected and rejected for a query. + */ +public class QueueAssignmentResult { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueueAssignmentResult.class); + + private final List<ResourcePool> selectedLeafPools = new ArrayList<>(); + + private final List<ResourcePool> rejectedPools = new ArrayList<>(); + + public void addSelectedPool(ResourcePool pool) { + Preconditions.checkState(pool.isLeafPool(), "Selected pool %s is not a leaf pool", + pool.getPoolName()); + selectedLeafPools.add(pool); + } + + public void addRejectedPool(ResourcePool pool) { + rejectedPools.add(pool); + } + + public List<ResourcePool> getSelectedLeafPools() { + return selectedLeafPools; + } + + public List<ResourcePool> getRejectedPools() { + return rejectedPools; + } + + public void logAssignmentResult(String queryId) { + logger.debug("For query {}. Details[{}]", queryId, toString()); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Selected Leaf Pools: {"); + for (ResourcePool pool : selectedLeafPools) { + sb.append(pool.getPoolName()).append(", "); + } + sb.append("} and Rejected pools: {"); + for (ResourcePool pool : rejectedPools) { + sb.append(pool.getPoolName()).append(", "); + } + sb.append("}"); + return sb.toString(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/RMCommonDefaults.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/RMCommonDefaults.java new file mode 100644 index 000000000..dc486fe09 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/RMCommonDefaults.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config; + +import org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicy.SelectionPolicy; + +/** + * Defines all the default values used for the optional configurations for ResourceManagement + */ +public final class RMCommonDefaults { + + public static final int MAX_ADMISSIBLE_QUERY_COUNT = 10; + + public static final int MAX_WAITING_QUERY_COUNT = 10; + + public static final int MAX_WAIT_TIMEOUT_IN_MS = 30_000; + + public static final boolean WAIT_FOR_PREFERRED_NODES = true; + + public static final double ROOT_POOL_DEFAULT_MEMORY_PERCENT = 0.9; + + public static final SelectionPolicy ROOT_POOL_DEFAULT_QUEUE_SELECTION_POLICY = SelectionPolicy.BESTFIT; + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePool.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePool.java new file mode 100644 index 000000000..c2f851224 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePool.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config; + +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.resourcemgr.config.selectors.ResourcePoolSelector; + +import java.util.List; + +/** + * Interface which defines an implementation of ResourcePool configuration for {@link ResourcePoolTree} + */ +public interface ResourcePool { + String getPoolName(); + + boolean isLeafPool(); + + boolean isDefaultPool(); + + // Only valid for leaf pool since it will have a queue assigned to it with this configuration + long getMaxQueryMemoryPerNode(); + + /** + * Evaluates this pool selector to see if the query can be admitted in this pool. If yes then evaluates all + * the child pools selectors as well. During traversal it builds the QueueAssignment result which consists of all + * the selected leaf pools and all rejected intermediate pools. + * @param assignmentResult + * @param queryContext + */ + void visitAndSelectPool(QueueAssignmentResult assignmentResult, QueryContext queryContext); + + /** + * @return Percentage of memory share assigned to this pool + */ + double getPoolMemoryShare(); + + long getPoolMemoryInMB(int numClusterNodes); + + /** + * Only valid for leaf pool. + * @return Returns queue configuration assigned to this leaf pool + */ + QueryQueueConfig getQueryQueue(); + + /** + * @return Full path of the resource pool from root to this pool + */ + String getFullPath(); + + ResourcePool getParentPool(); + + List<ResourcePool> getChildPools(); + + ResourcePoolSelector getSelector(); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolImpl.java new file mode 100644 index 000000000..79cf6c82f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolImpl.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config; + +import com.typesafe.config.Config; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.resourcemgr.NodeResources; +import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException; +import org.apache.drill.exec.resourcemgr.config.selectors.DefaultSelector; +import org.apache.drill.exec.resourcemgr.config.selectors.ResourcePoolSelector; +import org.apache.drill.exec.resourcemgr.config.selectors.ResourcePoolSelectorFactory; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.apache.drill.shaded.guava.com.google.common.collect.Lists; + +import java.util.List; +import java.util.Map; + +/** + * Parses and initializes all the provided configuration for a ResourcePool defined in RM configuration. It takes + * care of creating all the child ResourcePools belonging to this Resource Pool, {@link ResourcePoolSelector} for this + * pool and a {@link QueryQueueConfig} if it's a leaf pool. + */ +public class ResourcePoolImpl implements ResourcePool { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResourcePoolImpl.class); + + public static final String POOL_NAME_KEY = "pool_name"; + + public static final String POOL_MEMORY_SHARE_KEY = "memory"; + + public static final String POOL_CHILDREN_POOLS_KEY = "child_pools"; + + public static final String POOL_SELECTOR_KEY = "selector"; + + public static final String POOL_QUEUE_KEY = "queue"; + + private String poolName; + + private List<ResourcePool> childPools; + + private final double parentResourceShare; + + private final double poolResourceShare; + + private QueryQueueConfig assignedQueue; + + private final ResourcePoolSelector assignedSelector; + + private NodeResources poolResourcePerNode; + + private final ResourcePool parentPool; + + ResourcePoolImpl(Config poolConfig, double poolAbsResourceShare, double parentResourceShare, + NodeResources parentNodeResource, ResourcePool parentPool, + Map<String, QueryQueueConfig> leafQueueCollector) throws RMConfigException { + try { + this.poolName = poolConfig.getString(POOL_NAME_KEY); + this.parentResourceShare = parentResourceShare; + this.poolResourceShare = poolAbsResourceShare * this.parentResourceShare; + this.parentPool = parentPool; + assignedSelector = ResourcePoolSelectorFactory.createSelector(poolConfig.hasPath(POOL_SELECTOR_KEY) + ? poolConfig.getConfig(POOL_SELECTOR_KEY) : null); + parseAndCreateChildPools(poolConfig, parentNodeResource, leafQueueCollector); + } catch (RMConfigException ex) { + throw ex; + } catch (Exception ex) { + throw new RMConfigException(String.format("Failure while parsing configuration for pool: %s. [Details: " + + "PoolConfig: %s]", poolName, poolConfig), ex); + } + } + + @Override + public String getPoolName() { + return poolName; + } + + /** + * Determines if this ResourcePool is a leaf pool or not which will have a queue associated with it + * @return <tt>true</tt> If a leaf pool, <tt>false</tt> otherwise + */ + @Override + public boolean isLeafPool() { + return childPools == null && assignedQueue != null; + } + + /** + * Determines if this ResourcePool is a default pool or not which will act as a sink for all the queries + * @return <tt>true</tt> If a Default pool, <tt>false</tt> otherwise + */ + @Override + public boolean isDefaultPool() { + return (assignedSelector instanceof DefaultSelector); + } + + @Override + public long getMaxQueryMemoryPerNode() { + Preconditions.checkState(isLeafPool() && assignedQueue != null, "max_query_memory_per_node is " + + "only valid for leaf level pools which has a queue assigned to it [Details: PoolName: %s]", poolName); + return assignedQueue.getMaxQueryMemoryInMBPerNode(); + } + + /** + * Used to determine if a ResourcePool is selected for a given query or not. It uses the assigned selector of this + * ResourcePool which takes in query metadata to determine if a query is allowed in this pool. + * @param assignmentResult Used to keep track of all selected leaf pools and all rejected pools for given query + * @param queryContext Contains query metadata like user, groups, tags, etc used by ResourcePoolSelector + */ + @Override + public void visitAndSelectPool(QueueAssignmentResult assignmentResult, QueryContext queryContext) { + if (assignedSelector.isQuerySelected(queryContext)) { + if (isLeafPool()) { + assignmentResult.addSelectedPool(this); + } else { + // Check for each of the child pools + for (ResourcePool childPool : childPools) { + childPool.visitAndSelectPool(assignmentResult, queryContext); + } + } + } else { + assignmentResult.addRejectedPool(this); + } + } + + /** + * Actual percentage share of memory assigned to this ResourcePool + * @return Pool memory share in percentage + */ + @Override + public double getPoolMemoryShare() { + return poolResourceShare; + } + + /** + * Total memory share in MB assigned to this ResourcePool + * @param numClusterNodes number of available cluster nodes for this pool + * @return Pool memory share in MB + */ + @Override + public long getPoolMemoryInMB(int numClusterNodes) { + return poolResourcePerNode.getMemoryInMB() * numClusterNodes; + } + + /** + * Parses and creates all the child ResourcePools if this is an intermediate pool or QueryQueueConfig is its a leaf + * resource pool + * @param poolConfig Config object for this ResourcePool + * @param parentResource Parent ResourcePool NodeResources + * @param leafQueueCollector Collector which keeps track of all leaf queues + * @throws RMConfigException in case of bad configuration + */ + private void parseAndCreateChildPools(Config poolConfig, NodeResources parentResource, + Map<String, QueryQueueConfig> leafQueueCollector) throws RMConfigException { + this.poolResourcePerNode = new NodeResources(Math.round(parentResource.getMemoryInBytes() * poolResourceShare), + parentResource.getNumVirtualCpu()); + if (poolConfig.hasPath(POOL_CHILDREN_POOLS_KEY)) { + childPools = Lists.newArrayList(); + List<? extends Config> childPoolsConfig = poolConfig.getConfigList(POOL_CHILDREN_POOLS_KEY); + logger.debug("Creating {} child pools for parent pool {}", childPoolsConfig.size(), poolName); + for (Config childConfig : childPoolsConfig) { + try { + final ResourcePool childPool = new ResourcePoolImpl(childConfig, childConfig.getDouble(POOL_MEMORY_SHARE_KEY), + poolResourceShare, poolResourcePerNode, this, leafQueueCollector); + childPools.add(childPool); + } catch (RMConfigException ex) { + logger.error("Failure while configuring child ResourcePool. [Details: PoolName: {}, ChildPoolConfig with " + + "error: {}]", poolName, childConfig); + throw ex; + } catch (Exception ex) { + throw new RMConfigException(String.format("Failure while configuring the child ResourcePool. [Details: " + + "PoolName: %s, ChildPoolConfig with error: %s]", poolName, childConfig), ex); + } + } + + if (childPools.isEmpty()) { + throw new RMConfigException(String.format("Empty config for child_pools is not allowed. Please configure the " + + "child_pools property of pool %s correctly or associate a queue with it with no child_pools", poolName)); + } + } else { + logger.info("Resource Pool {} is a leaf level pool with queue assigned to it", poolName); + + if (leafQueueCollector.containsKey(poolName)) { + throw new RMConfigException(String.format("Found non-unique leaf pools with name: %s and config: %s. Leaf " + + "pool names has to be unique since they represent a queue.", poolName, poolConfig)); + } + assignedQueue = new QueryQueueConfigImpl(poolConfig.getConfig(POOL_QUEUE_KEY), poolName, poolResourcePerNode); + leafQueueCollector.put(poolName, assignedQueue); + } + } + + /** + * If this a leaf pool then returns the {@link QueryQueueConfig} for the queue associated with this pool + * @return {@link QueryQueueConfig} object for this pool + */ + @Override + public QueryQueueConfig getQueryQueue() { + Preconditions.checkState(isLeafPool() && assignedQueue != null, "QueryQueue is only " + + "valid for leaf level pools.[Details: PoolName: %s]", poolName); + return assignedQueue; + } + + @Override + public ResourcePool getParentPool() { + return parentPool; + } + + /** + * Returns full path in terms of concatenated pool names from root pool to this pool in {@link ResourcePoolTree} + * @return String with pool names from root to this pool + */ + @Override + public String getFullPath() { + StringBuilder pathBuilder = new StringBuilder(poolName); + ResourcePool parent = parentPool; + while(parent != null) { + pathBuilder.append(parent.getPoolName()); + parent = parent.getParentPool(); + } + + return pathBuilder.toString(); + } + + @Override + public List<ResourcePool> getChildPools() { + Preconditions.checkState(!isLeafPool() && assignedQueue == null, + "There are no child pools for a leaf ResourcePool"); + return childPools; + } + + @Override + public ResourcePoolSelector getSelector() { + return assignedSelector; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{PoolName: ").append(poolName); + sb.append(", PoolResourceShare: ").append(poolResourceShare); + sb.append(", Selector: ").append(assignedSelector.getSelectorType()); + if (isLeafPool()) { + sb.append(", Queue: [").append(assignedQueue.toString()).append("]"); + } else { + sb.append(", ChildPools: ["); + + for (ResourcePool childPool : childPools) { + sb.append(childPool.toString()); + sb.append(", "); + } + sb.append("]"); + } + sb.append("}"); + return sb.toString(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTree.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTree.java new file mode 100644 index 000000000..309bb24db --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTree.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config; + +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.resourcemgr.NodeResources; +import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException; +import org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicy; + +import java.util.Map; + +/** + * Interface which defines the implementation of a hierarchical configuration for all the ResourcePool that will be + * used for ResourceManagement + */ +public interface ResourcePoolTree { + + ResourcePool getRootPool(); + + Map<String, QueryQueueConfig> getAllLeafQueues(); + + double getResourceShare(); + + QueueAssignmentResult selectAllQueues(QueryContext queryContext); + + QueryQueueConfig selectOneQueue(QueryContext queryContext, NodeResources queryMaxNodeResource) + throws QueueSelectionException; + + QueueSelectionPolicy getSelectionPolicyInUse(); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTreeImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTreeImpl.java new file mode 100644 index 000000000..cc12a0945 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/ResourcePoolTreeImpl.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config; + +import com.typesafe.config.Config; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.resourcemgr.NodeResources; +import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException; +import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException; +import org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicy; +import org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicy.SelectionPolicy; +import org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicyFactory; +import org.apache.drill.shaded.guava.com.google.common.collect.Maps; + +import java.util.List; +import java.util.Map; + +import static org.apache.drill.exec.resourcemgr.config.RMCommonDefaults.ROOT_POOL_DEFAULT_MEMORY_PERCENT; +import static org.apache.drill.exec.resourcemgr.config.RMCommonDefaults.ROOT_POOL_DEFAULT_QUEUE_SELECTION_POLICY; + +/** + * Parses and initializes configuration for ResourceManagement in Drill. It takes care of creating all the ResourcePools + * recursively maintaining the n-ary tree hierarchy defined in the configuration. + */ +public class ResourcePoolTreeImpl implements ResourcePoolTree { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResourcePoolTreeImpl.class); + + private static final String ROOT_POOL_MEMORY_SHARE_KEY = "memory"; + + private static final String ROOT_POOL_QUEUE_SELECTION_POLICY_KEY = "queue_selection_policy"; + + public static final String ROOT_POOL_CONFIG_KEY = "drill.exec.rm"; + + private final ResourcePool rootPool; + + private final Config rmConfig; + + private final NodeResources totalNodeResources; + + private final double resourceShare; + + private final Map<String, QueryQueueConfig> leafQueues = Maps.newHashMap(); + + private final QueueSelectionPolicy selectionPolicy; + + public ResourcePoolTreeImpl(Config rmConfig, long totalNodeMemory, + int totalNodePhysicalCpu, int vFactor) throws RMConfigException { + this(rmConfig, new NodeResources(totalNodeMemory, totalNodePhysicalCpu, vFactor)); + } + + private ResourcePoolTreeImpl(Config rmConfig, NodeResources totalNodeResources) throws RMConfigException { + try { + this.rmConfig = rmConfig; + this.totalNodeResources = totalNodeResources; + this.resourceShare = this.rmConfig.hasPath(ROOT_POOL_MEMORY_SHARE_KEY) ? + this.rmConfig.getDouble(ROOT_POOL_MEMORY_SHARE_KEY) : ROOT_POOL_DEFAULT_MEMORY_PERCENT; + this.selectionPolicy = QueueSelectionPolicyFactory.createSelectionPolicy( + this.rmConfig.hasPath(ROOT_POOL_QUEUE_SELECTION_POLICY_KEY) ? + SelectionPolicy.valueOf(rmConfig.getString(ROOT_POOL_QUEUE_SELECTION_POLICY_KEY)) : + ROOT_POOL_DEFAULT_QUEUE_SELECTION_POLICY); + rootPool = new ResourcePoolImpl(this.rmConfig.getConfig(ROOT_POOL_CONFIG_KEY), resourceShare, 1.0, + totalNodeResources, null, leafQueues); + logger.debug("Dumping RM configuration {}", toString()); + } catch (RMConfigException ex) { + throw ex; + } catch (Exception ex) { + throw new RMConfigException(String.format("Failure while parsing root pool configuration. " + + "[Details: Config: %s]", rmConfig), ex); + } + } + + /** + * @return root {@link ResourcePool} + */ + @Override + public ResourcePool getRootPool() { + return rootPool; + } + + /** + * @return Map containing all the configured leaf queues + */ + @Override + public Map<String, QueryQueueConfig> getAllLeafQueues() { + return leafQueues; + } + + @Override + public double getResourceShare() { + return resourceShare; + } + + /** + * Creates {@link QueueAssignmentResult} which contains list of all selected leaf ResourcePools and all the rejected + * ResourcePools for the provided query. Performs DFS of the ResourcePoolTree to traverse and find + * selected/rejected ResourcePools. + * @param queryContext {@link QueryContext} which contains metadata required for the given query + * @return {@link QueueAssignmentResult} populated with selected/rejected ResourcePools + */ + @Override + public QueueAssignmentResult selectAllQueues(QueryContext queryContext) { + QueueAssignmentResult assignmentResult = new QueueAssignmentResult(); + rootPool.visitAndSelectPool(assignmentResult, queryContext); + return assignmentResult; + } + + /** + * Selects a leaf queue out of all the possibles leaf queues returned by + * {@link ResourcePoolTree#selectAllQueues(QueryContext)} for a given query based on the configured + * {@link QueueSelectionPolicy}. If none of the queue qualifies then it throws {@link QueueSelectionException} + * @param queryContext {@link QueryContext} which contains metadata for given query + * @param queryMaxNodeResource Max resources on a node required for given query + * @return {@link QueryQueueConfig} for the selected leaf queue + * @throws QueueSelectionException If no leaf queue is selected + */ + @Override + public QueryQueueConfig selectOneQueue(QueryContext queryContext, NodeResources queryMaxNodeResource) + throws QueueSelectionException { + final QueueAssignmentResult assignmentResult = selectAllQueues(queryContext); + final List<ResourcePool> selectedPools = assignmentResult.getSelectedLeafPools(); + if (selectedPools.size() == 0) { + throw new QueueSelectionException(String.format("No resource pools to choose from for the query: %s", + queryContext.getQueryId())); + } else if (selectedPools.size() == 1) { + return selectedPools.get(0).getQueryQueue(); + } + + return selectionPolicy.selectQueue(selectedPools, queryContext, queryMaxNodeResource).getQueryQueue(); + } + + /** + * @return {@link QueueSelectionPolicy} which is used to chose one queue out of all the available options for a query + */ + @Override + public QueueSelectionPolicy getSelectionPolicyInUse() { + return selectionPolicy; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{ NodeResources: ").append(totalNodeResources.toString()); + sb.append(", ResourcePercent: ").append(resourceShare); + sb.append(", SelectionPolicy: ").append(selectionPolicy.getSelectionPolicy()); + sb.append(", RootPool: ").append(rootPool.toString()); + sb.append("}"); + return sb.toString(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/exception/QueueSelectionException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/exception/QueueSelectionException.java new file mode 100644 index 000000000..8f0275dd2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/exception/QueueSelectionException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config.exception; + +/** + * Used in case of error while selecting a queue for a given query + */ +public class QueueSelectionException extends Exception { + public QueueSelectionException(String msg) { + super(msg); + } + + public QueueSelectionException(String msg, Exception ex) { + super(msg, ex); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/exception/RMConfigException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/exception/RMConfigException.java new file mode 100644 index 000000000..d6648333d --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/exception/RMConfigException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config.exception; + +/** + * Used in cases of any error with the ResourceManagement configuration + */ +public class RMConfigException extends Exception { + public RMConfigException(String message) { + super(message); + } + + public RMConfigException(String message, Exception e) { + super(message, e); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/package-info.java new file mode 100644 index 000000000..6829e452d --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/package-info.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package contains the configuration components of ResourceManagement feature in Drill. ResourceManagement will + * have it's own configuration file supporting the similar hierarchy of files as supported by Drill's current + * configuration and supports HOCON format. All the supported files for ResourceManagement is listed in + * {@link org.apache.drill.common.config.ConfigConstants}. However whether the feature is enabled/disabled is still + * controlled by a configuration {@link org.apache.drill.exec.ExecConstants#RM_ENABLED} available in the Drill's main + * configuration file. The rm config files will be parsed and loaded only when the feature is enabled. The + * configuration is a hierarchical tree {@link org.apache.drill.exec.resourcemgr.config.ResourcePoolTree} of + * {@link org.apache.drill.exec.resourcemgr.config.ResourcePool}. At the top will be the root pool which represents + * the entire resources (only memory in version 1) which is available to ResourceManager to use for admitting queries. + * It is assumed that all the nodes in the Drill cluster is homogeneous and given same amount of memory resources. + * The root pool can be further divided into child ResourcePools to divide the resources among multiple child pools. + * Each child pool get's a resource share from it's parent resource pool. In theory there is no limit on the number + * of ResourcePools that can be configured to divide the cluster resources. + * <p> + * In addition to other parameters defined later root ResourcePool also supports a configuration + * {@link org.apache.drill.exec.resourcemgr.config.ResourcePoolTreeImpl#ROOT_POOL_QUEUE_SELECTION_POLICY_KEY} which + * helps to select exactly one leaf pool out of all the possible options available for a query. For details please + * see package-info.java of {@link org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicy}. + * {@link org.apache.drill.exec.resourcemgr.config.ResourcePoolTree#selectOneQueue(org.apache.drill.exec.ops.QueryContext, + * org.apache.drill.exec.resourcemgr.NodeResources)} method is used by parallelizer to get a queue which will be used + * to admit a query. The selected queue resource constraints are used by parallelizer to allocate proper resources + * to a query so that it remains within the bounds. + * </p> + * <p> + * The ResourcePools falls under 2 category: + * <ul> + * <li>Intermediate Pool: As the name suggests all the pools between root and leaf pool falls under this + * category. It helps to navigate a query through the ResourcePoolTree hierarchy to find leaf pools using selectors. + * The intermediate ResourcePool help to subdivide a parent resource pool resource and doesn't have an actual queue + * associated with it. A query will only be executed in a queue associated with a ResourcePool not the ResourcePool + * itself. + * </li> + * <li>Leaf Pool: All the ResourcePools which doesn't have any child pools associated with it are leaf + * ResourcePools. All the leaf pools should have a unique name associated with it and should always have exactly one + * queue configured with it. The queue of a leaf pool is where the queries will be admitted and a resource slice will + * be given to it. All the leaf ResourcePools will collectively comprise of all the resource share available to + * Drill's ResourceManager to allocate to all the queries. + * </li> + * </ul> + * Configurations Supported by ResourcePool: + * <ul> + * <li>{@link org.apache.drill.exec.resourcemgr.config.ResourcePoolImpl#POOL_MEMORY_SHARE_KEY}: Percentage of + * memory share of parent ResourcePool assigned to this pool</li> + * <li>{@link org.apache.drill.exec.resourcemgr.config.ResourcePoolImpl#POOL_SELECTOR_KEY}: A selector assigned + * to this pool. For details please see package-info.java of + * {@link org.apache.drill.exec.resourcemgr.config.selectors.ResourcePoolSelector} + * </li> + * <li>{@link org.apache.drill.exec.resourcemgr.config.ResourcePoolImpl#POOL_QUEUE_KEY}: Queue configuration + * associated with this pool. It should always be configured for a leaf pool only. If configured with an + * intermediate pool then it will be ignored. + * </li> + * </ul> + * </p> + * <p> + * A queue always have 1:1 relationship with a leaf pool. Queries are admitted and executed with a resource slice + * from the queue. It supports following configurations: + * <ul> + * <li>{@link org.apache.drill.exec.resourcemgr.config.QueryQueueConfigImpl#MAX_ADMISSIBLE_KEY}: Upper bound on the + * total number of queries that can be admitted inside a queue. After this limit is reached all the queries + * will be moved to waiting state.</li> + * <li>{@link org.apache.drill.exec.resourcemgr.config.QueryQueueConfigImpl#MAX_WAITING_KEY}: Limits the + * total number of queries that can be in waiting state inside a queue. After this limit is reached all the new + * queries will be failed immediately.</li> + * <li> {@link org.apache.drill.exec.resourcemgr.config.QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY}: + * Limits the maximum memory any query in this queue can consume on any node in the cluster. This is to limit a + * query from a queue to consume all the resources on a node so that other queues query can also have some + * resources available for it. Ideally it's advised that sum of value of this parameter for all queues should not + * exceed the total memory on a node. + * </li> + * <li> {@link org.apache.drill.exec.resourcemgr.config.QueryQueueConfigImpl#WAIT_FOR_PREFERRED_NODES_KEY}: This + * configuration helps to decide if an admitted query in a queue should wait until it has available resources on all + * the nodes assigned to it by planner for its execution. By default it's true. When set to false then for the nodes + * which doesn't have available resources for a query will be replaced with another node with enough resources. + * </li> + * </ul> + * </p> + * Once all the configuration are parsed an in-memory structures are created then for each query planner will select + * a queue where a query can be admitted. The queue selection process happens by traversing the ResourcePoolTree. During + * traversal process the query metadata is evaluated against assigned selector of a ResourcePool. If the selector + * returns true then traversal continues to it's child pools otherwise it stops there and tries another pool. With + * the traversal it finds all the leaf pools which are eligible for admitting the query and store that information in + * {@link org.apache.drill.exec.resourcemgr.config.QueueAssignmentResult}. Later the selected pools are passed to + * configured QueueSelectionPolicy to select one queue for the query. Planner uses that selected queue's max query + * memory per node parameter to limit resource assignment to all the fragments of a query on a node. After a query is + * planned with resource constraints it is sent to leader of that queue to ask for admission. If admitted the query + * required resources are reserved in global state store and query is executed on the cluster. For details please see + * the design document and functional spec linked in <a href="https://issues.apache.org/jira/browse/DRILL-7026"> + * DRILL-7026</a> + */ +package org.apache.drill.exec.resourcemgr.config;
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/AbstractQueueSelectionPolicy.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/AbstractQueueSelectionPolicy.java new file mode 100644 index 000000000..7e9efe7e5 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/AbstractQueueSelectionPolicy.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config.selectionpolicy; + +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.resourcemgr.NodeResources; +import org.apache.drill.exec.resourcemgr.config.ResourcePool; +import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException; + +import java.util.List; + +public abstract class AbstractQueueSelectionPolicy implements QueueSelectionPolicy { + private final SelectionPolicy selectionPolicy; + + public AbstractQueueSelectionPolicy(SelectionPolicy selectionPolicy) { + this.selectionPolicy = selectionPolicy; + } + + @Override + public SelectionPolicy getSelectionPolicy() { + return selectionPolicy; + } + + public abstract ResourcePool selectQueue(List<ResourcePool> allPools, QueryContext queryContext, + NodeResources maxResourcePerNode) throws QueueSelectionException; +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/BestFitQueueSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/BestFitQueueSelection.java new file mode 100644 index 000000000..a76f19336 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/BestFitQueueSelection.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config.selectionpolicy; + +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.resourcemgr.NodeResources; +import org.apache.drill.exec.resourcemgr.config.QueryQueueConfig; +import org.apache.drill.exec.resourcemgr.config.ResourcePool; +import org.apache.drill.exec.resourcemgr.config.QueryQueueConfigImpl; +import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException; + +import java.util.Comparator; +import java.util.List; + +/** + * Helps to select a queue whose {@link QueryQueueConfig#getMaxQueryMemoryInMBPerNode()} is nearest to the max memory + * on a node required by the given query. Nearest is found by following rule: + * <ul> + * <li> + * Queue whose {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY} is + * equal to max memory per node of given query + * </li> + * <li> + * Queue whose {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY} is + * just greater than max memory per node of given query. From all queues whose max_query_memory_per_node is + * greater than what is needed by the query, the queue with minimum value is chosen. + * </li> + * <li> + * Queue whose {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY} is + * just less than max memory per node of given query. From all queues whose max_query_memory_per_node is + * less than what is needed by the query, the queue with maximum value is chosen. + * </li> + * </ul> + */ +public class BestFitQueueSelection extends AbstractQueueSelectionPolicy { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BestFitQueueSelection.class); + + public BestFitQueueSelection() { + super(SelectionPolicy.BESTFIT); + } + + /** + * Comparator used to sort the leaf ResourcePool lists based on + * {@link QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY} + */ + private static class BestFitComparator implements Comparator<ResourcePool> { + @Override + public int compare(ResourcePool o1, ResourcePool o2) { + long pool1Value = o1.getQueryQueue().getMaxQueryMemoryInMBPerNode(); + long pool2Value = o2.getQueryQueue().getMaxQueryMemoryInMBPerNode(); + return Long.compare(pool1Value, pool2Value); + } + } + + @Override + public ResourcePool selectQueue(List<ResourcePool> allPools, QueryContext queryContext, + NodeResources maxResourcePerNode) throws QueueSelectionException { + if (allPools.isEmpty()) { + throw new QueueSelectionException(String.format("There are no pools to apply %s selection policy pool for the " + + "query: %s", getSelectionPolicy().toString(), queryContext.getQueryId())); + } + + allPools.sort(new BestFitComparator()); + final long queryMaxNodeMemory = maxResourcePerNode.getMemoryInMB(); + ResourcePool selectedPool = allPools.get(0); + for (ResourcePool pool : allPools) { + selectedPool = pool; + long poolMaxNodeMem = pool.getQueryQueue().getMaxQueryMemoryInMBPerNode(); + if (poolMaxNodeMem >= queryMaxNodeMemory) { + break; + } + } + logger.debug("Selected pool {} based on {} policy for query {}", selectedPool.getPoolName(), + getSelectionPolicy().toString(), + queryContext.getQueryId()); + return selectedPool; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/DefaultQueueSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/DefaultQueueSelection.java new file mode 100644 index 000000000..f1c03c3f5 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/DefaultQueueSelection.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config.selectionpolicy; + +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.resourcemgr.NodeResources; +import org.apache.drill.exec.resourcemgr.config.ResourcePool; +import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException; + +import java.util.List; + +/** + * Helps to select the first default queue in the list of all the provided queues. If there is no default queue + * present it throws {@link QueueSelectionException}. Default queue is a queue associated with {@link ResourcePool} + * which has {@link org.apache.drill.exec.resourcemgr.config.selectors.DefaultSelector} assigned to it + */ +public class DefaultQueueSelection extends AbstractQueueSelectionPolicy { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DefaultQueueSelection.class); + + public DefaultQueueSelection() { + super(SelectionPolicy.DEFAULT); + } + + @Override + public ResourcePool selectQueue(List<ResourcePool> allPools, QueryContext queryContext, + NodeResources maxResourcePerNode) throws QueueSelectionException { + for (ResourcePool pool : allPools) { + if (pool.isDefaultPool()) { + logger.debug("Selected default pool: {} for the query: {}", pool.getPoolName(), queryContext.getQueryId()); + return pool; + } + } + + throw new QueueSelectionException(String.format("There is no default pool to select from list of pools provided " + + "for the query: %s", queryContext.getQueryId())); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/QueueSelectionPolicy.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/QueueSelectionPolicy.java new file mode 100644 index 000000000..5fab2d44c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/QueueSelectionPolicy.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config.selectionpolicy; + +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.resourcemgr.NodeResources; +import org.apache.drill.exec.resourcemgr.config.ResourcePool; +import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException; + +import java.util.List; + +/** + * Interface that defines all the implementation of a QueueSelectionPolicy supported by ResourceManagement + */ +public interface QueueSelectionPolicy { + + enum SelectionPolicy { + UNKNOWN, + DEFAULT, + RANDOM, + BESTFIT; + + @Override + public String toString() { + return name().toLowerCase(); + } + } + + SelectionPolicy getSelectionPolicy(); + + ResourcePool selectQueue(List<ResourcePool> allPools, QueryContext queryContext, NodeResources maxResourcePerNode) + throws QueueSelectionException; +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/QueueSelectionPolicyFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/QueueSelectionPolicyFactory.java new file mode 100644 index 000000000..6ae96e7a6 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/QueueSelectionPolicyFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config.selectionpolicy; + +/** + * Factory to return an instance of {@link QueueSelectionPolicy} based on the configured policy name. By default if + * the configured policy name doesn't matches any supported policies then it returns {@link BestFitQueueSelection} + */ +public class QueueSelectionPolicyFactory { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueueSelectionPolicyFactory.class); + + public static QueueSelectionPolicy createSelectionPolicy(QueueSelectionPolicy.SelectionPolicy policy) { + logger.debug("Creating SelectionPolicy of type {}", policy); + QueueSelectionPolicy selectionPolicy; + switch (policy) { + case DEFAULT: + selectionPolicy = new DefaultQueueSelection(); + break; + case BESTFIT: + selectionPolicy = new BestFitQueueSelection(); + break; + case RANDOM: + selectionPolicy = new RandomQueueSelection(); + break; + default: + logger.info("QueueSelectionPolicy is not configured so proceeding with the bestfit as default policy"); + selectionPolicy = new BestFitQueueSelection(); + break; + } + + return selectionPolicy; + } + + // prevents from instantiation + private QueueSelectionPolicyFactory() { + // no-op + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/RandomQueueSelection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/RandomQueueSelection.java new file mode 100644 index 000000000..63c51f29d --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/RandomQueueSelection.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config.selectionpolicy; + +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.resourcemgr.NodeResources; +import org.apache.drill.exec.resourcemgr.config.ResourcePool; +import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException; + +import java.util.Collections; +import java.util.List; + +/** + * Randomly selects a queue from the list of all the provided queues. If no pools are provided then it throws + * {@link QueueSelectionException} + */ +public class RandomQueueSelection extends AbstractQueueSelectionPolicy { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RandomQueueSelection.class); + + public RandomQueueSelection() { + super(SelectionPolicy.RANDOM); + } + + @Override + public ResourcePool selectQueue(List<ResourcePool> allPools, QueryContext queryContext, + NodeResources maxResourcePerNode) throws QueueSelectionException { + if (allPools.size() == 0) { + throw new QueueSelectionException(String.format("Input pool list is empty to apply %s selection policy", + getSelectionPolicy().toString())); + } + Collections.shuffle(allPools); + ResourcePool selectedPool = allPools.get(0); + logger.debug("Selected random pool: {} for query: {}", selectedPool.getPoolName(), queryContext.getQueryId()); + return selectedPool; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/package-info.java new file mode 100644 index 000000000..8c1e257c2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/package-info.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Defines all the selection policy implementation which can be configured with Resource Management. The + * configuration which is used to specify a policy is + * {@link org.apache.drill.exec.resourcemgr.config.ResourcePoolTreeImpl#ROOT_POOL_QUEUE_SELECTION_POLICY_KEY}. Selection Policy + * helps to select a single leaf ResourcePool out of all the eligible pools whose queue can be used to admit this query. + * Currently there are 3 types of supported policies. In future more policies can be supported by implementing + * {@link org.apache.drill.exec.resourcemgr.config.selectionpolicy.QueueSelectionPolicy} interface. + * <ul> + * <li>{@link org.apache.drill.exec.resourcemgr.config.selectionpolicy.DefaultQueueSelection}: Out of all the eligible pools + * this policy will choose a default pool in the list. If there are multiple default pools present in the list then + * it will return the first default pool. If there is no default pool present then it throws + * {@link org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException} + * </li> + * <li>{@link org.apache.drill.exec.resourcemgr.config.selectionpolicy.RandomQueueSelection}: Out of all the eligible pools + * this policy will choose a pool at random. If there are no pools to select from then it throws + * {@link org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException} + * </li> + * <li>{@link org.apache.drill.exec.resourcemgr.config.selectionpolicy.BestFitQueueSelection}: Out of all the eligible pools + * this policy will choose a pool whose queue configuration + * {@link org.apache.drill.exec.resourcemgr.config.QueryQueueConfigImpl#MAX_QUERY_MEMORY_PER_NODE_KEY} value is closest to + * the max memory on a node required by the query. It tries to find a pool whose value for MAX_QUERY_MEMORY_PER_NODE + * is equal to queries max memory per node requirement. If there is no such pool then find the pool with config value + * just greater than queries max memory per node. Otherwise find a pool with config value just less than queries + * max memory per node. + * </li> + * </ul> + */ +package org.apache.drill.exec.resourcemgr.config.selectionpolicy;
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AbstractResourcePoolSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AbstractResourcePoolSelector.java new file mode 100644 index 000000000..6f474e3a0 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AbstractResourcePoolSelector.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config.selectors; + +import org.apache.drill.exec.ops.QueryContext; + +public abstract class AbstractResourcePoolSelector implements ResourcePoolSelector { + + protected final SelectorType SELECTOR_TYPE; + + AbstractResourcePoolSelector(SelectorType type) { + SELECTOR_TYPE = type; + } + + public SelectorType getSelectorType() { + return SELECTOR_TYPE; + } + + public abstract boolean isQuerySelected(QueryContext queryContext); + + @Override + public String toString() { + return SELECTOR_TYPE.toString(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AclSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AclSelector.java new file mode 100644 index 000000000..50acb860d --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AclSelector.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config.selectors; + +import avro.shaded.com.google.common.annotations.VisibleForTesting; +import com.typesafe.config.Config; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException; +import org.apache.drill.exec.util.ImpersonationUtil; +import org.apache.drill.shaded.guava.com.google.common.collect.Sets; +import org.apache.hadoop.security.UserGroupInformation; + +import java.util.List; +import java.util.Set; + +/** + * Evaluates if a query can be admitted to a ResourcePool or not by comparing query user/groups with the + * configured users/groups policies for this selector. AclSelector can be configured using both long-form syntax or + * short-form syntax as defined below: + * <ul> + * <li>Long-Form Syntax: Allows to use identifiers to specify allowed and disallowed users/groups. For example: + * users: [alice:+, bob:-] means alice is allowed whereas bob is denied access to the pool</li> + * <li>Short-Form Syntax: Allows to specify lists of allowed users/groups only. For example: users: [alice, bob] + * means only alice and bob are allowed access to this pool</li> + * </ul> + * The selector also supports * as a wildcard for both long and short form syntax to allow/deny all users/groups. + * Example configuration is of form: + * <code><pre> + * selector: { + * acl: { + * users: [alice:+, bob:-], + * groups: [sales, marketing] + * } + * } + * </pre></code> + */ +public class AclSelector extends AbstractResourcePoolSelector { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AclSelector.class); + + private final Set<String> allowedUsers = Sets.newHashSet(); + + private final Set<String> allowedGroups = Sets.newHashSet(); + + private final Set<String> deniedUsers = Sets.newHashSet(); + + private final Set<String> deniedGroups = Sets.newHashSet(); + + private final Config aclSelectorValue; + + private static final String ACL_VALUE_GROUPS_KEY = "groups"; + + private static final String ACL_VALUE_USERS_KEY = "users"; + + private static final String ACL_LONG_SYNTAX_SEPARATOR = ":"; + + private static final String ACL_LONG_ALLOWED_IDENTIFIER = "+"; + + private static final String ACL_LONG_DISALLOWED_IDENTIFIER = "-"; + + private static final String ACL_ALLOW_ALL = "*"; + + + AclSelector(Config configValue) throws RMConfigException { + super(SelectorType.ACL); + this.aclSelectorValue = configValue; + validateAndParseACL(aclSelectorValue); + } + + /** + * Determines if a given query is selected by this ACL selector of a Resource Pool or not. Following rules are + * followed to evaluate the selection. Assumption: There is an assumption made that if a user or group is configured + * in both +ve/-ve respective lists then it will be treated to be present in -ve list. + * + * Rules: + * 1) Check if query user is present in -ve users list, If yes then query is not selected else go to 2 + * 2) Check if query user is present in +ve users list, If yes then query is selected else go to 3 + * 3) Check if * is present in -ve users list, if yes then query is not selected else go to 4 + * 4) Check if * is present in +ve users list, if yes then query is selected else go to 5 + * 5) If here that means query user or * is absent in both +ve and -ve users list so check for groups of query user + * in step 6 + * 6) Check if any of groups of query user is present in -ve groups list, If yes then query is not selected else go + * to 7 + * 7) Check if any of groups of query user is present in +ve groups list, If yes then query selected else go to 8 + * 8) Check if * is present in -ve groups list, If yes then query is not selected else go to 9 + * 9) Check if * is present in +ve groups list, If yes then query is selected else go to 10 + * 10) Query user and groups of it is neither present is +ve/-ve users list not +ve/-ve groups list hence the query + * is not selected + * + * @param queryContext QueryContext to get information about query user + * @return true if a query is selected by this selector, false otherwise + */ + @Override + public boolean isQuerySelected(QueryContext queryContext) { + final String queryUser = queryContext.getQueryUserName(); + final UserGroupInformation queryUserUGI = ImpersonationUtil.createProxyUgi(queryUser); + final Set<String> queryGroups = Sets.newHashSet(queryUserUGI.getGroupNames()); + return checkQueryUserGroups(queryUser, queryGroups); + } + + @VisibleForTesting + public boolean checkQueryUserGroups(String queryUser, Set<String> queryGroups) { + // Check for +ve/-ve users information with query user + if (deniedUsers.contains(queryUser)) { + logger.debug("Query user is present in configured ACL -ve users list"); + return false; + } else if (allowedUsers.contains(queryUser)) { + logger.debug("Query user is present in configured ACL +ve users list"); + return true; + } else if (isStarInDisAllowedUsersList()) { + logger.debug("Query user is absent in configured ACL +ve/-ve users list but * is in -ve users list"); + return false; + } else if (isStarInAllowedUsersList()) { + logger.debug("Query user is absent in configured ACL +ve/-ve users list but * is in +ve users list"); + return true; + } + + // Check for +ve/-ve groups information with groups of query user + if (Sets.intersection(queryGroups, deniedGroups).size() > 0) { + logger.debug("Groups of Query user is present in configured ACL -ve groups list"); + return false; + } else if (Sets.intersection(queryGroups, allowedGroups).size() > 0) { + logger.debug("Groups of Query user is present in configured ACL +ve groups list"); + return true; + } else if (isStarInDisAllowedGroupsList()) { + logger.debug("Groups of Query user is absent in configured ACL +ve/-ve groups list but * is in -ve groups list"); + return false; + } else if (isStarInAllowedGroupsList()) { + logger.debug("Groups of Query user is absent in configured ACL +ve/-ve groups list but * is in +ve groups list"); + return true; + } + + logger.debug("Neither query user or group is present in configured ACL users/groups list"); + return false; + } + + /** + * Parses the acl selector config value for users and groups to populate list of allowed/denied users and groups. + * @param aclConfig Acl config to parse + * @throws RMConfigException in case of invalid config for either users/groups + */ + private void validateAndParseACL(Config aclConfig) throws RMConfigException { + + // ACL config doesn't have either group or user list + if (!aclConfig.hasPath(ACL_VALUE_GROUPS_KEY) && !aclConfig.hasPath(ACL_VALUE_USERS_KEY)) { + throw new RMConfigException(String.format("ACL Selector config is missing both group and user list information." + + " Please configure either of groups or users list. [Details: aclConfig: %s]", aclConfig)); + } + + if (aclConfig.hasPath(ACL_VALUE_USERS_KEY)) { + final List<String> users = aclSelectorValue.getStringList(ACL_VALUE_USERS_KEY); + parseACLInput(users, allowedUsers, deniedUsers); + } + + if (aclConfig.hasPath(ACL_VALUE_GROUPS_KEY)) { + final List<String> groups = aclSelectorValue.getStringList(ACL_VALUE_GROUPS_KEY); + parseACLInput(groups, allowedGroups, deniedGroups); + } + + // If no valid configuration is seen for this selector + if (allowedGroups.size() == 0 && deniedGroups.size() == 0 && + deniedUsers.size() == 0 && allowedUsers.size() == 0) { + throw new RMConfigException("No valid users or groups information is configured for this ACL selector. Either " + + "use * or valid users/groups"); + } + + // Check if there is any intersection between allowed and disallowed users/groups + Set<String> wrongConfig = Sets.intersection(allowedUsers, deniedUsers); + if (wrongConfig.size() > 0) { + logger.warn("These users are configured both in allowed and disallowed list. They will be treated as disallowed" + + ". [Details: users: {}]", wrongConfig); + allowedUsers.removeAll(wrongConfig); + } + + wrongConfig = Sets.intersection(allowedGroups, deniedGroups); + if (wrongConfig.size() > 0) { + logger.warn("These groups are configured both in allowed and disallowed list. They will be treated as " + + "disallowed. [Details: groups: {}]", wrongConfig); + allowedGroups.removeAll(wrongConfig); + } + } + + public Set<String> getAllowedUsers() { + return allowedUsers; + } + + public Set<String> getAllowedGroups() { + return allowedGroups; + } + + public Set<String> getDeniedUsers() { + return deniedUsers; + } + + public Set<String> getDeniedGroups() { + return deniedGroups; + } + + private boolean isStarInAllowedUsersList() { + return allowedUsers.contains(ACL_ALLOW_ALL); + } + + private boolean isStarInAllowedGroupsList() { + return allowedGroups.contains(ACL_ALLOW_ALL); + } + + private boolean isStarInDisAllowedUsersList() { + return deniedUsers.contains(ACL_ALLOW_ALL); + } + + private boolean isStarInDisAllowedGroupsList() { + return deniedGroups.contains(ACL_ALLOW_ALL); + } + + private void parseACLInput(List<String> acls, Set<String> allowedIdentity, Set<String> disAllowedIdentity) { + for (String aclValue : acls) { + + if (aclValue.isEmpty()) { + continue; + } + // Check if it's long form syntax or shortForm syntax + String[] aclValueSplits = aclValue.split(ACL_LONG_SYNTAX_SEPARATOR); + if (aclValueSplits.length == 1) { + // short form + if (!allowedIdentity.add(aclValueSplits[0])) { + logger.info("Duplicate acl identity: {} found in configured list will be ignored", aclValueSplits[0]); + } + } else { + // long form + final String identifier = aclValueSplits[1]; + if (identifier.equals(ACL_LONG_ALLOWED_IDENTIFIER)) { + if (!allowedIdentity.add(aclValueSplits[0])) { + logger.info("Duplicate acl identity: {} found in configured list will be ignored", aclValueSplits[0]); + } + } else if (identifier.equals(ACL_LONG_DISALLOWED_IDENTIFIER)) { + if (!disAllowedIdentity.add(aclValueSplits[0])) { + logger.info("Duplicate acl identity: {} found in configured list will be ignored", aclValueSplits[0]); + } + } else { + logger.error("Invalid long form syntax encountered hence ignoring ACL string {} . Details[Allowed " + + "identifiers are `+` and `-`. Encountered: {}]", aclValue, identifier); + } + } + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{ SelectorType: ").append(super.toString()); + sb.append(", AllowedUsers: ["); + for (String positiveUser : allowedUsers) { + sb.append(positiveUser).append(", "); + } + sb.append("], AllowedGroups: ["); + for (String positiveGroup : allowedGroups) { + sb.append(positiveGroup).append(", "); + } + sb.append("], DisallowedUsers: ["); + for (String negativeUser : deniedUsers) { + sb.append(negativeUser).append(", "); + } + sb.append("], DisallowedGroups: ["); + for (String negativeGroup : deniedGroups) { + sb.append(negativeGroup).append(", "); + } + sb.append("]}"); + + return sb.toString(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AndSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AndSelector.java new file mode 100644 index 000000000..82b51ee46 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/AndSelector.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config.selectors; + +import com.typesafe.config.Config; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException; + +import java.util.List; + +/** + * Complex selector whose value is list of other Simple or Complex Selectors. There has to be at least 2 other + * selectors configured in the value list for this selector. It does AND operation on result of all the child + * selectors configured with it to evaluate if a query can be admitted to it's ResourcePool or not. + * + * Example configuration is of form: + * <code><pre> + * selector: { + * and: [{tag: "BITool"},{tag: "operational"}] + * } + * </pre></code> + */ +public class AndSelector extends ComplexSelectors { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AndSelector.class); + + AndSelector(List<? extends Config> configValue) throws RMConfigException { + super(SelectorType.AND, configValue); + } + + @Override + public boolean isQuerySelected(QueryContext queryContext) { + for (ResourcePoolSelector childSelector : childSelectors) { + if (!childSelector.isQuerySelected(queryContext)) { + logger.debug("Query {} is not selected by the child selector of type {} in this complex AndSelector", + queryContext.getQueryId(), childSelector.getSelectorType().toString()); + return false; + } + } + return true; + } +}
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ComplexSelectors.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ComplexSelectors.java new file mode 100644 index 000000000..fe9ab52ae --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ComplexSelectors.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config.selectors; + +import com.typesafe.config.Config; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException; + +import java.util.ArrayList; +import java.util.List; + +public abstract class ComplexSelectors extends AbstractResourcePoolSelector { + + protected final List<ResourcePoolSelector> childSelectors = new ArrayList<>(); + + ComplexSelectors(SelectorType type, List<? extends Config> selectorConfig) throws RMConfigException { + super(type); + parseAndCreateChildSelectors(selectorConfig); + } + + private void parseAndCreateChildSelectors(List<? extends Config> childConfigs) throws RMConfigException { + for (Config childConfig : childConfigs) { + childSelectors.add(ResourcePoolSelectorFactory.createSelector(childConfig)); + } + + if (childSelectors.size() < 2) { + throw new RMConfigException(String.format("For complex selector OR and AND it is expected to have atleast 2 " + + "selectors in the list but found %d", childSelectors.size())); + } + } + + public abstract boolean isQuerySelected(QueryContext queryContext); + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{ SelectorType: ").append(super.toString()); + sb.append(", of selectors ["); + for (ResourcePoolSelector childSelector : childSelectors) { + sb.append(childSelector.toString()).append(", "); + } + sb.append("]}"); + return sb.toString(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/DefaultSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/DefaultSelector.java new file mode 100644 index 000000000..2336d03e2 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/DefaultSelector.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config.selectors; + +import org.apache.drill.exec.ops.QueryContext; + +/** + * When selector configuration is absent for a ResourcePool then it is associated with a DefaultSelector. It acts as + * a sink for all the queries which means all the queries will be selected by this default selector. + */ +public class DefaultSelector extends AbstractResourcePoolSelector { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DefaultSelector.class); + + DefaultSelector() { + super(SelectorType.DEFAULT); + } + + @Override + public SelectorType getSelectorType() { + return SELECTOR_TYPE; + } + + @Override + public boolean isQuerySelected(QueryContext queryContext) { + logger.debug("Query {} is selected by this Default selector", queryContext.getQueryId()); + return true; + } + + @Override + public String toString() { + return "{SelectorType: " + super.toString() + "}"; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/NotEqualSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/NotEqualSelector.java new file mode 100644 index 000000000..653c3b538 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/NotEqualSelector.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config.selectors; + +import avro.shaded.com.google.common.annotations.VisibleForTesting; +import com.typesafe.config.Config; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException; + +/** + * Simple selector whose value is another Simple or Complex Selectors. It does NOT of result of its child selector + * configured with it to evaluate if a query can be admitted to it's ResourcePool or not. + * + * Example configuration is of form: + * <code><pre> + * selector: { + * not_equal: {tag: "BITool"} + * } + * </pre></code> + */ +public class NotEqualSelector extends AbstractResourcePoolSelector { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NotEqualSelector.class); + + private final ResourcePoolSelector poolSelector; + + NotEqualSelector(Config selectorValue) throws RMConfigException { + super(SelectorType.NOT_EQUAL); + poolSelector = ResourcePoolSelectorFactory.createSelector(selectorValue); + } + + @Override + public boolean isQuerySelected(QueryContext queryContext) { + logger.debug("Query {} is evaluated for not_equal of selector type {}", queryContext.getQueryId(), + poolSelector.getSelectorType().toString()); + return !poolSelector.isQuerySelected(queryContext); + } + + @VisibleForTesting + public ResourcePoolSelector getPoolSelector() { + return poolSelector; + } + + @Override + public String toString() { + return "{ SelectorType: " + super.toString() + ", of selector " + poolSelector.toString() + " }"; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/OrSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/OrSelector.java new file mode 100644 index 000000000..5e8b2f9e0 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/OrSelector.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config.selectors; + +import com.typesafe.config.Config; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException; + +import java.util.List; + +/** + * Complex selector whose value is list of other Simple or Complex Selectors. There has to be at least 2 other + * selectors configured in the value list for this selector. It does OR operation on result of all the child selectors + * configured with it to evaluate if a query can be admitted to it's ResourcePool or not. + * + * Example configuration is of form: + * <code></><pre> + * selector: { + * or: [{tag: "BITool"},{tag: "operational"}] + * } + * </pre></code> + */ +public class OrSelector extends ComplexSelectors { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OrSelector.class); + + OrSelector(List<? extends Config> configValue) throws RMConfigException { + super(SelectorType.OR, configValue); + } + + @Override + public boolean isQuerySelected(QueryContext queryContext) { + for (ResourcePoolSelector childSelector : childSelectors) { + // If we find any selector evaluating to true then no need to evaluate other selectors in the list + if (childSelector.isQuerySelected(queryContext)) { + logger.debug("Query {} is selected by the child selector of type {} in this OrSelector", + queryContext.getQueryId(), childSelector.getSelectorType().toString()); + return true; + } + } + + return false; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ResourcePoolSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ResourcePoolSelector.java new file mode 100644 index 000000000..f5dbf0db1 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ResourcePoolSelector.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config.selectors; + +import org.apache.drill.exec.ops.QueryContext; + +/** + * Interface that defines implementation for selectors assigned to a ResourcePool. ResourcePoolSelector helps to + * evaluate if a given query can be admitted into a ResourcePool or not. Based on the assigned selector type to a + * ResourcePool it uses the query metadata with it's own configured values and make a decision for a query. The + * SelectorType defines all the supported ResourcePoolSelector which can be assigned to a ResourcePool. The + * configuration of a selector is of type: + * <code><pre> + * selector: { + * SelectorType:SelectorValue + * } + * where SelectorValue can be a string (for SelectorType tag), + * object (for SelectorType acl and not_equal) and + * list of objects (for SelectorType and, or) + * when selector config is absent then a DefaultSelector is associated with the ResourcePool + * </pre></code> + */ +public interface ResourcePoolSelector { + + enum SelectorType { + UNKNOWN, + DEFAULT, + TAG, + ACL, + OR, + AND, + NOT_EQUAL; + + @Override + public String toString() { + return name().toLowerCase(); + } + } + + SelectorType getSelectorType(); + + boolean isQuerySelected(QueryContext queryContext); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ResourcePoolSelectorFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ResourcePoolSelectorFactory.java new file mode 100644 index 000000000..0ad130269 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/ResourcePoolSelectorFactory.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config.selectors; + +import com.typesafe.config.Config; +import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException; +import org.apache.drill.exec.resourcemgr.config.selectors.ResourcePoolSelector.SelectorType; + +public class ResourcePoolSelectorFactory { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResourcePoolSelectorFactory.class); + + public static ResourcePoolSelector createSelector(Config selectorConfig) throws RMConfigException { + ResourcePoolSelector poolSelector = null; + String selectorType = SelectorType.DEFAULT.toString(); + try { + if (selectorConfig == null) { + poolSelector = new DefaultSelector(); + } else if (selectorConfig.hasPath(SelectorType.TAG.toString())) { + selectorType = SelectorType.TAG.toString(); + poolSelector = new TagSelector(selectorConfig.getString(selectorType)); + } else if (selectorConfig.hasPath(SelectorType.ACL.toString())) { + selectorType = SelectorType.ACL.toString(); + poolSelector = new AclSelector(selectorConfig.getConfig(selectorType)); + } else if (selectorConfig.hasPath(SelectorType.OR.toString())) { + selectorType = SelectorType.OR.toString(); + poolSelector = new OrSelector(selectorConfig.getConfigList(selectorType)); + } else if (selectorConfig.hasPath(SelectorType.AND.toString())) { + selectorType = SelectorType.AND.toString(); + poolSelector = new AndSelector(selectorConfig.getConfigList(selectorType)); + } else if (selectorConfig.hasPath(SelectorType.NOT_EQUAL.toString())) { + selectorType = SelectorType.NOT_EQUAL.toString(); + poolSelector = new NotEqualSelector(selectorConfig.getConfig(selectorType)); + } + } catch (Exception ex) { + throw new RMConfigException(String.format("There is an error with value configuration for selector type %s", + selectorType), ex); + } + + // if here means either a selector is chosen or wrong configuration + if (poolSelector == null) { + throw new RMConfigException(String.format("Configured selector is either empty or not supported. [Details: " + + "SelectorConfig: %s]", selectorConfig)); + } + + logger.debug("Created selector of type {}", poolSelector.getSelectorType().toString()); + return poolSelector; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/TagSelector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/TagSelector.java new file mode 100644 index 000000000..79237123c --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/TagSelector.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config.selectors; + +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException; + +/** + * Simple selector whose value is a string representing a tag. It tries to match it's configured tag with one of + * the tags configured for the query using connection/session parameter. If a query posses at least one tag same as + * this selector tag then it will be admitted in the respective ResourcePool. + * + * Example configuration is of form: + * <code><pre> + * selector: { + * tag: "BITool" + * } + * </pre></code> + */ +public class TagSelector extends AbstractResourcePoolSelector { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TagSelector.class); + + private final String configuredTag; + + TagSelector(String selectorValue) throws RMConfigException { + super(SelectorType.TAG); + + if (selectorValue == null || selectorValue.isEmpty()) { + throw new RMConfigException("Tag value of this selector is either null or empty. Please configure a valid tag " + + "as string."); + } + configuredTag = selectorValue; + } + + @Override + public SelectorType getSelectorType() { + return SELECTOR_TYPE; + } + + @Override + public boolean isQuerySelected(QueryContext queryContext) { + String[] queryTags = queryContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY).string_val.split(","); + for (String queryTag : queryTags) { + if (queryTag.equals(configuredTag)) { + logger.debug("Query {} tag {} matches the selector tag {}", queryContext.getQueryId(), queryTag, configuredTag); + return true; + } + } + return false; + } + + public String getTagValue() { + return configuredTag; + } + + @Override + public String toString() { + return "{ SelectorType: " + super.toString() + ", TagValue: [" + configuredTag + "]}"; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/package-info.java new file mode 100644 index 000000000..441c24aed --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/config/selectors/package-info.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Defines all the Selectors which can be assigned to a ResourcePool in the ResourceManagement configuration. A + * selector helps to evaluate that given a query and it's metadata if that query can be admitted inside associated + * ResourcePool or not. Selectors are associated with both intermediate and leaf level ResourcePools. The intermediate + * pool selectors helps to navigate the ResourcePool hierarchy to reach a leaf level ResourcePool where query will + * actually be admitted in the queue associated with a leaf pool. Whereas leaf pool selector will help to choose all + * the leaf pools which can be considered to admit a query. A selector can be configured for a ResourcePool + * using the {@link org.apache.drill.exec.resourcemgr.config.ResourcePoolImpl#POOL_SELECTOR_KEY} configuration. If the + * selector configuration is missing for a ResourcePool then it is associated with a Default Selector making it + * a Default ResourcePool. Selectors are configured as a key value pair where key represents it's type and + * value is what it uses to evaluate a query. Currently there are 6 different types of supported selectors. In future + * more selectos can be supported by implementing + * {@link org.apache.drill.exec.resourcemgr.config.selectors.ResourcePoolSelector} interface. + * <ul> + * <li>{@link org.apache.drill.exec.resourcemgr.config.selectors.DefaultSelector}: It acts as a sink and will always return + * true for all the queries. It is associated with a ResourcePool which is not assigned any selector in the + * configuration making it a default pool. + * </li> + * <li>{@link org.apache.drill.exec.resourcemgr.config.selectors.TagSelector}: A selector which has a tag (String) value + * associated with it. It evaluates all the tags associated with a query to see if there is a match or not. A query is + * only selected by this selector if it has a tag same as that configured for this selector. + * </li> + * <li>{@link org.apache.drill.exec.resourcemgr.config.selectors.AclSelector}: A selector which has users/groups policies + * value associated with it. It evaluates these policies against the users/groups of query session to check if the + * query can be selected or not. It supports long/short form syntax to configure the acl policies. It also supports * + * as wildcard character to allow/deny all users/groups in its policies. Please see AclSelector class javadoc for more + * details. + * </li> + * <li>{@link org.apache.drill.exec.resourcemgr.config.selectors.OrSelector}: A selector which can have lists of 2 or more + * other selectors configured as it's value except for Default selector. It performs || operation on the selection + * result of all other configured selectors.</li> + * <li>{@link org.apache.drill.exec.resourcemgr.config.selectors.AndSelector}: A selector which can have lists of 2 or more + * other selectors configured as it's value except for Default selector. It performs && operation on the selection + * result of all other configured selectors.</li> + * <li>{@link org.apache.drill.exec.resourcemgr.config.selectors.NotEqualSelector}: A selector which can have any other + * selector defined above configured as it's value except for Default selector. It will compare the query metadata + * against the configured selector and will return ! of that as a selection result. For example: if a TagSelector + * with value "sales" is configured for this NotSelector then it will select queries whose doesn't have sales tag + * associated with it.</li> + * </ul> + */ +package org.apache.drill.exec.resourcemgr.config.selectors;
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/package-info.java new file mode 100644 index 000000000..0658172f6 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/package-info.java @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package will contain all the components of resource manager in Drill. + */ +package org.apache.drill.exec.resourcemgr;
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/util/MemoryConfigParser.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/util/MemoryConfigParser.java new file mode 100644 index 000000000..8d5f5f26f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resourcemgr/util/MemoryConfigParser.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.util; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Utility class which helps in parsing memory configuration string using the passed in pattern to get memory value in + * bytes. + */ +public class MemoryConfigParser { + + /** + * @param memoryConfig Memory configuration string + * @param patternToUse Pattern which will be used to parse the memory configuration string + * @return memory value in bytes + */ + public static long parseMemoryConfigString(String memoryConfig, String patternToUse) { + Pattern pattern = Pattern.compile(patternToUse); + Matcher patternMatcher = pattern.matcher(memoryConfig); + + long memoryPerNodeInBytes = 0; + if (patternMatcher.matches()) { + memoryPerNodeInBytes = Long.parseLong(patternMatcher.group(1)); + + // group 2 can be optional + String group2 = patternMatcher.group(2); + if (!group2.isEmpty()) { + switch (group2.charAt(0)) { + case 'G': + case 'g': + memoryPerNodeInBytes *= 1073741824L; + break; + case 'K': + case 'k': + memoryPerNodeInBytes *= 1024L; + break; + case 'M': + case 'm': + memoryPerNodeInBytes *= 1048576L; + break; + default: + throw new IllegalArgumentException(String.format("Memory Configuration %s didn't matched any of the" + + " supported suffixes. [Details: Supported: kKmMgG, Actual: %s", memoryConfig, group2)); + } + } + } else { + throw new IllegalArgumentException(String.format("Memory Configuration %s didn't matched supported format. " + + "Supported format is %s?", memoryConfig, patternToUse)); + } + + return memoryPerNodeInBytes; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java index dc1e9ccf6..e2fd1e845 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java @@ -17,17 +17,16 @@ */ package org.apache.drill.exec.rpc.user; -import java.io.IOException; -import java.net.SocketAddress; -import java.util.List; -import java.util.Map.Entry; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; - -import javax.net.ssl.SSLEngine; -import javax.security.sasl.SaslException; - +import com.google.protobuf.MessageLite; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import org.apache.drill.common.config.DrillProperties; import org.apache.drill.common.exceptions.DrillException; import org.apache.drill.exec.exception.DrillbitStartupException; @@ -65,17 +64,15 @@ import org.apache.hadoop.security.HadoopKerberosName; import org.joda.time.DateTime; import org.slf4j.Logger; -import com.google.protobuf.MessageLite; - -import io.netty.channel.Channel; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelPipeline; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.socket.SocketChannel; -import io.netty.handler.ssl.SslHandler; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; +import javax.net.ssl.SSLEngine; +import javax.security.sasl.SaslException; +import java.io.IOException; +import java.net.SocketAddress; +import java.util.List; +import java.util.Map.Entry; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; public class UserServer extends BasicServer<RpcType, BitToUserConnection> { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserServer.class); @@ -197,7 +194,11 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> { } /** - * {@link AbstractRemoteConnection} implementation for user connection. Also implements {@link UserClientConnection}. + * It represents a client connection accepted by Foreman Drillbit's UserServer from a DrillClient. This connection + * is used to get hold of {@link UserSession} which stores all session related information like session options + * changed over the lifetime of this connection. There is a 1:1 mapping between a BitToUserConnection and a + * UserSession. This connection object is also used to send query data and result back to the client submitted as part + * of the session tied to this connection. */ public class BitToUserConnection extends AbstractServerConnection<BitToUserConnection> implements UserClientConnection { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java index e856ac58d..0798deabb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserSession.java @@ -24,6 +24,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.base.Strings; @@ -119,14 +120,34 @@ public class UserSession implements AutoCloseable { return this; } - public UserSession build() { + private boolean canApplyUserProperty() { + final StringBuilder sb = new StringBuilder(); if (userSession.properties.containsKey(DrillProperties.QUOTING_IDENTIFIERS)) { - if (userSession.sessionOptions != null) { + sb.append(DrillProperties.QUOTING_IDENTIFIERS).append(","); + } + + if (userSession.properties.containsKey(DrillProperties.QUERY_TAGS)) { + sb.append(DrillProperties.QUERY_TAGS); + } + + if (userSession.sessionOptions == null && sb.length() > 0) { + logger.warn("User property {} can't be installed as a server option without the session option manager", + sb.toString()); + return false; + } + return true; + } + + public UserSession build() { + if (canApplyUserProperty()) { + if (userSession.properties.containsKey(DrillProperties.QUOTING_IDENTIFIERS)) { userSession.setSessionOption(PlannerSettings.QUOTING_IDENTIFIERS_KEY, - userSession.properties.getProperty(DrillProperties.QUOTING_IDENTIFIERS)); - } else { - logger.warn("User property {} can't be installed as a server option without the session option manager", - DrillProperties.QUOTING_IDENTIFIERS); + userSession.properties.getProperty(DrillProperties.QUOTING_IDENTIFIERS)); + } + + if (userSession.properties.containsKey(DrillProperties.QUERY_TAGS)) { + userSession.setSessionOption(ExecConstants.RM_QUERY_TAGS_KEY, + userSession.properties.getProperty(DrillProperties.QUERY_TAGS)); } } UserSession session = userSession; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java index 73a039b72..89061763b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java @@ -17,19 +17,6 @@ */ package org.apache.drill.exec.server; -import java.io.IOException; -import java.nio.file.InvalidPathException; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.StandardWatchEventKinds; -import java.nio.file.WatchEvent; -import java.nio.file.WatchKey; -import java.nio.file.WatchService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import javax.tools.ToolProvider; - import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.StackTrace; import org.apache.drill.common.concurrent.ExtendedLatch; @@ -43,6 +30,7 @@ import org.apache.drill.exec.coord.ClusterCoordinator.RegistrationHandle; import org.apache.drill.exec.coord.zk.ZKClusterCoordinator; import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State; import org.apache.drill.exec.server.DrillbitStateManager.DrillbitState; import org.apache.drill.exec.server.options.OptionDefinition; import org.apache.drill.exec.server.options.OptionValue; @@ -51,21 +39,31 @@ import org.apache.drill.exec.server.options.SystemOptionManager; import org.apache.drill.exec.server.rest.WebServer; import org.apache.drill.exec.service.ServiceEngine; import org.apache.drill.exec.store.StoragePluginRegistry; -import org.apache.drill.exec.store.sys.store.provider.CachingPersistentStoreProvider; -import org.apache.drill.exec.store.sys.store.provider.InMemoryStoreProvider; import org.apache.drill.exec.store.sys.PersistentStoreProvider; import org.apache.drill.exec.store.sys.PersistentStoreRegistry; +import org.apache.drill.exec.store.sys.store.provider.CachingPersistentStoreProvider; +import org.apache.drill.exec.store.sys.store.provider.InMemoryStoreProvider; import org.apache.drill.exec.store.sys.store.provider.LocalPersistentStoreProvider; import org.apache.drill.exec.util.GuavaPatcher; import org.apache.drill.exec.util.ProtobufPatcher; import org.apache.drill.exec.work.WorkManager; -import org.apache.zookeeper.Environment; - -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint.State; import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; +import org.apache.zookeeper.Environment; import org.slf4j.bridge.SLF4JBridgeHandler; +import javax.tools.ToolProvider; +import java.io.IOException; +import java.nio.file.InvalidPathException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + /** * Starts, tracks and stops all the required services for a Drillbit daemon to work. */ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 030e1a346..9cd670e99 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -17,15 +17,6 @@ */ package org.apache.drill.exec.server.options; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.Map.Entry; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; - import org.apache.commons.collections.IteratorUtils; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.config.LogicalPlanPersistence; @@ -39,10 +30,18 @@ import org.apache.drill.exec.store.sys.PersistentStoreConfig; import org.apache.drill.exec.store.sys.PersistentStoreProvider; import org.apache.drill.exec.store.sys.store.provider.InMemoryStoreProvider; import org.apache.drill.exec.util.AssertionUtil; - import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + /** * <p> {@link OptionManager} that holds options within {@link org.apache.drill.exec.server.DrillbitContext}. * Only one instance of this class exists per drillbit. Options set at the system level affect the entire system and @@ -277,7 +276,10 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea new OptionDefinition(ExecConstants.HLL_ACCURACY_VALIDATOR), new OptionDefinition(ExecConstants.DETERMINISTIC_SAMPLING_VALIDATOR), new OptionDefinition(ExecConstants.NDV_BLOOM_FILTER_ELEMENTS_VALIDATOR), - new OptionDefinition(ExecConstants.NDV_BLOOM_FILTER_FPOS_PROB_VALIDATOR) + new OptionDefinition(ExecConstants.NDV_BLOOM_FILTER_FPOS_PROB_VALIDATOR), + new OptionDefinition(ExecConstants.RM_QUERY_TAGS_VALIDATOR, + new OptionMetaData(OptionValue.AccessibleScopes.SESSION_AND_QUERY, false, false)), + new OptionDefinition(ExecConstants.RM_QUEUES_WAIT_FOR_PREFERRED_NODES_VALIDATOR) }; CaseInsensitiveMap<OptionDefinition> map = Arrays.stream(definitions) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandlerService.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandlerService.java index f5fa122fe..3551b6769 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandlerService.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandlerService.java @@ -21,7 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.drill.shaded.guava.com.google.common.base.Charsets; import org.apache.drill.shaded.guava.com.google.common.io.Resources; import com.jasonclawson.jackson.dataformat.hocon.HoconFactory; -import org.apache.drill.common.config.CommonConstants; +import org.apache.drill.common.config.ConfigConstants; import org.apache.drill.common.config.LogicalPlanPersistence; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.logical.StoragePluginConfig; @@ -45,10 +45,10 @@ import static org.apache.drill.exec.store.StoragePluginRegistry.ACTION_ON_STORAG /** * Drill plugins handler, which allows to update storage plugins configs from the - * {@link CommonConstants#STORAGE_PLUGINS_OVERRIDE_CONF} conf file + * {@link ConfigConstants#STORAGE_PLUGINS_OVERRIDE_CONF} conf file * * TODO: DRILL-6564: It can be improved with configs versioning and service of creating - * {@link CommonConstants#STORAGE_PLUGINS_OVERRIDE_CONF} + * {@link ConfigConstants#STORAGE_PLUGINS_OVERRIDE_CONF} */ public class StoragePluginsHandlerService implements StoragePluginsHandler { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginsHandlerService.class); @@ -124,17 +124,17 @@ public class StoragePluginsHandlerService implements StoragePluginsHandler { } /** - * Get the new storage plugins from the {@link CommonConstants#STORAGE_PLUGINS_OVERRIDE_CONF} file if it exists, + * Get the new storage plugins from the {@link ConfigConstants#STORAGE_PLUGINS_OVERRIDE_CONF} file if it exists, * null otherwise * * @return storage plugins */ private StoragePlugins getNewStoragePlugins() { - Set<URL> urlSet = ClassPathScanner.forResource(CommonConstants.STORAGE_PLUGINS_OVERRIDE_CONF, false); + Set<URL> urlSet = ClassPathScanner.forResource(ConfigConstants.STORAGE_PLUGINS_OVERRIDE_CONF, false); if (!urlSet.isEmpty()) { if (urlSet.size() != 1) { DrillRuntimeException.format("More than one %s file is placed in Drill's classpath: %s", - CommonConstants.STORAGE_PLUGINS_OVERRIDE_CONF, urlSet); + ConfigConstants.STORAGE_PLUGINS_OVERRIDE_CONF, urlSet); } pluginsOverrideFileUrl = urlSet.iterator().next(); try { @@ -142,11 +142,11 @@ public class StoragePluginsHandlerService implements StoragePluginsHandler { return lpPersistence.getMapper().readValue(newPluginsData, StoragePlugins.class); } catch (IOException e) { logger.error("Failures are obtained while loading %s file. Proceed without update", - CommonConstants.STORAGE_PLUGINS_OVERRIDE_CONF, e); + ConfigConstants.STORAGE_PLUGINS_OVERRIDE_CONF, e); } } logger.trace("The {} file is absent. Proceed without updating of the storage plugins configs", - CommonConstants.STORAGE_PLUGINS_OVERRIDE_CONF); + ConfigConstants.STORAGE_PLUGINS_OVERRIDE_CONF); return null; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedResourceManager.java new file mode 100644 index 000000000..af9ba00f4 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedResourceManager.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.resourcemgr.config.ResourcePoolTree; +import org.apache.drill.exec.resourcemgr.config.ResourcePoolTreeImpl; +import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.work.foreman.Foreman; + +public class DistributedResourceManager implements ResourceManager { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedResourceManager.class); + + private final ResourcePoolTree rmPoolTree; + + private final DrillbitContext context; + + private final DrillConfig rmConfig; + + private final ResourceManager delegatedRM; + + public DistributedResourceManager(DrillbitContext context) throws DrillRuntimeException { + try { + this.context = context; + this.rmConfig = DrillConfig.createForRM(); + rmPoolTree = new ResourcePoolTreeImpl(rmConfig, DrillConfig.getMaxDirectMemory(), + Runtime.getRuntime().availableProcessors(), 1); + logger.debug("Successfully parsed RM config \n{}", rmConfig.getConfig(ResourcePoolTreeImpl.ROOT_POOL_CONFIG_KEY)); + this.delegatedRM = new DefaultResourceManager(); + } catch (RMConfigException ex) { + throw new DrillRuntimeException(String.format("Failed while parsing Drill RM Configs. Drillbit won't be started" + + " unless config is fixed or RM is disabled by setting %s to false", ExecConstants.RM_ENABLED), ex); + } + } + @Override + public long memoryPerNode() { + return delegatedRM.memoryPerNode(); + } + + @Override + public int cpusPerNode() { + return delegatedRM.cpusPerNode(); + } + + @Override + public QueryResourceAllocator newResourceAllocator(QueryContext queryContext) { + return delegatedRM.newResourceAllocator(queryContext); + } + + @Override + public QueryResourceManager newQueryRM(Foreman foreman) { + return delegatedRM.newQueryRM(foreman); + } + + public ResourcePoolTree getRmPoolTree() { + return rmPoolTree; + } + + @Override + public void close() { + delegatedRM.close(); + } +} diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 4f6fbb278..7e9415530 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -407,6 +407,8 @@ drill.exec: { // Resource management rm : { + // Enables Drill RM feature by default. To disable it set the below parameter to false + enabled: false // Memory per node normally comes from the direct memory alloated on the JVM // command line. This parameter, if other than 0, further limits the amount. // Primarily for testing. @@ -675,5 +677,9 @@ drill.exec.options: { exec.statistics.ndv_accuracy: 20, exec.statistics.ndv_extrapolation_bf_elements: 1000000, exec.statistics.ndv_extrapolation_bf_fpprobability: 10, - exec.statistics.deterministic_sampling: false + exec.statistics.deterministic_sampling: false, + exec.query.return_result_set_for_ddl: true, + # ========= rm related options =========== + exec.rm.queryTags: "", + exec.rm.queues.wait_for_preferred_nodes: true } diff --git a/exec/java-exec/src/main/resources/drill-rm-default.conf b/exec/java-exec/src/main/resources/drill-rm-default.conf new file mode 100644 index 000000000..520947f56 --- /dev/null +++ b/exec/java-exec/src/main/resources/drill-rm-default.conf @@ -0,0 +1,34 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http:// www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +drill.exec.rm: { + pool_name: "root", + memory: 0.9, // 90% of total direct memory allocated to Drill + queue_selection_policy: "bestfit", // policy to select queue for a query when multiple queues are eligible + selector: { + acl: { + users: ["*"], + groups: ["*"] + } + } + queue: { + max_query_memory_per_node: 8G // supported format regex [0-9]*[kKmMgG]? + max_waiting: 10, // default + max_admissible: 10, // default + max_wait_timeout: 30000, // default in ms + wait_for_preferred_nodes: true // default + } +}
\ No newline at end of file diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/TestRMConfigLoad.java b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/TestRMConfigLoad.java new file mode 100644 index 000000000..16f7f64f0 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/TestRMConfigLoad.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr; + +import org.apache.drill.categories.ResourceManagerTest; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.resourcemgr.config.QueryQueueConfig; +import org.apache.drill.exec.resourcemgr.config.RMCommonDefaults; +import org.apache.drill.exec.resourcemgr.config.ResourcePoolTree; +import org.apache.drill.exec.resourcemgr.config.selectors.AclSelector; +import org.apache.drill.exec.work.foreman.rm.DefaultResourceManager; +import org.apache.drill.exec.work.foreman.rm.DistributedResourceManager; +import org.apache.drill.exec.work.foreman.rm.ResourceManager; +import org.apache.drill.test.BaseDirTestWatcher; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterFixtureBuilder; +import org.apache.drill.test.DrillTest; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +@Ignore("These tests will be ignored until integration with new DistributedResourceManager is done") +@Category(ResourceManagerTest.class) +public final class TestRMConfigLoad extends DrillTest { + + @Rule + public final BaseDirTestWatcher dirTestWatcher = new BaseDirTestWatcher(); + + @Test + public void testDefaultRMConfig() throws Exception { + ClusterFixtureBuilder fixtureBuilder = ClusterFixture.builder(dirTestWatcher) + .configProperty(ExecConstants.RM_ENABLED, true) + .configProperty(ExecConstants.DRILL_PORT_HUNT, true) + .withLocalZk(); + + try (ClusterFixture cluster = fixtureBuilder.build()) { + ResourceManager resourceManager = cluster.drillbit().getContext().getResourceManager(); + assertTrue(resourceManager instanceof DistributedResourceManager); + + ResourcePoolTree poolTree = ((DistributedResourceManager) resourceManager).getRmPoolTree(); + assertTrue("In drill-rm-default root pool is not leaf pool", poolTree.getRootPool().isLeafPool()); + assertTrue("selector in drill-rm-default is not acl selector", + poolTree.getRootPool().getSelector() instanceof AclSelector); + assertEquals("max_query_memory_per_node in drill-rm-default is not configured with expected default value", + 8 * 1024L, poolTree.getRootPool().getMaxQueryMemoryPerNode()); + assertEquals("queue_selection_policy in drill-rm-default is not configured with expected default value", + RMCommonDefaults.ROOT_POOL_DEFAULT_QUEUE_SELECTION_POLICY, + poolTree.getSelectionPolicyInUse().getSelectionPolicy()); + assertEquals("memory share of root pool in drill-rm-default is not configured with expected default value", + RMCommonDefaults.ROOT_POOL_DEFAULT_MEMORY_PERCENT, poolTree.getResourceShare(), 0); + + final QueryQueueConfig defaultQueue = poolTree.getRootPool().getQueryQueue(); + assertEquals("max_admissible in drill-rm-default is not configured with expected default value", + RMCommonDefaults.MAX_ADMISSIBLE_QUERY_COUNT, defaultQueue.getMaxAdmissibleQueries()); + assertEquals("max_waiting in drill-rm-default is not configured with expected default value", + RMCommonDefaults.MAX_WAITING_QUERY_COUNT, defaultQueue.getMaxWaitingQueries()); + assertEquals("max_wait_timeout in drill-rm-default is not configured with expected default value", + RMCommonDefaults.MAX_WAIT_TIMEOUT_IN_MS, defaultQueue.getWaitTimeoutInMs()); + assertEquals("wait_for_preferred_nodes in drill-rm-default is not configured with expected default value", + RMCommonDefaults.WAIT_FOR_PREFERRED_NODES, defaultQueue.waitForPreferredNodes()); + } + } + + @Test + public void testDefaultRMWithLocalCoordinatorAndRMEnabled() throws Exception { + ClusterFixtureBuilder fixtureBuilder = ClusterFixture.builder(dirTestWatcher) + .configProperty(ExecConstants.RM_ENABLED, true); + + try (ClusterFixture cluster = fixtureBuilder.build()) { + ResourceManager resourceManager = cluster.drillbit().getContext().getResourceManager(); + assertTrue(resourceManager instanceof DefaultResourceManager); + } + } + + @Test + public void testDefaultRMWithLocalCoordinatorAndRMDisabled() throws Exception { + ClusterFixtureBuilder fixtureBuilder = ClusterFixture.builder(dirTestWatcher) + .configProperty(ExecConstants.RM_ENABLED, false); + + try (ClusterFixture cluster = fixtureBuilder.build()) { + ResourceManager resourceManager = cluster.drillbit().getContext().getResourceManager(); + assertTrue(resourceManager instanceof DefaultResourceManager); + } + } + + @Test + public void testDefaultRMOnlyRMDisabled() throws Exception { + ClusterFixtureBuilder fixtureBuilder = ClusterFixture.builder(dirTestWatcher) + .configProperty(ExecConstants.RM_ENABLED, false) + .configProperty(ExecConstants.DRILL_PORT_HUNT, true) + .withLocalZk(); + + try (ClusterFixture cluster = fixtureBuilder.build()) { + ResourceManager resourceManager = cluster.drillbit().getContext().getResourceManager(); + assertTrue(resourceManager instanceof DefaultResourceManager); + } + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/TestResourcePoolTree.java b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/TestResourcePoolTree.java new file mode 100644 index 000000000..fea132d41 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/TestResourcePoolTree.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; +import org.apache.drill.categories.ResourceManagerTest; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.resourcemgr.config.QueueAssignmentResult; +import org.apache.drill.exec.resourcemgr.config.ResourcePool; +import org.apache.drill.exec.resourcemgr.config.ResourcePoolImpl; +import org.apache.drill.exec.resourcemgr.config.ResourcePoolTree; +import org.apache.drill.exec.resourcemgr.config.ResourcePoolTreeImpl; +import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException; +import org.apache.drill.exec.server.options.OptionValue; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.drill.exec.resourcemgr.config.RMCommonDefaults.ROOT_POOL_DEFAULT_QUEUE_SELECTION_POLICY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@Category(ResourceManagerTest.class) +public final class TestResourcePoolTree { + + private static final Map<String, Object> poolTreeConfig = new HashMap<>(); + + private static final Map<String, Object> pool1 = new HashMap<>(); + + private static final Map<String, Object> pool2 = new HashMap<>(); + + private static final Map<String, Object> queue1 = new HashMap<>(); + + private static final List<Object> childResourcePools = new ArrayList<>(); + + private static final Map<String, Object> tagSelectorConfig1 = new HashMap<>(); + + private static final Map<String, Object> tagSelectorConfig2 = new HashMap<>(); + + private static final QueryContext mockContext = mock(QueryContext.class); + + @BeforeClass + public static void testSuiteSetup() { + pool1.put(ResourcePoolImpl.POOL_NAME_KEY, "dev"); + pool1.put(ResourcePoolImpl.POOL_MEMORY_SHARE_KEY, 0.80); + + pool2.put(ResourcePoolImpl.POOL_NAME_KEY, "qa"); + pool2.put(ResourcePoolImpl.POOL_MEMORY_SHARE_KEY, 0.20); + + queue1.put("max_query_memory_per_node", 5534); + + tagSelectorConfig1.put("tag", "small"); + tagSelectorConfig2.put("tag", "large"); + } + + @After + public void afterTestCleanup() { + // cleanup resource tree + poolTreeConfig.clear(); + + // cleanup pools + pool1.remove(ResourcePoolImpl.POOL_QUEUE_KEY); + pool1.remove(ResourcePoolImpl.POOL_SELECTOR_KEY); + + pool2.remove(ResourcePoolImpl.POOL_QUEUE_KEY); + pool2.remove(ResourcePoolImpl.POOL_SELECTOR_KEY); + + childResourcePools.clear(); + } + + private ResourcePoolTree getPoolTreeConfig() throws RMConfigException { + poolTreeConfig.put(ResourcePoolImpl.POOL_NAME_KEY, "drill"); + poolTreeConfig.put(ResourcePoolImpl.POOL_CHILDREN_POOLS_KEY, childResourcePools); + + Config rmConfig = ConfigFactory.empty() + .withValue("drill.exec.rm", ConfigValueFactory.fromMap(poolTreeConfig)); + return new ResourcePoolTreeImpl(rmConfig, 10000, 10, 2); + } + + private boolean checkExpectedVsActualPools(List<ResourcePool> actual, List<String> expectedNames) { + if (actual.size() != expectedNames.size()) { + return false; + } + + for (ResourcePool pool : actual) { + if (!expectedNames.contains(pool.getPoolName())) { + return false; + } + } + return true; + } + + @Test + public void testTreeWith2LeafPool() throws Exception { + // pool with tag selector + pool1.put(ResourcePoolImpl.POOL_QUEUE_KEY, queue1); + pool1.put(ResourcePoolImpl.POOL_SELECTOR_KEY, tagSelectorConfig1); + + // pool with default selector + pool2.put(ResourcePoolImpl.POOL_QUEUE_KEY, queue1); + + childResourcePools.add(pool1); + childResourcePools.add(pool2); + + ResourcePoolTree configTree = getPoolTreeConfig(); + + // get all leaf queues names + Set<String> expectedLeafQueue = new HashSet<>(); + expectedLeafQueue.add((String)pool1.get("pool_name")); + expectedLeafQueue.add((String)pool2.get("pool_name")); + + assertEquals("Root pool is different than expected", "drill", configTree.getRootPool().getPoolName()); + assertEquals("Expected and actual leaf queue names are different", expectedLeafQueue, + configTree.getAllLeafQueues().keySet()); + assertEquals("Unexpected Selection policy is in use", ROOT_POOL_DEFAULT_QUEUE_SELECTION_POLICY, + configTree.getSelectionPolicyInUse().getSelectionPolicy()); + } + + @Test(expected = RMConfigException.class) + public void testDuplicateLeafPool() throws Exception { + // leaf pool + pool1.put(ResourcePoolImpl.POOL_QUEUE_KEY, queue1); + childResourcePools.add(pool1); + childResourcePools.add(pool1); + + getPoolTreeConfig(); + } + + @Test(expected = RMConfigException.class) + public void testMissingQueueAtLeafPool() throws Exception { + // leaf pool with queue + pool1.put(ResourcePoolImpl.POOL_QUEUE_KEY, queue1); + pool1.put(ResourcePoolImpl.POOL_SELECTOR_KEY, tagSelectorConfig1); + childResourcePools.add(pool1); + childResourcePools.add(pool2); + + getPoolTreeConfig(); + } + + @Test(expected = RMConfigException.class) + public void testInvalidQueueAtLeafPool() throws Exception { + // leaf pool with invalid queue + int initialValue = (Integer)queue1.remove("max_query_memory_per_node"); + + try { + pool1.put(ResourcePoolImpl.POOL_QUEUE_KEY, queue1); + pool1.put(ResourcePoolImpl.POOL_SELECTOR_KEY, tagSelectorConfig1); + childResourcePools.add(pool1); + + getPoolTreeConfig(); + } finally { + queue1.put("max_query_memory_per_node", initialValue); + } + } + + @Test + public void testRootPoolAsLeaf() throws Exception { + // leaf pool with queue + poolTreeConfig.put(ResourcePoolImpl.POOL_NAME_KEY, "drill"); + poolTreeConfig.put(ResourcePoolImpl.POOL_QUEUE_KEY, queue1); + poolTreeConfig.put(ResourcePoolImpl.POOL_SELECTOR_KEY, tagSelectorConfig1); + + Config rmConfig = ConfigFactory.empty() + .withValue("drill.exec.rm", ConfigValueFactory.fromMap(poolTreeConfig)); + ResourcePoolTree poolTree = new ResourcePoolTreeImpl(rmConfig, 10000, 10, 2); + + assertTrue("Root pool is not a leaf pool", poolTree.getRootPool().isLeafPool()); + assertEquals("Root pool name is not drill", "drill", poolTree.getRootPool().getPoolName()); + assertTrue("Root pool is not the only leaf pool", poolTree.getAllLeafQueues().size() == 1); + assertTrue("Root pool name is not same as leaf pool name", poolTree.getAllLeafQueues().containsKey("drill")); + assertFalse("Root pool should not be a default pool", poolTree.getRootPool().isDefaultPool()); + } + + @Test + public void testTreeWithLeafAndIntermediatePool() throws Exception { + // left leaf pool1 with tag selector + pool1.put(ResourcePoolImpl.POOL_QUEUE_KEY, queue1); + pool1.put(ResourcePoolImpl.POOL_SELECTOR_KEY, tagSelectorConfig1); + + // left leaf pool2 with default selector + pool2.put(ResourcePoolImpl.POOL_QUEUE_KEY, queue1); + + // intermediate left pool1 with 2 leaf pools (pool1, pool2) + Map<String, Object> interPool1 = new HashMap<>(); + List<Object> childPools1 = new ArrayList<>(); + childPools1.add(pool1); + childPools1.add(pool2); + interPool1.put(ResourcePoolImpl.POOL_NAME_KEY, "eng"); + interPool1.put(ResourcePoolImpl.POOL_MEMORY_SHARE_KEY, 0.9); + interPool1.put(ResourcePoolImpl.POOL_CHILDREN_POOLS_KEY, childPools1); + + // right leaf pool + Map<String, Object> rightLeafPool = new HashMap<>(); + rightLeafPool.put(ResourcePoolImpl.POOL_NAME_KEY, "marketing"); + rightLeafPool.put(ResourcePoolImpl.POOL_MEMORY_SHARE_KEY, 0.1); + rightLeafPool.put(ResourcePoolImpl.POOL_QUEUE_KEY, queue1); + rightLeafPool.put(ResourcePoolImpl.POOL_SELECTOR_KEY, tagSelectorConfig2); + + childResourcePools.add(interPool1); + childResourcePools.add(rightLeafPool); + ResourcePoolTree configTree = getPoolTreeConfig(); + + // Test successful selection of all leaf pools + OptionValue testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, ExecConstants + .RM_QUERY_TAGS_KEY, "small,large", OptionValue.OptionScope.SESSION); + when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption); + QueueAssignmentResult assignmentResult = configTree.selectAllQueues(mockContext); + List<ResourcePool> selectedPools = assignmentResult.getSelectedLeafPools(); + List<String> expectedPools = new ArrayList<>(); + expectedPools.add("dev"); + expectedPools.add("qa"); + expectedPools.add("marketing"); + + assertTrue("All leaf pools are not selected", selectedPools.size() == 3); + assertTrue("Selected leaf pools and expected pools are different", + checkExpectedVsActualPools(selectedPools, expectedPools)); + + // Test successful selection of multiple leaf pools + expectedPools.clear(); + testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, ExecConstants + .RM_QUERY_TAGS_KEY, "small", OptionValue.OptionScope.SESSION); + when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption); + assignmentResult = configTree.selectAllQueues(mockContext); + selectedPools = assignmentResult.getSelectedLeafPools(); + expectedPools.add("qa"); + expectedPools.add("dev"); + assertTrue("Expected 2 pools to be selected", selectedPools.size() == 2); + assertTrue("Selected leaf pools and expected pools are different", + checkExpectedVsActualPools(selectedPools, expectedPools)); + + // Test successful selection of only left default pool + expectedPools.clear(); + testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, ExecConstants + .RM_QUERY_TAGS_KEY, "medium", OptionValue.OptionScope.SESSION); + when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption); + assignmentResult = configTree.selectAllQueues(mockContext); + selectedPools = assignmentResult.getSelectedLeafPools(); + expectedPools.add("qa"); + assertTrue("More than one leaf pool is selected", selectedPools.size() == 1); + assertTrue("Selected leaf pools and expected pools are different", + checkExpectedVsActualPools(selectedPools, expectedPools)); + + // cleanup + interPool1.clear(); + rightLeafPool.clear(); + expectedPools.clear(); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/TestBestFitSelectionPolicy.java b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/TestBestFitSelectionPolicy.java new file mode 100644 index 000000000..3dcde00ca --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/TestBestFitSelectionPolicy.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config.selectionpolicy; + +import org.apache.drill.categories.ResourceManagerTest; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.resourcemgr.NodeResources; +import org.apache.drill.exec.resourcemgr.config.QueryQueueConfig; +import org.apache.drill.exec.resourcemgr.config.RMCommonDefaults; +import org.apache.drill.exec.resourcemgr.config.ResourcePool; +import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@Category(ResourceManagerTest.class) +public final class TestBestFitSelectionPolicy { + + private static QueueSelectionPolicy selectionPolicy; + + private static final QueryContext queryContext = mock(QueryContext.class); + + private static final NodeResources queryMaxResources = new NodeResources(1500*1024L*1024L, 2); + + private static final List<Long> poolMemory = new ArrayList<>(); + + @BeforeClass + public static void testSetup() { + selectionPolicy = QueueSelectionPolicyFactory.createSelectionPolicy + (RMCommonDefaults.ROOT_POOL_DEFAULT_QUEUE_SELECTION_POLICY); + when(queryContext.getQueryId()).thenReturn(UserBitShared.QueryId.getDefaultInstance()); + } + + @After + public void afterTestSetup() { + poolMemory.clear(); + } + + private void testCommonHelper(long expectedPoolMem) throws Exception { + List<ResourcePool> inputPools = new ArrayList<>(); + ResourcePool expectedPool = null; + + for (Long poolMemory : poolMemory) { + final ResourcePool testPool = mock(ResourcePool.class); + final QueryQueueConfig testPoolQueue = mock(QueryQueueConfig.class); + when(testPool.getQueryQueue()).thenReturn(testPoolQueue); + when(testPoolQueue.getMaxQueryMemoryInMBPerNode()).thenReturn(poolMemory); + inputPools.add(testPool); + + if (poolMemory == expectedPoolMem) { + expectedPool = testPool; + } + } + + ResourcePool selectedPool = selectionPolicy.selectQueue(inputPools, queryContext, queryMaxResources); + assertEquals("Selected Pool and expected pool is different", expectedPool, selectedPool); + } + + @Test(expected = QueueSelectionException.class) + public void testWithNoPool() throws Exception { + testCommonHelper(0); + } + + @Test + public void testWithSinglePool() throws Exception { + poolMemory.add(1000L); + testCommonHelper(1000); + } + + @Test + public void testWithMultiplePoolWithGreaterMaxNodeMemory() throws Exception { + poolMemory.add(2500L); + poolMemory.add(2000L); + testCommonHelper(2000); + } + + @Test + public void testWithMultiplePoolWithLesserMaxNodeMemory() throws Exception { + poolMemory.add(700L); + poolMemory.add(500L); + testCommonHelper(700); + } + + @Test + public void testMixOfPoolLess_Greater_MaxNodeMemory() throws Exception { + poolMemory.add(1000L); + poolMemory.add(2000L); + testCommonHelper(2000); + } + + @Test + public void testMixOfPoolLess_Greater_EqualMaxNodeMemory() throws Exception { + poolMemory.add(1000L); + poolMemory.add(2000L); + poolMemory.add(1500L); + testCommonHelper(1500); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/TestDefaultSelectionPolicy.java b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/TestDefaultSelectionPolicy.java new file mode 100644 index 000000000..a78d106f3 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectionpolicy/TestDefaultSelectionPolicy.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config.selectionpolicy; + +import org.apache.drill.categories.ResourceManagerTest; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.resourcemgr.config.ResourcePool; +import org.apache.drill.exec.resourcemgr.config.exception.QueueSelectionException; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@Category(ResourceManagerTest.class) +public final class TestDefaultSelectionPolicy { + + private static QueueSelectionPolicy selectionPolicy; + + private static final QueryContext queryContext = mock(QueryContext.class); + + @BeforeClass + public static void testSetup() { + selectionPolicy = QueueSelectionPolicyFactory.createSelectionPolicy( + QueueSelectionPolicy.SelectionPolicy.DEFAULT); + when(queryContext.getQueryId()).thenReturn(UserBitShared.QueryId.getDefaultInstance()); + } + + @Test(expected = QueueSelectionException.class) + public void testWithNoDefaultPool() throws Exception { + List<ResourcePool> inputPools = new ArrayList<>(); + final ResourcePool testPool1 = mock(ResourcePool.class); + when(testPool1.isDefaultPool()).thenReturn(false); + final ResourcePool testPool2 = mock(ResourcePool.class); + when(testPool2.isDefaultPool()).thenReturn(false); + + inputPools.add(testPool1); + inputPools.add(testPool2); + selectionPolicy.selectQueue(inputPools, queryContext, null); + } + + @Test + public void testWithSingleDefaultPool() throws Exception { + List<ResourcePool> inputPools = new ArrayList<>(); + final ResourcePool testPool1 = mock(ResourcePool.class); + when(testPool1.isDefaultPool()).thenReturn(true); + inputPools.add(testPool1); + + final ResourcePool selectedPool = selectionPolicy.selectQueue(inputPools, queryContext, null); + assertEquals("Selected Pool and expected pool is different",testPool1, selectedPool); + } + + @Test + public void testWithMultipleDefaultPool() throws Exception { + List<ResourcePool> inputPools = new ArrayList<>(); + final ResourcePool testPool1 = mock(ResourcePool.class); + when(testPool1.isDefaultPool()).thenReturn(true); + final ResourcePool testPool2 = mock(ResourcePool.class); + when(testPool2.isDefaultPool()).thenReturn(true); + + inputPools.add(testPool1); + inputPools.add(testPool2); + + final ResourcePool selectedPool = selectionPolicy.selectQueue(inputPools, queryContext, null); + assertEquals("Selected Pool and expected pool is different",testPool1, selectedPool); + } + + @Test + public void testMixOfDefaultAndNonDefaultPool() throws Exception { + List<ResourcePool> inputPools = new ArrayList<>(); + final ResourcePool testPool1 = mock(ResourcePool.class); + when(testPool1.isDefaultPool()).thenReturn(false); + final ResourcePool testPool2 = mock(ResourcePool.class); + when(testPool2.isDefaultPool()).thenReturn(true); + + inputPools.add(testPool1); + inputPools.add(testPool2); + + final ResourcePool selectedPool = selectionPolicy.selectQueue(inputPools, queryContext, null); + assertEquals("Selected Pool and expected pool is different",testPool2, selectedPool); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestAclSelector.java b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestAclSelector.java new file mode 100644 index 000000000..fbdb108fd --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestAclSelector.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config.selectors; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; +import org.apache.drill.categories.ResourceManagerTest; +import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException; +import org.junit.After; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +@Category(ResourceManagerTest.class) +public final class TestAclSelector { + + private static final List<String> groupsValue = new ArrayList<>(); + + private static final List<String> usersValue = new ArrayList<>(); + + private static final List<String> emptyList = new ArrayList<>(); + + private static final Map<String, List<String>> aclConfigValue = new HashMap<>(); + + private boolean checkIfSame(Object[] expected, Object[] actual) { + Arrays.sort(expected); + Arrays.sort(actual); + return Arrays.equals(expected, actual); + } + + @After + public void cleanupAfterTest() { + groupsValue.clear(); + usersValue.clear(); + aclConfigValue.clear(); + } + + private ResourcePoolSelector testNegativeHelper() throws RMConfigException { + final Config testConfig = ConfigFactory.empty() + .withValue("acl", ConfigValueFactory.fromMap(aclConfigValue)); + return ResourcePoolSelectorFactory.createSelector(testConfig); + } + + private ResourcePoolSelector testCommonHelper(List<String> expectedPositiveUsers, List<String> expectedPositiveGroups, + List<String> expectedNegativeUsers, List<String> expectedNegativeGroups) + throws RMConfigException { + ResourcePoolSelector testSelector = testNegativeHelper(); + assertTrue("TestSelector is not a ACL selector", testSelector instanceof AclSelector); + assertTrue("Expected +ve users config mismatched with actual config", + checkIfSame(expectedPositiveUsers.toArray(), ((AclSelector) testSelector).getAllowedUsers().toArray())); + assertTrue("Expected +ve groups config mismatched with actual config", + checkIfSame(expectedPositiveGroups.toArray(), ((AclSelector) testSelector).getAllowedGroups().toArray())); + assertTrue("Expected -ve users config mismatched with actual config", + checkIfSame(expectedNegativeUsers.toArray(), ((AclSelector) testSelector).getDeniedUsers().toArray())); + assertTrue("Expected -ve groups config mismatched with actual config", + checkIfSame(expectedNegativeGroups.toArray(), ((AclSelector) testSelector).getDeniedGroups().toArray())); + return testSelector; + } + + @Test + public void testValidACLSelector_shortSyntax() throws Exception { + groupsValue.add("sales"); + groupsValue.add("marketing"); + + usersValue.add("user1"); + usersValue.add("user2"); + + aclConfigValue.put("groups", groupsValue); + aclConfigValue.put("users", usersValue); + AclSelector testSelector = (AclSelector) testCommonHelper(usersValue, groupsValue, emptyList, emptyList); + + // check based on valid/invalid user + Set<String> groups = new HashSet<>(); + assertFalse(testSelector.checkQueryUserGroups("user3", groups)); + assertTrue(testSelector.checkQueryUserGroups("user1", groups)); + + // check based on correct group + groups.add("sales"); + assertTrue(testSelector.checkQueryUserGroups("user3", groups)); + } + + @Test + public void testACLSelector_onlyUsers() throws Exception { + usersValue.add("user1"); + aclConfigValue.put("users", usersValue); + testCommonHelper(usersValue, groupsValue, emptyList, emptyList); + } + + @Test + public void testACLSelector_onlyGroups() throws Exception { + groupsValue.add("group1"); + aclConfigValue.put("groups", groupsValue); + testCommonHelper(usersValue, groupsValue, emptyList, emptyList); + } + + @Test(expected = RMConfigException.class) + public void testInValidACLSelector_shortSyntax() throws Exception { + aclConfigValue.put("groups", new ArrayList<>()); + aclConfigValue.put("users", new ArrayList<>()); + testNegativeHelper(); + } + + @Test + public void testValidACLSelector_longSyntax() throws Exception { + + groupsValue.add("sales:+"); + groupsValue.add("marketing:-"); + + List<String> expectedAllowedGroups = new ArrayList<>(); + expectedAllowedGroups.add("sales"); + + List<String> expectedDisAllowedGroups = new ArrayList<>(); + expectedDisAllowedGroups.add("marketing"); + + usersValue.add("user1:+"); + usersValue.add("user2:-"); + + List<String> expectedAllowedUsers = new ArrayList<>(); + expectedAllowedUsers.add("user1"); + List<String> expectedDisAllowedUsers = new ArrayList<>(); + expectedDisAllowedUsers.add("user2"); + + aclConfigValue.put("groups", groupsValue); + aclConfigValue.put("users", usersValue); + AclSelector testSelector = (AclSelector)testCommonHelper(expectedAllowedUsers, expectedAllowedGroups, + expectedDisAllowedUsers, expectedDisAllowedGroups); + + Set<String> queryGroups = new HashSet<>(); + // Negative user/group + queryGroups.add("marketing"); + assertFalse(testSelector.checkQueryUserGroups("user2", queryGroups)); + + // Invalid user -ve group + assertFalse(testSelector.checkQueryUserGroups("user3", queryGroups)); + + // -ve user +ve group + queryGroups.clear(); + queryGroups.add("sales"); + assertFalse(testSelector.checkQueryUserGroups("user2", queryGroups)); + + // Invalid user +ve group + assertTrue(testSelector.checkQueryUserGroups("user3", queryGroups)); + } + + @Test(expected = RMConfigException.class) + public void testInvalidLongSyntaxIdentifier() throws Exception { + groupsValue.add("sales:|"); + aclConfigValue.put("groups", groupsValue); + testNegativeHelper(); + } + + @Test + public void testMixLongShortAclSyntax() throws Exception { + groupsValue.add("groups1"); + groupsValue.add("groups2:+"); + groupsValue.add("groups3:-"); + + List<String> expectedAllowedGroups = new ArrayList<>(); + expectedAllowedGroups.add("groups1"); + expectedAllowedGroups.add("groups2"); + + List<String> expectedDisAllowedGroups = new ArrayList<>(); + expectedDisAllowedGroups.add("groups3"); + + usersValue.add("user1"); + usersValue.add("user2:+"); + usersValue.add("user3:-"); + + List<String> expectedAllowedUsers = new ArrayList<>(); + expectedAllowedUsers.add("user1"); + expectedAllowedUsers.add("user2"); + + List<String> expectedDisAllowedUsers = new ArrayList<>(); + expectedDisAllowedUsers.add("user3"); + + aclConfigValue.put("groups", groupsValue); + aclConfigValue.put("users", usersValue); + + testCommonHelper(expectedAllowedUsers, expectedAllowedGroups, expectedDisAllowedUsers, expectedDisAllowedGroups); + } + + @Test + public void testSameUserBothInPositiveNegative() throws Exception { + usersValue.add("user1:+"); + usersValue.add("user1:-"); + + List<String> expectedDisAllowedUsers = new ArrayList<>(); + expectedDisAllowedUsers.add("user1"); + + aclConfigValue.put("users", usersValue); + testCommonHelper(emptyList, emptyList, expectedDisAllowedUsers, emptyList); + } + + @Test + public void testStarInPositiveUsers() throws Exception { + usersValue.add("*:+"); + usersValue.add("user1:-"); + + List<String> expectedAllowedUsers = new ArrayList<>(); + expectedAllowedUsers.add("*"); + + List<String> expectedDisAllowedUsers = new ArrayList<>(); + expectedDisAllowedUsers.add("user1"); + + aclConfigValue.put("users", usersValue); + AclSelector testSelector = (AclSelector)testCommonHelper(expectedAllowedUsers, emptyList, + expectedDisAllowedUsers, emptyList); + + Set<String> queryGroups = new HashSet<>(); + // -ve user with Invalid groups + assertFalse(testSelector.checkQueryUserGroups("user1", queryGroups)); + + // Other user with invalid groups + assertTrue(testSelector.checkQueryUserGroups("user2", queryGroups)); + } + + @Test + public void testStarInNegativeUsers() throws Exception { + usersValue.add("*:-"); + usersValue.add("user1:+"); + + List<String> expectedAllowedUsers = new ArrayList<>(); + expectedAllowedUsers.add("user1"); + + List<String> expectedDisAllowedUsers = new ArrayList<>(); + expectedDisAllowedUsers.add("*"); + + aclConfigValue.put("users", usersValue); + AclSelector testSelector = (AclSelector)testCommonHelper(expectedAllowedUsers, emptyList, + expectedDisAllowedUsers, emptyList); + + Set<String> queryGroups = new HashSet<>(); + // Other user with Invalid groups + assertFalse(testSelector.checkQueryUserGroups("user2", queryGroups)); + + // +ve user with invalid groups + assertTrue(testSelector.checkQueryUserGroups("user1", queryGroups)); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestComplexSelectors.java b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestComplexSelectors.java new file mode 100644 index 000000000..d381d05a0 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestComplexSelectors.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config.selectors; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; +import org.apache.drill.categories.ResourceManagerTest; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException; +import org.apache.drill.exec.server.options.OptionValue; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@Category(ResourceManagerTest.class) +public final class TestComplexSelectors { + + private static final Map<String, String> tagSelectorConfig1 = new HashMap<>(); + + private static final Map<String, String> tagSelectorConfig2 = new HashMap<>(); + + private static final QueryContext mockContext = mock(QueryContext.class); + + private static final List<Object> complexSelectorValue = new ArrayList<>(); + + @BeforeClass + public static void classLevelSetup() { + tagSelectorConfig1.put("tag", "small"); + tagSelectorConfig2.put("tag", "large"); + } + + @After + public void testLevelSetup() { + complexSelectorValue.clear(); + } + + private ResourcePoolSelector testOrCommonHelper() throws RMConfigException { + Config testConfig = ConfigFactory.empty().withValue("or", ConfigValueFactory.fromIterable(complexSelectorValue)); + final ResourcePoolSelector testSelector = ResourcePoolSelectorFactory.createSelector(testConfig); + assertTrue("TestSelector is not a OrSelector", testSelector instanceof OrSelector); + return testSelector; + } + + private ResourcePoolSelector testAndCommonHelper() throws RMConfigException { + Config testConfig = ConfigFactory.empty().withValue("and", ConfigValueFactory.fromIterable(complexSelectorValue)); + final ResourcePoolSelector testSelector = ResourcePoolSelectorFactory.createSelector(testConfig); + assertTrue("TestSelector is not a AndSelector", testSelector instanceof AndSelector); + return testSelector; + } + + @Test(expected = RMConfigException.class) + public void testOrSelectorSingleValidSelector() throws Exception { + // setup complexSelectorValue + complexSelectorValue.add(tagSelectorConfig1); + testOrCommonHelper(); + } + + @Test(expected = RMConfigException.class) + public void testOrSelectorEmptyValue() throws Exception { + testOrCommonHelper(); + } + + @Test(expected = RMConfigException.class) + public void testAndSelectorEmptyValue() throws Exception { + testAndCommonHelper(); + } + + @Test(expected = RMConfigException.class) + public void testOrSelectorStringValue() throws Exception { + Config testConfig = ConfigFactory.empty().withValue("or", ConfigValueFactory.fromAnyRef("dummy")); + ResourcePoolSelectorFactory.createSelector(testConfig); + } + + @Test(expected = RMConfigException.class) + public void testAndSelectorStringValue() throws Exception { + Config testConfig = ConfigFactory.empty().withValue("and", ConfigValueFactory.fromAnyRef("dummy")); + ResourcePoolSelectorFactory.createSelector(testConfig); + } + + @Test + public void testOrSelectorWithTwoValidSelector() throws Exception { + // setup complexSelectorValue + complexSelectorValue.add(tagSelectorConfig1); + complexSelectorValue.add(tagSelectorConfig2); + + // Test for OrSelector + ResourcePoolSelector testSelector = testOrCommonHelper(); + + // Test successful selection with OrSelector + OptionValue testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, + ExecConstants.RM_QUERY_TAGS_KEY, "small", OptionValue.OptionScope.SESSION); + when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption); + assertTrue(testSelector.isQuerySelected(mockContext)); + + // Test unsuccessful selection with OrSelector + testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, + ExecConstants.RM_QUERY_TAGS_KEY, "medium", OptionValue.OptionScope.SESSION); + when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption); + assertFalse(testSelector.isQuerySelected(mockContext)); + } + + @Test + public void testAndSelectorWithTwoValidSelector() throws Exception { + // setup complexSelectorValue + complexSelectorValue.add(tagSelectorConfig1); + complexSelectorValue.add(tagSelectorConfig2); + + // Test for AndSelector + ResourcePoolSelector testSelector = testAndCommonHelper(); + + // Test successful selection with AndSelector + OptionValue testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, + ExecConstants.RM_QUERY_TAGS_KEY, "small,large", OptionValue.OptionScope.SESSION); + when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption); + assertTrue(testSelector.isQuerySelected(mockContext)); + + // Test unsuccessful selection with AndSelector + testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, + ExecConstants.RM_QUERY_TAGS_KEY, "small", OptionValue.OptionScope.SESSION); + when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption); + assertFalse(testSelector.isQuerySelected(mockContext)); + } + + @Test + public void testAndSelectorWithNotEqualSelector() throws Exception { + // setup NotEqualSelector config + Map<String, Object> notEqualConfig = new HashMap<>(); + notEqualConfig.put("not_equal", tagSelectorConfig1); + + // setup complex selector value + complexSelectorValue.add(notEqualConfig); + complexSelectorValue.add(tagSelectorConfig2); + + // Test for AndSelector + ResourcePoolSelector testSelector = testAndCommonHelper(); + + // Test successful selection with AndSelector + OptionValue testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, + ExecConstants.RM_QUERY_TAGS_KEY, "large", OptionValue.OptionScope.SESSION); + when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption); + assertTrue(testSelector.isQuerySelected(mockContext)); + + // Test unsuccessful selection with AndSelector + testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, + ExecConstants.RM_QUERY_TAGS_KEY, "small", OptionValue.OptionScope.SESSION); + when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption); + assertFalse(testSelector.isQuerySelected(mockContext)); + } + + @Test + public void testORSelectorWithNotEqualSelector() throws Exception { + // setup NotEqualSelector config + Map<String, Object> notEqualConfig = new HashMap<>(); + notEqualConfig.put("not_equal", tagSelectorConfig1); + + // setup complex selector value + complexSelectorValue.add(notEqualConfig); + complexSelectorValue.add(tagSelectorConfig2); + + // Test for AndSelector + ResourcePoolSelector testSelector = testOrCommonHelper(); + + // Test successful selection with AndSelector + OptionValue testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, + ExecConstants.RM_QUERY_TAGS_KEY, "medium", OptionValue.OptionScope.SESSION); + when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption); + assertTrue(testSelector.isQuerySelected(mockContext)); + + testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, + ExecConstants.RM_QUERY_TAGS_KEY, "large", OptionValue.OptionScope.SESSION); + when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption); + assertTrue(testSelector.isQuerySelected(mockContext)); + + // Test unsuccessful selection with AndSelector + testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, + ExecConstants.RM_QUERY_TAGS_KEY, "small", OptionValue.OptionScope.SESSION); + when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption); + assertFalse(testSelector.isQuerySelected(mockContext)); + } + + @Test + public void testAndSelectorWithOrSelector() throws Exception { + // setup OrSelector config + List<Object> orConfigValue = new ArrayList<>(); + orConfigValue.add(tagSelectorConfig1); + orConfigValue.add(tagSelectorConfig2); + + Map<String, Object> orConfig = new HashMap<>(); + orConfig.put("or", orConfigValue); + + // get another TagSelector config + Map<String, String> tagSelectorConfig3 = new HashMap<>(); + tagSelectorConfig3.put("tag", "medium"); + + // setup complex selector value + complexSelectorValue.add(orConfig); + complexSelectorValue.add(tagSelectorConfig3); + + // Test for AndSelector + ResourcePoolSelector testSelector = testAndCommonHelper(); + + // Test successful selection with AndSelector + OptionValue testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, + ExecConstants.RM_QUERY_TAGS_KEY, "small,medium", OptionValue.OptionScope.SESSION); + when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption); + assertTrue(testSelector.isQuerySelected(mockContext)); + + testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, + ExecConstants.RM_QUERY_TAGS_KEY, "large,medium", OptionValue.OptionScope.SESSION); + when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption); + assertTrue(testSelector.isQuerySelected(mockContext)); + + // Test unsuccessful selection with AndSelector + testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, + ExecConstants.RM_QUERY_TAGS_KEY, "small,verylarge", OptionValue.OptionScope.SESSION); + when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption); + assertFalse(testSelector.isQuerySelected(mockContext)); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestNotEqualSelector.java b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestNotEqualSelector.java new file mode 100644 index 000000000..b3053aa22 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestNotEqualSelector.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config.selectors; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; +import org.apache.drill.categories.ResourceManagerTest; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException; +import org.apache.drill.exec.server.options.OptionValue; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@Category(ResourceManagerTest.class) +public final class TestNotEqualSelector { + + private ResourcePoolSelector testCommonHelper(Map<String, ? extends Object> selectorValue) throws RMConfigException { + Config testConfig = ConfigFactory.empty() + .withValue("not_equal", ConfigValueFactory.fromMap(selectorValue)); + final ResourcePoolSelector testSelector = ResourcePoolSelectorFactory.createSelector(testConfig); + assertTrue("TestSelector is not a not_equal selector", testSelector instanceof NotEqualSelector); + return testSelector; + } + + @Test + public void testValidNotEqualTagSelector() throws Exception { + Map<String, String> tagSelectorConfig = new HashMap<>(); + tagSelectorConfig.put("tag", "marketing"); + NotEqualSelector testSelector = (NotEqualSelector)testCommonHelper(tagSelectorConfig); + assertTrue("Expected child selector type to be TagSelector", + testSelector.getPoolSelector() instanceof TagSelector); + + QueryContext mockContext = mock(QueryContext.class); + + // Test successful selection + OptionValue testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, ExecConstants + .RM_QUERY_TAGS_KEY, "small,large", OptionValue.OptionScope.SESSION); + when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption); + assertTrue(testSelector.isQuerySelected(mockContext)); + + // Test unsuccessful selection + testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, ExecConstants + .RM_QUERY_TAGS_KEY, "marketing", OptionValue.OptionScope.SESSION); + when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption); + assertFalse(testSelector.isQuerySelected(mockContext)); + } + + @Test + public void testValidNotEqualAclSelector() throws Exception { + Map<String, List<String>> aclSelectorValue = new HashMap<>(); + List<String> usersValue = new ArrayList<>(); + usersValue.add("user1"); + usersValue.add("user2"); + + List<String> groupsValue = new ArrayList<>(); + groupsValue.add("group1"); + groupsValue.add("group2"); + + aclSelectorValue.put("users", usersValue); + aclSelectorValue.put("groups", groupsValue); + + Map<String, Map<String, List<String>>> aclSelectorConfig = new HashMap<>(); + aclSelectorConfig.put("acl", aclSelectorValue); + NotEqualSelector testSelector = (NotEqualSelector)testCommonHelper(aclSelectorConfig); + assertTrue("Expected child selector type to be TagSelector", + testSelector.getPoolSelector() instanceof AclSelector); + } + + @Test(expected = RMConfigException.class) + public void testInValidNotEqualAclSelector() throws Exception { + Map<String, List<String>> aclSelectorValue = new HashMap<>(); + + aclSelectorValue.put("users", new ArrayList<>()); + aclSelectorValue.put("groups", new ArrayList<>()); + + Map<String, Map<String, List<String>>> aclSelectorConfig = new HashMap<>(); + aclSelectorConfig.put("acl", aclSelectorValue); + testCommonHelper(aclSelectorConfig); + } + + @Test(expected = RMConfigException.class) + public void testNotEqualSelectorEmptyValue() throws Exception { + Map<String, String> notEqualSelectorValue = new HashMap<>(); + testCommonHelper(notEqualSelectorValue); + } + + @Test(expected = RMConfigException.class) + public void testNotEqualSelectorStringValue() throws Exception { + Config testConfig = ConfigFactory.empty() + .withValue("not_equal", ConfigValueFactory.fromAnyRef("null")); + ResourcePoolSelectorFactory.createSelector(testConfig); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestResourcePoolSelectors.java b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestResourcePoolSelectors.java new file mode 100644 index 000000000..0a2d121c9 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestResourcePoolSelectors.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config.selectors; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import org.apache.drill.categories.ResourceManagerTest; +import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertTrue; + +@Category(ResourceManagerTest.class) +public class TestResourcePoolSelectors { + + @Test + public void testNullSelectorConfig() throws Exception { + final ResourcePoolSelector testSelector = ResourcePoolSelectorFactory.createSelector(null); + assertTrue("TestSelector with null config is not of type Default Selector", + testSelector instanceof DefaultSelector); + assertTrue("DefaultSelector type is not default", + testSelector.getSelectorType() == ResourcePoolSelector.SelectorType.DEFAULT); + } + + @Test(expected = RMConfigException.class) + public void testEmptySelectorConfig() throws Exception { + Config testConfig = ConfigFactory.empty(); + ResourcePoolSelectorFactory.createSelector(testConfig); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestTagSelector.java b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestTagSelector.java new file mode 100644 index 000000000..d93731513 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/resourcemgr/config/selectors/TestTagSelector.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.resourcemgr.config.selectors; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; +import org.apache.drill.categories.ResourceManagerTest; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.resourcemgr.config.exception.RMConfigException; +import org.apache.drill.exec.server.options.OptionValue; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@Category(ResourceManagerTest.class) +public final class TestTagSelector { + + private ResourcePoolSelector testCommonHelper(Object tagValue) throws RMConfigException { + Config testConfig = ConfigFactory.empty() + .withValue("tag", ConfigValueFactory.fromAnyRef(tagValue)); + final ResourcePoolSelector testSelector = ResourcePoolSelectorFactory.createSelector(testConfig); + assertTrue("TestSelector is not a tag selector", testSelector instanceof TagSelector); + assertEquals("Unexpected value for tag selector", tagValue, ((TagSelector) testSelector).getTagValue()); + return testSelector; + } + + // Tests for TagSelector + @Test + public void testValidTagSelector() throws Exception { + testCommonHelper("marketing"); + } + + @Test(expected = RMConfigException.class) + public void testInValidTagSelector() throws Exception { + testCommonHelper(""); + testCommonHelper(null); + } + + @Test + public void testTagSelectorWithQueryTags() throws Exception { + QueryContext mockContext = mock(QueryContext.class); + TagSelector testSelector = (TagSelector) testCommonHelper("small"); + + // Test successful selection + OptionValue testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, ExecConstants + .RM_QUERY_TAGS_KEY, "small,large", OptionValue.OptionScope.SESSION); + when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption); + assertTrue(testSelector.isQuerySelected(mockContext)); + + // Test empty query tags + testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, ExecConstants + .RM_QUERY_TAGS_KEY, "", OptionValue.OptionScope.SESSION); + when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption); + assertFalse(testSelector.isQuerySelected(mockContext)); + + // Test different query tags + testOption = OptionValue.create(OptionValue.AccessibleScopes.SESSION_AND_QUERY, ExecConstants + .RM_QUERY_TAGS_KEY, "medium", OptionValue.OptionScope.SESSION); + when(mockContext.getOption(ExecConstants.RM_QUERY_TAGS_KEY)).thenReturn(testOption); + assertFalse(testSelector.isQuerySelected(mockContext)); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java b/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java index 16ac42e27..dd9da2219 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/udf/dynamic/TestDynamicUDFSupport.java @@ -23,7 +23,7 @@ import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.drill.categories.SlowTest; import org.apache.drill.categories.SqlFunctionTest; -import org.apache.drill.common.config.CommonConstants; +import org.apache.drill.common.config.ConfigConstants; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.UserRemoteException; import org.apache.drill.exec.ExecConstants; @@ -205,7 +205,7 @@ public class TestDynamicUDFSupport extends BaseTestQuery { .unOrdered() .baselineColumns("ok", "summary") .baselineValues(false, String.format(summary, - CommonConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME, jar)) + ConfigConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME, jar)) .go(); } |