diff options
author | Laurent Goujon <laurent@dremio.com> | 2016-11-04 17:36:42 -0700 |
---|---|---|
committer | Jinfeng Ni <jni@apache.org> | 2017-03-01 23:15:30 -0800 |
commit | 16aa0810c6b5ab7466b3b7eeaf8652b765da0f89 (patch) | |
tree | 7cd6c66111c48b3ac5ea0d2e6d94c8d4f756c1ac /exec/java-exec/src/main/java/org/apache/drill/exec/rpc | |
parent | ab60855bf390e8f01369760f019ee06eecf1959e (diff) |
DRILL-4994: Add back JDBC prepared statement for older servers
When the JDBC client is connected to an older Drill server, it always
attempted to use server-side prepared statement with no fallback.
With this change, client will check server version and will fallback to the
previous client-side prepared statement (which is still limited to only execute
queries and does not provide metadata).
close #613
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/rpc')
4 files changed, 61 insertions, 21 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java index 96b56693b..847b7266a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java @@ -17,14 +17,6 @@ */ package org.apache.drill.exec.rpc.user; -import com.google.common.base.Strings; -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.util.concurrent.AbstractCheckedFuture; -import com.google.common.util.concurrent.CheckedFuture; -import com.google.common.util.concurrent.SettableFuture; -import io.netty.buffer.ByteBuf; - import java.io.IOException; import java.util.List; import java.util.Map; @@ -32,10 +24,12 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; -import io.netty.channel.socket.SocketChannel; +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; + import org.apache.drill.common.KerberosUtil; -import org.apache.drill.common.config.DrillProperties; import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.config.DrillProperties; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; @@ -60,29 +54,35 @@ import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake; import org.apache.drill.exec.rpc.AbstractClientConnection; import org.apache.drill.exec.rpc.Acks; import org.apache.drill.exec.rpc.BasicClient; -import org.apache.drill.exec.rpc.NonTransientRpcException; -import org.apache.drill.exec.rpc.security.AuthenticationOutcomeListener; import org.apache.drill.exec.rpc.DrillRpcFuture; +import org.apache.drill.exec.rpc.NonTransientRpcException; import org.apache.drill.exec.rpc.OutOfMemoryHandler; import org.apache.drill.exec.rpc.ProtobufLengthDecoder; import org.apache.drill.exec.rpc.Response; import org.apache.drill.exec.rpc.ResponseSender; import org.apache.drill.exec.rpc.RpcConnectionHandler; import org.apache.drill.exec.rpc.RpcException; - -import com.google.protobuf.MessageLite; import org.apache.drill.exec.rpc.RpcOutcomeListener; import org.apache.drill.exec.rpc.security.AuthStringUtil; +import org.apache.drill.exec.rpc.security.AuthenticationOutcomeListener; import org.apache.drill.exec.rpc.security.AuthenticatorFactory; -import org.apache.drill.exec.rpc.security.plain.PlainFactory; import org.apache.drill.exec.rpc.security.ClientAuthenticatorProvider; +import org.apache.drill.exec.rpc.security.plain.PlainFactory; import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; +import com.google.common.base.Strings; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.AbstractCheckedFuture; +import com.google.common.util.concurrent.CheckedFuture; +import com.google.common.util.concurrent.SettableFuture; +import com.google.protobuf.MessageLite; +import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoopGroup; -import org.slf4j.Logger; +import io.netty.channel.socket.SocketChannel; public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnection, UserToBitHandshake, BitToUserHandshake> { @@ -91,9 +91,10 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect private final BufferAllocator allocator; private final QueryResultHandler queryResultHandler = new QueryResultHandler(); private final String clientName; + private final boolean supportComplexTypes; private RpcEndpointInfos serverInfos = null; - private boolean supportComplexTypes = true; + private Set<RpcType> supportedMethods = null; // these are used for authentication private volatile List<String> serverAuthMechanisms = null; @@ -117,6 +118,10 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect return serverInfos; } + public Set<RpcType> getSupportedMethods() { + return supportedMethods; + } + public void submitQuery(UserResultsListener resultsListener, RunQuery query) { send(queryResultHandler.getWrappedListener(resultsListener), RpcType.RUN_QUERY, query, QueryId.class); } @@ -346,6 +351,8 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect if (inbound.hasServerInfos()) { serverInfos = inbound.getServerInfos(); } + supportedMethods = Sets.immutableEnumSet(inbound.getSupportedMethodsList()); + switch (inbound.getStatus()) { case SUCCESS: break; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java index 645ded5c7..ecf15ddf5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.rpc.user; +import java.util.Set; import java.util.concurrent.Executor; import org.apache.drill.common.config.DrillConfig; @@ -29,8 +30,8 @@ import org.apache.drill.exec.proto.UserBitShared.SaslMessage; import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake; import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementReq; import org.apache.drill.exec.proto.UserProtos.CreatePreparedStatementResp; -import org.apache.drill.exec.proto.UserProtos.GetCatalogsResp; import org.apache.drill.exec.proto.UserProtos.GetCatalogsReq; +import org.apache.drill.exec.proto.UserProtos.GetCatalogsResp; import org.apache.drill.exec.proto.UserProtos.GetColumnsReq; import org.apache.drill.exec.proto.UserProtos.GetColumnsResp; import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments; @@ -44,6 +45,9 @@ import org.apache.drill.exec.proto.UserProtos.RunQuery; import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake; import org.apache.drill.exec.rpc.RpcConfig; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + public class UserRpcConfig { // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserRpcConfig.class); @@ -75,4 +79,16 @@ public class UserRpcConfig { // prevent instantiation private UserRpcConfig() { } + + /** + * Contains the list of methods supported by the server (from user to bit) + */ + public static final Set<RpcType> SUPPORTED_SERVER_METHODS = Sets.immutableEnumSet( + ImmutableSet + .<RpcType> builder() + .add(RpcType.RUN_QUERY, RpcType.CANCEL_QUERY, RpcType.GET_QUERY_PLAN_FRAGMENTS, RpcType.RESUME_PAUSED_QUERY, + RpcType.GET_CATALOGS, RpcType.GET_SCHEMAS, RpcType.GET_TABLES, RpcType.GET_COLUMNS, + RpcType.CREATE_PREPARED_STATEMENT) + .build() + ); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcUtils.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcUtils.java index e7e9ffdc6..c513d1190 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcUtils.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcUtils.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.rpc.user; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; +import org.apache.drill.common.Version; import org.apache.drill.common.util.DrillVersionInfo; import org.apache.drill.exec.proto.UserProtos.RpcEndpointInfos; @@ -52,8 +53,23 @@ public final class UserRpcUtils { .setMajorVersion(DrillVersionInfo.getMajorVersion()) .setMinorVersion(DrillVersionInfo.getMinorVersion()) .setPatchVersion(DrillVersionInfo.getPatchVersion()) + .setBuildNumber(DrillVersionInfo.getBuildNumber()) + .setVersionQualifier(DrillVersionInfo.getQualifier()) .build(); return infos; } + + /** + * Get the version from a {@code RpcEndpointInfos} instance + */ + public static Version getVersion(RpcEndpointInfos infos) { + return new Version( + infos.getVersion(), + infos.getMajorVersion(), + infos.getMinorVersion(), + infos.getPatchVersion(), + infos.getBuildNumber(), + infos.getVersionQualifier()); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java index 6854a3ec5..e917b3eb1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java @@ -275,7 +275,8 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> { BitToUserHandshake.Builder respBuilder = BitToUserHandshake.newBuilder() .setRpcVersion(UserRpcConfig.RPC_VERSION) - .setServerInfos(UserRpcUtils.getRpcEndpointInfos(SERVER_NAME)); + .setServerInfos(UserRpcUtils.getRpcEndpointInfos(SERVER_NAME)) + .addAllSupportedMethods(UserRpcConfig.SUPPORTED_SERVER_METHODS); try { if (inbound.getRpcVersion() != UserRpcConfig.RPC_VERSION) { |