diff options
author | Vitalii Diravka <vitalii.diravka@gmail.com> | 2018-07-03 20:23:03 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-07-03 20:23:03 +0300 |
commit | 0ae7035de390c699f574d1ec25d45cd8b20a8b94 (patch) | |
tree | 3012eafa62d188030069d235649bce2c7282c674 | |
parent | 069c3049f1a500e5ae0b47caeebc5856ab182b73 (diff) |
DRILL-6494: Drill Plugins Handler
- Storage Plugins Handler service is used op the Drill start-up stage and it updates storage plugins configs from
storage-plugins-override.conf file. If plugins configs are present in the persistence store - they are updated,
otherwise bootstrap plugins are updated and the result configs are loaded to persistence store. If the enabled
status is absent in the storage-plugins-override.conf file, the last plugin config enabled status persists.
- 'drill.exec.storage.action_on_plugins_override_file' Boot option is added. This is the action, which should be
performed on the storage-plugins-override.conf file after successful updating storage plugins configs.
Possible values are: "none" (default), "rename" and "remove".
- The "NULL" issue with updating Hive plugin config by REST is solved. But clients are still being instantiated for disabled
plugins - DRILL-6412.
- "org.honton.chas.hocon:jackson-dataformat-hocon" library is added for the proper deserializing HOCON conf file
- additional refactoring: "com.typesafe:config" and "org.apache.commons:commons-lang3" are placed into DependencyManagement
block with proper versions; correct properties for metrics in "drill-override-example.conf" are specified
closes #1345
35 files changed, 680 insertions, 201 deletions
diff --git a/common/pom.xml b/common/pom.xml index a7fba2bf1..a8cab1075 100644 --- a/common/pom.xml +++ b/common/pom.xml @@ -53,13 +53,11 @@ <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> - <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> - <version>3.1</version> </dependency> <dependency> diff --git a/common/src/main/java/org/apache/drill/common/config/CommonConstants.java b/common/src/main/java/org/apache/drill/common/config/CommonConstants.java index 1b5fb29c0..e203972b8 100644 --- a/common/src/main/java/org/apache/drill/common/config/CommonConstants.java +++ b/common/src/main/java/org/apache/drill/common/config/CommonConstants.java @@ -31,4 +31,7 @@ public interface CommonConstants { /** Override configuration file name. (Classpath resource pathname.) */ String CONFIG_OVERRIDE_RESOURCE_PATHNAME = "drill-override.conf"; + /** Override plugins configs file name. (Classpath resource pathname.) */ + String STORAGE_PLUGINS_OVERRIDE_CONF = "storage-plugins-override.conf"; + } diff --git a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java index 66058643d..7211f1936 100644 --- a/common/src/main/java/org/apache/drill/common/config/DrillConfig.java +++ b/common/src/main/java/org/apache/drill/common/config/DrillConfig.java @@ -261,7 +261,7 @@ public class DrillConfig extends NestedConfig { final String className = getString(location); if (className == null) { throw new DrillConfigurationException(String.format( - "No class defined at location '%s'. Expected a definition of the class []", + "No class defined at location '%s'. Expected a definition of the class [%s]", location, clazz.getCanonicalName())); } diff --git a/common/src/main/java/org/apache/drill/common/scanner/ClassPathScanner.java b/common/src/main/java/org/apache/drill/common/scanner/ClassPathScanner.java index 13a5eade9..909e8110d 100644 --- a/common/src/main/java/org/apache/drill/common/scanner/ClassPathScanner.java +++ b/common/src/main/java/org/apache/drill/common/scanner/ClassPathScanner.java @@ -51,7 +51,6 @@ import org.reflections.util.FilterBuilder; import com.google.common.base.Stopwatch; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; import javassist.bytecode.AccessFlag; import javassist.bytecode.AnnotationsAttribute; @@ -320,15 +319,12 @@ public final class ClassPathScanner { * to scan for (relative to specified class loaders' classpath roots) * @param returnRootPathname whether to collect classpath root portion of * URL for each resource instead of full URL of each resource - * @param classLoaders set of class loaders in which to look up resource; - * none (empty array) to specify to use current thread's context - * class loader and {@link Reflections}'s class loader * @returns ...; empty set if none */ public static Set<URL> forResource(final String resourcePathname, final boolean returnRootPathname) { logger.debug("Scanning classpath for resources with pathname \"{}\".", resourcePathname); - final Set<URL> resultUrlSet = Sets.newHashSet(); + final Set<URL> resultUrlSet = new HashSet<>(); final ClassLoader classLoader = ClassPathScanner.class.getClassLoader(); try { final Enumeration<URL> resourceUrls = classLoader.getResources(resourcePathname); diff --git a/common/src/main/java/org/apache/drill/exec/util/ActionOnFile.java b/common/src/main/java/org/apache/drill/exec/util/ActionOnFile.java new file mode 100644 index 000000000..cca1e771d --- /dev/null +++ b/common/src/main/java/org/apache/drill/exec/util/ActionOnFile.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.util; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * It defines possible actions on the file and performs the necessary action + */ +public enum ActionOnFile { + + /** + * No action will be performed + */ + NONE { + @Override + public void action(URL url) { } + }, + + /** + * Rename the file by adding current timestamp value with "yyyyMMdd_HHmmss" format before last dot of original file name<p> + * Example:<br> + * Original file name: "storage-plugins-override.conf"<br> + * New file name: "storage-plugins-override-20180703_033354.conf" + */ + RENAME { + @Override + public void action(URL url) { + String fileName = url.getFile(); + File file = new File(url.getPath()); + String currentDateTime = new SimpleDateFormat("yyyyMMdd_HHmmss").format(new Date()); + String newFileName = new StringBuilder(fileName) + .insert(fileName.lastIndexOf("."), "-" + currentDateTime) + .toString(); + Path filePath = file.toPath(); + try { + Files.move(filePath, filePath.resolveSibling(newFileName)); + } catch (IOException e) { + logger.error("There was an error during file {} rename.", fileName, e); + } + } + }, + + /** + * It removes the file + */ + REMOVE { + @Override + public void action(URL url) { + File file = new File(url.getPath()); + try { + Files.delete(file.toPath()); + } catch (IOException e) { + logger.error("There was an error during file {} removing.", url.getFile(), e); + } + } + }; + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ActionOnFile.class); + + /** + * This is an action which should be performed on the file + * @param url the file URL + */ + public abstract void action(URL url); +} diff --git a/contrib/storage-hbase/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-hbase/src/main/resources/bootstrap-storage-plugins.json index 3e0e8c041..530a407c8 100644 --- a/contrib/storage-hbase/src/main/resources/bootstrap-storage-plugins.json +++ b/contrib/storage-hbase/src/main/resources/bootstrap-storage-plugins.json @@ -2,11 +2,11 @@ "storage":{ hbase : { type:"hbase", - enabled: false, config : { "hbase.zookeeper.quorum" : "localhost", "hbase.zookeeper.property.clientPort" : 2181 - } + }, + enabled: false } } } diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java index d3115b8a6..e3cb3a2dd 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/schema/HiveSchemaFactory.java @@ -27,7 +27,6 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; import org.apache.calcite.schema.SchemaPlus; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; @@ -72,6 +71,7 @@ public class HiveSchemaFactory implements SchemaFactory { isDrillImpersonationEnabled = plugin.getContext().getConfig().getBoolean(ExecConstants.IMPERSONATION_ENABLED); try { + // TODO: DRILL-6412. Clients for plugin should be instantiated only for the case, when plugin is enabled processUserMetastoreClient = DrillHiveMetaStoreClient.createCloseableClientWithCaching(hiveConf); } catch (MetaException e) { @@ -82,12 +82,9 @@ public class HiveSchemaFactory implements SchemaFactory { .newBuilder() .expireAfterAccess(10, TimeUnit.MINUTES) .maximumSize(5) // Up to 5 clients for impersonation-enabled. - .removalListener(new RemovalListener<String, DrillHiveMetaStoreClient>() { - @Override - public void onRemoval(RemovalNotification<String, DrillHiveMetaStoreClient> notification) { - DrillHiveMetaStoreClient client = notification.getValue(); - client.close(); - } + .removalListener((RemovalListener<String, DrillHiveMetaStoreClient>) notification -> { + DrillHiveMetaStoreClient client = notification.getValue(); + client.close(); }) .build(new CacheLoader<String, DrillHiveMetaStoreClient>() { @Override diff --git a/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json index 5c7174e25..d06220fe7 100644 --- a/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json +++ b/contrib/storage-hive/core/src/main/resources/bootstrap-storage-plugins.json @@ -2,14 +2,16 @@ "storage":{ hive : { type:"hive", - enabled: false, config : { "hive.metastore.uris" : "", "javax.jdo.option.ConnectionURL" : "jdbc:derby:;databaseName=../sample-data/drill_hive_db;create=true", "hive.metastore.warehouse.dir" : "/tmp/drill_hive_wh", "fs.default.name" : "file:///", - "hive.metastore.sasl.enabled" : "false" - } + "hive.metastore.sasl.enabled" : "false", + "hive.metastore.schema.verification": "false", + "datanucleus.schema.autoCreateAll": "true" + }, + enabled: false } } } diff --git a/contrib/storage-jdbc/src/test/resources/bootstrap-storage-plugins.json b/contrib/storage-jdbc/src/test/resources/bootstrap-storage-plugins.json index add980847..4018d9247 100755 --- a/contrib/storage-jdbc/src/test/resources/bootstrap-storage-plugins.json +++ b/contrib/storage-jdbc/src/test/resources/bootstrap-storage-plugins.json @@ -2,15 +2,16 @@ "storage" : { derby : { type : "jdbc", - enabled : true, driver : "org.apache.derby.jdbc.ClientDriver", - url : "jdbc:derby://localhost:${derby.reserved.port}/memory:${derby.database.name};user=root;password=root" + url : "jdbc:derby://localhost:${derby.reserved.port}/memory:${derby.database.name};user=root;password=root", + enabled : true }, mysql : { type : "jdbc", enabled : true, driver : "com.mysql.jdbc.Driver", - url : "jdbc:mysql://localhost:${mysql.reserved.port}/${mysql.database.name}?user=root&password=root&useJDBCCompliantTimezoneShift=true" + url : "jdbc:mysql://localhost:${mysql.reserved.port}/${mysql.database.name}?user=root&password=root&useJDBCCompliantTimezoneShift=true", + enabled : true } } } diff --git a/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json index 406c03060..18a1df564 100644 --- a/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json +++ b/contrib/storage-kafka/src/main/resources/bootstrap-storage-plugins.json @@ -2,8 +2,8 @@ "storage":{ kafka : { type:"kafka", - enabled: false, - kafkaConsumerProps: {"bootstrap.servers":"localhost:9092", "group.id" : "drill-consumer"} + kafkaConsumerProps: {"bootstrap.servers":"localhost:9092", "group.id" : "drill-consumer"}, + enabled: false } } } diff --git a/contrib/storage-mongo/src/main/resources/bootstrap-storage-plugins.json b/contrib/storage-mongo/src/main/resources/bootstrap-storage-plugins.json index b7d34f2fb..9983596d3 100644 --- a/contrib/storage-mongo/src/main/resources/bootstrap-storage-plugins.json +++ b/contrib/storage-mongo/src/main/resources/bootstrap-storage-plugins.json @@ -2,8 +2,8 @@ "storage":{ mongo : { type:"mongo", - enabled: false, - connection:"mongodb://localhost:27017/" + connection:"mongodb://localhost:27017/", + enabled: false } } } diff --git a/distribution/src/assemble/bin.xml b/distribution/src/assemble/bin.xml index e044801f9..712f3ec59 100644 --- a/distribution/src/assemble/bin.xml +++ b/distribution/src/assemble/bin.xml @@ -420,6 +420,11 @@ <source>src/resources/drill-on-yarn-example.conf</source> <outputDirectory>conf</outputDirectory> <fileMode>0640</fileMode> - </file> + </file> + <file> + <source>src/resources/storage-plugins-override-example.conf</source> + <outputDirectory>conf</outputDirectory> + <fileMode>0640</fileMode> + </file> </files> </assembly> diff --git a/distribution/src/resources/drill-override-example.conf b/distribution/src/resources/drill-override-example.conf index fa2e39555..296cd8b60 100644 --- a/distribution/src/resources/drill-override-example.conf +++ b/distribution/src/resources/drill-override-example.conf @@ -58,17 +58,10 @@ drill.exec: { batch.size: 4000 }, partition.column.label: "dir" - } - }, - metrics : { - context: "drillbit", - jmx: { - enabled : true }, - log: { - enabled : false, - interval : 60 - } + # The action on the storage-plugins-override.conf after it's use. + # Possible values are "none" (default), "rename", "remove" + action_on_plugins_override_file: "none" }, zk: { connect: "localhost:2181", @@ -252,6 +245,15 @@ drill.exec: { #ssl provider. May be "JDK" or "OPENSSL". Default is "JDK" provider: "JDK" } -} - +}, +drill.metrics : { + context: "drillbit", + jmx: { + enabled : true + }, + log: { + enabled : false, + interval : 60 + } +} diff --git a/distribution/src/resources/storage-plugins-override-example.conf b/distribution/src/resources/storage-plugins-override-example.conf new file mode 100644 index 000000000..360ba2c0a --- /dev/null +++ b/distribution/src/resources/storage-plugins-override-example.conf @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This file involves storage plugins configs, which can be updated on the Drill start-up. +# This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md for more information. + + "storage":{ + cp: { + type: "file", + connection: "classpath:///", + formats: { + "csv" : { + type: "text", + extensions: [ "csv" ], + delimiter: "," + } + } + } + } + "storage":{ + dfs: { + type: "file", + connection: "hdfs:///", + workspaces: { + "root": { + "location": "/", + "writable": false, + "defaultInputFormat": null, + "allowAccessOutsideWorkspace": false + } + }, + formats: { + "parquet": { + "type": "parquet" + } + }, + enabled: false + } + } + "storage":{ + mongo : { + type:"mongo", + connection:"mongodb://test_host:27017/", + enabled: true + } + } + "storage": { + openTSDB: { + type: "openTSDB", + connection: "http://localhost:8888", + enabled: true + } + } diff --git a/drill-yarn/pom.xml b/drill-yarn/pom.xml index 6bad97a3d..b6ada46b2 100644 --- a/drill-yarn/pom.xml +++ b/drill-yarn/pom.xml @@ -106,7 +106,6 @@ <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> - <version>1.0.0</version> </dependency> <!-- Logging --> diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml index 7701e7616..94c345953 100644 --- a/exec/java-exec/pom.xml +++ b/exec/java-exec/pom.xml @@ -206,6 +206,10 @@ <version>${jackson.version}</version> </dependency> <dependency> + <groupId>org.honton.chas.hocon</groupId> + <artifactId>jackson-dataformat-hocon</artifactId> + </dependency> + <dependency> <groupId>org.glassfish.jersey.ext</groupId> <artifactId>jersey-mvc-freemarker</artifactId> <version>2.8</version> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java index d23485422..1493a923c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/StoragePlugins.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.config.LogicalPlanPersistence; @@ -35,7 +36,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Charsets; import com.google.common.io.Resources; -public class StoragePlugins implements Iterable<Map.Entry<String, StoragePluginConfig>>{ +public class StoragePlugins implements Iterable<Map.Entry<String, StoragePluginConfig>> { private Map<String, StoragePluginConfig> storage; @@ -95,4 +96,45 @@ public class StoragePlugins implements Iterable<Map.Entry<String, StoragePluginC return storage.equals(((StoragePlugins) obj).getStorage()); } + /** + * Put one plugin into current storage plugins map + * + * @param name storage plugin name + * @param config storage plugin config + */ + public void put(String name, StoragePluginConfig config) { + storage.put(name, config); + } + + /** + * Put other storage plugins into current storage plugins map + * + * @param plugins storage plugins + */ + public void putAll(StoragePlugins plugins) { + Optional.ofNullable(plugins) + .ifPresent(p -> storage.putAll(p.getStorage())); + } + + /** + * Put one plugin into current storage plugins map, if it was absent + * + * @param name storage plugin name + * @param config storage plugin config + * @return the previous storage plugin config, null if it was absent or it had null value + */ + public StoragePluginConfig putIfAbsent(String name, StoragePluginConfig config) { + return storage.putIfAbsent(name, config); + } + + /** + * Return storage plugin config for certain plugin name + * + * @param pluginName storage plugin name + * @return storage plugin config + */ + public StoragePluginConfig getConfig(String pluginName) { + return storage.get(pluginName); + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java index b6f839ba8..ee55d9ed5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java @@ -100,6 +100,7 @@ public class StorageResources { @Produces(MediaType.APPLICATION_JSON) public PluginConfigWrapper getStoragePluginJSON(@PathParam("name") String name) { try { + // TODO: DRILL-6412: No need to get StoragePlugin. It is enough to have plugin name and config here StoragePlugin plugin = storage.getPlugin(name); if (plugin != null) { return new PluginConfigWrapper(name, plugin.getConfig()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/NamedStoragePluginConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/NamedStoragePluginConfig.java index 9b2c92871..41db97b3e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/NamedStoragePluginConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/NamedStoragePluginConfig.java @@ -17,22 +17,53 @@ */ package org.apache.drill.exec.store; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.drill.common.logical.StoragePluginConfig; import com.fasterxml.jackson.annotation.JsonTypeName; -@JsonTypeName("named") +@JsonTypeName(NamedStoragePluginConfig.NAME) public class NamedStoragePluginConfig extends StoragePluginConfig { - public String name; + + public static final String NAME = "named"; + + private final String name; + + @JsonCreator + public NamedStoragePluginConfig(@JsonProperty("name") String name) { + this.name = name; + } + + public String getName() { + return name; + } @Override - public boolean equals(Object o) { - return this == o; + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + NamedStoragePluginConfig other = (NamedStoragePluginConfig) obj; + if (name == null) { + return other.name == null; + } else { + return name.equals(other.name); + } } @Override public int hashCode() { - return name.hashCode(); + final int prime = 31; + int result = 1; + result = prime * result + (name == null ? 0 : name.hashCode()); + return result; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java index 28775c85c..582791e73 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginMap.java @@ -21,13 +21,14 @@ import java.util.Collection; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.logical.StoragePluginConfig; import com.google.common.collect.LinkedListMultimap; -import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; @@ -40,7 +41,7 @@ import com.google.common.collect.Multimaps; class StoragePluginMap implements Iterable<Entry<String, StoragePlugin>>, AutoCloseable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginMap.class); - private final ConcurrentMap<String, StoragePlugin> nameMap = Maps.newConcurrentMap(); + private final ConcurrentMap<String, StoragePlugin> nameMap = new ConcurrentHashMap<>(); @SuppressWarnings("unchecked") private final Multimap<StoragePluginConfig, StoragePlugin> configMap = @@ -111,7 +112,12 @@ class StoragePluginMap implements Iterable<Entry<String, StoragePlugin>>, AutoCl return nameMap.entrySet().iterator(); } - public Iterable<String> names() { + /** + * Returns set of plugin names of this {@link StoragePluginMap} + * + * @return plugin names + */ + public Set<String> getNames() { return nameMap.keySet(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java index 82f18f8d5..313d3b985 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistry.java @@ -27,85 +27,84 @@ import org.apache.drill.exec.store.dfs.FormatPlugin; import org.apache.drill.exec.store.sys.PersistentStore; public interface StoragePluginRegistry extends Iterable<Map.Entry<String, StoragePlugin>>, AutoCloseable { - final String SYS_PLUGIN = "sys"; - final String INFORMATION_SCHEMA_PLUGIN = "INFORMATION_SCHEMA"; - final String STORAGE_PLUGIN_REGISTRY_IMPL = "drill.exec.storage.registry"; - final String PSTORE_NAME = "sys.storage_plugins"; + String SYS_PLUGIN = "sys"; + String INFORMATION_SCHEMA_PLUGIN = "INFORMATION_SCHEMA"; + String STORAGE_PLUGIN_REGISTRY_IMPL = "drill.exec.storage.registry"; + String ACTION_ON_STORAGE_PLUGINS_OVERRIDE_FILE = "drill.exec.storage.action_on_plugins_override_file"; + String PSTORE_NAME = "sys.storage_plugins"; /** * Initialize the storage plugin registry. Must be called before the registry is used. * - * @throws DrillbitStartupException + * @throws DrillbitStartupException if drillbit startup fails */ void init() throws DrillbitStartupException; /** * Delete a plugin by name - * @param name - * The name of the storage plugin to delete. + * + * @param name The name of the storage plugin to delete. */ void deletePlugin(String name); /** * Create a plugin by name and configuration. If the plugin already exists, update the plugin - * @param name - * The name of the plugin - * @param config - * The plugin configuration - * @param persist - * Whether to persist the plugin for later use or treat it as ephemeral. + * + * @param name The name of the plugin + * @param config The plugin configuration + * @param persist Whether to persist the plugin for later use or treat it as ephemeral. * @return The StoragePlugin instance. - * @throws ExecutionSetupException + * @throws ExecutionSetupException if plugin cannot be created */ StoragePlugin createOrUpdate(String name, StoragePluginConfig config, boolean persist) throws ExecutionSetupException; /** * Get a plugin by name. Create it based on the PStore saved definition if it doesn't exist. - * @param name - * The name of the plugin + * + * @param name The name of the plugin * @return The StoragePlugin instance. - * @throws ExecutionSetupException + * @throws ExecutionSetupException if plugin cannot be obtained */ StoragePlugin getPlugin(String name) throws ExecutionSetupException; /** * Get a plugin by configuration. If it doesn't exist, create it. - * @param config - * The configuration for the plugin. + * + * @param config The configuration for the plugin. * @return The StoragePlugin instance. - * @throws ExecutionSetupException + * @throws ExecutionSetupException if plugin cannot be obtained */ StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException; /** * Add a plugin to the registry using the provided name. * - * @param name - * @param plugin + * @param name The name of the plugin + * @param plugin The StoragePlugin instance */ - void addPlugin(String name, StoragePlugin plugin); + void addEnabledPlugin(String name, StoragePlugin plugin); /** * Get the Format plugin for the FileSystemPlugin associated with the provided storage config and format config. * - * @param storageConfig - * The storage config for the associated FileSystemPlugin - * @param formatConfig - * The format config for the associated FormatPlugin - * @return A FormatPlugin - * @throws ExecutionSetupException + * @param storageConfig The storage config for the associated FileSystemPlugin + * @param formatConfig The format config for the associated FormatPlugin + * @return A FormatPlugin instance + * @throws ExecutionSetupException if plugin cannot be obtained */ FormatPlugin getFormatPlugin(StoragePluginConfig storageConfig, FormatPluginConfig formatConfig) throws ExecutionSetupException; /** * Get the PStore for this StoragePluginRegistry. (Used in the management layer.) + * * @return PStore for StoragePlugin configuration objects. */ PersistentStore<StoragePluginConfig> getStore(); /** * Get the Schema factory associated with this storage plugin registry. + * * @return A SchemaFactory that can register the schemas associated with this plugin registry. */ SchemaFactory getSchemaFactory(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java index f2edf5e4b..8e5fba449 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginRegistryImpl.java @@ -23,9 +23,11 @@ import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.net.URL; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -34,6 +36,7 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import com.google.common.annotations.VisibleForTesting; import org.apache.calcite.schema.SchemaPlus; import org.apache.drill.common.config.LogicalPlanPersistence; import org.apache.drill.common.exceptions.DrillRuntimeException; @@ -43,7 +46,6 @@ import org.apache.drill.common.logical.StoragePluginConfig; import org.apache.drill.common.scanner.ClassPathScanner; import org.apache.drill.common.scanner.persistence.ScanResult; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.exception.StoreException; import org.apache.drill.exec.planner.logical.StoragePlugins; import org.apache.drill.exec.server.DrillbitContext; @@ -63,17 +65,13 @@ import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import com.google.common.io.Resources; public class StoragePluginRegistryImpl implements StoragePluginRegistry { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginRegistryImpl.class); private Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = Collections.emptyMap(); - private final StoragePluginMap plugins = new StoragePluginMap(); + private final StoragePluginMap enabledPlugins = new StoragePluginMap(); private DrillbitContext context; private final DrillSchemaFactory schemaFactory = new DrillSchemaFactory(); @@ -87,11 +85,11 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { this.lpPersistence = checkNotNull(context.getLpPersistence()); this.classpathScan = checkNotNull(context.getClasspathScan()); try { - this.pluginSystemTable = context // - .getStoreProvider() // - .getOrCreateStore(PersistentStoreConfig // - .newJacksonBuilder(lpPersistence.getMapper(), StoragePluginConfig.class) // - .name(PSTORE_NAME) // + this.pluginSystemTable = context + .getStoreProvider() + .getOrCreateStore(PersistentStoreConfig + .newJacksonBuilder(lpPersistence.getMapper(), StoragePluginConfig.class) + .name(PSTORE_NAME) .build()); } catch (StoreException | RuntimeException e) { logger.error("Failure while loading storage plugin registry.", e); @@ -101,12 +99,8 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { ephemeralPlugins = CacheBuilder.newBuilder() .expireAfterAccess(24, TimeUnit.HOURS) .maximumSize(250) - .removalListener(new RemovalListener<StoragePluginConfig, StoragePlugin>() { - @Override - public void onRemoval(RemovalNotification<StoragePluginConfig, StoragePlugin> notification) { - closePlugin(notification.getValue()); - } - }) + .removalListener( + (RemovalListener<StoragePluginConfig, StoragePlugin>) notification -> closePlugin(notification.getValue())) .build(new CacheLoader<StoragePluginConfig, StoragePlugin>() { @Override public StoragePlugin load(StoragePluginConfig config) throws Exception { @@ -121,69 +115,82 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { } @Override - public void init() throws DrillbitStartupException { + public void init() { availablePlugins = findAvailablePlugins(classpathScan); + try { + StoragePlugins bootstrapPlugins = pluginSystemTable.getAll().hasNext() ? null : loadBootstrapPlugins(); + + StoragePluginsHandler storagePluginsHandler = new StoragePluginsHandlerService(context); + storagePluginsHandler.loadPlugins(pluginSystemTable, bootstrapPlugins); - // create registered plugins defined in "storage-plugins.json" - plugins.putAll(createPlugins()); + defineEnabledPlugins(); + } catch (IOException e) { + logger.error("Failure setting up storage enabledPlugins. Drillbit exiting.", e); + throw new IllegalStateException(e); + } } - @SuppressWarnings("resource") - private Map<String, StoragePlugin> createPlugins() throws DrillbitStartupException { - try { - /* - * Check if the storage plugins system table has any entries. If not, load the boostrap-storage-plugin file into - * the system table. - */ - if (!pluginSystemTable.getAll().hasNext()) { - // bootstrap load the config since no plugins are stored. - logger.info("No storage plugin instances configured in persistent store, loading bootstrap configuration."); - Collection<URL> urls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE, false); - if (urls != null && !urls.isEmpty()) { - logger.info("Loading the storage plugin configs from URLs {}.", urls); - Map<String, URL> pluginURLMap = Maps.newHashMap(); - for (URL url : urls) { - String pluginsData = Resources.toString(url, Charsets.UTF_8); - StoragePlugins plugins = lpPersistence.getMapper().readValue(pluginsData, StoragePlugins.class); - for (Map.Entry<String, StoragePluginConfig> config : plugins) { - if (!definePluginConfig(config.getKey(), config.getValue())) { - logger.warn("Duplicate plugin instance '{}' defined in [{}, {}], ignoring the later one.", - config.getKey(), pluginURLMap.get(config.getKey()), url); - continue; - } - pluginURLMap.put(config.getKey(), url); - } + /** + * Read bootstrap storage plugins {@link ExecConstants#BOOTSTRAP_STORAGE_PLUGINS_FILE} files for the first fresh + * instantiating of Drill + * + * @return bootstrap storage plugins + * @throws IOException if a read error occurs + */ + private StoragePlugins loadBootstrapPlugins() throws IOException { + // bootstrap load the config since no plugins are stored. + logger.info("No storage plugin instances configured in persistent store, loading bootstrap configuration."); + Set<URL> urls = ClassPathScanner.forResource(ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE, false); + if (urls != null && !urls.isEmpty()) { + logger.info("Loading the storage plugin configs from URLs {}.", urls); + StoragePlugins bootstrapPlugins = new StoragePlugins(new HashMap<>()); + Map<String, URL> pluginURLMap = new HashMap<>(); + for (URL url : urls) { + String pluginsData = Resources.toString(url, Charsets.UTF_8); + StoragePlugins plugins = lpPersistence.getMapper().readValue(pluginsData, StoragePlugins.class); + for (Entry<String, StoragePluginConfig> plugin : plugins) { + StoragePluginConfig oldPluginConfig = bootstrapPlugins.putIfAbsent(plugin.getKey(), plugin.getValue()); + if (oldPluginConfig != null) { + logger.warn("Duplicate plugin instance '{}' defined in [{}, {}], ignoring the later one.", + plugin.getKey(), pluginURLMap.get(plugin.getKey()), url); + } else { + pluginURLMap.put(plugin.getKey(), url); } - } else { - throw new IOException("Failure finding " + ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE); } } + return bootstrapPlugins; + } else { + throw new IOException("Failure finding " + ExecConstants.BOOTSTRAP_STORAGE_PLUGINS_FILE); + } + } - Map<String, StoragePlugin> activePlugins = new HashMap<String, StoragePlugin>(); - for (Map.Entry<String, StoragePluginConfig> entry : Lists.newArrayList(pluginSystemTable.getAll())) { - String name = entry.getKey(); - StoragePluginConfig config = entry.getValue(); - if (config.isEnabled()) { - try { - StoragePlugin plugin = create(name, config); - activePlugins.put(name, plugin); - } catch (ExecutionSetupException e) { - logger.error("Failure while setting up StoragePlugin with name: '{}', disabling.", name, e); - config.setEnabled(false); - pluginSystemTable.put(name, config); - } + /** + * It initializes {@link #enabledPlugins} with currently enabled plugins + */ + private void defineEnabledPlugins() { + Map<String, StoragePlugin> activePlugins = new HashMap<>(); + Iterator<Entry<String, StoragePluginConfig>> allPlugins = pluginSystemTable.getAll(); + while (allPlugins.hasNext()) { + Entry<String, StoragePluginConfig> plugin = allPlugins.next(); + String name = plugin.getKey(); + StoragePluginConfig config = plugin.getValue(); + if (config.isEnabled()) { + try { + StoragePlugin storagePlugin = create(name, config); + activePlugins.put(name, storagePlugin); + } catch (ExecutionSetupException e) { + logger.error("Failure while setting up StoragePlugin with name: '{}', disabling.", name, e); + config.setEnabled(false); + pluginSystemTable.put(name, config); } } + } - activePlugins.put(INFORMATION_SCHEMA_PLUGIN, new InfoSchemaStoragePlugin(new InfoSchemaConfig(), context, - INFORMATION_SCHEMA_PLUGIN)); - activePlugins.put(SYS_PLUGIN, new SystemTablePlugin(SystemTablePluginConfig.INSTANCE, context, SYS_PLUGIN)); + activePlugins.put(INFORMATION_SCHEMA_PLUGIN, new InfoSchemaStoragePlugin(new InfoSchemaConfig(), context, + INFORMATION_SCHEMA_PLUGIN)); + activePlugins.put(SYS_PLUGIN, new SystemTablePlugin(SystemTablePluginConfig.INSTANCE, context, SYS_PLUGIN)); - return activePlugins; - } catch (IOException e) { - logger.error("Failure setting up storage plugins. Drillbit exiting.", e); - throw new IllegalStateException(e); - } + enabledPlugins.putAll(activePlugins); } /** @@ -194,25 +201,21 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { * @param config plugin config * @param plugin plugin implementation */ - - public void definePlugin(String name, StoragePluginConfig config, StoragePlugin plugin) { - addPlugin(name, plugin); - definePluginConfig(name, config); - } - - private boolean definePluginConfig(String name, StoragePluginConfig config) { - return pluginSystemTable.putIfAbsent(name, config); + @VisibleForTesting + public void addPluginToPersistentStoreIfAbsent(String name, StoragePluginConfig config, StoragePlugin plugin) { + addEnabledPlugin(name, plugin); + pluginSystemTable.putIfAbsent(name, config); } @Override - public void addPlugin(String name, StoragePlugin plugin) { - plugins.put(name, plugin); + public void addEnabledPlugin(String name, StoragePlugin plugin) { + enabledPlugins.put(name, plugin); } @Override public void deletePlugin(String name) { @SuppressWarnings("resource") - StoragePlugin plugin = plugins.remove(name); + StoragePlugin plugin = enabledPlugins.remove(name); closePlugin(plugin); pluginSystemTable.delete(name); } @@ -234,21 +237,21 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { public StoragePlugin createOrUpdate(String name, StoragePluginConfig config, boolean persist) throws ExecutionSetupException { for (;;) { - final StoragePlugin oldPlugin = plugins.get(name); + final StoragePlugin oldPlugin = enabledPlugins.get(name); final StoragePlugin newPlugin = create(name, config); boolean done = false; try { if (oldPlugin != null) { if (config.isEnabled()) { - done = plugins.replace(name, oldPlugin, newPlugin); + done = enabledPlugins.replace(name, oldPlugin, newPlugin); } else { - done = plugins.remove(name, oldPlugin); + done = enabledPlugins.remove(name, oldPlugin); } if (done) { closePlugin(oldPlugin); } } else if (config.isEnabled()) { - done = (null == plugins.putIfAbsent(name, newPlugin)); + done = (null == enabledPlugins.putIfAbsent(name, newPlugin)); } else { done = true; } @@ -270,7 +273,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { @Override public StoragePlugin getPlugin(String name) throws ExecutionSetupException { - StoragePlugin plugin = plugins.get(name); + StoragePlugin plugin = enabledPlugins.get(name); if (name.equals(SYS_PLUGIN) || name.equals(INFORMATION_SCHEMA_PLUGIN)) { return plugin; } @@ -279,7 +282,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { StoragePluginConfig config = this.pluginSystemTable.get(name); if (config == null) { if (plugin != null) { - plugins.remove(name); + enabledPlugins.remove(name); } return null; } else { @@ -296,10 +299,10 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { @Override public StoragePlugin getPlugin(StoragePluginConfig config) throws ExecutionSetupException { if (config instanceof NamedStoragePluginConfig) { - return getPlugin(((NamedStoragePluginConfig) config).name); + return getPlugin(((NamedStoragePluginConfig) config).getName()); } else { // try to lookup plugin by configuration - StoragePlugin plugin = plugins.get(config); + StoragePlugin plugin = enabledPlugins.get(config); if (plugin != null) { return plugin; } @@ -335,7 +338,9 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { } private StoragePlugin create(String name, StoragePluginConfig pluginConfig) throws ExecutionSetupException { - StoragePlugin plugin = null; + // TODO: DRILL-6412: clients for storage plugins shouldn't be created, if storage plugin is disabled + // Creating of the StoragePlugin leads to instantiating storage clients + StoragePlugin plugin; Constructor<? extends StoragePlugin> c = availablePlugins.get(pluginConfig.getClass()); if (c == null) { throw new ExecutionSetupException(String.format("Failure finding StoragePlugin constructor for config %s", @@ -358,7 +363,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { @Override public Iterator<Entry<String, StoragePlugin>> iterator() { - return plugins.iterator(); + return enabledPlugins.iterator(); } @Override @@ -374,13 +379,15 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { Stopwatch watch = Stopwatch.createStarted(); try { - Set<String> currentPluginNames = Sets.newHashSet(plugins.names()); - // iterate through the plugin instances in the persistence store adding + Set<String> currentPluginNames = new HashSet<>(enabledPlugins.getNames()); + // iterate through the plugin instances in the persistent store adding // any new ones and refreshing those whose configuration has changed - for (Map.Entry<String, StoragePluginConfig> config : Lists.newArrayList(pluginSystemTable.getAll())) { - if (config.getValue().isEnabled()) { - getPlugin(config.getKey()); - currentPluginNames.remove(config.getKey()); + Iterator<Entry<String, StoragePluginConfig>> allPlugins = pluginSystemTable.getAll(); + while (allPlugins.hasNext()) { + Entry<String, StoragePluginConfig> plugin = allPlugins.next(); + if (plugin.getValue().isEnabled()) { + getPlugin(plugin.getKey()); + currentPluginNames.remove(plugin.getKey()); } } // remove those which are no longer in the registry @@ -388,11 +395,11 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { if (pluginName.equals(SYS_PLUGIN) || pluginName.equals(INFORMATION_SCHEMA_PLUGIN)) { continue; } - plugins.remove(pluginName); + enabledPlugins.remove(pluginName); } // finally register schemas with the refreshed plugins - for (StoragePlugin plugin : plugins.plugins()) { + for (StoragePlugin plugin : enabledPlugins.plugins()) { plugin.registerSchemas(schemaConfig, parent); } } catch (ExecutionSetupException e) { @@ -424,7 +431,7 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { // -- "hivedb1" // -- "hive.default" // -- "hive.hivedb1" - List<SchemaPlus> secondLevelSchemas = Lists.newArrayList(); + List<SchemaPlus> secondLevelSchemas = new ArrayList<>(); for (String firstLevelSchemaName : parent.getSubSchemaNames()) { SchemaPlus firstLevelSchema = parent.getSubSchema(firstLevelSchemaName); for (String secondLevelSchemaName : firstLevelSchema.getSubSchemaNames()) { @@ -451,19 +458,18 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { @Override public synchronized void close() throws Exception { ephemeralPlugins.invalidateAll(); - plugins.close(); + enabledPlugins.close(); pluginSystemTable.close(); } /** * Get a list of all available storage plugin class constructors. - * @param classpathScan - * A classpath scan to use. + * @param classpathScan A classpath scan to use. * @return A Map of StoragePluginConfig => StoragePlugin.<init>() constructors. */ @SuppressWarnings("unchecked") public static Map<Object, Constructor<? extends StoragePlugin>> findAvailablePlugins(final ScanResult classpathScan) { - Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = new HashMap<Object, Constructor<? extends StoragePlugin>>(); + Map<Object, Constructor<? extends StoragePlugin>> availablePlugins = new HashMap<>(); final Collection<Class<? extends StoragePlugin>> pluginClasses = classpathScan.getImplementations(StoragePlugin.class); final String lineBrokenList = @@ -494,6 +500,4 @@ public class StoragePluginRegistryImpl implements StoragePluginRegistry { return availablePlugins; } - - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandler.java new file mode 100644 index 000000000..25b813c6b --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandler.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store; + +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.exec.planner.logical.StoragePlugins; +import org.apache.drill.exec.store.sys.PersistentStore; + + +/** + * Storage plugins handler is an additional service for updating storage plugins configs from the file + */ +public interface StoragePluginsHandler { + + /** + * Update incoming storage plugins configs from persistence store if present, otherwise bootstrap plugins configs. + * + * @param persistentStore the last storage plugins configs from persistence store + * @param bootstrapPlugins bootstrap storage plugins, which are used in case of first Drill start up + * @return all storage plugins, which should be loaded into persistence store + */ + void loadPlugins(PersistentStore<StoragePluginConfig> persistentStore, StoragePlugins bootstrapPlugins); + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandlerService.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandlerService.java new file mode 100644 index 000000000..599f4a345 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/StoragePluginsHandlerService.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Charsets; +import com.google.common.io.Resources; +import com.jasonclawson.jackson.dataformat.hocon.HoconFactory; +import org.apache.drill.common.config.CommonConstants; +import org.apache.drill.common.config.LogicalPlanPersistence; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.logical.StoragePluginConfig; +import org.apache.drill.common.scanner.ClassPathScanner; +import org.apache.drill.exec.planner.logical.StoragePlugins; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.sys.PersistentStore; +import org.apache.drill.exec.util.ActionOnFile; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; +import java.io.IOException; +import java.net.URL; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static org.apache.drill.exec.store.StoragePluginRegistry.ACTION_ON_STORAGE_PLUGINS_OVERRIDE_FILE; + +/** + * Drill plugins handler, which allows to update storage plugins configs from the + * {@link CommonConstants#STORAGE_PLUGINS_OVERRIDE_CONF} conf file + * + * TODO: DRILL-6564: It can be improved with configs versioning and service of creating + * {@link CommonConstants#STORAGE_PLUGINS_OVERRIDE_CONF} + */ +public class StoragePluginsHandlerService implements StoragePluginsHandler { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StoragePluginsHandlerService.class); + + private final LogicalPlanPersistence lpPersistence; + private final DrillbitContext context; + private URL pluginsOverrideFileUrl; + + public StoragePluginsHandlerService(DrillbitContext context) { + this.context = context; + this.lpPersistence = new LogicalPlanPersistence(context.getConfig(), context.getClasspathScan(), + new ObjectMapper(new HoconFactory())); + } + + @Override + public void loadPlugins(@NotNull PersistentStore<StoragePluginConfig> persistentStore, + @Nullable StoragePlugins bootstrapPlugins) { + // if bootstrapPlugins is not null -- fresh Drill set up + StoragePlugins pluginsForPersistentStore; + + StoragePlugins newPlugins = getNewStoragePlugins(); + + if (newPlugins != null) { + pluginsForPersistentStore = new StoragePlugins(new HashMap<>()); + Optional.ofNullable(bootstrapPlugins) + .ifPresent(pluginsForPersistentStore::putAll); + + for (Map.Entry<String, StoragePluginConfig> newPlugin : newPlugins) { + String pluginName = newPlugin.getKey(); + StoragePluginConfig oldPluginConfig = Optional.ofNullable(bootstrapPlugins) + .map(plugins -> plugins.getConfig(pluginName)) + .orElse(persistentStore.get(pluginName)); + StoragePluginConfig updatedStatusPluginConfig = updatePluginStatus(oldPluginConfig, newPlugin.getValue()); + pluginsForPersistentStore.put(pluginName, updatedStatusPluginConfig); + } + } else { + pluginsForPersistentStore = bootstrapPlugins; + } + + // load pluginsForPersistentStore to Persistent Store + Optional.ofNullable(pluginsForPersistentStore) + .ifPresent(plugins -> plugins.forEach(plugin -> persistentStore.put(plugin.getKey(), plugin.getValue()))); + + if (newPlugins != null) { + String fileAction = context.getConfig().getString(ACTION_ON_STORAGE_PLUGINS_OVERRIDE_FILE); + Optional<ActionOnFile> actionOnFile = Arrays.stream(ActionOnFile.values()) + .filter(action -> action.name().equalsIgnoreCase(fileAction)) + .findFirst(); + actionOnFile.ifPresent(action -> action.action(pluginsOverrideFileUrl)); + // TODO: replace with ifPresentOrElse() once the project will be on Java9 + if (!actionOnFile.isPresent()) { + logger.error("Unknown value {} for {} boot option. Nothing will be done with file.", + fileAction, ACTION_ON_STORAGE_PLUGINS_OVERRIDE_FILE); + } + } + } + + /** + * Helper method to identify the enabled status for new storage plugins config. If this status is absent in the updater + * file, the status is kept from the configs, which are going to be updated + * + * @param oldPluginConfig current storage plugin config from Persistent Store or bootstrap config file + * @param newPluginConfig new storage plugin config + * @return new storage plugin config with updated enabled status + */ + private StoragePluginConfig updatePluginStatus(@Nullable StoragePluginConfig oldPluginConfig, + @NotNull StoragePluginConfig newPluginConfig) { + if (!newPluginConfig.isEnabledStatusPresent()) { + boolean newStatus = oldPluginConfig != null && oldPluginConfig.isEnabled(); + newPluginConfig.setEnabled(newStatus); + } + return newPluginConfig; + } + + /** + * Get the new storage plugins from the {@link CommonConstants#STORAGE_PLUGINS_OVERRIDE_CONF} file if it exists, + * null otherwise + * + * @return storage plugins + */ + private StoragePlugins getNewStoragePlugins() { + Set<URL> urlSet = ClassPathScanner.forResource(CommonConstants.STORAGE_PLUGINS_OVERRIDE_CONF, false); + if (!urlSet.isEmpty()) { + if (urlSet.size() != 1) { + DrillRuntimeException.format("More than one %s file is placed in Drill's classpath: %s", + CommonConstants.STORAGE_PLUGINS_OVERRIDE_CONF, urlSet); + } + pluginsOverrideFileUrl = urlSet.iterator().next(); + try { + String newPluginsData = Resources.toString(pluginsOverrideFileUrl, Charsets.UTF_8); + return lpPersistence.getMapper().readValue(newPluginsData, StoragePlugins.class); + } catch (IOException e) { + logger.error("Failures are obtained while loading %s file. Proceed without update", + CommonConstants.STORAGE_PLUGINS_OVERRIDE_CONF, e); + } + } + logger.trace("The {} file is absent. Proceed without updating of the storage plugins configs", + CommonConstants.STORAGE_PLUGINS_OVERRIDE_CONF); + return null; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/StoragePluginTestUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/StoragePluginTestUtils.java index 16836c295..f6a7a375c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/StoragePluginTestUtils.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/StoragePluginTestUtils.java @@ -90,6 +90,7 @@ public class StoragePluginTestUtils { pluginConfig.getConfig(), newWorkspaces, pluginConfig.getFormats()); + newPluginConfig.setEnabled(pluginConfig.isEnabled()); pluginRegistry.createOrUpdate(pluginName, newPluginConfig, true); } @@ -137,6 +138,7 @@ public class StoragePluginTestUtils { fileSystemConfig.getConfig(), fileSystemConfig.getWorkspaces(), newFormats); + newFileSystemConfig.setEnabled(fileSystemConfig.isEnabled()); pluginRegistry.createOrUpdate(storagePlugin, newFileSystemConfig, true); } diff --git a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json index 417635a0b..42cddd865 100644 --- a/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json +++ b/exec/java-exec/src/main/resources/bootstrap-storage-plugins.json @@ -68,12 +68,13 @@ "crw", "cr2", "nef", "orf", "raf", "rw2", "rwl", "srw", "x3f" ] } - } + }, + enabled : true }, + s3: { type: "file", connection: "s3a://my.bucket.location.com", - enabled : false, config : { "fs.s3a.access.key": "ID", "fs.s3a.secret.key": "SECRET" @@ -124,7 +125,8 @@ delimiter: ",", extractHeader: true } - } + }, + enabled : false }, cp: { @@ -166,7 +168,8 @@ "crw", "cr2", "nef", "orf", "raf", "rw2", "rwl", "srw", "x3f" ] } - } + }, + enabled : true } } } diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 2e8c2e783..6889a2f5c 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -103,7 +103,10 @@ drill.exec: { buffer.size: 262144, batch.size: 4000 } - } + }, + # The action on the storage-plugins-override.conf after it's use. + # Possible values are "none" (default), "rename", "remove" + action_on_plugins_override_file: "none" }, zk: { connect: "localhost:2181", diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java index 6a4452e63..2b112e2cb 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/impersonation/BaseTestImpersonation.java @@ -131,6 +131,7 @@ public class BaseTestImpersonation extends PlanTestBase { createAndAddWorkspace("tmp", "/tmp", (short) 0777, processUser, processUser, workspaces); FileSystemConfig miniDfsPluginConfig = new FileSystemConfig(connection, null, workspaces, lfsPluginConfig.getFormats()); + miniDfsPluginConfig.setEnabled(true); pluginRegistry.createOrUpdate(MINIDFS_STORAGE_PLUGIN_NAME, miniDfsPluginConfig, true); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java index ebf2cdd0b..c4ababf1c 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestCTTAS.java @@ -66,6 +66,7 @@ public class TestCTTAS extends BaseTestQuery { pluginConfig.getConfig(), newWorkspaces, pluginConfig.getFormats()); + newPluginConfig.setEnabled(pluginConfig.isEnabled()); pluginRegistry.createOrUpdate(DFS_PLUGIN_NAME, newPluginConfig, true); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java index 096c8cdb7..77df0097b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterFixture.java @@ -277,7 +277,7 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { MockStorageEngine plugin = new MockStorageEngine( MockStorageEngineConfig.INSTANCE, bit.getContext(), MockStorageEngineConfig.NAME); - ((StoragePluginRegistryImpl) pluginRegistry).definePlugin(MockStorageEngineConfig.NAME, config, plugin); + ((StoragePluginRegistryImpl) pluginRegistry).addPluginToPersistentStoreIfAbsent(MockStorageEngineConfig.NAME, config, plugin); } private void applyOptions() throws Exception { @@ -515,6 +515,8 @@ public class ClusterFixture extends BaseFixture implements AutoCloseable { pluginConfig.getConfig(), newWorkspaces, newFormats); + newPluginConfig.setEnabled(pluginConfig.isEnabled()); + pluginRegistry.createOrUpdate(pluginName, newPluginConfig, true); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterMockStorageFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterMockStorageFixture.java index 54d7bf0c5..03b0828d9 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/test/ClusterMockStorageFixture.java +++ b/exec/java-exec/src/test/java/org/apache/drill/test/ClusterMockStorageFixture.java @@ -42,7 +42,7 @@ public class ClusterMockStorageFixture extends ClusterFixture { @SuppressWarnings("resource") MockBreakageStorage plugin = new MockBreakageStorage( MockStorageEngineConfig.INSTANCE, bit.getContext(), name); - ((StoragePluginRegistryImpl) pluginRegistry).definePlugin(name, config, plugin); + ((StoragePluginRegistryImpl) pluginRegistry).addPluginToPersistentStoreIfAbsent(name, config, plugin); plugin.setBreakRegister(breakRegisterSchema); } diff --git a/logical/pom.xml b/logical/pom.xml index 07a942b19..fbc653654 100644 --- a/logical/pom.xml +++ b/logical/pom.xml @@ -66,13 +66,11 @@ <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> - <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> - <version>3.1</version> </dependency> <dependency> diff --git a/logical/src/main/java/org/apache/drill/common/config/LogicalPlanPersistence.java b/logical/src/main/java/org/apache/drill/common/config/LogicalPlanPersistence.java index 6a3df3a63..e04f3adc7 100644 --- a/logical/src/main/java/org/apache/drill/common/config/LogicalPlanPersistence.java +++ b/logical/src/main/java/org/apache/drill/common/config/LogicalPlanPersistence.java @@ -35,14 +35,14 @@ import com.fasterxml.jackson.databind.module.SimpleModule; public class LogicalPlanPersistence { - private ObjectMapper mapper; + private final ObjectMapper mapper; - public ObjectMapper getMapper() { - return mapper; + public LogicalPlanPersistence(DrillConfig conf, ScanResult scanResult) { + this(conf, scanResult, new ObjectMapper()); } - public LogicalPlanPersistence(DrillConfig conf, ScanResult scanResult) { - mapper = new ObjectMapper(); + public LogicalPlanPersistence(DrillConfig conf, ScanResult scanResult, ObjectMapper mapper) { + this.mapper = mapper; SimpleModule deserModule = new SimpleModule("LogicalExpressionDeserializationModule") .addDeserializer(LogicalExpression.class, new LogicalExpression.De(conf)) @@ -59,6 +59,10 @@ public class LogicalPlanPersistence { registerSubtypes(FormatPluginConfigBase.getSubTypes(scanResult)); } + public ObjectMapper getMapper() { + return mapper; + } + private <T> void registerSubtypes(Set<Class<? extends T>> types) { for (Class<? extends T> type : types) { mapper.registerSubtypes(type); diff --git a/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java b/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java index 96c4036a2..49335f665 100644 --- a/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java +++ b/logical/src/main/java/org/apache/drill/common/logical/StoragePluginConfig.java @@ -18,22 +18,39 @@ package org.apache.drill.common.logical; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonTypeInfo; -@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property="type") +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") public abstract class StoragePluginConfig{ - private boolean enabled = true; + private Boolean enabled; + /** + * Check for enabled status of the plugin + * + * @return true, when enabled. False, when disabled or status is absent + */ public boolean isEnabled() { - return enabled; + return enabled != null && enabled; } - public void setEnabled(boolean enabled) { + + public void setEnabled(Boolean enabled) { this.enabled = enabled; } + /** + * Allows to check whether the enabled status is present in config + * + * @return true if enabled status is present, false otherwise + */ + @JsonIgnore + public boolean isEnabledStatusPresent() { + return enabled != null; + } + @Override public abstract boolean equals(Object o); @@ -1364,6 +1364,11 @@ <version>${jackson.version}</version> </dependency> <dependency> + <groupId>org.honton.chas.hocon</groupId> + <artifactId>jackson-dataformat-hocon</artifactId> + <version>1.1.1</version> + </dependency> + <dependency> <groupId>com.mapr.db</groupId> <artifactId>maprdb</artifactId> <version>${mapr.release.version}</version> @@ -1540,6 +1545,16 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>3.2</version> + </dependency> + <dependency> + <groupId>com.typesafe</groupId> + <artifactId>config</artifactId> + <version>1.0.0</version> + </dependency> </dependencies> </dependencyManagement> |