aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArina Ielchiieva <arina.yelchiyeva@gmail.com>2016-12-20 16:57:15 +0000
committerJinfeng Ni <jni@apache.org>2017-03-01 23:46:19 -0800
commitdcbcb94fd2695edd4bbca63b2759292e99695d47 (patch)
tree7bbfc6493c42caa02a64d5478f65d32932417f8a
parent79811db5aa8c7f2cdbe6f74c0a40124bea9fb1fd (diff)
DRILL-4963: Fix issues with dynamically loaded overloaded functions
close #701
-rw-r--r--contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java16
-rw-r--r--contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java7
-rw-r--r--contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java14
-rw-r--r--distribution/src/resources/drill-override-example.conf3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java45
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java27
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java219
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java27
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java31
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java24
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java29
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java70
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java20
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java188
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java18
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java33
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/TestDynamicUDFSupport.java101
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java30
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolderTest.java45
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java7
-rw-r--r--exec/java-exec/src/test/resources/jars/DrillUDF-overloading-1.0-sources.jarbin0 -> 3473 bytes
-rw-r--r--exec/java-exec/src/test/resources/jars/DrillUDF-overloading-1.0.jarbin0 -> 5779 bytes
27 files changed, 639 insertions, 349 deletions
diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java
index 2d329a852..ef6bbfea1 100644
--- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java
+++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/config/HBasePersistentStore.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -67,6 +67,20 @@ public class HBasePersistentStore<V> extends BasePersistentStore<V> {
}
@Override
+ public boolean contains(String key) {
+ try {
+ Get get = new Get(row(key));
+ get.addColumn(FAMILY, QUALIFIER);
+ return hbaseTable.exists(get);
+ } catch (IOException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Caught error while checking row existence '%s' for table '%s'", key, hbaseTableName)
+ .build(logger);
+ }
+ }
+
+ @Override
public V get(String key) {
return get(key, FAMILY);
}
diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java
index 6b73283cf..f2783593f 100644
--- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java
+++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseTableProvider.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,6 +18,8 @@
package org.apache.drill.hbase;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.util.Map.Entry;
@@ -57,6 +59,9 @@ public class TestHBaseTableProvider extends BaseHBaseTest {
assertEquals("v0", hbaseStore.get(""));
assertEquals("testValue", hbaseStore.get(".test"));
+ assertTrue(hbaseStore.contains(""));
+ assertFalse(hbaseStore.contains("unknown_key"));
+
int rowCount = 0;
for (Entry<String, String> entry : Lists.newArrayList(hbaseStore.getAll())) {
rowCount++;
diff --git a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java
index b5cc3eefa..73ff31de5 100644
--- a/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java
+++ b/contrib/storage-mongo/src/main/java/org/apache/drill/exec/store/mongo/config/MongoPersistentStore.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -61,6 +61,18 @@ public class MongoPersistentStore<V> extends BasePersistentStore<V> {
}
@Override
+ public boolean contains(String key) {
+ try {
+ Bson query = Filters.eq(DrillMongoConstants.ID, key);
+ Document document = collection.find(query).first();
+ return document != null && document.containsKey(pKey);
+ } catch (Exception e) {
+ logger.error(e.getMessage(), e);
+ throw new DrillRuntimeException(e.getMessage(), e);
+ }
+ }
+
+ @Override
public V get(String key) {
try {
Bson query = Filters.eq(DrillMongoConstants.ID, key);
diff --git a/distribution/src/resources/drill-override-example.conf b/distribution/src/resources/drill-override-example.conf
index 43f9942ba..b9d09a8fb 100644
--- a/distribution/src/resources/drill-override-example.conf
+++ b/distribution/src/resources/drill-override-example.conf
@@ -171,8 +171,7 @@ drill.exec: {
decode_threadpool_size: 1
},
debug.error_on_leak: true,
- # Settings for Dynamic UDFs.
- # See https://gist.github.com/arina-ielchiieva/a1c4cfa3890145c5ecb1b70a39cbff55#file-dynamicudfssupport-md.
+ # Settings for Dynamic UDFs (see https://issues.apache.org/jira/browse/DRILL-4726 for details).
udf: {
# number of retry attempts to update remote function registry
# if registry version was changed during update
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
index 610a2b9e9..17cb6cbbd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/zk/ZookeeperClient.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -85,32 +85,53 @@ public class ZookeeperClient implements AutoCloseable {
/**
* Returns true if path exists in the cache, false otherwise.
- *
* Note that calls to this method are eventually consistent.
*
- * @param path path to check
+ * @param path path to check
+ * @return true if path exists, false otherwise
*/
public boolean hasPath(final String path) {
- return hasPath(path, false);
+ return hasPath(path, false, null);
+ }
+
+ /**
+ * Returns true if path exists, false otherwise.
+ * If consistent flag is set to true, check is done directly is made against Zookeeper directly,
+ * else check is done against local cache.
+ *
+ * @param path path to check
+ * @param consistent whether the check should be consistent
+ * @return true if path exists, false otherwise
+ */
+ public boolean hasPath(final String path, final boolean consistent) {
+ return hasPath(path, consistent, null);
}
/**
* Checks if the given path exists.
+ * If the flag consistent is set, the check is consistent as it is made against Zookeeper directly.
+ * Otherwise, the check is eventually consistent.
*
- * If the flag consistent is set, the check is consistent as it is made against Zookeeper directly. Otherwise,
- * the check is eventually consistent.
+ * If consistency flag is set to true and version holder is not null, passes version holder to get data change version.
+ * Data change version is retrieved from {@link Stat} object, it increases each time znode data change is performed.
+ * Link to Zookeeper documentation - https://zookeeper.apache.org/doc/r3.2.2/zookeeperProgrammers.html#sc_zkDataModel_znodes
*
- * @param path path to check
- * @param consistent whether the check should be consistent
- * @return
+ * @param path path to check
+ * @param consistent whether the check should be consistent
+ * @param version version holder
+ * @return true if path exists, false otherwise
*/
- public boolean hasPath(final String path, final boolean consistent) {
+ public boolean hasPath(final String path, final boolean consistent, final DataChangeVersion version) {
Preconditions.checkNotNull(path, "path is required");
final String target = PathUtils.join(root, path);
try {
if (consistent) {
- return curator.checkExists().forPath(target) != null;
+ Stat stat = curator.checkExists().forPath(target);
+ if (version != null && stat != null) {
+ version.setVersion(stat.getVersion());
+ }
+ return stat != null;
} else {
return getCache().getCurrentData(target) != null;
}
@@ -153,7 +174,7 @@ public class ZookeeperClient implements AutoCloseable {
* @param path target path
* @param version version holder
*/
- public byte[] get(final String path, DataChangeVersion version) {
+ public byte[] get(final String path, final DataChangeVersion version) {
return get(path, true, version);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java b/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java
deleted file mode 100644
index 0d59cc8d7..000000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/exception/FunctionNotFoundException.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.exception;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-
-public class FunctionNotFoundException extends DrillRuntimeException {
-
- public FunctionNotFoundException(String message, Throwable cause) {
- super(message, cause);
- }
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
index ce0d68b64..5c7bfb433 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -58,11 +58,13 @@ import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry;
import org.apache.drill.exec.planner.sql.DrillOperatorTable;
import org.apache.drill.exec.proto.UserBitShared.Jar;
import org.apache.drill.exec.resolver.FunctionResolver;
+import org.apache.drill.exec.resolver.FunctionResolverFactory;
import org.apache.drill.exec.server.options.OptionManager;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
+import org.apache.drill.exec.store.sys.store.DataChangeVersion;
import org.apache.drill.exec.util.JarUtil;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -128,7 +130,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
/**
* Register functions in given operator table.
- * @param operatorTable
+ * @param operatorTable operator table
*/
public void register(DrillOperatorTable operatorTable) {
// Register Drill functions first and move to pluggable function registries.
@@ -140,27 +142,39 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
}
/**
- * Using the given <code>functionResolver</code>
- * finds Drill function implementation for given <code>functionCall</code>.
- * If function implementation was not found,
- * loads all missing remote functions and tries to find Drill implementation one more time.
+ * First attempts to finds the Drill function implementation that matches the name, arg types and return type.
+ * If exact function implementation was not found,
+ * syncs local function registry with remote function registry if needed
+ * and tries to find function implementation one more time
+ * but this time using given <code>functionResolver</code>.
+ *
+ * @param functionResolver function resolver
+ * @param functionCall function call
+ * @return best matching function holder
*/
@Override
public DrillFuncHolder findDrillFunction(FunctionResolver functionResolver, FunctionCall functionCall) {
- return findDrillFunction(functionResolver, functionCall, true);
- }
-
- private DrillFuncHolder findDrillFunction(FunctionResolver functionResolver, FunctionCall functionCall, boolean retry) {
AtomicLong version = new AtomicLong();
- DrillFuncHolder holder = functionResolver.getBestMatch(
- localFunctionRegistry.getMethods(functionReplacement(functionCall), version), functionCall);
- if (holder == null && retry && loadRemoteFunctions(version.get())) {
- return findDrillFunction(functionResolver, functionCall, false);
+ String newFunctionName = functionReplacement(functionCall);
+ List<DrillFuncHolder> functions = localFunctionRegistry.getMethods(newFunctionName, version);
+ FunctionResolver exactResolver = FunctionResolverFactory.getExactResolver(functionCall);
+ DrillFuncHolder holder = exactResolver.getBestMatch(functions, functionCall);
+
+ if (holder == null) {
+ syncWithRemoteRegistry(version.get());
+ List<DrillFuncHolder> updatedFunctions = localFunctionRegistry.getMethods(newFunctionName, version);
+ holder = functionResolver.getBestMatch(updatedFunctions, functionCall);
}
+
return holder;
}
- // Check if this Function Replacement is needed; if yes, return a new name. otherwise, return the original name
+ /**
+ * Checks if this function replacement is needed.
+ *
+ * @param functionCall function call
+ * @return new function name is replacement took place, otherwise original function name
+ */
private String functionReplacement(FunctionCall functionCall) {
String funcName = functionCall.getName();
if (functionCall.args.size() > 0) {
@@ -178,22 +192,41 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
}
/**
- * Find the Drill function implementation that matches the name, arg types and return type.
- * If exact function implementation was not found,
- * loads all missing remote functions and tries to find Drill implementation one more time.
+ * Finds the Drill function implementation that matches the name, arg types and return type.
+ *
+ * @param name function name
+ * @param argTypes input parameters types
+ * @param returnType function return type
+ * @return exactly matching function holder
*/
public DrillFuncHolder findExactMatchingDrillFunction(String name, List<MajorType> argTypes, MajorType returnType) {
return findExactMatchingDrillFunction(name, argTypes, returnType, true);
}
- private DrillFuncHolder findExactMatchingDrillFunction(String name, List<MajorType> argTypes, MajorType returnType, boolean retry) {
+ /**
+ * Finds the Drill function implementation that matches the name, arg types and return type.
+ * If exact function implementation was not found,
+ * checks if local function registry is in sync with remote function registry.
+ * If not syncs them and tries to find exact function implementation one more time
+ * but with retry flag set to false.
+ *
+ * @param name function name
+ * @param argTypes input parameters types
+ * @param returnType function return type
+ * @param retry retry on failure flag
+ * @return exactly matching function holder
+ */
+ private DrillFuncHolder findExactMatchingDrillFunction(String name,
+ List<MajorType> argTypes,
+ MajorType returnType,
+ boolean retry) {
AtomicLong version = new AtomicLong();
for (DrillFuncHolder h : localFunctionRegistry.getMethods(name, version)) {
if (h.matches(returnType, argTypes)) {
return h;
}
}
- if (retry && loadRemoteFunctions(version.get())) {
+ if (retry && syncWithRemoteRegistry(version.get())) {
return findExactMatchingDrillFunction(name, argTypes, returnType, false);
}
return null;
@@ -206,8 +239,8 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
* Note: Order of searching is same as order of {@link org.apache.drill.exec.expr.fn.PluggableFunctionRegistry}
* implementations found on classpath.
*
- * @param functionCall
- * @return
+ * @param functionCall function call
+ * @return drill function holder
*/
@Override
public AbstractFuncHolder findNonDrillFunction(FunctionCall functionCall) {
@@ -260,76 +293,101 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
}
/**
- * Attempts to load and register functions from remote function registry.
- * First checks if there is no missing jars.
- * If yes, enters synchronized block to prevent other loading the same jars.
- * Again re-checks if there are no missing jars in case someone has already loaded them (double-check lock).
- * If there are still missing jars, first copies jars to local udf area and prepares {@link JarScan} for each jar.
- * Jar registration timestamp represented in milliseconds is used as suffix.
- * Then registers all jars at the same time. Returns true when finished.
- * In case if any errors during jars coping or registration, logs errors and proceeds.
+ * Purpose of this method is to synchronize remote and local function registries if needed
+ * and to inform if function registry was changed after given version.
+ *
+ * To make synchronization as much light-weigh as possible, first only versions of both registries are checked
+ * without any locking. If synchronization is needed, enters synchronized block to prevent others loading the same jars.
+ * The need of synchronization is checked again (double-check lock) before comparing jars.
+ * If any missing jars are found, they are downloaded to local udf area, each is wrapped into {@link JarScan}.
+ * Once jar download is finished, all missing jars are registered in one batch.
+ * In case if any errors during jars download / registration, these errors are logged.
*
- * If no missing jars are found, checks current local registry version.
- * Returns false if versions match, true otherwise.
+ * During registration local function registry is updated with remote function registry version it is synced with.
+ * When at least one jar of the missing jars failed to download / register,
+ * local function registry version are not updated but jars that where successfully downloaded / registered
+ * are added to local function registry.
*
- * @param version local function registry version
- * @return true if new jars were registered or local function registry version is different, false otherwise
+ * If synchronization between remote and local function registry was not needed,
+ * checks if given registry version matches latest sync version
+ * to inform if function registry was changed after given version.
+ *
+ * @param version remote function registry local function registry was based on
+ * @return true if remote and local function registries were synchronized after given version
*/
- public boolean loadRemoteFunctions(long version) {
- List<String> missingJars = getMissingJars(remoteFunctionRegistry, localFunctionRegistry);
- if (!missingJars.isEmpty()) {
+ public boolean syncWithRemoteRegistry(long version) {
+ if (isRegistrySyncNeeded(remoteFunctionRegistry.getRegistryVersion(), localFunctionRegistry.getVersion())) {
synchronized (this) {
- missingJars = getMissingJars(remoteFunctionRegistry, localFunctionRegistry);
- if (!missingJars.isEmpty()) {
- logger.info("Starting dynamic UDFs lazy-init process.\n" +
- "The following jars are going to be downloaded and registered locally: " + missingJars);
+ long localRegistryVersion = localFunctionRegistry.getVersion();
+ if (isRegistrySyncNeeded(remoteFunctionRegistry.getRegistryVersion(), localRegistryVersion)) {
+ DataChangeVersion remoteVersion = new DataChangeVersion();
+ List<String> missingJars = getMissingJars(this.remoteFunctionRegistry, localFunctionRegistry, remoteVersion);
List<JarScan> jars = Lists.newArrayList();
- for (String jarName : missingJars) {
- Path binary = null;
- Path source = null;
- URLClassLoader classLoader = null;
- try {
- binary = copyJarToLocal(jarName, remoteFunctionRegistry);
- source = copyJarToLocal(JarUtil.getSourceName(jarName), remoteFunctionRegistry);
- URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()};
- classLoader = new URLClassLoader(urls);
- ScanResult scanResult = scan(classLoader, binary, urls);
- localFunctionRegistry.validate(jarName, scanResult);
- jars.add(new JarScan(jarName, scanResult, classLoader));
- } catch (Exception e) {
- deleteQuietlyLocalJar(binary);
- deleteQuietlyLocalJar(source);
- if (classLoader != null) {
- try {
- classLoader.close();
- } catch (Exception ex) {
- logger.warn("Problem during closing class loader for {}", jarName, e);
+ if (!missingJars.isEmpty()) {
+ logger.info("Starting dynamic UDFs lazy-init process.\n" +
+ "The following jars are going to be downloaded and registered locally: " + missingJars);
+ for (String jarName : missingJars) {
+ Path binary = null;
+ Path source = null;
+ URLClassLoader classLoader = null;
+ try {
+ binary = copyJarToLocal(jarName, this.remoteFunctionRegistry);
+ source = copyJarToLocal(JarUtil.getSourceName(jarName), this.remoteFunctionRegistry);
+ URL[] urls = {binary.toUri().toURL(), source.toUri().toURL()};
+ classLoader = new URLClassLoader(urls);
+ ScanResult scanResult = scan(classLoader, binary, urls);
+ localFunctionRegistry.validate(jarName, scanResult);
+ jars.add(new JarScan(jarName, scanResult, classLoader));
+ } catch (Exception e) {
+ deleteQuietlyLocalJar(binary);
+ deleteQuietlyLocalJar(source);
+ if (classLoader != null) {
+ try {
+ classLoader.close();
+ } catch (Exception ex) {
+ logger.warn("Problem during closing class loader for {}", jarName, e);
+ }
}
+ logger.error("Problem during remote functions load from {}", jarName, e);
}
- logger.error("Problem during remote functions load from {}", jarName, e);
}
}
- if (!jars.isEmpty()) {
- localFunctionRegistry.register(jars);
- return true;
- }
+ long latestRegistryVersion = jars.size() != missingJars.size() ?
+ localRegistryVersion : remoteVersion.getVersion();
+ localFunctionRegistry.register(jars, latestRegistryVersion);
+ return true;
}
}
}
+
return version != localFunctionRegistry.getVersion();
}
/**
- * First finds path to marker file url, otherwise throws {@link JarValidationException}.
- * Then scans jar classes according to list indicated in marker files.
- * Additional logic is added to close {@link URL} after {@link ConfigFactory#parseURL(URL)}.
- * This is extremely important for Windows users where system doesn't allow to delete file if it's being used.
+ * Checks if local function registry should be synchronized with remote function registry.
+ * If remote function registry version is -1, it means that remote function registry is unreachable
+ * or is not configured thus we skip synchronization and return false.
+ * In all other cases synchronization is needed if remote and local function registries versions do not match.
*
- * @param classLoader unique class loader for jar
- * @param path local path to jar
- * @param urls urls associated with the jar (ex: binary and source)
- * @return scan result of packages, classes, annotations found in jar
+ * @param remoteVersion remote function registry version
+ * @param localVersion local function registry version
+ * @return true is local registry should be refreshed, false otherwise
*/
+ private boolean isRegistrySyncNeeded(long remoteVersion, long localVersion) {
+ return remoteVersion != -1 && remoteVersion != localVersion;
+ }
+
+ /**
+ * First finds path to marker file url, otherwise throws {@link JarValidationException}.
+ * Then scans jar classes according to list indicated in marker files.
+ * Additional logic is added to close {@link URL} after {@link ConfigFactory#parseURL(URL)}.
+ * This is extremely important for Windows users where system doesn't allow to delete file if it's being used.
+ *
+ * @param classLoader unique class loader for jar
+ * @param path local path to jar
+ * @param urls urls associated with the jar (ex: binary and source)
+ * @return scan result of packages, classes, annotations found in jar
+ */
private ScanResult scan(ClassLoader classLoader, Path path, URL[] urls) throws IOException {
Enumeration<URL> markerFileEnumeration = classLoader.getResources(
CommonConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME);
@@ -355,14 +413,17 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
/**
* Return list of jars that are missing in local function registry
* but present in remote function registry.
+ * Also updates version holder with remote function registry version.
*
* @param remoteFunctionRegistry remote function registry
* @param localFunctionRegistry local function registry
+ * @param version holder for remote function registry version
* @return list of missing jars
*/
private List<String> getMissingJars(RemoteFunctionRegistry remoteFunctionRegistry,
- LocalFunctionRegistry localFunctionRegistry) {
- List<Jar> remoteJars = remoteFunctionRegistry.getRegistry().getJarList();
+ LocalFunctionRegistry localFunctionRegistry,
+ DataChangeVersion version) {
+ List<Jar> remoteJars = remoteFunctionRegistry.getRegistry(version).getJarList();
List<String> localJars = localFunctionRegistry.getAllJarNames();
List<String> missingJars = Lists.newArrayList();
for (Jar jar : remoteJars) {
@@ -384,8 +445,10 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
private Path getLocalUdfDir(DrillConfig config) {
tmpDir = getTmpDir(config);
File udfDir = new File(tmpDir, config.getString(ExecConstants.UDF_DIRECTORY_LOCAL));
- udfDir.mkdirs();
String udfPath = udfDir.getPath();
+ if (udfDir.mkdirs()) {
+ logger.debug("Local udf directory [{}] was created", udfPath);
+ }
Preconditions.checkState(udfDir.exists(), "Local udf directory [%s] must exist", udfPath);
Preconditions.checkState(udfDir.isDirectory(), "Local udf directory [%s] must be a directory", udfPath);
Preconditions.checkState(udfDir.canWrite(), "Local udf directory [%s] must be writable for application user", udfPath);
@@ -404,6 +467,8 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
* If value is still missing, generates directory using {@link Files#createTempDir()}.
* If temporary directory was generated, sets {@link #deleteTmpDir} to true
* to delete directory on drillbit exit.
+ *
+ * @param config drill config
* @return drill temporary directory path
*/
private File getTmpDir(DrillConfig config) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
index 005c4e58c..3124539cc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -42,8 +42,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* since we expect infrequent registry changes.
* Holder is designed to allow concurrent reads and single writes to keep data consistent.
* This is achieved by {@link ReadWriteLock} implementation usage.
- * Holder has number version which changes every time new jars are added or removed. Initial version number is 0.
- * Also version is used when user needs data from registry with version it is based on.
+ * Holder has number version which indicates remote function registry version number it is in sync with.
*
* Structure example:
*
@@ -86,7 +85,8 @@ public class FunctionRegistryHolder {
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private final AutoCloseableLock readLock = new AutoCloseableLock(readWriteLock.readLock());
private final AutoCloseableLock writeLock = new AutoCloseableLock(readWriteLock.writeLock());
- private long version = 0;
+ // remote function registry number, it is in sync with
+ private long version;
// jar name, Map<function name, Queue<function signature>
private final Map<String, Map<String, Queue<String>>> jars;
@@ -114,13 +114,13 @@ public class FunctionRegistryHolder {
* If jar with the same name already exists, it and its functions will be removed.
* Then jar will be added to {@link #jars}
* and each function will be added using {@link #addFunctions(Map, List)}.
- * Function version registry will be incremented by 1 if at least one jar was added but not for each jar.
+ * Registry version is updated with passed version if all jars were added successfully.
* This is write operation, so one user at a time can call perform such action,
* others will wait till first user completes his action.
*
* @param newJars jars and list of their function holders, each contains function name, signature and holder
*/
- public void addJars(Map<String, List<FunctionHolder>> newJars) {
+ public void addJars(Map<String, List<FunctionHolder>> newJars, long version) {
try (AutoCloseableLock lock = writeLock.open()) {
for (Map.Entry<String, List<FunctionHolder>> newJar : newJars.entrySet()) {
String jarName = newJar.getKey();
@@ -129,15 +129,12 @@ public class FunctionRegistryHolder {
jars.put(jarName, jar);
addFunctions(jar, newJar.getValue());
}
- if (!newJars.isEmpty()) {
- version++;
- }
+ this.version = version;
}
}
/**
* Removes jar from {@link #jars} and all associated with jar functions from {@link #functions}
- * If jar was removed, function registry version will be incremented by 1.
* This is write operation, so one user at a time can call perform such action,
* others will wait till first user completes his action.
*
@@ -145,9 +142,7 @@ public class FunctionRegistryHolder {
*/
public void removeJar(String jarName) {
try (AutoCloseableLock lock = writeLock.open()) {
- if (removeAllByJar(jarName)) {
- version++;
- }
+ removeAllByJar(jarName);
}
}
@@ -341,12 +336,11 @@ public class FunctionRegistryHolder {
* All jar functions have the same class loader, so we need to close only one time.
*
* @param jarName jar name to be removed
- * @return true if jar was removed, false otherwise
*/
- private boolean removeAllByJar(String jarName) {
+ private void removeAllByJar(String jarName) {
Map<String, Queue<String>> jar = jars.remove(jarName);
if (jar == null) {
- return false;
+ return;
}
for (Map.Entry<String, Queue<String>> functionEntry : jar.entrySet()) {
@@ -372,6 +366,5 @@ public class FunctionRegistryHolder {
functions.remove(function);
}
}
- return true;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
index 2a3f167c8..1318f72dc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -73,12 +73,14 @@ public class LocalFunctionRegistry {
private final FunctionRegistryHolder registryHolder;
- /** Registers all functions present in Drill classpath on start-up. All functions will be marked as built-in.
- * Built-in functions are not allowed to be unregistered. */
+ /**
+ * Registers all functions present in Drill classpath on start-up. All functions will be marked as built-in.
+ * Built-in functions are not allowed to be unregistered. Initially sync registry version will be set to 0.
+ */
public LocalFunctionRegistry(ScanResult classpathScan) {
registryHolder = new FunctionRegistryHolder();
validate(BUILT_IN, classpathScan);
- register(Lists.newArrayList(new JarScan(BUILT_IN, classpathScan, this.getClass().getClassLoader())));
+ register(Lists.newArrayList(new JarScan(BUILT_IN, classpathScan, this.getClass().getClassLoader())), 0);
if (logger.isTraceEnabled()) {
StringBuilder allFunctions = new StringBuilder();
for (DrillFuncHolder method: registryHolder.getAllFunctionsWithHolders().values()) {
@@ -89,7 +91,7 @@ public class LocalFunctionRegistry {
}
/**
- * @return local function registry version number
+ * @return remote function registry version number with which local function registry is synced
*/
public long getVersion() {
return registryHolder.getVersion();
@@ -147,14 +149,15 @@ public class LocalFunctionRegistry {
}
/**
- * Registers all functions present in jar.
+ * Registers all functions present in jar and updates registry version.
* If jar name is already registered, all jar related functions will be overridden.
* To prevent classpath collisions during loading and unloading jars,
* each jar is shipped with its own class loader.
*
* @param jars list of jars to be registered
+ * @param version remote function registry version number with which local function registry is synced
*/
- public void register(List<JarScan> jars) {
+ public void register(List<JarScan> jars, long version) {
Map<String, List<FunctionHolder>> newJars = Maps.newHashMap();
for (JarScan jarScan : jars) {
FunctionConverter converter = new FunctionConverter();
@@ -174,7 +177,7 @@ public class LocalFunctionRegistry {
}
}
}
- registryHolder.addJars(newJars);
+ registryHolder.addJars(newJars, version);
}
/**
@@ -217,25 +220,31 @@ public class LocalFunctionRegistry {
return registryHolder.getHoldersByFunctionName(name.toLowerCase(), version);
}
+ /**
+ * @param name function name
+ * @return all function holders associated with the function name. Function name is case insensitive.
+ */
public List<DrillFuncHolder> getMethods(String name) {
return registryHolder.getHoldersByFunctionName(name.toLowerCase());
}
/**
* Registers all functions present in {@link DrillOperatorTable},
- * also sets local registry version used at the moment of registering.
+ * also sets sync registry version used at the moment of function registration.
*
* @param operatorTable drill operator table
*/
public void register(DrillOperatorTable operatorTable) {
AtomicLong versionHolder = new AtomicLong();
- final Map<String, Collection<DrillFuncHolder>> registeredFunctions = registryHolder.getAllFunctionsWithHolders(versionHolder).asMap();
+ final Map<String, Collection<DrillFuncHolder>> registeredFunctions =
+ registryHolder.getAllFunctionsWithHolders(versionHolder).asMap();
operatorTable.setFunctionRegistryVersion(versionHolder.get());
registerOperatorsWithInference(operatorTable, registeredFunctions);
registerOperatorsWithoutInference(operatorTable, registeredFunctions);
}
- private void registerOperatorsWithInference(DrillOperatorTable operatorTable, Map<String, Collection<DrillFuncHolder>> registeredFunctions) {
+ private void registerOperatorsWithInference(DrillOperatorTable operatorTable, Map<String,
+ Collection<DrillFuncHolder>> registeredFunctions) {
final Map<String, DrillSqlOperator.DrillSqlOperatorBuilder> map = Maps.newHashMap();
final Map<String, DrillSqlAggOperator.DrillSqlAggOperatorBuilder> mapAgg = Maps.newHashMap();
for (Entry<String, Collection<DrillFuncHolder>> function : registeredFunctions.entrySet()) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
index fe7958374..2e5eda209 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -106,8 +106,26 @@ public class RemoteFunctionRegistry implements AutoCloseable {
this.retryAttempts = config.getInt(ExecConstants.UDF_RETRY_ATTEMPTS);
}
- public Registry getRegistry() {
- return registry.get(registry_path, null);
+ /**
+ * Returns current remote function registry version.
+ * If remote function registry is not found or unreachable, logs error and returns -1.
+ *
+ * @return remote function registry version if any, -1 otherwise
+ */
+ public long getRegistryVersion() {
+ DataChangeVersion version = new DataChangeVersion();
+ boolean contains = false;
+ try {
+ contains = registry.contains(registry_path, version);
+ } catch (Exception e) {
+ logger.error("Problem during trying to access remote function registry [{}]", registry_path, e);
+ }
+ if (contains) {
+ return version.getVersion();
+ } else {
+ logger.error("Remote function registry [{}] is unreachable", registry_path);
+ return -1;
+ }
}
public Registry getRegistry(DataChangeVersion version) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 264af299c..707815a70 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -66,7 +66,6 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem
private final UserSession session;
private final OptionManager queryOptions;
private final PlannerSettings plannerSettings;
- private final DrillOperatorTable table;
private final ExecutionControls executionControls;
private final BufferAllocator allocator;
@@ -83,6 +82,7 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem
* time this is set to true and the close method becomes a no-op.
*/
private boolean closed = false;
+ private DrillOperatorTable table;
public QueryContext(final UserSession session, final DrillbitContext drillbitContext, QueryId queryId) {
this.drillbitContext = drillbitContext;
@@ -229,6 +229,15 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem
return table;
}
+ /**
+ * Re-creates drill operator table to refresh functions list from local function registry.
+ */
+ public void reloadDrillOperatorTable() {
+ table = new DrillOperatorTable(
+ drillbitContext.getFunctionImplementationRegistry(),
+ drillbitContext.getOptionManager());
+ }
+
public QueryContextInformation getQueryContextInfo() {
return queryContextInfo;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
index 6e5c72b8d..5102ae8b0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -36,7 +36,7 @@ import org.apache.drill.exec.server.options.OptionManager;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+
/**
* Implementation of {@link SqlOperatorTable} that contains standard operators and functions provided through
@@ -52,8 +52,8 @@ public class DrillOperatorTable extends SqlStdOperatorTable {
private final ArrayListMultimap<String, SqlOperator> drillOperatorsWithoutInferenceMap = ArrayListMultimap.create();
private final ArrayListMultimap<String, SqlOperator> drillOperatorsWithInferenceMap = ArrayListMultimap.create();
- // indicates local function registry version based on which drill operator were loaded
- // is used to define if we need to reload operator table in case when function signature was not found
+ // indicates remote function registry version based on which drill operator were loaded
+ // is used to define if we need to reload operator table in case remote function registry version has changed
private long functionRegistryVersion;
private final OptionManager systemOptionManager;
@@ -65,19 +65,18 @@ public class DrillOperatorTable extends SqlStdOperatorTable {
this.systemOptionManager = systemOptionManager;
}
- /** Cleans up all operator holders and reloads operators */
- public void reloadOperators(FunctionImplementationRegistry registry) {
- drillOperatorsWithoutInference.clear();
- drillOperatorsWithInference.clear();
- drillOperatorsWithoutInferenceMap.clear();
- drillOperatorsWithInferenceMap.clear();
- registry.register(this);
- }
-
- public long setFunctionRegistryVersion(long version) {
- return functionRegistryVersion = version;
+ /**
+ * Set function registry version based on which operator table was loaded.
+ *
+ * @param version registry version
+ */
+ public void setFunctionRegistryVersion(long version) {
+ functionRegistryVersion = version;
}
+ /**
+ * @return function registry version based on which operator table was loaded
+ */
public long getFunctionRegistryVersion() {
return functionRegistryVersion;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index 0ad39440d..3bc09229e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -24,10 +24,7 @@ import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.exec.exception.FunctionNotFoundException;
-import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.ops.UdfUtilities;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.planner.sql.handlers.AbstractSqlHandler;
import org.apache.drill.exec.planner.sql.handlers.DefaultSqlHandler;
@@ -50,13 +47,56 @@ public class DrillSqlWorker {
private DrillSqlWorker() {
}
+ /**
+ * Converts sql query string into query physical plan.
+ *
+ * @param context query context
+ * @param sql sql query
+ * @return query physical plan
+ */
public static PhysicalPlan getPlan(QueryContext context, String sql) throws SqlParseException, ValidationException,
ForemanSetupException {
return getPlan(context, sql, null);
}
+ /**
+ * Converts sql query string into query physical plan.
+ * In case of any errors (that might occur due to missing function implementation),
+ * checks if local function registry should be synchronized with remote function registry.
+ * If sync took place, reloads drill operator table
+ * (since functions were added to / removed from local function registry)
+ * and attempts to converts sql query string into query physical plan one more time.
+ *
+ * @param context query context
+ * @param sql sql query
+ * @param textPlan text plan
+ * @return query physical plan
+ */
public static PhysicalPlan getPlan(QueryContext context, String sql, Pointer<String> textPlan)
throws ForemanSetupException {
+ Pointer<String> textPlanCopy = textPlan == null ? null : new Pointer<>(textPlan.value);
+ try {
+ return getQueryPlan(context, sql, textPlan);
+ } catch (Exception e) {
+ if (context.getFunctionRegistry().syncWithRemoteRegistry(
+ context.getDrillOperatorTable().getFunctionRegistryVersion())) {
+ context.reloadDrillOperatorTable();
+ return getQueryPlan(context, sql, textPlanCopy);
+ }
+ throw e;
+ }
+ }
+
+ /**
+ * Converts sql query string into query physical plan.
+ *
+ * @param context query context
+ * @param sql sql query
+ * @param textPlan text plan
+ * @return query physical plan
+ */
+ private static PhysicalPlan getQueryPlan(QueryContext context, String sql, Pointer<String> textPlan)
+ throws ForemanSetupException {
final SqlConverter parser = new SqlConverter(context);
@@ -88,7 +128,7 @@ public class DrillSqlWorker {
}
try {
- return getPhysicalPlan(handler, sqlNode, context);
+ return handler.getPlan(sqlNode);
} catch(ValidationException e) {
String errorMessage = e.getCause() != null ? e.getCause().getMessage() : e.getMessage();
throw UserException.validationError(e)
@@ -104,26 +144,4 @@ public class DrillSqlWorker {
throw new QueryInputException("Failure handling SQL.", e);
}
}
-
- /**
- * Returns query physical plan.
- * In case of {@link FunctionNotFoundException} attempts to load remote functions.
- * If at least one function was loaded or local function function registry version has changed,
- * makes one more attempt to get query physical plan.
- */
- private static PhysicalPlan getPhysicalPlan(AbstractSqlHandler handler, SqlNode sqlNode, QueryContext context)
- throws RelConversionException, IOException, ForemanSetupException, ValidationException {
- try {
- return handler.getPlan(sqlNode);
- } catch (FunctionNotFoundException e) {
- DrillOperatorTable drillOperatorTable = context.getDrillOperatorTable();
- FunctionImplementationRegistry functionRegistry = context.getFunctionRegistry();
- if (functionRegistry.loadRemoteFunctions(drillOperatorTable.getFunctionRegistryVersion())) {
- drillOperatorTable.reloadOperators(functionRegistry);
- return handler.getPlan(sqlNode);
- }
- throw e;
- }
- }
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
index e9085f7c4..845848c2e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/SqlConverter.java
@@ -49,17 +49,13 @@ import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.util.ChainedSqlOperatorTable;
import org.apache.calcite.sql.validate.SqlConformance;
import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
-import org.apache.calcite.sql.validate.SqlValidatorException;
import org.apache.calcite.sql.validate.SqlValidatorImpl;
import org.apache.calcite.sql.validate.SqlValidatorScope;
import org.apache.calcite.sql2rel.RelDecorrelator;
import org.apache.calcite.sql2rel.SqlToRelConverter;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.FunctionNotFoundException;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.ops.UdfUtilities;
@@ -173,11 +169,6 @@ public class SqlConverter {
SqlNode validatedNode = validator.validate(parsedNode);
return validatedNode;
} catch (RuntimeException e) {
- final Throwable rootCause = ExceptionUtils.getRootCause(e);
- if (rootCause instanceof SqlValidatorException
- && StringUtils.contains(rootCause.getMessage(), "No match found for function signature")) {
- throw new FunctionNotFoundException(rootCause.getMessage(), e);
- }
UserException.Builder builder = UserException
.validationError(e)
.addContext("SQL Query", sql);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java
index 48bfd8b00..0902fb7d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/CreateFunctionHandler.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -186,6 +186,7 @@ public class CreateFunctionHandler extends DefaultSqlHandler {
remoteRegistry.updateRegistry(updatedRegistry, version);
return;
} catch (VersionMismatchException ex) {
+ logger.debug("Failed to update function registry during registration, version mismatch was detected.", ex);
retryAttempts--;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java
index 6e2801a8b..b5d0b23bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DropFunctionHandler.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -142,6 +142,7 @@ public class DropFunctionHandler extends DefaultSqlHandler {
remoteFunctionRegistry.updateRegistry(updatedRegistry, version);
return jarToBeDeleted;
} catch (VersionMismatchException ex) {
+ logger.debug("Failed to update function registry during unregistration, version mismatch was detected.", ex);
retryAttempts--;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
index ea382782a..064040732 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/BasePersistentStore.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -29,6 +29,12 @@ public abstract class BasePersistentStore<V> implements PersistentStore<V> {
return getRange(0, Integer.MAX_VALUE);
}
+ /** By default contains with version will behave the same way as without version.
+ * Override this method to add version support. */
+ public boolean contains(String key, DataChangeVersion version) {
+ return contains(key);
+ }
+
/** By default get with version will behave the same way as without version.
* Override this method to add version support. */
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
index bb2375244..206642a51 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStore.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -34,6 +34,24 @@ public interface PersistentStore<V> extends AutoCloseable {
PersistentStoreMode getMode();
/**
+ * Checks if lookup key is present in store.
+ *
+ * @param key lookup key
+ * @return true if store contains lookup key, false otherwise
+ */
+ boolean contains(String key);
+
+ /**
+ * Checks if lookup key is present in store.
+ * Sets data change version number.
+ *
+ * @param key lookup key
+ * @param version version holder
+ * @return true if store contains lookup key, false otherwise
+ */
+ boolean contains(String key, DataChangeVersion version);
+
+ /**
* Returns the value for the given key if exists, null otherwise.
* @param key lookup key
*/
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
index b9a4b59ea..ef855e268 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/LocalPersistentStore.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -28,12 +28,16 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.Nullable;
import org.apache.commons.io.IOUtils;
import org.apache.drill.common.collections.ImmutableEntry;
+import org.apache.drill.common.concurrent.AutoCloseableLock;
import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.exception.VersionMismatchException;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.sys.BasePersistentStore;
import org.apache.drill.exec.store.sys.PersistentStoreConfig;
@@ -47,13 +51,20 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class LocalPersistentStore<V> extends BasePersistentStore<V> {
-// private static final Logger logger = LoggerFactory.getLogger(LocalPersistentStore.class);
+ private static final Logger logger = LoggerFactory.getLogger(LocalPersistentStore.class);
+
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ private final AutoCloseableLock readLock = new AutoCloseableLock(readWriteLock.readLock());
+ private final AutoCloseableLock writeLock = new AutoCloseableLock(readWriteLock.writeLock());
private final Path basePath;
private final PersistentStoreConfig<V> config;
private final DrillFileSystem fs;
+ private int version = -1;
public LocalPersistentStore(DrillFileSystem fs, Path base, PersistentStoreConfig<V> config) {
super();
@@ -62,7 +73,9 @@ public class LocalPersistentStore<V> extends BasePersistentStore<V> {
this.fs = fs;
try {
- mkdirs(basePath);
+ if (!fs.mkdirs(basePath)) {
+ version++;
+ }
} catch (IOException e) {
throw new RuntimeException("Failure setting pstore configuration path.");
}
@@ -73,11 +86,7 @@ public class LocalPersistentStore<V> extends BasePersistentStore<V> {
return PersistentStoreMode.PERSISTENT;
}
- private void mkdirs(Path path) throws IOException{
- fs.mkdirs(path);
- }
-
- public static Path getLogDir(){
+ public static Path getLogDir() {
String drillLogDir = System.getenv("DRILL_LOG_DIR");
if (drillLogDir == null) {
drillLogDir = "/var/log/drill";
@@ -85,10 +94,10 @@ public class LocalPersistentStore<V> extends BasePersistentStore<V> {
return new Path(new File(drillLogDir).getAbsoluteFile().toURI());
}
- public static DrillFileSystem getFileSystem(DrillConfig config, Path root) throws IOException{
+ public static DrillFileSystem getFileSystem(DrillConfig config, Path root) throws IOException {
Path blobRoot = root == null ? getLogDir() : root;
Configuration fsConf = new Configuration();
- if(blobRoot.toUri().getScheme() != null){
+ if (blobRoot.toUri().getScheme() != null) {
fsConf.set(FileSystem.FS_DEFAULT_NAME_KEY, blobRoot.toUri().toString());
}
@@ -100,93 +109,142 @@ public class LocalPersistentStore<V> extends BasePersistentStore<V> {
@Override
public Iterator<Map.Entry<String, V>> getRange(int skip, int take) {
- try{
- List<FileStatus> f = fs.list(false, basePath);
- if (f == null || f.isEmpty()) {
- return Collections.emptyIterator();
- }
- List<String> files = Lists.newArrayList();
-
- for (FileStatus stat : f) {
- String s = stat.getPath().getName();
- if (s.endsWith(DRILL_SYS_FILE_SUFFIX)) {
- files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length()));
+ try (AutoCloseableLock lock = readLock.open()) {
+ try {
+ List<FileStatus> f = fs.list(false, basePath);
+ if (f == null || f.isEmpty()) {
+ return Collections.emptyIterator();
}
- }
+ List<String> files = Lists.newArrayList();
- Collections.sort(files);
- return Iterables.transform(Iterables.limit(Iterables.skip(files, skip), take), new Function<String, Entry<String, V>>() {
- @Nullable
- @Override
- public Entry<String, V> apply(String key) {
- return new ImmutableEntry<>(key, get(key));
+ for (FileStatus stat : f) {
+ String s = stat.getPath().getName();
+ if (s.endsWith(DRILL_SYS_FILE_SUFFIX)) {
+ files.add(s.substring(0, s.length() - DRILL_SYS_FILE_SUFFIX.length()));
+ }
}
- }).iterator();
- }catch(IOException e){
- throw new RuntimeException(e);
+
+ Collections.sort(files);
+ return Iterables.transform(Iterables.limit(Iterables.skip(files, skip), take), new Function<String, Entry<String, V>>() {
+ @Nullable
+ @Override
+ public Entry<String, V> apply(String key) {
+ return new ImmutableEntry<>(key, get(key));
+ }
+ }).iterator();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
}
private Path makePath(String name) {
Preconditions.checkArgument(
!name.contains("/") &&
- !name.contains(":") &&
- !name.contains(".."));
+ !name.contains(":") &&
+ !name.contains(".."));
+ return new Path(basePath, name + DRILL_SYS_FILE_SUFFIX);
+ }
- final Path path = new Path(basePath, name + DRILL_SYS_FILE_SUFFIX);
- // do this to check file name.
- return path;
+ @Override
+ public boolean contains(String key) {
+ return contains(key, null);
}
@Override
- public V get(String key) {
- try{
- Path path = makePath(key);
- if(!fs.exists(path)){
- return null;
+ public boolean contains(String key, DataChangeVersion dataChangeVersion) {
+ try (AutoCloseableLock lock = readLock.open()) {
+ try {
+ Path path = makePath(key);
+ boolean exists = fs.exists(path);
+ if (exists && dataChangeVersion != null) {
+ dataChangeVersion.setVersion(version);
+ }
+ return exists;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
- }catch(IOException e){
- throw new RuntimeException(e);
}
+ }
- final Path path = makePath(key);
- try (InputStream is = fs.open(path)) {
- return config.getSerializer().deserialize(IOUtils.toByteArray(is));
- } catch (IOException e) {
- throw new RuntimeException("Unable to deserialize \"" + path + "\"", e);
+ @Override
+ public V get(String key) {
+ return get(key, null);
+ }
+
+ @Override
+ public V get(String key, DataChangeVersion dataChangeVersion) {
+ try (AutoCloseableLock lock = readLock.open()) {
+ try {
+ if (dataChangeVersion != null) {
+ dataChangeVersion.setVersion(version);
+ }
+ Path path = makePath(key);
+ if (!fs.exists(path)) {
+ return null;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ final Path path = makePath(key);
+ try (InputStream is = fs.open(path)) {
+ return config.getSerializer().deserialize(IOUtils.toByteArray(is));
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to deserialize \"" + path + "\"", e);
+ }
}
}
@Override
public void put(String key, V value) {
- try (OutputStream os = fs.create(makePath(key))) {
- IOUtils.write(config.getSerializer().serialize(value), os);
- } catch (IOException e) {
- throw new RuntimeException(e);
+ put(key, value, null);
+ }
+
+ @Override
+ public void put(String key, V value, DataChangeVersion dataChangeVersion) {
+ try (AutoCloseableLock lock = writeLock.open()) {
+ if (dataChangeVersion != null && dataChangeVersion.getVersion() != version) {
+ throw new VersionMismatchException("Version mismatch detected", dataChangeVersion.getVersion());
+ }
+ try (OutputStream os = fs.create(makePath(key))) {
+ IOUtils.write(config.getSerializer().serialize(value), os);
+ version++;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
}
}
@Override
public boolean putIfAbsent(String key, V value) {
- try {
- Path p = makePath(key);
- if (fs.exists(p)) {
- return false;
- } else {
- put(key, value);
- return true;
+ try (AutoCloseableLock lock = writeLock.open()) {
+ try {
+ Path p = makePath(key);
+ if (fs.exists(p)) {
+ return false;
+ } else {
+ try (OutputStream os = fs.create(makePath(key))) {
+ IOUtils.write(config.getSerializer().serialize(value), os);
+ version++;
+ }
+ return true;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
- } catch (IOException e) {
- throw new RuntimeException(e);
}
}
@Override
public void delete(String key) {
- try {
- fs.delete(makePath(key), false);
- } catch (IOException e) {
- throw new RuntimeException(e);
+ try (AutoCloseableLock lock = writeLock.open()) {
+ try {
+ fs.delete(makePath(key), false);
+ version++;
+ } catch (IOException e) {
+ logger.error("Unable to delete data from storage.", e);
+ throw new RuntimeException(e);
+ }
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
index 55f72c90b..a3ee58eb5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/ZookeeperPersistentStore.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -62,16 +62,26 @@ public class ZookeeperPersistentStore<V> extends BasePersistentStore<V> {
}
@Override
+ public boolean contains(final String key) {
+ return contains(key, null);
+ }
+
+ @Override
+ public boolean contains(final String key, final DataChangeVersion version) {
+ return client.hasPath(key, true, version);
+ }
+
+ @Override
public V get(final String key) {
return get(key, false, null);
}
@Override
- public V get(final String key, DataChangeVersion version) {
+ public V get(final String key, final DataChangeVersion version) {
return get(key, true, version);
}
- public V get(final String key, boolean consistencyFlag, DataChangeVersion version) {
+ public V get(final String key, final boolean consistencyFlag, final DataChangeVersion version) {
byte[] bytes = client.get(key, consistencyFlag, version);
if (bytes == null) {
@@ -90,7 +100,7 @@ public class ZookeeperPersistentStore<V> extends BasePersistentStore<V> {
}
@Override
- public void put(final String key, final V value, DataChangeVersion version) {
+ public void put(final String key, final V value, final DataChangeVersion version) {
final InstanceSerializer<V> serializer = config.getSerializer();
try {
final byte[] bytes = serializer.serialize(value);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java
index 58ec3eab6..e36dc83f5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/store/NoWriteLocalStore.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,8 +20,6 @@ package org.apache.drill.exec.testing.store;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -38,13 +36,13 @@ public class NoWriteLocalStore<V> extends BasePersistentStore<V> {
private final AutoCloseableLock readLock = new AutoCloseableLock(readWriteLock.readLock());
private final AutoCloseableLock writeLock = new AutoCloseableLock(readWriteLock.writeLock());
private final ConcurrentMap<String, V> store = Maps.newConcurrentMap();
- private final AtomicInteger version = new AtomicInteger();
+ private int version = -1;
@Override
public void delete(final String key) {
try (AutoCloseableLock lock = writeLock.open()) {
store.remove(key);
- version.incrementAndGet();
+ version++;
}
}
@@ -54,6 +52,21 @@ public class NoWriteLocalStore<V> extends BasePersistentStore<V> {
}
@Override
+ public boolean contains(final String key) {
+ return contains(key, null);
+ }
+
+ @Override
+ public boolean contains(final String key, final DataChangeVersion dataChangeVersion) {
+ try (AutoCloseableLock lock = readLock.open()) {
+ if (dataChangeVersion != null) {
+ dataChangeVersion.setVersion(version);
+ }
+ return store.containsKey(key);
+ }
+ }
+
+ @Override
public V get(final String key) {
return get(key, null);
}
@@ -62,7 +75,7 @@ public class NoWriteLocalStore<V> extends BasePersistentStore<V> {
public V get(final String key, final DataChangeVersion dataChangeVersion) {
try (AutoCloseableLock lock = readLock.open()) {
if (dataChangeVersion != null) {
- dataChangeVersion.setVersion(version.get());
+ dataChangeVersion.setVersion(version);
}
return store.get(key);
}
@@ -76,11 +89,11 @@ public class NoWriteLocalStore<V> extends BasePersistentStore<V> {
@Override
public void put(final String key, final V value, final DataChangeVersion dataChangeVersion) {
try (AutoCloseableLock lock = writeLock.open()) {
- if (dataChangeVersion != null && dataChangeVersion.getVersion() != version.get()) {
+ if (dataChangeVersion != null && dataChangeVersion.getVersion() != version) {
throw new VersionMismatchException("Version mismatch detected", dataChangeVersion.getVersion());
}
store.put(key, value);
- version.incrementAndGet();
+ version++;
}
}
@@ -89,7 +102,7 @@ public class NoWriteLocalStore<V> extends BasePersistentStore<V> {
try (AutoCloseableLock lock = writeLock.open()) {
final V old = store.putIfAbsent(key, value);
if (old == null) {
- version.incrementAndGet();
+ version++;
return true;
}
return false;
@@ -107,7 +120,7 @@ public class NoWriteLocalStore<V> extends BasePersistentStore<V> {
public void close() throws Exception {
try (AutoCloseableLock lock = writeLock.open()) {
store.clear();
- version.set(0);
+ version = -1;
}
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestDynamicUDFSupport.java b/exec/java-exec/src/test/java/org/apache/drill/TestDynamicUDFSupport.java
index 10a03b7a8..25c01b86b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestDynamicUDFSupport.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestDynamicUDFSupport.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -134,7 +134,8 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
@Test
public void testAbsentSourceInStaging() throws Exception {
Path staging = getDrillbitContext().getRemoteFunctionRegistry().getStagingArea();
- copyJar(getDrillbitContext().getRemoteFunctionRegistry().getFs(), new Path(jars.toURI()), staging, default_binary_name);
+ copyJar(getDrillbitContext().getRemoteFunctionRegistry().getFs(), new Path(jars.toURI()),
+ staging, default_binary_name);
String summary = String.format("File %s does not exist", new Path(staging, default_source_name).toUri().getPath());
@@ -157,7 +158,8 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
.sqlQuery("create function using jar '%s'", jarWithNoMarkerFile)
.unOrdered()
.baselineColumns("ok", "summary")
- .baselineValues(false, String.format(summary, CommonConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME, jarWithNoMarkerFile))
+ .baselineValues(false, String.format(summary,
+ CommonConstants.DRILL_JAR_MARKER_FILE_RESOURCE_PATHNAME, jarWithNoMarkerFile))
.go();
}
@@ -201,7 +203,7 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
assertTrue("Source should be present in registry area",
fs.exists(new Path(remoteFunctionRegistry.getRegistryArea(), default_source_name)));
- Registry registry = remoteFunctionRegistry.getRegistry();
+ Registry registry = remoteFunctionRegistry.getRegistry(new DataChangeVersion());
assertEquals("Registry should contain one jar", registry.getJarList().size(), 1);
assertEquals(registry.getJar(0).getName(), default_binary_name);
}
@@ -304,7 +306,7 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
assertTrue("Source should be present in registry area",
fs.exists(new Path(remoteFunctionRegistry.getRegistryArea(), default_source_name)));
- Registry registry = remoteFunctionRegistry.getRegistry();
+ Registry registry = remoteFunctionRegistry.getRegistry(new DataChangeVersion());
assertEquals("Registry should contain one jar", registry.getJarList().size(), 1);
assertEquals(registry.getJar(0).getName(), default_binary_name);
}
@@ -337,7 +339,8 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
FileSystem fs = remoteFunctionRegistry.getFs();
assertFalse("Registry area should be empty", fs.listFiles(remoteFunctionRegistry.getRegistryArea(), false).hasNext());
- assertEquals("Registry should be empty", remoteFunctionRegistry.getRegistry().getJarList().size(), 0);
+ assertEquals("Registry should be empty",
+ remoteFunctionRegistry.getRegistry(new DataChangeVersion()).getJarList().size(), 0);
}
@Test
@@ -367,10 +370,13 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
assertTrue("Source should be present in staging area",
fs.exists(new Path(remoteFunctionRegistry.getStagingArea(), default_source_name)));
- assertFalse("Registry area should be empty", fs.listFiles(remoteFunctionRegistry.getRegistryArea(), false).hasNext());
- assertFalse("Temporary area should be empty", fs.listFiles(remoteFunctionRegistry.getTmpArea(), false).hasNext());
+ assertFalse("Registry area should be empty",
+ fs.listFiles(remoteFunctionRegistry.getRegistryArea(), false).hasNext());
+ assertFalse("Temporary area should be empty",
+ fs.listFiles(remoteFunctionRegistry.getTmpArea(), false).hasNext());
- assertEquals("Registry should be empty", remoteFunctionRegistry.getRegistry().getJarList().size(), 0);
+ assertEquals("Registry should be empty",
+ remoteFunctionRegistry.getRegistry(new DataChangeVersion()).getJarList().size(), 0);
}
@Test
@@ -402,7 +408,7 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
assertTrue("Source should be present in registry area",
fs.exists(new Path(remoteFunctionRegistry.getRegistryArea(), default_source_name)));
- Registry registry = remoteFunctionRegistry.getRegistry();
+ Registry registry = remoteFunctionRegistry.getRegistry(new DataChangeVersion());
assertEquals("Registry should contain one jar", registry.getJarList().size(), 1);
assertEquals(registry.getJar(0).getName(), default_binary_name);
}
@@ -424,7 +430,8 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
.baselineValues("a")
.go();
- Path localUdfDirPath = Deencapsulation.getField(getDrillbitContext().getFunctionImplementationRegistry(), "localUdfDir");
+ Path localUdfDirPath = Deencapsulation.getField(
+ getDrillbitContext().getFunctionImplementationRegistry(), "localUdfDir");
File localUdfDir = new File(localUdfDirPath.toUri().getPath());
assertTrue("Binary should exist in local udf directory", new File(localUdfDir, default_binary_name).exists());
@@ -455,6 +462,33 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
}
}
+ @Test
+ public void testOverloadedFunctionPlanningStage() throws Exception {
+ String jarName = "DrillUDF-overloading-1.0.jar";
+ copyJarsToStagingArea(jarName, JarUtil.getSourceName(jarName));
+ test("create function using jar '%s'", jarName);
+
+ testBuilder()
+ .sqlQuery("select abs('A', 'A') as res from (values(1))")
+ .unOrdered()
+ .baselineColumns("res")
+ .baselineValues("ABS was overloaded. Input: A, A")
+ .go();
+ }
+
+ @Test
+ public void testOverloadedFunctionExecutionStage() throws Exception {
+ String jarName = "DrillUDF-overloading-1.0.jar";
+ copyJarsToStagingArea(jarName, JarUtil.getSourceName(jarName));
+ test("create function using jar '%s'", jarName);
+
+ testBuilder()
+ .sqlQuery("select log('A') as res from (values(1))")
+ .unOrdered()
+ .baselineColumns("res")
+ .baselineValues("LOG was overloaded. Input: A")
+ .go();
+ }
@Test
public void testDropFunction() throws Exception {
@@ -462,7 +496,8 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
test("create function using jar '%s'", default_binary_name);
test("select custom_lower('A') from (values(1))");
- Path localUdfDirPath = Deencapsulation.getField(getDrillbitContext().getFunctionImplementationRegistry(), "localUdfDir");
+ Path localUdfDirPath = Deencapsulation.getField(
+ getDrillbitContext().getFunctionImplementationRegistry(), "localUdfDir");
File localUdfDir = new File(localUdfDirPath.toUri().getPath());
assertTrue("Binary should exist in local udf directory", new File(localUdfDir, default_binary_name).exists());
@@ -485,7 +520,8 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
}
RemoteFunctionRegistry remoteFunctionRegistry = getDrillbitContext().getRemoteFunctionRegistry();
- assertEquals("Remote registry should be empty", remoteFunctionRegistry.getRegistry().getJarList().size(), 0);
+ assertEquals("Remote registry should be empty",
+ remoteFunctionRegistry.getRegistry(new DataChangeVersion()).getJarList().size(), 0);
FileSystem fs = remoteFunctionRegistry.getFs();
assertFalse("Binary should not be present in registry area",
@@ -561,8 +597,10 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
.baselineValues(false, errorMessage)
.go();
- assertFalse("Registry area should be empty", fs.listFiles(remoteFunctionRegistry.getRegistryArea(), false).hasNext());
- assertFalse("Temporary area should be empty", fs.listFiles(remoteFunctionRegistry.getTmpArea(), false).hasNext());
+ assertFalse("Registry area should be empty",
+ fs.listFiles(remoteFunctionRegistry.getRegistryArea(), false).hasNext());
+ assertFalse("Temporary area should be empty",
+ fs.listFiles(remoteFunctionRegistry.getTmpArea(), false).hasNext());
assertTrue("Binary should be present in staging area",
fs.exists(new Path(remoteFunctionRegistry.getStagingArea(), default_binary_name)));
@@ -684,7 +722,7 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
DataChangeVersion version = new DataChangeVersion();
Registry registry = remoteFunctionRegistry.getRegistry(version);
- assertEquals("Remote registry version should match", 2, version.getVersion());
+ assertEquals("Remote registry version should match", 1, version.getVersion());
List<Jar> jarList = registry.getJarList();
assertEquals("Only one jar should be registered", 1, jarList.size());
assertEquals("Jar name should match", jarName1, jarList.get(0).getName());
@@ -748,7 +786,7 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
DataChangeVersion version = new DataChangeVersion();
Registry registry = remoteFunctionRegistry.getRegistry(version);
- assertEquals("Remote registry version should match", 3, version.getVersion());
+ assertEquals("Remote registry version should match", 2, version.getVersion());
List<Jar> actualJars = registry.getJarList();
List<String> expectedJars = Lists.newArrayList(jarName1, jarName2);
@@ -777,7 +815,7 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
public Boolean answer(InvocationOnMock invocation) throws Throwable {
latch1.await();
boolean result = (boolean) invocation.callRealMethod();
- assertTrue("loadRemoteFunctions() should return true", result);
+ assertTrue("syncWithRemoteRegistry() should return true", result);
latch2.countDown();
return true;
}
@@ -788,11 +826,11 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
latch1.countDown();
latch2.await();
boolean result = (boolean) invocation.callRealMethod();
- assertTrue("loadRemoteFunctions() should return true", result);
+ assertTrue("syncWithRemoteRegistry() should return true", result);
return true;
}
})
- .when(functionImplementationRegistry).loadRemoteFunctions(anyLong());
+ .when(functionImplementationRegistry).syncWithRemoteRegistry(anyLong());
SimpleQueryRunner simpleQueryRunner = new SimpleQueryRunner(query);
Thread thread1 = new Thread(simpleQueryRunner);
@@ -804,9 +842,10 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
thread1.join();
thread2.join();
- verify(functionImplementationRegistry, times(2)).loadRemoteFunctions(anyLong());
- LocalFunctionRegistry localFunctionRegistry = Deencapsulation.getField(functionImplementationRegistry, "localFunctionRegistry");
- assertEquals("Local functionRegistry version should match", 2L, localFunctionRegistry.getVersion());
+ verify(functionImplementationRegistry, times(2)).syncWithRemoteRegistry(anyLong());
+ LocalFunctionRegistry localFunctionRegistry = Deencapsulation.getField(
+ functionImplementationRegistry, "localFunctionRegistry");
+ assertEquals("Sync function registry version should match", 1L, localFunctionRegistry.getVersion());
}
@Test
@@ -819,7 +858,7 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
@Override
public Boolean answer(InvocationOnMock invocation) throws Throwable {
boolean result = (boolean) invocation.callRealMethod();
- assertTrue("loadRemoteFunctions() should return true", result);
+ assertTrue("syncWithRemoteRegistry() should return true", result);
return true;
}
})
@@ -827,11 +866,11 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
@Override
public Boolean answer(InvocationOnMock invocation) throws Throwable {
boolean result = (boolean) invocation.callRealMethod();
- assertFalse("loadRemoteFunctions() should return false", result);
+ assertFalse("syncWithRemoteRegistry() should return false", result);
return false;
}
})
- .when(functionImplementationRegistry).loadRemoteFunctions(anyLong());
+ .when(functionImplementationRegistry).syncWithRemoteRegistry(anyLong());
test("select custom_lower('A') from (values(1))");
@@ -841,9 +880,10 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
assertThat(e.getMessage(), containsString("No match found for function signature unknown_lower(<CHARACTER>)"));
}
- verify(functionImplementationRegistry, times(2)).loadRemoteFunctions(anyLong());
- LocalFunctionRegistry localFunctionRegistry = Deencapsulation.getField(functionImplementationRegistry, "localFunctionRegistry");
- assertEquals("Local functionRegistry version should match", 2L, localFunctionRegistry.getVersion());
+ verify(functionImplementationRegistry, times(2)).syncWithRemoteRegistry(anyLong());
+ LocalFunctionRegistry localFunctionRegistry = Deencapsulation.getField(
+ functionImplementationRegistry, "localFunctionRegistry");
+ assertEquals("Sync function registry version should match", 1L, localFunctionRegistry.getVersion());
}
private void copyDefaultJarsToStagingArea() throws IOException {
@@ -866,7 +906,8 @@ public class TestDynamicUDFSupport extends BaseTestQuery {
}
private RemoteFunctionRegistry spyRemoteFunctionRegistry() {
- FunctionImplementationRegistry functionImplementationRegistry = getDrillbitContext().getFunctionImplementationRegistry();
+ FunctionImplementationRegistry functionImplementationRegistry =
+ getDrillbitContext().getFunctionImplementationRegistry();
RemoteFunctionRegistry remoteFunctionRegistry = functionImplementationRegistry.getRemoteFunctionRegistry();
RemoteFunctionRegistry spy = spy(remoteFunctionRegistry);
Deencapsulation.setField(functionImplementationRegistry, "remoteFunctionRegistry", spy);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java b/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java
index ab886c4e3..88f1fcb03 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/coord/zk/TestZookeeperClient.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -22,19 +22,13 @@ import java.util.List;
import java.util.Map;
import com.google.common.collect.Lists;
-import org.apache.curator.CuratorZookeeperClient;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
-import org.apache.curator.framework.api.CreateBuilder;
-import org.apache.curator.framework.api.DeleteBuilder;
-import org.apache.curator.framework.api.SetDataBuilder;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.test.TestingServer;
-import org.apache.curator.utils.EnsurePath;
import org.apache.drill.common.collections.ImmutableEntry;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.exception.VersionMismatchException;
@@ -47,7 +41,9 @@ import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
public class TestZookeeperClient {
private final static String root = "/test";
@@ -132,6 +128,26 @@ public class TestZookeeperClient {
}
@Test
+ public void testHasPathTrueWithVersion() {
+ client.put(path, data);
+ DataChangeVersion version0 = new DataChangeVersion();
+ assertTrue(client.hasPath(path, true, version0));
+ assertEquals("Versions should match", 0, version0.getVersion());
+ client.put(path, data);
+ DataChangeVersion version1 = new DataChangeVersion();
+ assertTrue(client.hasPath(path, true, version1));
+ assertEquals("Versions should match", 1, version1.getVersion());
+ }
+
+ @Test
+ public void testHasPathFalseWithVersion() {
+ DataChangeVersion version0 = new DataChangeVersion();
+ version0.setVersion(-1);
+ assertFalse(client.hasPath("unknown_path", true, version0));
+ assertEquals("Versions should not have changed", -1, version0.getVersion());
+ }
+
+ @Test
public void testPutAndGetWorks() {
client.put(path, data);
final byte[] actual = client.get(path, true);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolderTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolderTest.java
index 61fa4e5b2..cd4dd9950 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolderTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolderTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -60,25 +60,24 @@ public class FunctionRegistryHolderTest {
@Before
public void setup() {
resetRegistry();
- fillInRegistry();
+ fillInRegistry(1);
}
@Test
public void testVersion() {
resetRegistry();
- assertEquals("Initial version should be 0", 0, registryHolder.getVersion());
- registryHolder.addJars(Maps.<String, List<FunctionHolder>>newHashMap());
- assertEquals("Version should not change if no jars were added.", 0, registryHolder.getVersion());
- registryHolder.removeJar("unknown.jar");
- assertEquals("Version should not change if no jars were removed.", 0, registryHolder.getVersion());
- fillInRegistry();
- assertEquals("Version should have incremented by 1", 1, registryHolder.getVersion());
+ long expectedVersion = 0;
+ assertEquals("Initial version should be 0", expectedVersion, registryHolder.getVersion());
+ registryHolder.addJars(Maps.<String, List<FunctionHolder>>newHashMap(), ++expectedVersion);
+ assertEquals("Version can change if no jars were added.", expectedVersion, registryHolder.getVersion());
+ fillInRegistry(++expectedVersion);
+ assertEquals("Version should have incremented by 1", expectedVersion, registryHolder.getVersion());
registryHolder.removeJar(built_in);
- assertEquals("Version should have incremented by 1", 2, registryHolder.getVersion());
- fillInRegistry();
- assertEquals("Version should have incremented by 1", 3, registryHolder.getVersion());
- fillInRegistry();
- assertEquals("Version should have incremented by 1", 4, registryHolder.getVersion());
+ assertEquals("Version should have incremented by 1", expectedVersion, registryHolder.getVersion());
+ fillInRegistry(++expectedVersion);
+ assertEquals("Version should have incremented by 1", expectedVersion, registryHolder.getVersion());
+ fillInRegistry(++expectedVersion);
+ assertEquals("Version should have incremented by 1", expectedVersion, registryHolder.getVersion());
}
@Test
@@ -97,8 +96,9 @@ public class FunctionRegistryHolderTest {
}
}
- registryHolder.addJars(newJars);
- assertEquals("Version number should match", 1, registryHolder.getVersion());
+ long expectedVersion = 0;
+ registryHolder.addJars(newJars, ++expectedVersion);
+ assertEquals("Version number should match", expectedVersion, registryHolder.getVersion());
compareTwoLists(jars, registryHolder.getAllJarNames());
assertEquals(functionsSize, registryHolder.functionsSize());
compareListMultimaps(functionsWithHolders, registryHolder.getAllFunctionsWithHolders());
@@ -120,16 +120,17 @@ public class FunctionRegistryHolderTest {
functionsSize++;
}
}
- registryHolder.addJars(newJars);
- assertEquals("Version number should match", 1, registryHolder.getVersion());
+ long expectedVersion = 0;
+ registryHolder.addJars(newJars, ++expectedVersion);
+ assertEquals("Version number should match", expectedVersion, registryHolder.getVersion());
compareTwoLists(jars, registryHolder.getAllJarNames());
assertEquals(functionsSize, registryHolder.functionsSize());
compareListMultimaps(functionsWithHolders, registryHolder.getAllFunctionsWithHolders());
compareListMultimaps(functionsWithSignatures, registryHolder.getAllFunctionsWithSignatures());
// adding the same jars should not cause adding duplicates, should override existing jars only
- registryHolder.addJars(newJars);
- assertEquals("Version number should match", 2, registryHolder.getVersion());
+ registryHolder.addJars(newJars, ++expectedVersion);
+ assertEquals("Version number should match", expectedVersion, registryHolder.getVersion());
compareTwoLists(jars, registryHolder.getAllJarNames());
assertEquals(functionsSize, registryHolder.functionsSize());
compareListMultimaps(functionsWithHolders, registryHolder.getAllFunctionsWithHolders());
@@ -252,8 +253,8 @@ public class FunctionRegistryHolderTest {
registryHolder = new FunctionRegistryHolder();
}
- private void fillInRegistry() {
- registryHolder.addJars(newJars);
+ private void fillInRegistry(long version) {
+ registryHolder.addJars(newJars, version);
}
private <T> void compareListMultimaps(ListMultimap<String, T> lm1, ListMultimap<String, T> lm2) {
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java b/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
index 8b338af8c..284769625 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/record/ExpressionTreeMaterializerTest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -45,7 +45,6 @@ import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry;
-import org.apache.drill.exec.proto.UserBitShared.Registry;
import org.junit.Test;
import com.google.common.collect.ImmutableList;
@@ -202,8 +201,8 @@ public class ExpressionTreeMaterializerTest extends ExecTest {
new MockUp<RemoteFunctionRegistry>() {
@Mock
- Registry getRegistry() {
- return Registry.getDefaultInstance();
+ long getRegistryVersion() {
+ return 0L;
}
};
diff --git a/exec/java-exec/src/test/resources/jars/DrillUDF-overloading-1.0-sources.jar b/exec/java-exec/src/test/resources/jars/DrillUDF-overloading-1.0-sources.jar
new file mode 100644
index 000000000..f6b250ec0
--- /dev/null
+++ b/exec/java-exec/src/test/resources/jars/DrillUDF-overloading-1.0-sources.jar
Binary files differ
diff --git a/exec/java-exec/src/test/resources/jars/DrillUDF-overloading-1.0.jar b/exec/java-exec/src/test/resources/jars/DrillUDF-overloading-1.0.jar
new file mode 100644
index 000000000..4b5ef8bc4
--- /dev/null
+++ b/exec/java-exec/src/test/resources/jars/DrillUDF-overloading-1.0.jar
Binary files differ