aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache
diff options
context:
space:
mode:
authorArina Ielchiieva <arina.yelchiyeva@gmail.com>2018-09-28 20:47:40 +0200
committerVolodymyr Vysotskyi <vvovyk@gmail.com>2018-10-08 12:56:42 +0300
commit2bd26ffe34108f876184c63188122f2d49db4c4a (patch)
tree4eec0be053925ac066ba6304742fb5dcbe20af02 /exec/java-exec/src/main/java/org/apache
parent0ca84ea40812527b481c5a052687021b43fdfc88 (diff)
DRILL-6762: Fix dynamic UDFs versioning issue
1. Added UndefinedVersionDelegatingStore to serve as versioned wrapper for those stores that do not support versioning. 2. Aligned remote and local function registries version type. Type will be represented as int since ZK version is returned as int. 3. Added NOT_AVAILABLE and UNDEFINED versions to DataChangeVersion holder to indicate proper registry state. 4. Added additional trace logging. 5. Minor refactoring and clean up. closes #1484
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java51
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java55
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java20
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java43
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java11
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java23
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/UndefinedVersionDelegatingStore.java82
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/CachingPersistentStoreProvider.java51
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/LocalPersistentStoreProvider.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/util/CheckedSupplier.java29
15 files changed, 263 insertions, 152 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java
index 135ccd4e1..35feaa9f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreFactory.java
@@ -25,7 +25,7 @@ public interface TransientStoreFactory extends AutoCloseable {
/**
* Returns a {@link TransientStore transient store} instance for the given configuration.
*
- * Note that implementors have liberty to cache previous {@link PersistentStore store} instances.
+ * Note that implementors have liberty to cache previous {@link TransientStore store} instances.
*
* @param config store configuration
* @param <V> store value type
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
index f24f9aaef..f4b837367 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/FunctionImplementationRegistry.java
@@ -25,11 +25,12 @@ import java.net.JarURLConnection;
import java.net.URL;
import java.net.URLClassLoader;
import java.net.URLConnection;
+import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
@@ -64,7 +65,6 @@ import org.apache.drill.exec.server.options.OptionSet;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.exec.store.sys.store.DataChangeVersion;
import org.apache.drill.exec.util.JarUtil;
import org.apache.hadoop.fs.FileSystem;
@@ -83,7 +83,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
private final Path localUdfDir;
private boolean deleteTmpDir = false;
private File tmpDir;
- private List<PluggableFunctionRegistry> pluggableFuncRegistries = Lists.newArrayList();
+ private List<PluggableFunctionRegistry> pluggableFuncRegistries = new ArrayList<>();
private OptionSet optionManager;
private final boolean useDynamicUdfs;
@@ -168,7 +168,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
*/
@Override
public DrillFuncHolder findDrillFunction(FunctionResolver functionResolver, FunctionCall functionCall) {
- AtomicLong version = new AtomicLong();
+ AtomicInteger version = new AtomicInteger();
String newFunctionName = functionReplacement(functionCall);
// Dynamic UDFS: First try with exact match. If not found, we may need to
@@ -246,7 +246,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
List<MajorType> argTypes,
MajorType returnType,
boolean retry) {
- AtomicLong version = new AtomicLong();
+ AtomicInteger version = new AtomicInteger();
for (DrillFuncHolder h : localFunctionRegistry.getMethods(name, version)) {
if (h.matches(returnType, argTypes)) {
return h;
@@ -321,19 +321,19 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
/**
* Purpose of this method is to synchronize remote and local function registries if needed
* and to inform if function registry was changed after given version.
- *
+ * <p/>
* To make synchronization as much light-weigh as possible, first only versions of both registries are checked
* without any locking. If synchronization is needed, enters synchronized block to prevent others loading the same jars.
* The need of synchronization is checked again (double-check lock) before comparing jars.
* If any missing jars are found, they are downloaded to local udf area, each is wrapped into {@link JarScan}.
* Once jar download is finished, all missing jars are registered in one batch.
* In case if any errors during jars download / registration, these errors are logged.
- *
+ * <p/>
* During registration local function registry is updated with remote function registry version it is synced with.
* When at least one jar of the missing jars failed to download / register,
* local function registry version are not updated but jars that where successfully downloaded / registered
* are added to local function registry.
- *
+ * <p/>
* If synchronization between remote and local function registry was not needed,
* checks if given registry version matches latest sync version
* to inform if function registry was changed after given version.
@@ -342,16 +342,16 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
* @return true if remote and local function registries were synchronized after given version
*/
@SuppressWarnings("resource")
- public boolean syncWithRemoteRegistry(long version) {
+ public boolean syncWithRemoteRegistry(int version) {
// Do the version check only if a remote registry exists. It does
// not exist for some JMockit-based unit tests.
if (isRegistrySyncNeeded()) {
synchronized (this) {
- long localRegistryVersion = localFunctionRegistry.getVersion();
+ int localRegistryVersion = localFunctionRegistry.getVersion();
if (isRegistrySyncNeeded(remoteFunctionRegistry.getRegistryVersion(), localRegistryVersion)) {
DataChangeVersion remoteVersion = new DataChangeVersion();
List<String> missingJars = getMissingJars(this.remoteFunctionRegistry, localFunctionRegistry, remoteVersion);
- List<JarScan> jars = Lists.newArrayList();
+ List<JarScan> jars = new ArrayList<>();
if (!missingJars.isEmpty()) {
logger.info("Starting dynamic UDFs lazy-init process.\n" +
"The following jars are going to be downloaded and registered locally: " + missingJars);
@@ -381,7 +381,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
}
}
}
- long latestRegistryVersion = jars.size() != missingJars.size() ?
+ int latestRegistryVersion = jars.size() != missingJars.size() ?
localRegistryVersion : remoteVersion.getVersion();
localFunctionRegistry.register(jars, latestRegistryVersion);
return true;
@@ -392,23 +392,38 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
return version != localFunctionRegistry.getVersion();
}
+ /**
+ * Checks if remote and local registries should be synchronized.
+ * Before comparing versions, checks if remote function registry is actually exists.
+ *
+ * @return true is local registry should be refreshed, false otherwise
+ */
private boolean isRegistrySyncNeeded() {
+ logger.trace("Has remote function registry: {}", remoteFunctionRegistry.hasRegistry());
return remoteFunctionRegistry.hasRegistry() &&
isRegistrySyncNeeded(remoteFunctionRegistry.getRegistryVersion(), localFunctionRegistry.getVersion());
}
/**
* Checks if local function registry should be synchronized with remote function registry.
- * If remote function registry version is -1, it means that remote function registry is unreachable
- * or is not configured thus we skip synchronization and return false.
- * In all other cases synchronization is needed if remote and local function registries versions do not match.
+ *
+ * <ul>If remote function registry version is {@link DataChangeVersion#UNDEFINED},
+ * it means that remote function registry does not support versioning
+ * thus we need to synchronize both registries.</ul>
+ * <ul>If remote function registry version is {@link DataChangeVersion#NOT_AVAILABLE},
+ * it means that remote function registry is unreachable
+ * or is not configured thus we skip synchronization and return false.</ul>
+ * <ul>For all other cases synchronization is needed if remote
+ * and local function registries versions do not match.</ul>
*
* @param remoteVersion remote function registry version
* @param localVersion local function registry version
* @return true is local registry should be refreshed, false otherwise
*/
- private boolean isRegistrySyncNeeded(long remoteVersion, long localVersion) {
- return remoteVersion != -1 && remoteVersion != localVersion;
+ private boolean isRegistrySyncNeeded(int remoteVersion, int localVersion) {
+ logger.trace("Compare remote [{}] and local [{}] registry versions.", remoteVersion, localVersion);
+ return remoteVersion == DataChangeVersion.UNDEFINED ||
+ (remoteVersion != DataChangeVersion.NOT_AVAILABLE && remoteVersion != localVersion);
}
/**
@@ -459,7 +474,7 @@ public class FunctionImplementationRegistry implements FunctionLookupContext, Au
DataChangeVersion version) {
List<Jar> remoteJars = remoteFunctionRegistry.getRegistry(version).getJarList();
List<String> localJars = localFunctionRegistry.getAllJarNames();
- List<String> missingJars = Lists.newArrayList();
+ List<String> missingJars = new ArrayList<>();
for (Jar jar : remoteJars) {
if (!localJars.contains(jar.getName())) {
missingJars.add(jar.getName());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
index dc8fd74ff..d1d4fc94d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/FunctionRegistryHolder.java
@@ -19,18 +19,18 @@ package org.apache.drill.exec.expr.fn.registry;
import org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
-import org.apache.drill.shaded.guava.com.google.common.collect.Queues;
import org.apache.drill.common.AutoCloseables.Closeable;
import org.apache.drill.common.concurrent.AutoCloseableLock;
import org.apache.drill.exec.expr.fn.DrillFuncHolder;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -45,9 +45,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* Holder is designed to allow concurrent reads and single writes to keep data consistent.
* This is achieved by {@link ReadWriteLock} implementation usage.
* Holder has number version which indicates remote function registry version number it is in sync with.
- *
+ * <p/>
* Structure example:
*
+ * <pre>
* JARS
* built-in -> upper -> upper(VARCHAR-REQUIRED)
* -> lower -> lower(VARCHAR-REQUIRED)
@@ -72,12 +73,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
*
* custom_lower -> custom_lower(VARCHAR-REQUIRED) -> function holder for custom_lower(VARCHAR-REQUIRED)
* -> custom_lower(VARCHAR-OPTIONAL) -> function holder for custom_lower(VARCHAR-OPTIONAL)
- *
+ * </pre>
* where
- * First.jar is jar name represented by String
- * upper is function name represented by String
- * upper(VARCHAR-REQUIRED) is signature name represented by String which consist of function name, list of input parameters
- * function holder for upper(VARCHAR-REQUIRED) is {@link DrillFuncHolder} initiated for each function.
+ * <li><b>First.jar</b> is jar name represented by {@link String}.</li>
+ * <li><b>upper</b> is function name represented by {@link String}.</li>
+ * <li><b>upper(VARCHAR-REQUIRED)</b> is signature name represented by String which consist of function name, list of input parameters.</li>
+ * <li><b>function holder for upper(VARCHAR-REQUIRED)</b> is {@link DrillFuncHolder} initiated for each function.</li>
*
*/
public class FunctionRegistryHolder {
@@ -88,7 +89,7 @@ public class FunctionRegistryHolder {
private final AutoCloseableLock readLock = new AutoCloseableLock(readWriteLock.readLock());
private final AutoCloseableLock writeLock = new AutoCloseableLock(readWriteLock.writeLock());
// remote function registry number, it is in sync with
- private long version;
+ private int version;
// jar name, Map<function name, Queue<function signature>
private final Map<String, Map<String, Queue<String>>> jars;
@@ -97,15 +98,15 @@ public class FunctionRegistryHolder {
private final Map<String, Map<String, DrillFuncHolder>> functions;
public FunctionRegistryHolder() {
- this.functions = Maps.newConcurrentMap();
- this.jars = Maps.newConcurrentMap();
+ this.functions = new ConcurrentHashMap<>();
+ this.jars = new ConcurrentHashMap<>();
}
/**
* This is read operation, so several users at a time can get this data.
* @return local function registry version number
*/
- public long getVersion() {
+ public int getVersion() {
try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
return version;
}
@@ -122,12 +123,12 @@ public class FunctionRegistryHolder {
*
* @param newJars jars and list of their function holders, each contains function name, signature and holder
*/
- public void addJars(Map<String, List<FunctionHolder>> newJars, long version) {
+ public void addJars(Map<String, List<FunctionHolder>> newJars, int version) {
try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) {
for (Map.Entry<String, List<FunctionHolder>> newJar : newJars.entrySet()) {
String jarName = newJar.getKey();
removeAllByJar(jarName);
- Map<String, Queue<String>> jar = Maps.newConcurrentMap();
+ Map<String, Queue<String>> jar = new ConcurrentHashMap<>();
jars.put(jarName, jar);
addFunctions(jar, newJar.getValue());
}
@@ -156,7 +157,7 @@ public class FunctionRegistryHolder {
*/
public List<String> getAllJarNames() {
try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
- return Lists.newArrayList(jars.keySet());
+ return new ArrayList<>(jars.keySet());
}
}
@@ -171,7 +172,7 @@ public class FunctionRegistryHolder {
public List<String> getFunctionNamesByJar(String jarName) {
try (@SuppressWarnings("unused") Closeable lock = readLock.open()){
Map<String, Queue<String>> functions = jars.get(jarName);
- return functions == null ? Lists.<String>newArrayList() : Lists.newArrayList(functions.keySet());
+ return functions == null ? new ArrayList<>() : new ArrayList<>(functions.keySet());
}
}
@@ -185,14 +186,14 @@ public class FunctionRegistryHolder {
* @param version version holder
* @return all functions which their holders
*/
- public ListMultimap<String, DrillFuncHolder> getAllFunctionsWithHolders(AtomicLong version) {
+ public ListMultimap<String, DrillFuncHolder> getAllFunctionsWithHolders(AtomicInteger version) {
try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
if (version != null) {
version.set(this.version);
}
ListMultimap<String, DrillFuncHolder> functionsWithHolders = ArrayListMultimap.create();
for (Map.Entry<String, Map<String, DrillFuncHolder>> function : functions.entrySet()) {
- functionsWithHolders.putAll(function.getKey(), Lists.newArrayList(function.getValue().values()));
+ functionsWithHolders.putAll(function.getKey(), new ArrayList<>(function.getValue().values()));
}
return functionsWithHolders;
}
@@ -220,7 +221,7 @@ public class FunctionRegistryHolder {
try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
ListMultimap<String, String> functionsWithSignatures = ArrayListMultimap.create();
for (Map.Entry<String, Map<String, DrillFuncHolder>> function : functions.entrySet()) {
- functionsWithSignatures.putAll(function.getKey(), Lists.newArrayList(function.getValue().keySet()));
+ functionsWithSignatures.putAll(function.getKey(), new ArrayList<>(function.getValue().keySet()));
}
return functionsWithSignatures;
}
@@ -236,13 +237,13 @@ public class FunctionRegistryHolder {
* @param version version holder
* @return list of function holders
*/
- public List<DrillFuncHolder> getHoldersByFunctionName(String functionName, AtomicLong version) {
+ public List<DrillFuncHolder> getHoldersByFunctionName(String functionName, AtomicInteger version) {
try (@SuppressWarnings("unused") Closeable lock = readLock.open()) {
if (version != null) {
version.set(this.version);
}
Map<String, DrillFuncHolder> holders = functions.get(functionName);
- return holders == null ? Lists.<DrillFuncHolder>newArrayList() : Lists.newArrayList(holders.values());
+ return holders == null ? new ArrayList<>() : new ArrayList<>(holders.values());
}
}
@@ -316,17 +317,13 @@ public class FunctionRegistryHolder {
final String functionName = function.getName();
Queue<String> jarFunctions = jar.get(functionName);
if (jarFunctions == null) {
- jarFunctions = Queues.newConcurrentLinkedQueue();
+ jarFunctions = new ConcurrentLinkedQueue<>();
jar.put(functionName, jarFunctions);
}
final String functionSignature = function.getSignature();
jarFunctions.add(functionSignature);
- Map<String, DrillFuncHolder> signatures = functions.get(functionName);
- if (signatures == null) {
- signatures = Maps.newConcurrentMap();
- functions.put(functionName, signatures);
- }
+ Map<String, DrillFuncHolder> signatures = functions.computeIfAbsent(functionName, k -> new ConcurrentHashMap<>());
signatures.put(functionSignature, function.getHolder());
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
index 3740a6cc8..cefbd8cf3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/LocalFunctionRegistry.java
@@ -24,8 +24,9 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.drill.exec.store.sys.store.DataChangeVersion;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
import org.apache.drill.shaded.guava.com.google.common.collect.ListMultimap;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -77,13 +78,16 @@ public class LocalFunctionRegistry {
private final FunctionRegistryHolder registryHolder;
/**
- * Registers all functions present in Drill classpath on start-up. All functions will be marked as built-in.
- * Built-in functions are not allowed to be unregistered. Initially sync registry version will be set to 0.
+ * Registers all functions present in Drill classpath on start-up.
+ * All functions will be marked as built-in. Built-in functions are not allowed to be unregistered.
+ * Since local function registry version is based on remote function registry version,
+ * initially sync version will be set to {@link DataChangeVersion#UNDEFINED}
+ * to ensure that upon first check both registries would be synchronized.
*/
public LocalFunctionRegistry(ScanResult classpathScan) {
registryHolder = new FunctionRegistryHolder();
validate(BUILT_IN, classpathScan);
- register(Lists.newArrayList(new JarScan(BUILT_IN, classpathScan, this.getClass().getClassLoader())), 0);
+ register(Lists.newArrayList(new JarScan(BUILT_IN, classpathScan, this.getClass().getClassLoader())), DataChangeVersion.UNDEFINED);
if (logger.isTraceEnabled()) {
StringBuilder allFunctions = new StringBuilder();
for (DrillFuncHolder method: registryHolder.getAllFunctionsWithHolders().values()) {
@@ -96,7 +100,7 @@ public class LocalFunctionRegistry {
/**
* @return remote function registry version number with which local function registry is synced
*/
- public long getVersion() {
+ public int getVersion() {
return registryHolder.getVersion();
}
@@ -160,7 +164,7 @@ public class LocalFunctionRegistry {
* @param jars list of jars to be registered
* @param version remote function registry version number with which local function registry is synced
*/
- public void register(List<JarScan> jars, long version) {
+ public void register(List<JarScan> jars, int version) {
Map<String, List<FunctionHolder>> newJars = new HashMap<>();
for (JarScan jarScan : jars) {
FunctionConverter converter = new FunctionConverter();
@@ -219,7 +223,7 @@ public class LocalFunctionRegistry {
* @param name function name
* @return all function holders associated with the function name. Function name is case insensitive.
*/
- public List<DrillFuncHolder> getMethods(String name, AtomicLong version) {
+ public List<DrillFuncHolder> getMethods(String name, AtomicInteger version) {
return registryHolder.getHoldersByFunctionName(name.toLowerCase(), version);
}
@@ -238,7 +242,7 @@ public class LocalFunctionRegistry {
* @param operatorTable drill operator table
*/
public void register(DrillOperatorTable operatorTable) {
- AtomicLong versionHolder = new AtomicLong();
+ AtomicInteger versionHolder = new AtomicInteger();
final Map<String, Collection<DrillFuncHolder>> registeredFunctions =
registryHolder.getAllFunctionsWithHolders(versionHolder).asMap();
operatorTable.setFunctionRegistryVersion(versionHolder.get());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
index 4e947656f..f727a9374 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/registry/RemoteFunctionRegistry.java
@@ -54,34 +54,36 @@ import static com.fasterxml.jackson.databind.SerializationFeature.INDENT_OUTPUT;
* Creates all remote registry areas at startup and validates them,
* during init establishes connections with three udf related stores.
* Provides tools to work with three udf related stores, gives access to remote registry areas.
- *
+ * <p/>
* There are three udf stores:
- * REGISTRY - persistent store, stores remote function registry {@link Registry} under udf path
+ *
+ * <li><b>REGISTRY</b> - persistent store, stores remote function registry {@link Registry} under udf path
* which contains information about all dynamically registered jars and their function signatures.
- * If connection is created for the first time, puts empty remote registry.
+ * If connection is created for the first time, puts empty remote registry.</li>
*
- * UNREGISTRATION - transient store, stores information under udf/unregister path.
+ * <li><b>UNREGISTRATION</b> - transient store, stores information under udf/unregister path.
* udf/unregister path is persistent by itself but any child created will be transient.
* Whenever user submits request to unregister jar, child path with jar name is created under this store.
* This store also holds unregistration listener, which notifies all drill bits when child path is created,
- * so they can start local unregistration process.
+ * so they can start local unregistration process.</li>
*
- * JARS - transient store, stores information under udf/jars path.
+ * <li><b>JARS</b> - transient store, stores information under udf/jars path.
* udf/jars path is persistent by itself but any child created will be transient.
* Servers as lock, not allowing to perform any action on the same time.
* There two types of actions: {@link Action#REGISTRATION} and {@link Action#UNREGISTRATION}.
* Before starting any action, users tries to create child path with jar name under this store
* and if such path already exists, receives action being performed on that very jar.
- * When user finishes its action, he deletes child path with jar name.
- *
+ * When user finishes its action, he deletes child path with jar name.</li>
+ * <p/>
* There are three udf areas:
- * STAGING - area where user copies binary and source jars before starting registration process.
- * REGISTRY - area where registered jars are stored.
- * TMP - area where source and binary jars are backed up in unique folder during registration process.
+ *
+ * <li><b>STAGING</b> - area where user copies binary and source jars before starting registration process.</li>
+ * <li><b>REGISTRY</b> - area where registered jars are stored.</li>
+ * <li><b>TMP</b> - area where source and binary jars are backed up in unique folder during registration process.</li>
*/
public class RemoteFunctionRegistry implements AutoCloseable {
- private static final String registry_path = "registry";
+ private static final String REGISTRY_PATH = "registry";
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteFunctionRegistry.class);
private static final ObjectMapper mapper = new ObjectMapper().enable(INDENT_OUTPUT);
@@ -112,19 +114,19 @@ public class RemoteFunctionRegistry implements AutoCloseable {
*
* @return remote function registry version if any, -1 otherwise
*/
- public long getRegistryVersion() {
+ public int getRegistryVersion() {
DataChangeVersion version = new DataChangeVersion();
boolean contains = false;
try {
- contains = registry.contains(registry_path, version);
+ contains = registry.contains(REGISTRY_PATH, version);
} catch (Exception e) {
- logger.error("Problem during trying to access remote function registry [{}]", registry_path, e);
+ logger.error("Problem during trying to access remote function registry [{}]", REGISTRY_PATH, e);
}
if (contains) {
return version.getVersion();
} else {
- logger.error("Remote function registry [{}] is unreachable", registry_path);
- return -1;
+ logger.error("Remote function registry [{}] is unreachable", REGISTRY_PATH);
+ return DataChangeVersion.NOT_AVAILABLE;
}
}
@@ -137,11 +139,11 @@ public class RemoteFunctionRegistry implements AutoCloseable {
public boolean hasRegistry() { return registry != null; }
public Registry getRegistry(DataChangeVersion version) {
- return registry.get(registry_path, version);
+ return registry.get(REGISTRY_PATH, version);
}
public void updateRegistry(Registry registryContent, DataChangeVersion version) throws VersionMismatchException {
- registry.put(registry_path, registryContent, version);
+ registry.put(REGISTRY_PATH, registryContent, version);
}
public void submitForUnregistration(String jar) {
@@ -193,7 +195,8 @@ public class RemoteFunctionRegistry implements AutoCloseable {
.persist()
.build();
registry = storeProvider.getOrCreateVersionedStore(registrationConfig);
- registry.putIfAbsent(registry_path, Registry.getDefaultInstance());
+ logger.trace("Remote function registry type: {}.", registry.getClass());
+ registry.putIfAbsent(REGISTRY_PATH, Registry.getDefaultInstance());
} catch (StoreException e) {
throw new DrillRuntimeException("Failure while loading remote registry.", e);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
index e1e33098a..eb79a5a65 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillOperatorTable.java
@@ -54,7 +54,7 @@ public class DrillOperatorTable extends SqlStdOperatorTable {
private final ArrayListMultimap<String, SqlOperator> drillOperatorsWithInferenceMap = ArrayListMultimap.create();
// indicates remote function registry version based on which drill operator were loaded
// is used to define if we need to reload operator table in case remote function registry version has changed
- private long functionRegistryVersion;
+ private int functionRegistryVersion;
private final OptionManager systemOptionManager;
@@ -70,14 +70,14 @@ public class DrillOperatorTable extends SqlStdOperatorTable {
*
* @param version registry version
*/
- public void setFunctionRegistryVersion(long version) {
+ public void setFunctionRegistryVersion(int version) {
functionRegistryVersion = version;
}
/**
* @return function registry version based on which operator table was loaded
*/
- public long getFunctionRegistryVersion() {
+ public int getFunctionRegistryVersion() {
return functionRegistryVersion;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
index e3cd7e460..41faea96c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/DrillSqlWorker.java
@@ -21,7 +21,6 @@ import java.io.IOException;
import org.apache.calcite.sql.SqlDescribeSchema;
import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.parser.SqlParseException;
import org.apache.calcite.tools.RelConversionException;
import org.apache.calcite.tools.ValidationException;
import org.apache.drill.common.exceptions.UserException;
@@ -58,8 +57,7 @@ public class DrillSqlWorker {
* @param sql sql query
* @return query physical plan
*/
- public static PhysicalPlan getPlan(QueryContext context, String sql) throws SqlParseException, ValidationException,
- ForemanSetupException {
+ public static PhysicalPlan getPlan(QueryContext context, String sql) throws ForemanSetupException {
return getPlan(context, sql, null);
}
@@ -76,15 +74,18 @@ public class DrillSqlWorker {
* @param textPlan text plan
* @return query physical plan
*/
- public static PhysicalPlan getPlan(QueryContext context, String sql, Pointer<String> textPlan)
- throws ForemanSetupException {
+ public static PhysicalPlan getPlan(QueryContext context, String sql, Pointer<String> textPlan) throws ForemanSetupException {
Pointer<String> textPlanCopy = textPlan == null ? null : new Pointer<>(textPlan.value);
try {
return getQueryPlan(context, sql, textPlan);
} catch (Exception e) {
+ logger.trace("There was an error during conversion into physical plan. " +
+ "Will sync remote and local function registries if needed and retry " +
+ "in case if issue was due to missing function implementation.");
if (context.getFunctionRegistry().syncWithRemoteRegistry(
context.getDrillOperatorTable().getFunctionRegistryVersion())) {
context.reloadDrillOperatorTable();
+ logger.trace("Local function registry was synchronized with remote. Trying to find function one more time.");
return getQueryPlan(context, sql, textPlanCopy);
}
throw e;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java
index 4311f48bd..ca26a2433 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/PersistentStoreProvider.java
@@ -18,24 +18,37 @@
package org.apache.drill.exec.store.sys;
import org.apache.drill.exec.exception.StoreException;
-import org.apache.drill.exec.store.sys.store.VersionedDelegatingStore;
+import org.apache.drill.exec.store.sys.store.UndefinedVersionDelegatingStore;
/**
* A factory used to create {@link PersistentStore store} instances.
- *
*/
public interface PersistentStoreProvider extends AutoCloseable {
+
/**
* Gets or creates a {@link PersistentStore persistent store} for the given configuration.
*
* Note that implementors have liberty to cache previous {@link PersistentStore store} instances.
*
- * @param config store configuration
- * @param <V> store value type
+ * @param config store configuration
+ * @param <V> store value type
+ * @return persistent store instance
+ * @throws StoreException in case when unable to create store
*/
<V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> config) throws StoreException;
+
+ /**
+ * Override this method if store supports versioning and return versioning instance.
+ * By default, undefined version wrapper will be used.
+ *
+ * @param config store configuration
+ * @param <V> store value type
+ * @return versioned persistent store instance
+ * @throws StoreException in case when unable to create store
+ */
default <V> VersionedPersistentStore<V> getOrCreateVersionedStore(PersistentStoreConfig<V> config) throws StoreException {
- return new VersionedDelegatingStore<>(getOrCreateStore(config));
+ // for those stores that do not support versioning
+ return new UndefinedVersionDelegatingStore<>(getOrCreateStore(config));
}
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java
index d182de331..76e5610de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/DataChangeVersion.java
@@ -17,9 +17,17 @@
*/
package org.apache.drill.exec.store.sys.store;
+/**
+ * Holder for store version. By default version is {@link DataChangeVersion#UNDEFINED}.
+ */
public class DataChangeVersion {
- private int version;
+ // is used when store in unreachable
+ public static final int NOT_AVAILABLE = -1;
+ // is used when store does not support versioning
+ public static final int UNDEFINED = -2;
+
+ private int version = UNDEFINED;
public void setVersion(int version) {
this.version = version;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/UndefinedVersionDelegatingStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/UndefinedVersionDelegatingStore.java
new file mode 100644
index 000000000..5873ec0ac
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/UndefinedVersionDelegatingStore.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.sys.store;
+
+import org.apache.drill.exec.store.sys.PersistentStore;
+import org.apache.drill.exec.store.sys.PersistentStoreMode;
+import org.apache.drill.exec.store.sys.VersionedPersistentStore;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Wrapper store that delegates operations to PersistentStore.
+ * Does not keep versioning and returns {@link DataChangeVersion#UNDEFINED} when version is required.
+ *
+ * @param <V> store value type
+ */
+public class UndefinedVersionDelegatingStore<V> implements VersionedPersistentStore<V> {
+
+ private final PersistentStore<V> store;
+
+ public UndefinedVersionDelegatingStore(PersistentStore<V> store) {
+ this.store = store;
+ }
+
+ @Override
+ public boolean contains(String key, DataChangeVersion version) {
+ version.setVersion(DataChangeVersion.UNDEFINED);
+ return store.contains(key);
+ }
+
+ @Override
+ public V get(String key, DataChangeVersion version) {
+ version.setVersion(DataChangeVersion.UNDEFINED);
+ return store.get(key);
+ }
+
+ @Override
+ public void put(String key, V value, DataChangeVersion version) {
+ store.put(key, value);
+ }
+
+ @Override
+ public PersistentStoreMode getMode() {
+ return store.getMode();
+ }
+
+ @Override
+ public void delete(String key) {
+ store.delete(key);
+ }
+
+ @Override
+ public boolean putIfAbsent(String key, V value) {
+ return store.putIfAbsent(key, value);
+ }
+
+ @Override
+ public Iterator<Map.Entry<String, V>> getRange(int skip, int take) {
+ return store.getRange(skip, take);
+ }
+
+ @Override
+ public void close() throws Exception {
+ store.close();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java
index 18e0b8262..40576d55c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/VersionedDelegatingStore.java
@@ -30,22 +30,24 @@ import org.apache.drill.exec.store.sys.PersistentStoreMode;
import org.apache.drill.exec.store.sys.VersionedPersistentStore;
/**
- * Versioned Store that delegates operations to PersistentStore
- * @param <V>
+ * Versioned store that delegates operations to PersistentStore and keeps versioning,
+ * incrementing version each time write / delete operation is triggered.
+ * Once created initial version is 0. Can be used only for local versioning, not distributed.
+ *
+ * @param <V> store value type
*/
public class VersionedDelegatingStore<V> implements VersionedPersistentStore<V> {
private final PersistentStore<V> store;
- private final ReadWriteLock readWriteLock;
private final AutoCloseableLock readLock;
private final AutoCloseableLock writeLock;
private int version;
public VersionedDelegatingStore(PersistentStore<V> store) {
this.store = store;
- readWriteLock = new ReentrantReadWriteLock();
+ ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
readLock = new AutoCloseableLock(readWriteLock.readLock());
writeLock = new AutoCloseableLock(readWriteLock.writeLock());
- version = -1;
+ version = 0;
}
@Override
@@ -113,7 +115,7 @@ public class VersionedDelegatingStore<V> implements VersionedPersistentStore<V>
{
try (@SuppressWarnings("unused") Closeable lock = writeLock.open()) {
store.close();
- version = -1;
+ version = DataChangeVersion.NOT_AVAILABLE;
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/CachingPersistentStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/CachingPersistentStoreProvider.java
index aa6ee9d17..75cef2f47 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/CachingPersistentStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/CachingPersistentStoreProvider.java
@@ -17,21 +17,23 @@
*/
package org.apache.drill.exec.store.sys.store.provider;
+import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
-import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
+import org.apache.drill.exec.store.sys.VersionedPersistentStore;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.exec.exception.StoreException;
import org.apache.drill.exec.store.sys.PersistentStore;
import org.apache.drill.exec.store.sys.PersistentStoreConfig;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
+import org.apache.drill.common.util.function.CheckedFunction;
public class CachingPersistentStoreProvider extends BasePersistentStoreProvider {
-// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(CachingPersistentStoreProvider.class);
- private final ConcurrentMap<PersistentStoreConfig<?>, PersistentStore<?>> storeCache = Maps.newConcurrentMap();
+ private final Map<PersistentStoreConfig<?>, PersistentStore<?>> storeCache = new ConcurrentHashMap<>();
+ private final Map<PersistentStoreConfig<?>, VersionedPersistentStore<?>> versionedStoreCache = new ConcurrentHashMap<>();
private final PersistentStoreProvider provider;
public CachingPersistentStoreProvider(PersistentStoreProvider provider) {
@@ -41,21 +43,15 @@ public class CachingPersistentStoreProvider extends BasePersistentStoreProvider
@Override
@SuppressWarnings("unchecked")
public <V> PersistentStore<V> getOrCreateStore(final PersistentStoreConfig<V> config) throws StoreException {
- final PersistentStore<?> store = storeCache.get(config);
- if (store == null) {
- final PersistentStore<?> newStore = provider.getOrCreateStore(config);
- final PersistentStore<?> finalStore = storeCache.putIfAbsent(config, newStore);
- if (finalStore == null) {
- return (PersistentStore<V>)newStore;
- }
- try {
- newStore.close();
- } catch (Exception ex) {
- throw new StoreException(ex);
- }
- }
+ CheckedFunction<PersistentStoreConfig<?>, PersistentStore<?>, StoreException> function = provider::getOrCreateStore;
+ return (PersistentStore<V>) storeCache.computeIfAbsent(config, function);
+ }
- return (PersistentStore<V>) store;
+ @Override
+ @SuppressWarnings("unchecked")
+ public <V> VersionedPersistentStore<V> getOrCreateVersionedStore(PersistentStoreConfig<V> config) throws StoreException {
+ CheckedFunction<PersistentStoreConfig<?>, VersionedPersistentStore<?>, StoreException> function = provider::getOrCreateVersionedStore;
+ return (VersionedPersistentStore<V>) versionedStoreCache.computeIfAbsent(config, function);
}
@Override
@@ -65,12 +61,19 @@ public class CachingPersistentStoreProvider extends BasePersistentStoreProvider
@Override
public void close() throws Exception {
- final List<AutoCloseable> closeables = Lists.newArrayList();
- for (final AutoCloseable store : storeCache.values()) {
- closeables.add(store);
- }
- closeables.add(provider);
+ List<AutoCloseable> closeables = new ArrayList<>();
+
+ // add un-versioned stores
+ closeables.addAll(storeCache.values());
storeCache.clear();
+
+ // add versioned stores
+ closeables.addAll(versionedStoreCache.values());
+ versionedStoreCache.clear();
+
+ // add provider
+ closeables.add(provider);
+
AutoCloseables.close(closeables);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java
index 3ab85ec29..6a70df775 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/InMemoryStoreProvider.java
@@ -17,11 +17,12 @@
*/
package org.apache.drill.exec.store.sys.store.provider;
-import org.apache.drill.exec.exception.StoreException;
import org.apache.drill.exec.store.sys.PersistentStore;
import org.apache.drill.exec.store.sys.PersistentStoreConfig;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
+import org.apache.drill.exec.store.sys.VersionedPersistentStore;
import org.apache.drill.exec.store.sys.store.InMemoryStore;
+import org.apache.drill.exec.store.sys.store.VersionedDelegatingStore;
public class InMemoryStoreProvider implements PersistentStoreProvider {
@@ -35,10 +36,15 @@ public class InMemoryStoreProvider implements PersistentStoreProvider {
public void close() throws Exception { }
@Override
- public <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> config) throws StoreException {
+ public <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> config) {
return new InMemoryStore<>(capacity);
}
@Override
- public void start() throws Exception { }
+ public <V> VersionedPersistentStore<V> getOrCreateVersionedStore(PersistentStoreConfig<V> config) {
+ return new VersionedDelegatingStore<>(getOrCreateStore(config));
+ }
+
+ @Override
+ public void start() { }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/LocalPersistentStoreProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/LocalPersistentStoreProvider.java
index af6777111..2dae62def 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/LocalPersistentStoreProvider.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/store/provider/LocalPersistentStoreProvider.java
@@ -26,7 +26,9 @@ import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.sys.PersistentStore;
import org.apache.drill.exec.store.sys.PersistentStoreConfig;
import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
+import org.apache.drill.exec.store.sys.VersionedPersistentStore;
import org.apache.drill.exec.store.sys.store.LocalPersistentStore;
+import org.apache.drill.exec.store.sys.store.VersionedDelegatingStore;
import org.apache.drill.exec.testing.store.NoWriteLocalStore;
import org.apache.hadoop.fs.Path;
@@ -70,6 +72,10 @@ public class LocalPersistentStoreProvider extends BasePersistentStoreProvider {
}
}
+ @Override
+ public <V> VersionedPersistentStore<V> getOrCreateVersionedStore(PersistentStoreConfig<V> config) {
+ return new VersionedDelegatingStore<>(getOrCreateStore(config));
+ }
@Override
public void close() throws Exception {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/CheckedSupplier.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/CheckedSupplier.java
deleted file mode 100644
index b744ac8c0..000000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/CheckedSupplier.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.util;
-
-/**
- * The java standard library does not provide a lambda function interface for funtions that take no arguments,
- * but that throw an exception. So, we have to define our own here.
- * @param <T> The return type of the lambda function.
- * @param <E> The type of exception thrown by the lambda function.
- */
-@FunctionalInterface
-public interface CheckedSupplier<T, E extends Exception> {
- T get() throws E;
-}