aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/rpc
diff options
context:
space:
mode:
authorLaurent Goujon <laurent@dremio.com>2016-11-04 17:36:42 -0700
committerJinfeng Ni <jni@apache.org>2017-03-01 23:15:30 -0800
commit16aa0810c6b5ab7466b3b7eeaf8652b765da0f89 (patch)
tree7cd6c66111c48b3ac5ea0d2e6d94c8d4f756c1ac /exec/java-exec/src/main/java/org/apache/drill/exec/rpc
parentab60855bf390e8f01369760f019ee06eecf1959e (diff)
DRILL-4994: Add back JDBC prepared statement for older servers
When the JDBC client is connected to an older Drill server, it always attempted to use server-side prepared statement with no fallback. With this change, client will check server version and will fallback to the previous client-side prepared statement (which is still limited to only execute queries and does not provide metadata). close #613
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/rpc')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java45
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java18
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcUtils.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java3
4 files changed, 61 insertions, 21 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 96b56693b..847b7266a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -17,14 +17,6 @@
*/
package org.apache.drill.exec.rpc.user;
-import com.google.common.base.Strings;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.AbstractCheckedFuture;
-import com.google.common.util.concurrent.CheckedFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import io.netty.buffer.ByteBuf;
-
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -32,10 +24,12 @@ import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
-import io.netty.channel.socket.SocketChannel;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
import org.apache.drill.common.KerberosUtil;
-import org.apache.drill.common.config.DrillProperties;
import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.config.DrillProperties;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -60,29 +54,35 @@ import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
import org.apache.drill.exec.rpc.AbstractClientConnection;
import org.apache.drill.exec.rpc.Acks;
import org.apache.drill.exec.rpc.BasicClient;
-import org.apache.drill.exec.rpc.NonTransientRpcException;
-import org.apache.drill.exec.rpc.security.AuthenticationOutcomeListener;
import org.apache.drill.exec.rpc.DrillRpcFuture;
+import org.apache.drill.exec.rpc.NonTransientRpcException;
import org.apache.drill.exec.rpc.OutOfMemoryHandler;
import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.ResponseSender;
import org.apache.drill.exec.rpc.RpcConnectionHandler;
import org.apache.drill.exec.rpc.RpcException;
-
-import com.google.protobuf.MessageLite;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.security.AuthStringUtil;
+import org.apache.drill.exec.rpc.security.AuthenticationOutcomeListener;
import org.apache.drill.exec.rpc.security.AuthenticatorFactory;
-import org.apache.drill.exec.rpc.security.plain.PlainFactory;
import org.apache.drill.exec.rpc.security.ClientAuthenticatorProvider;
+import org.apache.drill.exec.rpc.security.plain.PlainFactory;
import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
+import com.google.common.base.Strings;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractCheckedFuture;
+import com.google.common.util.concurrent.CheckedFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import com.google.protobuf.MessageLite;
+import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
-import org.slf4j.Logger;
+import io.netty.channel.socket.SocketChannel;
public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnection,
UserToBitHandshake, BitToUserHandshake> {
@@ -91,9 +91,10 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
private final BufferAllocator allocator;
private final QueryResultHandler queryResultHandler = new QueryResultHandler();
private final String clientName;
+ private final boolean supportComplexTypes;
private RpcEndpointInfos serverInfos = null;
- private boolean supportComplexTypes = true;
+ private Set<RpcType> supportedMethods = null;
// these are used for authentication
private volatile List<String> serverAuthMechanisms = null;
@@ -117,6 +118,10 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
return serverInfos;
}
+ public Set<RpcType> getSupportedMethods() {
+ return supportedMethods;
+ }
+
public void submitQuery(UserResultsListener resultsListener, RunQuery query) {
send(queryResultHandler.getWrappedListener(resultsListener), RpcType.RUN_QUERY, query, QueryId.class);
}
@@ -346,6 +351,8 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
if (inbound.hasServerInfos()) {
serverInfos = inbound.getServerInfos();
}
+ supportedMethods = Sets.immutableEnumSet(inbound.getSupportedMethodsList());
+
switch (inbound.getStatus()) {
case SUCCESS:
break;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
index 645ded5c7..ecf15ddf5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.rpc.user;
+import java.util.Set;
import java.util.concurrent.Executor;
import org.apache.drill.common.config.DrillConfig;
@@ -29,8 +30,8 @@ import org.apache.drill.exec.proto.UserBitShared.SaslMessage;
import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementReq;
import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementResp;
-import org.apache.drill.exec.proto.UserProtos.GetCatalogsResp;
import org.apache.drill.exec.proto.UserProtos.GetCatalogsReq;
+import org.apache.drill.exec.proto.UserProtos.GetCatalogsResp;
import org.apache.drill.exec.proto.UserProtos.GetColumnsReq;
import org.apache.drill.exec.proto.UserProtos.GetColumnsResp;
import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
@@ -44,6 +45,9 @@ import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
import org.apache.drill.exec.rpc.RpcConfig;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
public class UserRpcConfig {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserRpcConfig.class);
@@ -75,4 +79,16 @@ public class UserRpcConfig {
// prevent instantiation
private UserRpcConfig() {
}
+
+ /**
+ * Contains the list of methods supported by the server (from user to bit)
+ */
+ public static final Set<RpcType> SUPPORTED_SERVER_METHODS = Sets.immutableEnumSet(
+ ImmutableSet
+ .<RpcType> builder()
+ .add(RpcType.RUN_QUERY, RpcType.CANCEL_QUERY, RpcType.GET_QUERY_PLAN_FRAGMENTS, RpcType.RESUME_PAUSED_QUERY,
+ RpcType.GET_CATALOGS, RpcType.GET_SCHEMAS, RpcType.GET_TABLES, RpcType.GET_COLUMNS,
+ RpcType.CREATE_PREPARED_STATEMENT)
+ .build()
+ );
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcUtils.java
index e7e9ffdc6..c513d1190 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcUtils.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcUtils.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.rpc.user;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
+import org.apache.drill.common.Version;
import org.apache.drill.common.util.DrillVersionInfo;
import org.apache.drill.exec.proto.UserProtos.RpcEndpointInfos;
@@ -52,8 +53,23 @@ public final class UserRpcUtils {
.setMajorVersion(DrillVersionInfo.getMajorVersion())
.setMinorVersion(DrillVersionInfo.getMinorVersion())
.setPatchVersion(DrillVersionInfo.getPatchVersion())
+ .setBuildNumber(DrillVersionInfo.getBuildNumber())
+ .setVersionQualifier(DrillVersionInfo.getQualifier())
.build();
return infos;
}
+
+ /**
+ * Get the version from a {@code RpcEndpointInfos} instance
+ */
+ public static Version getVersion(RpcEndpointInfos infos) {
+ return new Version(
+ infos.getVersion(),
+ infos.getMajorVersion(),
+ infos.getMinorVersion(),
+ infos.getPatchVersion(),
+ infos.getBuildNumber(),
+ infos.getVersionQualifier());
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 6854a3ec5..e917b3eb1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -275,7 +275,8 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
BitToUserHandshake.Builder respBuilder = BitToUserHandshake.newBuilder()
.setRpcVersion(UserRpcConfig.RPC_VERSION)
- .setServerInfos(UserRpcUtils.getRpcEndpointInfos(SERVER_NAME));
+ .setServerInfos(UserRpcUtils.getRpcEndpointInfos(SERVER_NAME))
+ .addAllSupportedMethods(UserRpcConfig.SUPPORTED_SERVER_METHODS);
try {
if (inbound.getRpcVersion() != UserRpcConfig.RPC_VERSION) {