diff options
author | Arina Ielchiieva <arina.yelchiyeva@gmail.com> | 2016-12-20 16:57:15 +0000 |
---|---|---|
committer | Jinfeng Ni <jni@apache.org> | 2017-03-01 23:46:19 -0800 |
commit | dcbcb94fd2695edd4bbca63b2759292e99695d47 (patch) | |
tree | 7bbfc6493c42caa02a64d5478f65d32932417f8a | |
parent | 79811db5aa8c7f2cdbe6f74c0a40124bea9fb1fd (diff) |
DRILL-4963: Fix issues with dynamically loaded overloaded functions
close #701
27 files changed, 639 insertions, 349 deletions
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java index 2d329a852..ef6bbfea1 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -67,6 +67,20 @@ public class HBasePersistentStore<V> extends BasePersistentStore<V> { } @Override + public boolean contains(String key) { + try { + Get get = new Get(row(key)); + get.addColumn(FAMILY, QUALIFIER); + return hbaseTable.exists(get); + } catch (IOException e) { + throw UserException + .dataReadError(e) + .message("Caught error while checking row existence '%s' for table '%s'", key, hbaseTableName) + .build(logger); + } + } + + @Override public V get(String key) { return get(key, FAMILY); } diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java index 6b73283cf..f2783593f 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,6 +18,8 @@ package org.apache.drill.hbase; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.util.Map.Entry; @@ -57,6 +59,9 @@ public class TestHBaseTableProvider extends BaseHBaseTest { assertEquals("v0", hbaseStore.get("")); assertEquals("testValue", hbaseStore.get(".test")); + assertTrue(hbaseStore.contains("")); + assertFalse(hbaseStore.contains("unknown_key")); + int rowCount = 0; for (Entry<String, String> entry : Lists.newArrayList(hbaseStore.getAll())) { rowCount++; diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java index b5cc3eefa..73ff31de5 100644 --- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java +++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -61,6 +61,18 @@ public class MongoPersistentStore<V> extends BasePersistentStore<V> { } @Override + public boolean contains(String key) { + try { + Bson query = Filters.eq(DrillMongoConstants.ID, key); + Document document = collection.find(query).first(); + return document != null && document.containsKey(pKey); + } catch (Exception e) { + logger.error(e.getMessage(), e); + throw new DrillRuntimeException(e.getMessage(), e); + } + } + + @Override public V get(String key) { try { Bson query = Filters.eq(DrillMongoConstants.ID, key); diff --git a/distribution/src/resources/drill-override-example.conf b/distribution/src/resources/drill-override-example.conf index 43f9942ba..b9d09a8fb 100644 --- a/distribution/src/resources/drill-override-example.conf +++ b/distribution/src/resources/drill-override-example.conf @@ -171,8 +171,7 @@ drill.exec: { decode_threadpool_size: 1 }, debug.error_on_leak: true, - # Settings for Dynamic UDFs. - # See https://gist.github.com/arina-ielchiieva/a1c4cfa3890145c5ecb1b70a39cbff55#file-dynamicudfssupport-md. + # Settings for Dynamic UDFs (see https://issues.apache.org/jira/browse/DRILL-4726 for details). udf: { # number of retry attempts to update remote function registry # if registry version was changed during update diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java index 610a2b9e9..17cb6cbbd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -85,32 +85,53 @@ public class ZookeeperClient implements AutoCloseable { /** * Returns true if path exists in the cache, false otherwise. - * * Note that calls to this method are eventually consistent. * - * @param path path to check + * @param path path to check + * @return true if path exists, false otherwise */ public boolean hasPath(final String path) { - return hasPath(path, false); + return hasPath(path, false, null); + } + + /** + * Returns true if path exists, false otherwise. + * If consistent flag is set to true, check is done directly is made against Zookeeper directly, + * else check is done against local cache. + * + * @param path path to check + * @param consistent whether the check should be consistent + * @return true if path exists, false otherwise + */ + public boolean hasPath(final String path, final boolean consistent) { + return hasPath(path, consistent, null); } /** * Checks if the given path exists. + * If the flag consistent is set, the check is consistent as it is made against Zookeeper directly. + * Otherwise, the check is eventually consistent. * - * If the flag consistent is set, the check is consistent as it is made against Zookeeper directly. Otherwise, - * the check is eventually consistent. + * If consistency flag is set to true and version holder is not null, passes version holder to get data change version. + * Data change version is retrieved from {@link Stat} object, it increases each time znode data change is performed. + * Link to Zookeeper documentation - https://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#sc_zkDataModel_znodes * - * @param path path to check - * @param consistent whether the check should be consistent - * @return + * @param path path to check + * @param consistent whether the check should be consistent + * @param version version holder + * @return true if path exists, false otherwise */ - public boolean hasPath(final String path, final boolean consistent) { + public boolean hasPath(final String path, final boolean consistent, final DataChangeVersion version) { Preconditions.checkNotNull(path, "path is required"); final String target = PathUtils.join(root, path); try { if (consistent) { - return curator.checkExists().forPath(target) != null; + Stat stat = curator.checkExists().forPath(target); + if (version != null && stat != null) { + version.setVersion(stat.getVersion()); + } + return stat != null; } else { return getCache().getCurrentData(target) != null; } @@ -153,7 +174,7 @@ public class ZookeeperClient implements AutoCloseable { * @param path target path * @param version version holder */ - public byte[] get(final String path, DataChangeVersion version) { + public byte[] get(final String path, final DataChangeVersion version) { return get(path, true, version); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java deleted file mode 100644 index 0d59cc8d7..000000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.exception; - -import org.apache.drill.common.exceptions.DrillRuntimeException; - -public class FunctionNotFoundException extends DrillRuntimeException { - - public FunctionNotFoundException(String message, Throwable cause) { - super(message, cause); - } -} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java index ce0d68b64..5c7bfb433 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -58,11 +58,13 @@ import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry; import org.apache.drill.exec.planner.sql.DrillOperatorTable; import org.apache.drill.exec.proto.UserBitShared.Jar; import org.apache.drill.exec.resolver.FunctionResolver; +import org.apache.drill.exec.resolver.FunctionResolverFactory; import org.apache.drill.exec.server.options.OptionManager; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; +import org.apache.drill.exec.store.sys.store.DataChangeVersion; import org.apache.drill.exec.util.JarUtil; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -128,7 +130,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au /** * Register functions in given operator table. - * @param operatorTable + * @param operatorTable operator table */ public void register(DrillOperatorTable operatorTable) { // Register Drill functions first and move to pluggable function registries. @@ -140,27 +142,39 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au } /** - * Using the given <code>functionResolver</code> - * finds Drill function implementation for given <code>functionCall</code>. - * If function implementation was not found, - * loads all missing remote functions and tries to find Drill implementation one more time. + * First attempts to finds the Drill function implementation that matches the name, arg types and return type. + * If exact function implementation was not found, + * syncs local function registry with remote function registry if needed + * and tries to find function implementation one more time + * but this time using given <code>functionResolver</code>. + * + * @param functionResolver function resolver + * @param functionCall function call + * @return best matching function holder */ @Override public DrillFuncHolder findDrillFunction(FunctionResolver functionResolver, FunctionCall functionCall) { - return findDrillFunction(functionResolver, functionCall, true); - } - - private DrillFuncHolder findDrillFunction(FunctionResolver functionResolver, FunctionCall functionCall, boolean retry) { AtomicLong version = new AtomicLong(); - DrillFuncHolder holder = functionResolver.getBestMatch( - localFunctionRegistry.getMethods(functionReplacement(functionCall), version), functionCall); - if (holder == null && retry && loadRemoteFunctions(version.get())) { - return findDrillFunction(functionResolver, functionCall, false); + String newFunctionName = functionReplacement(functionCall); + List<DrillFuncHolder> functions = localFunctionRegistry.getMethods(newFunctionName, version); + FunctionResolver exactResolver = FunctionResolverFactory.getExactResolver(functionCall); + DrillFuncHolder holder = exactResolver.getBestMatch(functions, functionCall); + + if (holder == null) { + syncWithRemoteRegistry(version.get()); + List<DrillFuncHolder> updatedFunctions = localFunctionRegistry.getMethods(newFunctionName, version); + holder = functionResolver.getBestMatch(updatedFunctions, functionCall); } + return holder; } - // Check if this Function Replacement is needed; if yes, return a new name. otherwise, return the original name + /** + * Checks if this function replacement is needed. + * + * @param functionCall function call + * @return new function name is replacement took place, otherwise original function name + */ private String functionReplacement(FunctionCall functionCall) { String funcName = functionCall.getName(); if (functionCall.args.size() > 0) { @@ -178,22 +192,41 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au } /** - * Find the Drill function implementation that matches the name, arg types and return type. - * If exact function implementation was not found, - * loads all missing remote functions and tries to find Drill implementation one more time. + * Finds the Drill function implementation that matches the name, arg types and return type. + * + * @param name function name + * @param argTypes input parameters types + * @param returnType function return type + * @return exactly matching function holder */ public DrillFuncHolder findExactMatchingDrillFunction(String name, List<MajorType> argTypes, MajorType returnType) { return findExactMatchingDrillFunction(name, argTypes, returnType, true); } - private DrillFuncHolder findExactMatchingDrillFunction(String name, List<MajorType> argTypes, MajorType returnType, boolean retry) { + /** + * Finds the Drill function implementation that matches the name, arg types and return type. + * If exact function implementation was not found, + * checks if local function registry is in sync with remote function registry. + * If not syncs them and tries to find exact function implementation one more time + * but with retry flag set to false. + * + * @param name function name + * @param argTypes input parameters types + * @param returnType function return type + * @param retry retry on failure flag + * @return exactly matching function holder + */ + private DrillFuncHolder findExactMatchingDrillFunction(String name, + List<MajorType> argTypes, + MajorType returnType, + boolean retry) { AtomicLong version = new AtomicLong(); for (DrillFuncHolder h : localFunctionRegistry.getMethods(name, version)) { if (h.matches(returnType, argTypes)) { return h; } } - if (retry && loadRemoteFunctions(version.get())) { + if (retry && syncWithRemoteRegistry(version.get())) { return findExactMatchingDrillFunction(name, argTypes, returnType, false); } return null; @@ -206,8 +239,8 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au * Note: Order of searching is same as order of {@link org.apache.drill.exec.expr.fn.PluggableFunctionRegistry} * implementations found on classpath. * - * @param functionCall - * @return + * @param functionCall function call + * @return drill function holder */ @Override public AbstractFuncHolder findNonDrillFunction(FunctionCall functionCall) { @@ -260,76 +293,101 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au } /** - * Attempts to load and register functions from remote function registry. - * First checks if there is no missing jars. - * If yes, enters synchronized block to prevent other loading the same jars. - * Again re-checks if there are no missing jars in case someone has already loaded them (double-check lock). - * If there are still missing jars, first copies jars to local udf area and prepares {@link JarScan} for each jar. - * Jar registration timestamp represented in milliseconds is used as suffix. - * Then registers all jars at the same time. Returns true when finished. - * In case if any errors during jars coping or registration, logs errors and proceeds. + * Purpose of this method is to synchronize remote and local function registries if needed + * and to inform if function registry was changed after given version. + * + * To make synchronization as much light-weigh as possible, first only versions of both registries are checked + * without any locking. If synchronization is needed, enters synchronized block to prevent others loading the same jars. + * The need of synchronization is checked again (double-check lock) before comparing jars. + * If any missing jars are found, they are downloaded to local udf area, each is wrapped into {@link JarScan}. + * Once jar download is finished, all missing jars are registered in one batch. + * In case if any errors during jars download / registration, these errors are logged. * - * If no missing jars are found, checks current local registry version. - * Returns false if versions match, true otherwise. + * During registration local function registry is updated with remote function registry version it is synced with. + * When at least one jar of the missing jars failed to download / register, + * local function registry version are not updated but jars that where successfully downloaded / registered + * are added to local function registry. * - * @param version local function registry version - * @return true if new jars were registered or local function registry version is different, false otherwise + * If synchronization between remote and local function registry was not needed, + * checks if given registry version matches latest sync version + * to inform if function registry was changed after given version. + * + * @param version remote function registry local function registry was based on + * @return true if remote and local function registries were synchronized after given version */ - public boolean loadRemoteFunctions(long version) { - List<String> missingJars = getMissingJars(remoteFunctionRegistry, localFunctionRegistry); - if (!missingJars.isEmpty()) { + public boolean syncWithRemoteRegistry(long version) { + if (isRegistrySyncNeeded(remoteFunctionRegistry.getRegistryVersion(), localFunctionRegistry.getVersion())) { synchronized (this) { - missingJars = getMissingJars(remoteFunctionRegistry, localFunctionRegistry); - if (!missingJars.isEmpty()) { - logger.info("Starting dynamic UDFs lazy-init process.\n" + - "The following jars are going to be downloaded and registered locally: " + missingJars); + long localRegistryVersion = localFunctionRegistry.getVersion(); + if (isRegistrySyncNeeded(remoteFunctionRegistry.getRegistryVersion(), localRegistryVersion)) { + DataChangeVersion remoteVersion = new DataChangeVersion(); + List<String> missingJars = getMissingJars(this.remoteFunctionRegistry, localFunctionRegistry, remoteVersion); List<JarScan> jars = Lists.newArrayList(); - for (String jarName : missingJars) { - Path binary = null; - Path source = null; - URLClassLoader classLoader = null; - try { - binary = copyJarToLocal(jarName, remoteFunctionRegistry); - source = copyJarToLocal(JarUtil.getSourceName(jarName), remoteFunctionRegistry); - URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()}; - classLoader = new URLClassLoader(urls); - ScanResult scanResult = scan(classLoader, binary, urls); - localFunctionRegistry.validate(jarName, scanResult); - jars.add(new JarScan(jarName, scanResult, classLoader)); - } catch (Exception e) { - deleteQuietlyLocalJar(binary); - deleteQuietlyLocalJar(source); - if (classLoader != null) { - try { - classLoader.close(); - } catch (Exception ex) { - logger.warn("Problem during closing class loader for {}", jarName, e); + if (!missingJars.isEmpty()) { + logger.info("Starting dynamic UDFs lazy-init process.\n" + + "The following jars are going to be downloaded and registered locally: " + missingJars); + for (String jarName : missingJars) { + Path binary = null; + Path source = null; + URLClassLoader classLoader = null; + try { + binary = copyJarToLocal(jarName, this.remoteFunctionRegistry); + source = copyJarToLocal(JarUtil.getSourceName(jarName), this.remoteFunctionRegistry); + URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()}; + classLoader = new URLClassLoader(urls); + ScanResult scanResult = scan(classLoader, binary, urls); + localFunctionRegistry.validate(jarName, scanResult); + jars.add(new JarScan(jarName, scanResult, classLoader)); + } catch (Exception e) { + deleteQuietlyLocalJar(binary); + deleteQuietlyLocalJar(source); + if (classLoader != null) { + try { + classLoader.close(); + } catch (Exception ex) { + logger.warn("Problem during closing class loader for {}", jarName, e); + } } + logger.error("Problem during remote functions load from {}", jarName, e); } - logger.error("Problem during remote functions load from {}", jarName, e); } } - if (!jars.isEmpty()) { - localFunctionRegistry.register(jars); - return true; - } + long latestRegistryVersion = jars.size() != missingJars.size() ? + localRegistryVersion : remoteVersion.getVersion(); + localFunctionRegistry.register(jars, latestRegistryVersion); + return true; } } } + return version != localFunctionRegistry.getVersion(); } /** - * First finds path to marker file url, otherwise throws {@link JarValidationException}. - * Then scans jar classes according to list indicated in marker files. - * Additional logic is added to close {@link URL} after {@link ConfigFactory#parseURL(URL)}. - * This is extremely important for Windows users where system doesn't allow to delete file if it's being used. + * Checks if local function registry should be synchronized with remote function registry. + * If remote function registry version is -1, it means that remote function registry is unreachable + * or is not configured thus we skip synchronization and return false. + * In all other cases synchronization is needed if remote and local function registries versions do not match. * - * @param classLoader unique class loader for jar - * @param path local path to jar - * @param urls urls associated with the jar (ex: binary and source) - * @return scan result of packages, classes, annotations found in jar + * @param remoteVersion remote function registry version + * @param localVersion local function registry version + * @return true is local registry should be refreshed, false otherwise */ + private boolean isRegistrySyncNeeded(long remoteVersion, long localVersion) { + return remoteVersion != -1 && remoteVersion != localVersion; + } + + /** + * First finds path to marker file url, otherwise throws {@link JarValidationException}. + * Then scans jar classes according to list indicated in marker files. + * Additional logic is added to close {@link URL} after {@link ConfigFactory#parseURL(URL)}. + * This is extremely important for Windows users where system doesn't allow to delete file if it's being used. + * + * @param classLoader unique class loader for jar + * @param path local path to jar + * @param urls urls associated with the jar (ex: binary and source) + * @return scan result of packages, classes, annotations found in jar + */ private ScanResult scan(ClassLoader classLoader, Path path, URL[] urls) throws IOException { Enumeration<URL> markerFileEnumeration = classLoader.getResources( CommonConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME); @@ -355,14 +413,17 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au /** * Return list of jars that are missing in local function registry * but present in remote function registry. + * Also updates version holder with remote function registry version. * * @param remoteFunctionRegistry remote function registry * @param localFunctionRegistry local function registry + * @param version holder for remote function registry version * @return list of missing jars */ private List<String> getMissingJars(RemoteFunctionRegistry remoteFunctionRegistry, - LocalFunctionRegistry localFunctionRegistry) { - List<Jar> remoteJars = remoteFunctionRegistry.getRegistry().getJarList(); + LocalFunctionRegistry localFunctionRegistry, + DataChangeVersion version) { + List<Jar> remoteJars = remoteFunctionRegistry.getRegistry(version).getJarList(); List<String> localJars = localFunctionRegistry.getAllJarNames(); List<String> missingJars = Lists.newArrayList(); for (Jar jar : remoteJars) { @@ -384,8 +445,10 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au private Path getLocalUdfDir(DrillConfig config) { tmpDir = getTmpDir(config); File udfDir = new File(tmpDir, config.getString(ExecConstants.UDF_DIRECTORY_LOCAL)); - udfDir.mkdirs(); String udfPath = udfDir.getPath(); + if (udfDir.mkdirs()) { + logger.debug("Local udf directory [{}] was created", udfPath); + } Preconditions.checkState(udfDir.exists(), "Local udf directory [%s] must exist", udfPath); Preconditions.checkState(udfDir.isDirectory(), "Local udf directory [%s] must be a directory", udfPath); Preconditions.checkState(udfDir.canWrite(), "Local udf directory [%s] must be writable for application user", udfPath); @@ -404,6 +467,8 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au * If value is still missing, generates directory using {@link Files#createTempDir()}. * If temporary directory was generated, sets {@link #deleteTmpDir} to true * to delete directory on drillbit exit. + * + * @param config drill config * @return drill temporary directory path */ private File getTmpDir(DrillConfig config) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java index 005c4e58c..3124539cc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -42,8 +42,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; * since we expect infrequent registry changes. * Holder is designed to allow concurrent reads and single writes to keep data consistent. * This is achieved by {@link ReadWriteLock} implementation usage. - * Holder has number version which changes every time new jars are added or removed. Initial version number is 0. - * Also version is used when user needs data from registry with version it is based on. + * Holder has number version which indicates remote function registry version number it is in sync with. * * Structure example: * @@ -86,7 +85,8 @@ public class FunctionRegistryHolder { private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); private final AutoCloseableLock readLock = new AutoCloseableLock(readWriteLock.readLock()); private final AutoCloseableLock writeLock = new AutoCloseableLock(readWriteLock.writeLock()); - private long version = 0; + // remote function registry number, it is in sync with + private long version; // jar name, Map<function name, Queue<function signature> private final Map<String, Map<String, Queue<String>>> jars; @@ -114,13 +114,13 @@ public class FunctionRegistryHolder { * If jar with the same name already exists, it and its functions will be removed. * Then jar will be added to {@link #jars} * and each function will be added using {@link #addFunctions(Map, List)}. - * Function version registry will be incremented by 1 if at least one jar was added but not for each jar. + * Registry version is updated with passed version if all jars were added successfully. * This is write operation, so one user at a time can call perform such action, * others will wait till first user completes his action. * * @param newJars jars and list of their function holders, each contains function name, signature and holder */ - public void addJars(Map<String, List<FunctionHolder>> newJars) { + public void addJars(Map<String, List<FunctionHolder>> newJars, long version) { try (AutoCloseableLock lock = writeLock.open()) { for (Map.Entry<String, List<FunctionHolder>> newJar : newJars.entrySet()) { String jarName = newJar.getKey(); @@ -129,15 +129,12 @@ public class FunctionRegistryHolder { jars.put(jarName, jar); addFunctions(jar, newJar.getValue()); } - if (!newJars.isEmpty()) { - version++; - } + this.version = version; } } /** * Removes jar from {@link #jars} and all associated with jar functions from {@link #functions} - * If jar was removed, function registry version will be incremented by 1. * This is write operation, so one user at a time can call perform such action, * others will wait till first user completes his action. * @@ -145,9 +142,7 @@ public class FunctionRegistryHolder { */ public void removeJar(String jarName) { try (AutoCloseableLock lock = writeLock.open()) { - if (removeAllByJar(jarName)) { - version++; - } + removeAllByJar(jarName); } } @@ -341,12 +336,11 @@ public class FunctionRegistryHolder { * All jar functions have the same class loader, so we need to close only one time. * * @param jarName jar name to be removed - * @return true if jar was removed, false otherwise */ - private boolean removeAllByJar(String jarName) { + private void removeAllByJar(String jarName) { Map<String, Queue<String>> jar = jars.remove(jarName); if (jar == null) { - return false; + return; } for (Map.Entry<String, Queue<String>> functionEntry : jar.entrySet()) { @@ -372,6 +366,5 @@ public class FunctionRegistryHolder { functions.remove(function); } } - return true; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java index 2a3f167c8..1318f72dc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -73,12 +73,14 @@ public class LocalFunctionRegistry { private final FunctionRegistryHolder registryHolder; - /** Registers all functions present in Drill classpath on start-up. All functions will be marked as built-in. - * Built-in functions are not allowed to be unregistered. */ + /** + * Registers all functions present in Drill classpath on start-up. All functions will be marked as built-in. + * Built-in functions are not allowed to be unregistered. Initially sync registry version will be set to 0. + */ public LocalFunctionRegistry(ScanResult classpathScan) { registryHolder = new FunctionRegistryHolder(); validate(BUILT_IN, classpathScan); - register(Lists.newArrayList(new JarScan(BUILT_IN, classpathScan, this.getClass().getClassLoader()))); + register(Lists.newArrayList(new JarScan(BUILT_IN, classpathScan, this.getClass().getClassLoader())), 0); if (logger.isTraceEnabled()) { StringBuilder allFunctions = new StringBuilder(); for (DrillFuncHolder method: registryHolder.getAllFunctionsWithHolders().values()) { @@ -89,7 +91,7 @@ public class LocalFunctionRegistry { } /** - * @return local function registry version number + * @return remote function registry version number with which local function registry is synced */ public long getVersion() { return registryHolder.getVersion(); @@ -147,14 +149,15 @@ public class LocalFunctionRegistry { } /** - * Registers all functions present in jar. + * Registers all functions present in jar and updates registry version. * If jar name is already registered, all jar related functions will be overridden. * To prevent classpath collisions during loading and unloading jars, * each jar is shipped with its own class loader. * * @param jars list of jars to be registered + * @param version remote function registry version number with which local function registry is synced */ - public void register(List<JarScan> jars) { + public void register(List<JarScan> jars, long version) { Map<String, List<FunctionHolder>> newJars = Maps.newHashMap(); for (JarScan jarScan : jars) { FunctionConverter converter = new FunctionConverter(); @@ -174,7 +177,7 @@ public class LocalFunctionRegistry { } } } - registryHolder.addJars(newJars); + registryHolder.addJars(newJars, version); } /** @@ -217,25 +220,31 @@ public class LocalFunctionRegistry { return registryHolder.getHoldersByFunctionName(name.toLowerCase(), version); } + /** + * @param name function name + * @return all function holders associated with the function name. Function name is case insensitive. + */ public List<DrillFuncHolder> getMethods(String name) { return registryHolder.getHoldersByFunctionName(name.toLowerCase()); } /** * Registers all functions present in {@link DrillOperatorTable}, - * also sets local registry version used at the moment of registering. + * also sets sync registry version used at the moment of function registration. * * @param operatorTable drill operator table */ public void register(DrillOperatorTable operatorTable) { AtomicLong versionHolder = new AtomicLong(); - final Map<String, Collection<DrillFuncHolder>> registeredFunctions = registryHolder.getAllFunctionsWithHolders(versionHolder).asMap(); + final Map<String, Collection<DrillFuncHolder>> registeredFunctions = + registryHolder.getAllFunctionsWithHolders(versionHolder).asMap(); operatorTable.setFunctionRegistryVersion(versionHolder.get()); registerOperatorsWithInference(operatorTable, registeredFunctions); registerOperatorsWithoutInference(operatorTable, registeredFunctions); } - private void registerOperatorsWithInference(DrillOperatorTable operatorTable, Map<String, Collection<DrillFuncHolder>> registeredFunctions) { + private void registerOperatorsWithInference(DrillOperatorTable operatorTable, Map<String, + Collection<DrillFuncHolder>> registeredFunctions) { final Map<String, DrillSqlOperator.DrillSqlOperatorBuilder> map = Maps.newHashMap(); final Map<String, DrillSqlAggOperator.DrillSqlAggOperatorBuilder> mapAgg = Maps.newHashMap(); for (Entry<String, Collection<DrillFuncHolder>> function : registeredFunctions.entrySet()) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java index fe7958374..2e5eda209 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -106,8 +106,26 @@ public class RemoteFunctionRegistry implements AutoCloseable { this.retryAttempts = config.getInt(ExecConstants.UDF_RETRY_ATTEMPTS); } - public Registry getRegistry() { - return registry.get(registry_path, null); + /** + * Returns current remote function registry version. + * If remote function registry is not found or unreachable, logs error and returns -1. + * + * @return remote function registry version if any, -1 otherwise + */ + public long getRegistryVersion() { + DataChangeVersion version = new DataChangeVersion(); + boolean contains = false; + try { + contains = registry.contains(registry_path, version); + } catch (Exception e) { + logger.error("Problem during trying to access remote function registry [{}]", registry_path, e); + } + if (contains) { + return version.getVersion(); + } else { + logger.error("Remote function registry [{}] is unreachable", registry_path); + return -1; + } } public Registry getRegistry(DataChangeVersion version) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java index 264af299c..707815a70 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java @@ -66,7 +66,6 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem private final UserSession session; private final OptionManager queryOptions; private final PlannerSettings plannerSettings; - private final DrillOperatorTable table; private final ExecutionControls executionControls; private final BufferAllocator allocator; @@ -83,6 +82,7 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem * time this is set to true and the close method becomes a no-op. */ private boolean closed = false; + private DrillOperatorTable table; public QueryContext(final UserSession session, final DrillbitContext drillbitContext, QueryId queryId) { this.drillbitContext = drillbitContext; @@ -229,6 +229,15 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem return table; } + /** + * Re-creates drill operator table to refresh functions list from local function registry. + */ + public void reloadDrillOperatorTable() { + table = new DrillOperatorTable( + drillbitContext.getFunctionImplementationRegistry(), + drillbitContext.getOptionManager()); + } + public QueryContextInformation getQueryContextInfo() { return queryContextInfo; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java index 6e5c72b8d..5102ae8b0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -36,7 +36,7 @@ import org.apache.drill.exec.server.options.OptionManager; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; + /** * Implementation of {@link SqlOperatorTable} that contains standard operators and functions provided through @@ -52,8 +52,8 @@ public class DrillOperatorTable extends SqlStdOperatorTable { private final ArrayListMultimap<String, SqlOperator> drillOperatorsWithoutInferenceMap = ArrayListMultimap.create(); private final ArrayListMultimap<String, SqlOperator> drillOperatorsWithInferenceMap = ArrayListMultimap.create(); - // indicates local function registry version based on which drill operator were loaded - // is used to define if we need to reload operator table in case when function signature was not found + // indicates remote function registry version based on which drill operator were loaded + // is used to define if we need to reload operator table in case remote function registry version has changed private long functionRegistryVersion; private final OptionManager systemOptionManager; @@ -65,19 +65,18 @@ public class DrillOperatorTable extends SqlStdOperatorTable { this.systemOptionManager = systemOptionManager; } - /** Cleans up all operator holders and reloads operators */ - public void reloadOperators(FunctionImplementationRegistry registry) { - drillOperatorsWithoutInference.clear(); - drillOperatorsWithInference.clear(); - drillOperatorsWithoutInferenceMap.clear(); - drillOperatorsWithInferenceMap.clear(); - registry.register(this); - } - - public long setFunctionRegistryVersion(long version) { - return functionRegistryVersion = version; + /** + * Set function registry version based on which operator table was loaded. + * + * @param version registry version + */ + public void setFunctionRegistryVersion(long version) { + functionRegistryVersion = version; } + /** + * @return function registry version based on which operator table was loaded + */ public long getFunctionRegistryVersion() { return functionRegistryVersion; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java index 0ad39440d..3bc09229e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java @@ -24,10 +24,7 @@ import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.tools.RelConversionException; import org.apache.calcite.tools.ValidationException; import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.exec.exception.FunctionNotFoundException; -import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; import org.apache.drill.exec.ops.QueryContext; -import org.apache.drill.exec.ops.UdfUtilities; import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler; import org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler; @@ -50,13 +47,56 @@ public class DrillSqlWorker { private DrillSqlWorker() { } + /** + * Converts sql query string into query physical plan. + * + * @param context query context + * @param sql sql query + * @return query physical plan + */ public static PhysicalPlan getPlan(QueryContext context, String sql) throws SqlParseException, ValidationException, ForemanSetupException { return getPlan(context, sql, null); } + /** + * Converts sql query string into query physical plan. + * In case of any errors (that might occur due to missing function implementation), + * checks if local function registry should be synchronized with remote function registry. + * If sync took place, reloads drill operator table + * (since functions were added to / removed from local function registry) + * and attempts to converts sql query string into query physical plan one more time. + * + * @param context query context + * @param sql sql query + * @param textPlan text plan + * @return query physical plan + */ public static PhysicalPlan getPlan(QueryContext context, String sql, Pointer<String> textPlan) throws ForemanSetupException { + Pointer<String> textPlanCopy = textPlan == null ? null : new Pointer<>(textPlan.value); + try { + return getQueryPlan(context, sql, textPlan); + } catch (Exception e) { + if (context.getFunctionRegistry().syncWithRemoteRegistry( + context.getDrillOperatorTable().getFunctionRegistryVersion())) { + context.reloadDrillOperatorTable(); + return getQueryPlan(context, sql, textPlanCopy); + } + throw e; + } + } + + /** + * Converts sql query string into query physical plan. + * + * @param context query context + * @param sql sql query + * @param textPlan text plan + * @return query physical plan + */ + private static PhysicalPlan getQueryPlan(QueryContext context, String sql, Pointer<String> textPlan) + throws ForemanSetupException { final SqlConverter parser = new SqlConverter(context); @@ -88,7 +128,7 @@ public class DrillSqlWorker { } try { - return getPhysicalPlan(handler, sqlNode, context); + return handler.getPlan(sqlNode); } catch(ValidationException e) { String errorMessage = e.getCause() != null ? e.getCause().getMessage() : e.getMessage(); throw UserException.validationError(e) @@ -104,26 +144,4 @@ public class DrillSqlWorker { throw new QueryInputException("Failure handling SQL.", e); } } - - /** - * Returns query physical plan. - * In case of {@link FunctionNotFoundException} attempts to load remote functions. - * If at least one function was loaded or local function function registry version has changed, - * makes one more attempt to get query physical plan. - */ - private static PhysicalPlan getPhysicalPlan(AbstractSqlHandler handler, SqlNode sqlNode, QueryContext context) - throws RelConversionException, IOException, ForemanSetupException, ValidationException { - try { - return handler.getPlan(sqlNode); - } catch (FunctionNotFoundException e) { - DrillOperatorTable drillOperatorTable = context.getDrillOperatorTable(); - FunctionImplementationRegistry functionRegistry = context.getFunctionRegistry(); - if (functionRegistry.loadRemoteFunctions(drillOperatorTable.getFunctionRegistryVersion())) { - drillOperatorTable.reloadOperators(functionRegistry); - return handler.getPlan(sqlNode); - } - throw e; - } - } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java index e9085f7c4..845848c2e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java @@ -49,17 +49,13 @@ import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.util.ChainedSqlOperatorTable; import org.apache.calcite.sql.validate.SqlConformance; import org.apache.calcite.sql.validate.SqlValidatorCatalogReader; -import org.apache.calcite.sql.validate.SqlValidatorException; import org.apache.calcite.sql.validate.SqlValidatorImpl; import org.apache.calcite.sql.validate.SqlValidatorScope; import org.apache.calcite.sql2rel.RelDecorrelator; import org.apache.calcite.sql2rel.SqlToRelConverter; -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.exception.FunctionNotFoundException; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.ops.UdfUtilities; @@ -173,11 +169,6 @@ public class SqlConverter { SqlNode validatedNode = validator.validate(parsedNode); return validatedNode; } catch (RuntimeException e) { - final Throwable rootCause = ExceptionUtils.getRootCause(e); - if (rootCause instanceof SqlValidatorException - && StringUtils.contains(rootCause.getMessage(), "No match found for function signature")) { - throw new FunctionNotFoundException(rootCause.getMessage(), e); - } UserException.Builder builder = UserException .validationError(e) .addContext("SQL Query", sql); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java index 48bfd8b00..0902fb7d4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -186,6 +186,7 @@ public class CreateFunctionHandler extends DefaultSqlHandler { remoteRegistry.updateRegistry(updatedRegistry, version); return; } catch (VersionMismatchException ex) { + logger.debug("Failed to update function registry during registration, version mismatch was detected.", ex); retryAttempts--; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java index 6e2801a8b..b5d0b23bc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -142,6 +142,7 @@ public class DropFunctionHandler extends DefaultSqlHandler { remoteFunctionRegistry.updateRegistry(updatedRegistry, version); return jarToBeDeleted; } catch (VersionMismatchException ex) { + logger.debug("Failed to update function registry during unregistration, version mismatch was detected.", ex); retryAttempts--; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java index ea382782a..064040732 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -29,6 +29,12 @@ public abstract class BasePersistentStore<V> implements PersistentStore<V> { return getRange(0, Integer.MAX_VALUE); } + /** By default contains with version will behave the same way as without version. + * Override this method to add version support. */ + public boolean contains(String key, DataChangeVersion version) { + return contains(key); + } + /** By default get with version will behave the same way as without version. * Override this method to add version support. */ @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java index bb2375244..206642a51 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -34,6 +34,24 @@ public interface PersistentStore<V> extends AutoCloseable { PersistentStoreMode getMode(); /** + * Checks if lookup key is present in store. + * + * @param key lookup key + * @return true if store contains lookup key, false otherwise + */ + boolean contains(String key); + + /** + * Checks if lookup key is present in store. + * Sets data change version number. + * + * @param key lookup key + * @param version version holder + * @return true if store contains lookup key, false otherwise + */ + boolean contains(String key, DataChangeVersion version); + + /** * Returns the value for the given key if exists, null otherwise. * @param key lookup key */ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java index b9a4b59ea..ef855e268 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -28,12 +28,16 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.annotation.Nullable; import org.apache.commons.io.IOUtils; import org.apache.drill.common.collections.ImmutableEntry; +import org.apache.drill.common.concurrent.AutoCloseableLock; import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.exception.VersionMismatchException; import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.sys.BasePersistentStore; import org.apache.drill.exec.store.sys.PersistentStoreConfig; @@ -47,13 +51,20 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class LocalPersistentStore<V> extends BasePersistentStore<V> { -// private static final Logger logger = LoggerFactory.getLogger(LocalPersistentStore.class); + private static final Logger logger = LoggerFactory.getLogger(LocalPersistentStore.class); + + private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final AutoCloseableLock readLock = new AutoCloseableLock(readWriteLock.readLock()); + private final AutoCloseableLock writeLock = new AutoCloseableLock(readWriteLock.writeLock()); private final Path basePath; private final PersistentStoreConfig<V> config; private final DrillFileSystem fs; + private int version = -1; public LocalPersistentStore(DrillFileSystem fs, Path base, PersistentStoreConfig<V> config) { super(); @@ -62,7 +73,9 @@ public class LocalPersistentStore<V> extends BasePersistentStore<V> { this.fs = fs; try { - mkdirs(basePath); + if (!fs.mkdirs(basePath)) { + version++; + } } catch (IOException e) { throw new RuntimeException("Failure setting pstore configuration path."); } @@ -73,11 +86,7 @@ public class LocalPersistentStore<V> extends BasePersistentStore<V> { return PersistentStoreMode.PERSISTENT; } - private void mkdirs(Path path) throws IOException{ - fs.mkdirs(path); - } - - public static Path getLogDir(){ + public static Path getLogDir() { String drillLogDir = System.getenv("DRILL_LOG_DIR"); if (drillLogDir == null) { drillLogDir = "/var/log/drill"; @@ -85,10 +94,10 @@ public class LocalPersistentStore<V> extends BasePersistentStore<V> { return new Path(new File(drillLogDir).getAbsoluteFile().toURI()); } - public static DrillFileSystem getFileSystem(DrillConfig config, Path root) throws IOException{ + public static DrillFileSystem getFileSystem(DrillConfig config, Path root) throws IOException { Path blobRoot = root == null ? getLogDir() : root; Configuration fsConf = new Configuration(); - if(blobRoot.toUri().getScheme() != null){ + if (blobRoot.toUri().getScheme() != null) { fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, blobRoot.toUri().toString()); } @@ -100,93 +109,142 @@ public class LocalPersistentStore<V> extends BasePersistentStore<V> { @Override public Iterator<Map.Entry<String, V>> getRange(int skip, int take) { - try{ - List<FileStatus> f = fs.list(false, basePath); - if (f == null || f.isEmpty()) { - return Collections.emptyIterator(); - } - List<String> files = Lists.newArrayList(); - - for (FileStatus stat : f) { - String s = stat.getPath().getName(); - if (s.endsWith(DRILL_SYS_FILE_SUFFIX)) { - files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length())); + try (AutoCloseableLock lock = readLock.open()) { + try { + List<FileStatus> f = fs.list(false, basePath); + if (f == null || f.isEmpty()) { + return Collections.emptyIterator(); } - } + List<String> files = Lists.newArrayList(); - Collections.sort(files); - return Iterables.transform(Iterables.limit(Iterables.skip(files, skip), take), new Function<String, Entry<String, V>>() { - @Nullable - @Override - public Entry<String, V> apply(String key) { - return new ImmutableEntry<>(key, get(key)); + for (FileStatus stat : f) { + String s = stat.getPath().getName(); + if (s.endsWith(DRILL_SYS_FILE_SUFFIX)) { + files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length())); + } } - }).iterator(); - }catch(IOException e){ - throw new RuntimeException(e); + + Collections.sort(files); + return Iterables.transform(Iterables.limit(Iterables.skip(files, skip), take), new Function<String, Entry<String, V>>() { + @Nullable + @Override + public Entry<String, V> apply(String key) { + return new ImmutableEntry<>(key, get(key)); + } + }).iterator(); + } catch (IOException e) { + throw new RuntimeException(e); + } } } private Path makePath(String name) { Preconditions.checkArgument( !name.contains("/") && - !name.contains(":") && - !name.contains("..")); + !name.contains(":") && + !name.contains("..")); + return new Path(basePath, name + DRILL_SYS_FILE_SUFFIX); + } - final Path path = new Path(basePath, name + DRILL_SYS_FILE_SUFFIX); - // do this to check file name. - return path; + @Override + public boolean contains(String key) { + return contains(key, null); } @Override - public V get(String key) { - try{ - Path path = makePath(key); - if(!fs.exists(path)){ - return null; + public boolean contains(String key, DataChangeVersion dataChangeVersion) { + try (AutoCloseableLock lock = readLock.open()) { + try { + Path path = makePath(key); + boolean exists = fs.exists(path); + if (exists && dataChangeVersion != null) { + dataChangeVersion.setVersion(version); + } + return exists; + } catch (IOException e) { + throw new RuntimeException(e); } - }catch(IOException e){ - throw new RuntimeException(e); } + } - final Path path = makePath(key); - try (InputStream is = fs.open(path)) { - return config.getSerializer().deserialize(IOUtils.toByteArray(is)); - } catch (IOException e) { - throw new RuntimeException("Unable to deserialize \"" + path + "\"", e); + @Override + public V get(String key) { + return get(key, null); + } + + @Override + public V get(String key, DataChangeVersion dataChangeVersion) { + try (AutoCloseableLock lock = readLock.open()) { + try { + if (dataChangeVersion != null) { + dataChangeVersion.setVersion(version); + } + Path path = makePath(key); + if (!fs.exists(path)) { + return null; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + final Path path = makePath(key); + try (InputStream is = fs.open(path)) { + return config.getSerializer().deserialize(IOUtils.toByteArray(is)); + } catch (IOException e) { + throw new RuntimeException("Unable to deserialize \"" + path + "\"", e); + } } } @Override public void put(String key, V value) { - try (OutputStream os = fs.create(makePath(key))) { - IOUtils.write(config.getSerializer().serialize(value), os); - } catch (IOException e) { - throw new RuntimeException(e); + put(key, value, null); + } + + @Override + public void put(String key, V value, DataChangeVersion dataChangeVersion) { + try (AutoCloseableLock lock = writeLock.open()) { + if (dataChangeVersion != null && dataChangeVersion.getVersion() != version) { + throw new VersionMismatchException("Version mismatch detected", dataChangeVersion.getVersion()); + } + try (OutputStream os = fs.create(makePath(key))) { + IOUtils.write(config.getSerializer().serialize(value), os); + version++; + } catch (IOException e) { + throw new RuntimeException(e); + } } } @Override public boolean putIfAbsent(String key, V value) { - try { - Path p = makePath(key); - if (fs.exists(p)) { - return false; - } else { - put(key, value); - return true; + try (AutoCloseableLock lock = writeLock.open()) { + try { + Path p = makePath(key); + if (fs.exists(p)) { + return false; + } else { + try (OutputStream os = fs.create(makePath(key))) { + IOUtils.write(config.getSerializer().serialize(value), os); + version++; + } + return true; + } + } catch (IOException e) { + throw new RuntimeException(e); } - } catch (IOException e) { - throw new RuntimeException(e); } } @Override public void delete(String key) { - try { - fs.delete(makePath(key), false); - } catch (IOException e) { - throw new RuntimeException(e); + try (AutoCloseableLock lock = writeLock.open()) { + try { + fs.delete(makePath(key), false); + version++; + } catch (IOException e) { + logger.error("Unable to delete data from storage.", e); + throw new RuntimeException(e); + } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java index 55f72c90b..a3ee58eb5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -62,16 +62,26 @@ public class ZookeeperPersistentStore<V> extends BasePersistentStore<V> { } @Override + public boolean contains(final String key) { + return contains(key, null); + } + + @Override + public boolean contains(final String key, final DataChangeVersion version) { + return client.hasPath(key, true, version); + } + + @Override public V get(final String key) { return get(key, false, null); } @Override - public V get(final String key, DataChangeVersion version) { + public V get(final String key, final DataChangeVersion version) { return get(key, true, version); } - public V get(final String key, boolean consistencyFlag, DataChangeVersion version) { + public V get(final String key, final boolean consistencyFlag, final DataChangeVersion version) { byte[] bytes = client.get(key, consistencyFlag, version); if (bytes == null) { @@ -90,7 +100,7 @@ public class ZookeeperPersistentStore<V> extends BasePersistentStore<V> { } @Override - public void put(final String key, final V value, DataChangeVersion version) { + public void put(final String key, final V value, final DataChangeVersion version) { final InstanceSerializer<V> serializer = config.getSerializer(); try { final byte[] bytes = serializer.serialize(value); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java index 58ec3eab6..e36dc83f5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,8 +20,6 @@ package org.apache.drill.exec.testing.store; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -38,13 +36,13 @@ public class NoWriteLocalStore<V> extends BasePersistentStore<V> { private final AutoCloseableLock readLock = new AutoCloseableLock(readWriteLock.readLock()); private final AutoCloseableLock writeLock = new AutoCloseableLock(readWriteLock.writeLock()); private final ConcurrentMap<String, V> store = Maps.newConcurrentMap(); - private final AtomicInteger version = new AtomicInteger(); + private int version = -1; @Override public void delete(final String key) { try (AutoCloseableLock lock = writeLock.open()) { store.remove(key); - version.incrementAndGet(); + version++; } } @@ -54,6 +52,21 @@ public class NoWriteLocalStore<V> extends BasePersistentStore<V> { } @Override + public boolean contains(final String key) { + return contains(key, null); + } + + @Override + public boolean contains(final String key, final DataChangeVersion dataChangeVersion) { + try (AutoCloseableLock lock = readLock.open()) { + if (dataChangeVersion != null) { + dataChangeVersion.setVersion(version); + } + return store.containsKey(key); + } + } + + @Override public V get(final String key) { return get(key, null); } @@ -62,7 +75,7 @@ public class NoWriteLocalStore<V> extends BasePersistentStore<V> { public V get(final String key, final DataChangeVersion dataChangeVersion) { try (AutoCloseableLock lock = readLock.open()) { if (dataChangeVersion != null) { - dataChangeVersion.setVersion(version.get()); + dataChangeVersion.setVersion(version); } return store.get(key); } @@ -76,11 +89,11 @@ public class NoWriteLocalStore<V> extends BasePersistentStore<V> { @Override public void put(final String key, final V value, final DataChangeVersion dataChangeVersion) { try (AutoCloseableLock lock = writeLock.open()) { - if (dataChangeVersion != null && dataChangeVersion.getVersion() != version.get()) { + if (dataChangeVersion != null && dataChangeVersion.getVersion() != version) { throw new VersionMismatchException("Version mismatch detected", dataChangeVersion.getVersion()); } store.put(key, value); - version.incrementAndGet(); + version++; } } @@ -89,7 +102,7 @@ public class NoWriteLocalStore<V> extends BasePersistentStore<V> { try (AutoCloseableLock lock = writeLock.open()) { final V old = store.putIfAbsent(key, value); if (old == null) { - version.incrementAndGet(); + version++; return true; } return false; @@ -107,7 +120,7 @@ public class NoWriteLocalStore<V> extends BasePersistentStore<V> { public void close() throws Exception { try (AutoCloseableLock lock = writeLock.open()) { store.clear(); - version.set(0); + version = -1; } } } diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestDynamicUDFSupport.java b/exec/java-exec/src/test/java/org/apache/drill/TestDynamicUDFSupport.java index 10a03b7a8..25c01b86b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestDynamicUDFSupport.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestDynamicUDFSupport.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -134,7 +134,8 @@ public class TestDynamicUDFSupport extends BaseTestQuery { @Test public void testAbsentSourceInStaging() throws Exception { Path staging = getDrillbitContext().getRemoteFunctionRegistry().getStagingArea(); - copyJar(getDrillbitContext().getRemoteFunctionRegistry().getFs(), new Path(jars.toURI()), staging, default_binary_name); + copyJar(getDrillbitContext().getRemoteFunctionRegistry().getFs(), new Path(jars.toURI()), + staging, default_binary_name); String summary = String.format("File %s does not exist", new Path(staging, default_source_name).toUri().getPath()); @@ -157,7 +158,8 @@ public class TestDynamicUDFSupport extends BaseTestQuery { .sqlQuery("create function using jar '%s'", jarWithNoMarkerFile) .unOrdered() .baselineColumns("ok", "summary") - .baselineValues(false, String.format(summary, CommonConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME, jarWithNoMarkerFile)) + .baselineValues(false, String.format(summary, + CommonConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME, jarWithNoMarkerFile)) .go(); } @@ -201,7 +203,7 @@ public class TestDynamicUDFSupport extends BaseTestQuery { assertTrue("Source should be present in registry area", fs.exists(new Path(remoteFunctionRegistry.getRegistryArea(), default_source_name))); - Registry registry = remoteFunctionRegistry.getRegistry(); + Registry registry = remoteFunctionRegistry.getRegistry(new DataChangeVersion()); assertEquals("Registry should contain one jar", registry.getJarList().size(), 1); assertEquals(registry.getJar(0).getName(), default_binary_name); } @@ -304,7 +306,7 @@ public class TestDynamicUDFSupport extends BaseTestQuery { assertTrue("Source should be present in registry area", fs.exists(new Path(remoteFunctionRegistry.getRegistryArea(), default_source_name))); - Registry registry = remoteFunctionRegistry.getRegistry(); + Registry registry = remoteFunctionRegistry.getRegistry(new DataChangeVersion()); assertEquals("Registry should contain one jar", registry.getJarList().size(), 1); assertEquals(registry.getJar(0).getName(), default_binary_name); } @@ -337,7 +339,8 @@ public class TestDynamicUDFSupport extends BaseTestQuery { FileSystem fs = remoteFunctionRegistry.getFs(); assertFalse("Registry area should be empty", fs.listFiles(remoteFunctionRegistry.getRegistryArea(), false).hasNext()); - assertEquals("Registry should be empty", remoteFunctionRegistry.getRegistry().getJarList().size(), 0); + assertEquals("Registry should be empty", + remoteFunctionRegistry.getRegistry(new DataChangeVersion()).getJarList().size(), 0); } @Test @@ -367,10 +370,13 @@ public class TestDynamicUDFSupport extends BaseTestQuery { assertTrue("Source should be present in staging area", fs.exists(new Path(remoteFunctionRegistry.getStagingArea(), default_source_name))); - assertFalse("Registry area should be empty", fs.listFiles(remoteFunctionRegistry.getRegistryArea(), false).hasNext()); - assertFalse("Temporary area should be empty", fs.listFiles(remoteFunctionRegistry.getTmpArea(), false).hasNext()); + assertFalse("Registry area should be empty", + fs.listFiles(remoteFunctionRegistry.getRegistryArea(), false).hasNext()); + assertFalse("Temporary area should be empty", + fs.listFiles(remoteFunctionRegistry.getTmpArea(), false).hasNext()); - assertEquals("Registry should be empty", remoteFunctionRegistry.getRegistry().getJarList().size(), 0); + assertEquals("Registry should be empty", + remoteFunctionRegistry.getRegistry(new DataChangeVersion()).getJarList().size(), 0); } @Test @@ -402,7 +408,7 @@ public class TestDynamicUDFSupport extends BaseTestQuery { assertTrue("Source should be present in registry area", fs.exists(new Path(remoteFunctionRegistry.getRegistryArea(), default_source_name))); - Registry registry = remoteFunctionRegistry.getRegistry(); + Registry registry = remoteFunctionRegistry.getRegistry(new DataChangeVersion()); assertEquals("Registry should contain one jar", registry.getJarList().size(), 1); assertEquals(registry.getJar(0).getName(), default_binary_name); } @@ -424,7 +430,8 @@ public class TestDynamicUDFSupport extends BaseTestQuery { .baselineValues("a") .go(); - Path localUdfDirPath = Deencapsulation.getField(getDrillbitContext().getFunctionImplementationRegistry(), "localUdfDir"); + Path localUdfDirPath = Deencapsulation.getField( + getDrillbitContext().getFunctionImplementationRegistry(), "localUdfDir"); File localUdfDir = new File(localUdfDirPath.toUri().getPath()); assertTrue("Binary should exist in local udf directory", new File(localUdfDir, default_binary_name).exists()); @@ -455,6 +462,33 @@ public class TestDynamicUDFSupport extends BaseTestQuery { } } + @Test + public void testOverloadedFunctionPlanningStage() throws Exception { + String jarName = "DrillUDF-overloading-1.0.jar"; + copyJarsToStagingArea(jarName, JarUtil.getSourceName(jarName)); + test("create function using jar '%s'", jarName); + + testBuilder() + .sqlQuery("select abs('A', 'A') as res from (values(1))") + .unOrdered() + .baselineColumns("res") + .baselineValues("ABS was overloaded. Input: A, A") + .go(); + } + + @Test + public void testOverloadedFunctionExecutionStage() throws Exception { + String jarName = "DrillUDF-overloading-1.0.jar"; + copyJarsToStagingArea(jarName, JarUtil.getSourceName(jarName)); + test("create function using jar '%s'", jarName); + + testBuilder() + .sqlQuery("select log('A') as res from (values(1))") + .unOrdered() + .baselineColumns("res") + .baselineValues("LOG was overloaded. Input: A") + .go(); + } @Test public void testDropFunction() throws Exception { @@ -462,7 +496,8 @@ public class TestDynamicUDFSupport extends BaseTestQuery { test("create function using jar '%s'", default_binary_name); test("select custom_lower('A') from (values(1))"); - Path localUdfDirPath = Deencapsulation.getField(getDrillbitContext().getFunctionImplementationRegistry(), "localUdfDir"); + Path localUdfDirPath = Deencapsulation.getField( + getDrillbitContext().getFunctionImplementationRegistry(), "localUdfDir"); File localUdfDir = new File(localUdfDirPath.toUri().getPath()); assertTrue("Binary should exist in local udf directory", new File(localUdfDir, default_binary_name).exists()); @@ -485,7 +520,8 @@ public class TestDynamicUDFSupport extends BaseTestQuery { } RemoteFunctionRegistry remoteFunctionRegistry = getDrillbitContext().getRemoteFunctionRegistry(); - assertEquals("Remote registry should be empty", remoteFunctionRegistry.getRegistry().getJarList().size(), 0); + assertEquals("Remote registry should be empty", + remoteFunctionRegistry.getRegistry(new DataChangeVersion()).getJarList().size(), 0); FileSystem fs = remoteFunctionRegistry.getFs(); assertFalse("Binary should not be present in registry area", @@ -561,8 +597,10 @@ public class TestDynamicUDFSupport extends BaseTestQuery { .baselineValues(false, errorMessage) .go(); - assertFalse("Registry area should be empty", fs.listFiles(remoteFunctionRegistry.getRegistryArea(), false).hasNext()); - assertFalse("Temporary area should be empty", fs.listFiles(remoteFunctionRegistry.getTmpArea(), false).hasNext()); + assertFalse("Registry area should be empty", + fs.listFiles(remoteFunctionRegistry.getRegistryArea(), false).hasNext()); + assertFalse("Temporary area should be empty", + fs.listFiles(remoteFunctionRegistry.getTmpArea(), false).hasNext()); assertTrue("Binary should be present in staging area", fs.exists(new Path(remoteFunctionRegistry.getStagingArea(), default_binary_name))); @@ -684,7 +722,7 @@ public class TestDynamicUDFSupport extends BaseTestQuery { DataChangeVersion version = new DataChangeVersion(); Registry registry = remoteFunctionRegistry.getRegistry(version); - assertEquals("Remote registry version should match", 2, version.getVersion()); + assertEquals("Remote registry version should match", 1, version.getVersion()); List<Jar> jarList = registry.getJarList(); assertEquals("Only one jar should be registered", 1, jarList.size()); assertEquals("Jar name should match", jarName1, jarList.get(0).getName()); @@ -748,7 +786,7 @@ public class TestDynamicUDFSupport extends BaseTestQuery { DataChangeVersion version = new DataChangeVersion(); Registry registry = remoteFunctionRegistry.getRegistry(version); - assertEquals("Remote registry version should match", 3, version.getVersion()); + assertEquals("Remote registry version should match", 2, version.getVersion()); List<Jar> actualJars = registry.getJarList(); List<String> expectedJars = Lists.newArrayList(jarName1, jarName2); @@ -777,7 +815,7 @@ public class TestDynamicUDFSupport extends BaseTestQuery { public Boolean answer(InvocationOnMock invocation) throws Throwable { latch1.await(); boolean result = (boolean) invocation.callRealMethod(); - assertTrue("loadRemoteFunctions() should return true", result); + assertTrue("syncWithRemoteRegistry() should return true", result); latch2.countDown(); return true; } @@ -788,11 +826,11 @@ public class TestDynamicUDFSupport extends BaseTestQuery { latch1.countDown(); latch2.await(); boolean result = (boolean) invocation.callRealMethod(); - assertTrue("loadRemoteFunctions() should return true", result); + assertTrue("syncWithRemoteRegistry() should return true", result); return true; } }) - .when(functionImplementationRegistry).loadRemoteFunctions(anyLong()); + .when(functionImplementationRegistry).syncWithRemoteRegistry(anyLong()); SimpleQueryRunner simpleQueryRunner = new SimpleQueryRunner(query); Thread thread1 = new Thread(simpleQueryRunner); @@ -804,9 +842,10 @@ public class TestDynamicUDFSupport extends BaseTestQuery { thread1.join(); thread2.join(); - verify(functionImplementationRegistry, times(2)).loadRemoteFunctions(anyLong()); - LocalFunctionRegistry localFunctionRegistry = Deencapsulation.getField(functionImplementationRegistry, "localFunctionRegistry"); - assertEquals("Local functionRegistry version should match", 2L, localFunctionRegistry.getVersion()); + verify(functionImplementationRegistry, times(2)).syncWithRemoteRegistry(anyLong()); + LocalFunctionRegistry localFunctionRegistry = Deencapsulation.getField( + functionImplementationRegistry, "localFunctionRegistry"); + assertEquals("Sync function registry version should match", 1L, localFunctionRegistry.getVersion()); } @Test @@ -819,7 +858,7 @@ public class TestDynamicUDFSupport extends BaseTestQuery { @Override public Boolean answer(InvocationOnMock invocation) throws Throwable { boolean result = (boolean) invocation.callRealMethod(); - assertTrue("loadRemoteFunctions() should return true", result); + assertTrue("syncWithRemoteRegistry() should return true", result); return true; } }) @@ -827,11 +866,11 @@ public class TestDynamicUDFSupport extends BaseTestQuery { @Override public Boolean answer(InvocationOnMock invocation) throws Throwable { boolean result = (boolean) invocation.callRealMethod(); - assertFalse("loadRemoteFunctions() should return false", result); + assertFalse("syncWithRemoteRegistry() should return false", result); return false; } }) - .when(functionImplementationRegistry).loadRemoteFunctions(anyLong()); + .when(functionImplementationRegistry).syncWithRemoteRegistry(anyLong()); test("select custom_lower('A') from (values(1))"); @@ -841,9 +880,10 @@ public class TestDynamicUDFSupport extends BaseTestQuery { assertThat(e.getMessage(), containsString("No match found for function signature unknown_lower(<CHARACTER>)")); } - verify(functionImplementationRegistry, times(2)).loadRemoteFunctions(anyLong()); - LocalFunctionRegistry localFunctionRegistry = Deencapsulation.getField(functionImplementationRegistry, "localFunctionRegistry"); - assertEquals("Local functionRegistry version should match", 2L, localFunctionRegistry.getVersion()); + verify(functionImplementationRegistry, times(2)).syncWithRemoteRegistry(anyLong()); + LocalFunctionRegistry localFunctionRegistry = Deencapsulation.getField( + functionImplementationRegistry, "localFunctionRegistry"); + assertEquals("Sync function registry version should match", 1L, localFunctionRegistry.getVersion()); } private void copyDefaultJarsToStagingArea() throws IOException { @@ -866,7 +906,8 @@ public class TestDynamicUDFSupport extends BaseTestQuery { } private RemoteFunctionRegistry spyRemoteFunctionRegistry() { - FunctionImplementationRegistry functionImplementationRegistry = getDrillbitContext().getFunctionImplementationRegistry(); + FunctionImplementationRegistry functionImplementationRegistry = + getDrillbitContext().getFunctionImplementationRegistry(); RemoteFunctionRegistry remoteFunctionRegistry = functionImplementationRegistry.getRemoteFunctionRegistry(); RemoteFunctionRegistry spy = spy(remoteFunctionRegistry); Deencapsulation.setField(functionImplementationRegistry, "remoteFunctionRegistry", spy); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java b/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java index ab886c4e3..88f1fcb03 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,19 +22,13 @@ import java.util.List; import java.util.Map; import com.google.common.collect.Lists; -import org.apache.curator.CuratorZookeeperClient; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable; -import org.apache.curator.framework.api.CreateBuilder; -import org.apache.curator.framework.api.DeleteBuilder; -import org.apache.curator.framework.api.SetDataBuilder; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.retry.RetryNTimes; import org.apache.curator.test.TestingServer; -import org.apache.curator.utils.EnsurePath; import org.apache.drill.common.collections.ImmutableEntry; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.exec.exception.VersionMismatchException; @@ -47,7 +41,9 @@ import org.junit.Test; import org.mockito.Mockito; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; public class TestZookeeperClient { private final static String root = "/test"; @@ -132,6 +128,26 @@ public class TestZookeeperClient { } @Test + public void testHasPathTrueWithVersion() { + client.put(path, data); + DataChangeVersion version0 = new DataChangeVersion(); + assertTrue(client.hasPath(path, true, version0)); + assertEquals("Versions should match", 0, version0.getVersion()); + client.put(path, data); + DataChangeVersion version1 = new DataChangeVersion(); + assertTrue(client.hasPath(path, true, version1)); + assertEquals("Versions should match", 1, version1.getVersion()); + } + + @Test + public void testHasPathFalseWithVersion() { + DataChangeVersion version0 = new DataChangeVersion(); + version0.setVersion(-1); + assertFalse(client.hasPath("unknown_path", true, version0)); + assertEquals("Versions should not have changed", -1, version0.getVersion()); + } + + @Test public void testPutAndGetWorks() { client.put(path, data); final byte[] actual = client.get(path, true); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolderTest.java index 61fa4e5b2..cd4dd9950 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolderTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolderTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -60,25 +60,24 @@ public class FunctionRegistryHolderTest { @Before public void setup() { resetRegistry(); - fillInRegistry(); + fillInRegistry(1); } @Test public void testVersion() { resetRegistry(); - assertEquals("Initial version should be 0", 0, registryHolder.getVersion()); - registryHolder.addJars(Maps.<String, List<FunctionHolder>>newHashMap()); - assertEquals("Version should not change if no jars were added.", 0, registryHolder.getVersion()); - registryHolder.removeJar("unknown.jar"); - assertEquals("Version should not change if no jars were removed.", 0, registryHolder.getVersion()); - fillInRegistry(); - assertEquals("Version should have incremented by 1", 1, registryHolder.getVersion()); + long expectedVersion = 0; + assertEquals("Initial version should be 0", expectedVersion, registryHolder.getVersion()); + registryHolder.addJars(Maps.<String, List<FunctionHolder>>newHashMap(), ++expectedVersion); + assertEquals("Version can change if no jars were added.", expectedVersion, registryHolder.getVersion()); + fillInRegistry(++expectedVersion); + assertEquals("Version should have incremented by 1", expectedVersion, registryHolder.getVersion()); registryHolder.removeJar(built_in); - assertEquals("Version should have incremented by 1", 2, registryHolder.getVersion()); - fillInRegistry(); - assertEquals("Version should have incremented by 1", 3, registryHolder.getVersion()); - fillInRegistry(); - assertEquals("Version should have incremented by 1", 4, registryHolder.getVersion()); + assertEquals("Version should have incremented by 1", expectedVersion, registryHolder.getVersion()); + fillInRegistry(++expectedVersion); + assertEquals("Version should have incremented by 1", expectedVersion, registryHolder.getVersion()); + fillInRegistry(++expectedVersion); + assertEquals("Version should have incremented by 1", expectedVersion, registryHolder.getVersion()); } @Test @@ -97,8 +96,9 @@ public class FunctionRegistryHolderTest { } } - registryHolder.addJars(newJars); - assertEquals("Version number should match", 1, registryHolder.getVersion()); + long expectedVersion = 0; + registryHolder.addJars(newJars, ++expectedVersion); + assertEquals("Version number should match", expectedVersion, registryHolder.getVersion()); compareTwoLists(jars, registryHolder.getAllJarNames()); assertEquals(functionsSize, registryHolder.functionsSize()); compareListMultimaps(functionsWithHolders, registryHolder.getAllFunctionsWithHolders()); @@ -120,16 +120,17 @@ public class FunctionRegistryHolderTest { functionsSize++; } } - registryHolder.addJars(newJars); - assertEquals("Version number should match", 1, registryHolder.getVersion()); + long expectedVersion = 0; + registryHolder.addJars(newJars, ++expectedVersion); + assertEquals("Version number should match", expectedVersion, registryHolder.getVersion()); compareTwoLists(jars, registryHolder.getAllJarNames()); assertEquals(functionsSize, registryHolder.functionsSize()); compareListMultimaps(functionsWithHolders, registryHolder.getAllFunctionsWithHolders()); compareListMultimaps(functionsWithSignatures, registryHolder.getAllFunctionsWithSignatures()); // adding the same jars should not cause adding duplicates, should override existing jars only - registryHolder.addJars(newJars); - assertEquals("Version number should match", 2, registryHolder.getVersion()); + registryHolder.addJars(newJars, ++expectedVersion); + assertEquals("Version number should match", expectedVersion, registryHolder.getVersion()); compareTwoLists(jars, registryHolder.getAllJarNames()); assertEquals(functionsSize, registryHolder.functionsSize()); compareListMultimaps(functionsWithHolders, registryHolder.getAllFunctionsWithHolders()); @@ -252,8 +253,8 @@ public class FunctionRegistryHolderTest { registryHolder = new FunctionRegistryHolder(); } - private void fillInRegistry() { - registryHolder.addJars(newJars); + private void fillInRegistry(long version) { + registryHolder.addJars(newJars, version); } private <T> void compareListMultimaps(ListMultimap<String, T> lm1, ListMultimap<String, T> lm2) { diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java index 8b338af8c..284769625 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -45,7 +45,6 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ExpressionTreeMaterializer; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry; -import org.apache.drill.exec.proto.UserBitShared.Registry; import org.junit.Test; import com.google.common.collect.ImmutableList; @@ -202,8 +201,8 @@ public class ExpressionTreeMaterializerTest extends ExecTest { new MockUp<RemoteFunctionRegistry>() { @Mock - Registry getRegistry() { - return Registry.getDefaultInstance(); + long getRegistryVersion() { + return 0L; } }; diff --git a/exec/java-exec/src/test/resources/jars/DrillUDF-overloading-1.0-sources.jar b/exec/java-exec/src/test/resources/jars/DrillUDF-overloading-1.0-sources.jar Binary files differnew file mode 100644 index 000000000..f6b250ec0 --- /dev/null +++ b/exec/java-exec/src/test/resources/jars/DrillUDF-overloading-1.0-sources.jar diff --git a/exec/java-exec/src/test/resources/jars/DrillUDF-overloading-1.0.jar b/exec/java-exec/src/test/resources/jars/DrillUDF-overloading-1.0.jar Binary files differnew file mode 100644 index 000000000..4b5ef8bc4 --- /dev/null +++ b/exec/java-exec/src/test/resources/jars/DrillUDF-overloading-1.0.jar |