aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/rpc
diff options
context:
space:
mode:
authorParth Chandra <parthc@apache.org>2017-06-09 22:03:59 -0700
committerParth Chandra <parthc@apache.org>2017-10-11 19:26:13 -0700
commit552d7d825e37d42835bd6bfccfc07fc7d3b5fd94 (patch)
tree669a74d1646c97522969f8acf8c4ed2baff8a68f /exec/java-exec/src/main/java/org/apache/drill/exec/rpc
parentb803405c0f978beb8beaf77211f99731014ac92f (diff)
DRILL-5431: SSL Support (Java) - Java client server SSL implementation
Also enable OpenSSL support Also fix exclusions and java-exec pom file to eliminate netty-tcnative as a transitive dependency on all projects
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/rpc')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java356
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java49
3 files changed, 261 insertions, 157 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 2f4753857..99614bdfa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -20,16 +20,25 @@ package org.apache.drill.exec.rpc.user;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.net.ssl.SSLEngine;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.ssl.SslHandler;
import org.apache.drill.common.KerberosUtil;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.exec.client.InvalidConnectionInfoException;
+import org.apache.drill.exec.ssl.SSLConfig;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -55,6 +64,7 @@ import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
import org.apache.drill.exec.rpc.AbstractClientConnection;
import org.apache.drill.exec.rpc.Acks;
import org.apache.drill.exec.rpc.BasicClient;
+import org.apache.drill.exec.rpc.ConnectionMultiListener;
import org.apache.drill.exec.rpc.DrillRpcFuture;
import org.apache.drill.exec.rpc.NonTransientRpcException;
import org.apache.drill.exec.rpc.OutOfMemoryHandler;
@@ -62,6 +72,7 @@ import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.ResponseSender;
import org.apache.drill.exec.rpc.RpcConnectionHandler;
+import org.apache.drill.exec.rpc.RpcConstants;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.security.AuthStringUtil;
@@ -70,7 +81,9 @@ import org.apache.drill.exec.rpc.security.AuthenticatorFactory;
import org.apache.drill.exec.rpc.security.ClientAuthenticatorProvider;
import org.apache.drill.exec.rpc.security.plain.PlainFactory;
import org.apache.drill.exec.rpc.security.SaslProperties;
+import org.apache.drill.exec.ssl.SSLConfigBuilder;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.ssl.SSLFactory;
import org.slf4j.Logger;
import com.google.common.base.Strings;
@@ -87,8 +100,8 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
-public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnection,
- UserToBitHandshake, BitToUserHandshake> {
+public class UserClient
+ extends BasicClient<RpcType, UserClient.UserToBitConnection, UserToBitHandshake, BitToUserHandshake> {
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(UserClient.class);
private final BufferAllocator allocator;
@@ -102,19 +115,47 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
// these are used for authentication
private volatile List<String> serverAuthMechanisms = null;
private volatile boolean authComplete = true;
-
- public UserClient(String clientName, DrillConfig config, boolean supportComplexTypes,
- BufferAllocator allocator, EventLoopGroup eventLoopGroup, Executor eventExecutor) {
- super(
- UserRpcConfig.getMapping(config, eventExecutor),
- allocator.getAsByteBufAllocator(),
- eventLoopGroup,
- RpcType.HANDSHAKE,
- BitToUserHandshake.class,
- BitToUserHandshake.PARSER);
+ private SSLConfig sslConfig;
+ private DrillbitEndpoint endpoint;
+
+ public UserClient(String clientName, DrillConfig config, Properties properties, boolean supportComplexTypes,
+ BufferAllocator allocator, EventLoopGroup eventLoopGroup, Executor eventExecutor,
+ DrillbitEndpoint endpoint) throws NonTransientRpcException {
+ super(UserRpcConfig.getMapping(config, eventExecutor), allocator.getAsByteBufAllocator(),
+ eventLoopGroup, RpcType.HANDSHAKE, BitToUserHandshake.class, BitToUserHandshake.PARSER);
+ this.endpoint = endpoint; // save the endpoint; it might be needed by SSL init.
this.clientName = clientName;
this.allocator = allocator;
this.supportComplexTypes = supportComplexTypes;
+ try {
+ this.sslConfig = new SSLConfigBuilder().properties(properties).mode(SSLFactory.Mode.CLIENT)
+ .initializeSSLContext(true).validateKeyStore(false).build();
+ } catch (DrillException e) {
+ throw new InvalidConnectionInfoException(e.getMessage());
+ }
+
+ }
+
+ @Override protected void setupSSL(ChannelPipeline pipe,
+ ConnectionMultiListener.SSLHandshakeListener sslHandshakeListener) {
+
+ String peerHost = endpoint.getAddress();
+ int peerPort = endpoint.getUserPort();
+ SSLEngine sslEngine = sslConfig.createSSLEngine(allocator, peerHost, peerPort);
+
+ // Add SSL handler into pipeline
+ SslHandler sslHandler = new SslHandler(sslEngine);
+ sslHandler.setHandshakeTimeoutMillis(sslConfig.getHandshakeTimeout());
+
+ // Add a listener for SSL Handshake complete. The Drill client handshake will be enabled only
+ // after this is done.
+ sslHandler.handshakeFuture().addListener(sslHandshakeListener);
+ pipe.addFirst(RpcConstants.SSL_HANDLER, sslHandler);
+ logger.debug(sslConfig.toString());
+ }
+
+ @Override protected boolean isSslEnabled() {
+ return sslConfig.isUserSslEnabled();
}
public RpcEndpointInfos getServerInfos() {
@@ -132,38 +173,51 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
/**
* Connects, and if required, authenticates. This method blocks until both operations are complete.
*
- * @param endpoint endpoint to connect to
- * @param properties properties
+ * @param endpoint endpoint to connect to
+ * @param properties properties
* @param credentials credentials
* @throws RpcException if either connection or authentication fails
*/
public void connect(final DrillbitEndpoint endpoint, final DrillProperties properties,
- final UserCredentials credentials) throws RpcException {
- final UserToBitHandshake.Builder hsBuilder = UserToBitHandshake.newBuilder()
- .setRpcVersion(UserRpcConfig.RPC_VERSION)
- .setSupportListening(true)
- .setSupportComplexTypes(supportComplexTypes)
- .setSupportTimeout(true)
- .setCredentials(credentials)
- .setClientInfos(UserRpcUtils.getRpcEndpointInfos(clientName))
- .setSaslSupport(SaslSupport.SASL_PRIVACY)
- .setProperties(properties.serializeForServer());
+ final UserCredentials credentials) throws RpcException {
+ final UserToBitHandshake.Builder hsBuilder =
+ UserToBitHandshake.newBuilder()
+ .setRpcVersion(UserRpcConfig.RPC_VERSION)
+ .setSupportListening(true)
+ .setSupportComplexTypes(supportComplexTypes)
+ .setSupportTimeout(true).setCredentials(credentials)
+ .setClientInfos(UserRpcUtils.getRpcEndpointInfos(clientName))
+ .setSaslSupport(SaslSupport.SASL_PRIVACY)
+ .setProperties(properties.serializeForServer());
// Only used for testing purpose
if (properties.containsKey(DrillProperties.TEST_SASL_LEVEL)) {
- hsBuilder.setSaslSupport(SaslSupport.valueOf(
- Integer.parseInt(properties.getProperty(DrillProperties.TEST_SASL_LEVEL))));
+ hsBuilder.setSaslSupport(
+ SaslSupport.valueOf(Integer.parseInt(properties.getProperty(DrillProperties.TEST_SASL_LEVEL))));
}
- connect(hsBuilder.build(), endpoint).checkedGet();
+ if (sslConfig.isUserSslEnabled()) {
+ try {
+ connect(hsBuilder.build(), endpoint)
+ .checkedGet(sslConfig.getHandshakeTimeout(), TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ String msg = new StringBuilder().append(
+ "Connecting to the server timed out. This is sometimes due to a mismatch in the SSL configuration between client and server. [ Exception: ")
+ .append(e.getMessage()).append("]").toString();
+ throw new NonTransientRpcException(msg);
+ }
+ } else {
+ connect(hsBuilder.build(), endpoint).checkedGet();
+ }
// Check if client needs encryption and server is not configured for encryption.
- final boolean clientNeedsEncryption = properties.containsKey(DrillProperties.SASL_ENCRYPT)
- && Boolean.parseBoolean(properties.getProperty(DrillProperties.SASL_ENCRYPT));
+ final boolean clientNeedsEncryption = properties.containsKey(DrillProperties.SASL_ENCRYPT) && Boolean
+ .parseBoolean(properties.getProperty(DrillProperties.SASL_ENCRYPT));
- if(clientNeedsEncryption && !connection.isEncryptionEnabled()) {
- throw new NonTransientRpcException("Client needs encrypted connection but server is not configured for " +
- "encryption. Please check connection parameter or contact your administrator");
+ if (clientNeedsEncryption && !connection.isEncryptionEnabled()) {
+ throw new NonTransientRpcException(
+ "Client needs encrypted connection but server is not configured for "
+ + "encryption. Please check connection parameter or contact your administrator");
}
if (serverAuthMechanisms != null) {
@@ -176,31 +230,28 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
}
private CheckedFuture<Void, RpcException> connect(final UserToBitHandshake handshake,
- final DrillbitEndpoint endpoint) {
+ final DrillbitEndpoint endpoint) {
final SettableFuture<Void> connectionSettable = SettableFuture.create();
final CheckedFuture<Void, RpcException> connectionFuture =
new AbstractCheckedFuture<Void, RpcException>(connectionSettable) {
- @Override
- protected RpcException mapException(Exception e) {
+ @Override protected RpcException mapException(Exception e) {
return RpcException.mapException(e);
}
};
final RpcConnectionHandler<UserToBitConnection> connectionHandler =
new RpcConnectionHandler<UserToBitConnection>() {
- @Override
- public void connectionSucceeded(UserToBitConnection connection) {
+ @Override public void connectionSucceeded(UserToBitConnection connection) {
connectionSettable.set(null);
}
- @Override
- public void connectionFailed(FailureType type, Throwable t) {
- connectionSettable.setException(new RpcException(String.format("%s : %s",
- type.name(), t.getMessage()), t));
+ @Override public void connectionFailed(FailureType type, Throwable t) {
+ connectionSettable
+ .setException(new RpcException(String.format("%s : %s", type.name(), t.getMessage()), t));
}
};
- connectAsClient(queryResultHandler.getWrappedConnectionHandler(connectionHandler),
- handshake, endpoint.getAddress(), endpoint.getUserPort());
+ connectAsClient(queryResultHandler.getWrappedConnectionHandler(connectionHandler), handshake,
+ endpoint.getAddress(), endpoint.getUserPort());
return connectionFuture;
}
@@ -211,15 +262,15 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
// Set correct QOP property and Strength based on server needs encryption or not.
// If ChunkMode is enabled then negotiate for buffer size equal to wrapChunkSize,
// If ChunkMode is disabled then negotiate for MAX_WRAPPED_SIZE buffer size.
- propertiesMap.putAll(SaslProperties.getSaslProperties(connection.isEncryptionEnabled(),
- connection.getMaxWrappedSize()));
+ propertiesMap.putAll(
+ SaslProperties.getSaslProperties(connection.isEncryptionEnabled(), connection.getMaxWrappedSize()));
- final SettableFuture<Void> authSettable = SettableFuture.create(); // use handleAuthFailure to setException
+ final SettableFuture<Void> authSettable =
+ SettableFuture.create(); // use handleAuthFailure to setException
final CheckedFuture<Void, SaslException> authFuture =
new AbstractCheckedFuture<Void, SaslException>(authSettable) {
- @Override
- protected SaslException mapException(Exception e) {
+ @Override protected SaslException mapException(Exception e) {
if (e instanceof ExecutionException) {
final Throwable cause = Throwables.getRootCause(e);
if (cause instanceof SaslException) {
@@ -227,8 +278,9 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
connection.getEncryptionCtxtString(), cause.getMessage()), cause);
}
}
- return new SaslException(String.format("Authentication failed unexpectedly. [Details: %s, Error %s]",
- connection.getEncryptionCtxtString(), e.getMessage()), e);
+ return new SaslException(String
+ .format("Authentication failed unexpectedly. [Details: %s, Error %s]",
+ connection.getEncryptionCtxtString(), e.getMessage()), e);
}
};
@@ -244,8 +296,10 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
ugi = factory.createAndLoginUser(propertiesMap);
saslClient = factory.createSaslClient(ugi, propertiesMap);
if (saslClient == null) {
- throw new SaslException(String.format("Cannot initiate authentication using %s mechanism. Insufficient " +
- "credentials or selected mechanism doesn't support configured security layers?", factory.getSimpleName()));
+ throw new SaslException(String.format(
+ "Cannot initiate authentication using %s mechanism. Insufficient "
+ + "credentials or selected mechanism doesn't support configured security layers?",
+ factory.getSimpleName()));
}
connection.setSaslClient(saslClient);
} catch (final IOException e) {
@@ -256,26 +310,24 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
logger.trace("Initiating SASL exchange.");
new AuthenticationOutcomeListener<>(this, connection, RpcType.SASL_MESSAGE, ugi,
new RpcOutcomeListener<Void>() {
- @Override
- public void failed(RpcException ex) {
+ @Override public void failed(RpcException ex) {
authSettable.setException(ex);
}
- @Override
- public void success(Void value, ByteBuf buffer) {
+ @Override public void success(Void value, ByteBuf buffer) {
authComplete = true;
authSettable.set(null);
}
- @Override
- public void interrupted(InterruptedException e) {
+ @Override public void interrupted(InterruptedException e) {
authSettable.setException(e);
}
}).initiate(mechanismName);
return authFuture;
}
- private AuthenticatorFactory getAuthenticatorFactory(final DrillProperties properties) throws SaslException {
+ private AuthenticatorFactory getAuthenticatorFactory(final DrillProperties properties)
+ throws SaslException {
final Set<String> mechanismSet = AuthStringUtil.asSet(serverAuthMechanisms);
// first, check if a certain mechanism must be used
@@ -285,129 +337,126 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
throw new SaslException(String.format("Unknown mechanism: %s", authMechanism));
}
if (!mechanismSet.contains(authMechanism.toUpperCase())) {
- throw new SaslException(String.format("Server does not support authentication using: %s. [Details: %s]",
- authMechanism, connection.getEncryptionCtxtString()));
+ throw new SaslException(String
+ .format("Server does not support authentication using: %s. [Details: %s]", authMechanism,
+ connection.getEncryptionCtxtString()));
}
- return ClientAuthenticatorProvider.getInstance()
- .getAuthenticatorFactory(authMechanism);
+ return ClientAuthenticatorProvider.getInstance().getAuthenticatorFactory(authMechanism);
}
// check if Kerberos is supported, and the service principal is provided
- if (mechanismSet.contains(KerberosUtil.KERBEROS_SIMPLE_NAME) &&
- properties.containsKey(DrillProperties.SERVICE_PRINCIPAL)) {
+ if (mechanismSet.contains(KerberosUtil.KERBEROS_SIMPLE_NAME) && properties
+ .containsKey(DrillProperties.SERVICE_PRINCIPAL)) {
return ClientAuthenticatorProvider.getInstance()
.getAuthenticatorFactory(KerberosUtil.KERBEROS_SIMPLE_NAME);
}
// check if username/password is supported, and username/password are provided
- if (mechanismSet.contains(PlainFactory.SIMPLE_NAME) &&
- properties.containsKey(DrillProperties.USER) &&
- !Strings.isNullOrEmpty(properties.getProperty(DrillProperties.PASSWORD))) {
- return ClientAuthenticatorProvider.getInstance()
- .getAuthenticatorFactory(PlainFactory.SIMPLE_NAME);
+ if (mechanismSet.contains(PlainFactory.SIMPLE_NAME) && properties.containsKey(DrillProperties.USER)
+ && !Strings.isNullOrEmpty(properties.getProperty(DrillProperties.PASSWORD))) {
+ return ClientAuthenticatorProvider.getInstance().getAuthenticatorFactory(PlainFactory.SIMPLE_NAME);
}
- throw new SaslException(String.format("Server requires authentication using %s. Insufficient credentials?. " +
- "[Details: %s]. ", serverAuthMechanisms, connection.getEncryptionCtxtString()));
+ throw new SaslException(String
+ .format("Server requires authentication using %s. Insufficient credentials?. " + "[Details: %s]. ",
+ serverAuthMechanisms, connection.getEncryptionCtxtString()));
}
- protected <SEND extends MessageLite, RECEIVE extends MessageLite>
- void send(RpcOutcomeListener<RECEIVE> listener, RpcType rpcType, SEND protobufBody, Class<RECEIVE> clazz,
- boolean allowInEventLoop, ByteBuf... dataBodies) {
+ protected <SEND extends MessageLite, RECEIVE extends MessageLite> void send(
+ RpcOutcomeListener<RECEIVE> listener, RpcType rpcType, SEND protobufBody, Class<RECEIVE> clazz,
+ boolean allowInEventLoop, ByteBuf... dataBodies) {
super.send(listener, connection, rpcType, protobufBody, clazz, allowInEventLoop, dataBodies);
}
- @Override
- protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
+ @Override protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
switch (rpcType) {
- case RpcType.ACK_VALUE:
- return Ack.getDefaultInstance();
- case RpcType.HANDSHAKE_VALUE:
- return BitToUserHandshake.getDefaultInstance();
- case RpcType.QUERY_HANDLE_VALUE:
- return QueryId.getDefaultInstance();
- case RpcType.QUERY_RESULT_VALUE:
- return QueryResult.getDefaultInstance();
- case RpcType.QUERY_DATA_VALUE:
- return QueryData.getDefaultInstance();
- case RpcType.QUERY_PLAN_FRAGMENTS_VALUE:
- return QueryPlanFragments.getDefaultInstance();
- case RpcType.CATALOGS_VALUE:
- return GetCatalogsResp.getDefaultInstance();
- case RpcType.SCHEMAS_VALUE:
- return GetSchemasResp.getDefaultInstance();
- case RpcType.TABLES_VALUE:
- return GetTablesResp.getDefaultInstance();
- case RpcType.COLUMNS_VALUE:
- return GetColumnsResp.getDefaultInstance();
- case RpcType.PREPARED_STATEMENT_VALUE:
- return CreatePreparedStatementResp.getDefaultInstance();
- case RpcType.SASL_MESSAGE_VALUE:
- return SaslMessage.getDefaultInstance();
- case RpcType.SERVER_META_VALUE:
- return GetServerMetaResp.getDefaultInstance();
+ case RpcType.ACK_VALUE:
+ return Ack.getDefaultInstance();
+ case RpcType.HANDSHAKE_VALUE:
+ return BitToUserHandshake.getDefaultInstance();
+ case RpcType.QUERY_HANDLE_VALUE:
+ return QueryId.getDefaultInstance();
+ case RpcType.QUERY_RESULT_VALUE:
+ return QueryResult.getDefaultInstance();
+ case RpcType.QUERY_DATA_VALUE:
+ return QueryData.getDefaultInstance();
+ case RpcType.QUERY_PLAN_FRAGMENTS_VALUE:
+ return QueryPlanFragments.getDefaultInstance();
+ case RpcType.CATALOGS_VALUE:
+ return GetCatalogsResp.getDefaultInstance();
+ case RpcType.SCHEMAS_VALUE:
+ return GetSchemasResp.getDefaultInstance();
+ case RpcType.TABLES_VALUE:
+ return GetTablesResp.getDefaultInstance();
+ case RpcType.COLUMNS_VALUE:
+ return GetColumnsResp.getDefaultInstance();
+ case RpcType.PREPARED_STATEMENT_VALUE:
+ return CreatePreparedStatementResp.getDefaultInstance();
+ case RpcType.SASL_MESSAGE_VALUE:
+ return SaslMessage.getDefaultInstance();
+ case RpcType.SERVER_META_VALUE:
+ return GetServerMetaResp.getDefaultInstance();
}
throw new RpcException(String.format("Unable to deal with RpcType of %d", rpcType));
}
- @Override
- protected void handle(UserToBitConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody,
- ResponseSender sender) throws RpcException {
+ @Override protected void handle(UserToBitConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody,
+ ResponseSender sender) throws RpcException {
if (!authComplete) {
// Remote should not be making any requests before authenticating, drop connection
- throw new RpcException(String.format("Request of type %d is not allowed without authentication. " +
- "Remote on %s must authenticate before making requests. Connection dropped.",
- rpcType, connection.getRemoteAddress()));
+ throw new RpcException(String.format("Request of type %d is not allowed without authentication. "
+ + "Remote on %s must authenticate before making requests. Connection dropped.", rpcType,
+ connection.getRemoteAddress()));
}
switch (rpcType) {
- case RpcType.QUERY_DATA_VALUE:
- queryResultHandler.batchArrived(connection, pBody, dBody);
- sender.send(new Response(RpcType.ACK, Acks.OK));
- break;
- case RpcType.QUERY_RESULT_VALUE:
- queryResultHandler.resultArrived(pBody);
- sender.send(new Response(RpcType.ACK, Acks.OK));
- break;
- default:
- throw new RpcException(String.format("Unknown Rpc Type %d. ", rpcType));
+ case RpcType.QUERY_DATA_VALUE:
+ queryResultHandler.batchArrived(connection, pBody, dBody);
+ sender.send(new Response(RpcType.ACK, Acks.OK));
+ break;
+ case RpcType.QUERY_RESULT_VALUE:
+ queryResultHandler.resultArrived(pBody);
+ sender.send(new Response(RpcType.ACK, Acks.OK));
+ break;
+ default:
+ throw new RpcException(String.format("Unknown Rpc Type %d. ", rpcType));
}
}
- @Override
- protected void validateHandshake(BitToUserHandshake inbound) throws RpcException {
-// logger.debug("Handling handshake from bit to user. {}", inbound);
+ @Override protected void validateHandshake(BitToUserHandshake inbound) throws RpcException {
+ // logger.debug("Handling handshake from bit to user. {}", inbound);
if (inbound.hasServerInfos()) {
serverInfos = inbound.getServerInfos();
}
supportedMethods = Sets.immutableEnumSet(inbound.getSupportedMethodsList());
switch (inbound.getStatus()) {
- case SUCCESS:
- break;
- case AUTH_REQUIRED: {
- authComplete = false;
- serverAuthMechanisms = ImmutableList.copyOf(inbound.getAuthenticationMechanismsList());
- connection.setEncryption(inbound.hasEncrypted() && inbound.getEncrypted());
-
- if (inbound.hasMaxWrappedSize()) {
- connection.setMaxWrappedSize(inbound.getMaxWrappedSize());
+ case SUCCESS:
+ break;
+ case AUTH_REQUIRED: {
+ authComplete = false;
+ serverAuthMechanisms = ImmutableList.copyOf(inbound.getAuthenticationMechanismsList());
+ connection.setEncryption(inbound.hasEncrypted() && inbound.getEncrypted());
+
+ if (inbound.hasMaxWrappedSize()) {
+ connection.setMaxWrappedSize(inbound.getMaxWrappedSize());
+ }
+ logger.trace(String
+ .format("Server requires authentication with encryption context %s before proceeding.",
+ connection.getEncryptionCtxtString()));
+ break;
}
- logger.trace(String.format("Server requires authentication with encryption context %s before proceeding.",
- connection.getEncryptionCtxtString()));
- break;
- }
- case AUTH_FAILED:
- case RPC_VERSION_MISMATCH:
- case UNKNOWN_FAILURE:
- final String errMsg = String.format("Status: %s, Error Id: %s, Error message: %s",
- inbound.getStatus(), inbound.getErrorId(), inbound.getErrorMessage());
- logger.error(errMsg);
- throw new NonTransientRpcException(errMsg);
+ case AUTH_FAILED:
+ case RPC_VERSION_MISMATCH:
+ case UNKNOWN_FAILURE:
+ final String errMsg = String
+ .format("Status: %s, Error Id: %s, Error message: %s", inbound.getStatus(),
+ inbound.getErrorId(), inbound.getErrorMessage());
+ logger.error(errMsg);
+ throw new NonTransientRpcException(errMsg);
}
}
- @Override
- protected UserToBitConnection initRemoteConnection(SocketChannel channel) {
+ @Override protected UserToBitConnection initRemoteConnection(SocketChannel channel) {
super.initRemoteConnection(channel);
return new UserToBitConnection(channel);
}
@@ -421,39 +470,34 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
super(channel, "user client");
}
- @Override
- public BufferAllocator getAllocator() {
+ @Override public BufferAllocator getAllocator() {
return allocator;
}
- @Override
- protected Logger getLogger() {
+ @Override protected Logger getLogger() {
return logger;
}
- @Override
- public void incConnectionCounter() {
+ @Override public void incConnectionCounter() {
// no-op
}
- @Override
- public void decConnectionCounter() {
+ @Override public void decConnectionCounter() {
// no-op
}
}
- @Override
- public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) {
+ @Override public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) {
return new UserProtobufLengthDecoder(allocator, OutOfMemoryHandler.DEFAULT_INSTANCE);
}
/**
* planQuery is an API to plan a query without query execution
+ *
* @param req - data necessary to plan query
* @return list of PlanFragments that can later on be submitted for execution
*/
- public DrillRpcFuture<QueryPlanFragments> planQuery(
- GetQueryPlanFragments req) {
+ public DrillRpcFuture<QueryPlanFragments> planQuery(GetQueryPlanFragments req) {
return send(RpcType.GET_QUERY_PLAN_FRAGMENTS, req, QueryPlanFragments.class);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java
index 57ad4d51b..4e4c80e97 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java
@@ -33,6 +33,7 @@ class UserConnectionConfig extends AbstractConnectionConfig {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserConnectionConfig.class);
private final boolean authEnabled;
+ private final boolean sslEnabled;
private final InboundImpersonationManager impersonationManager;
private final UserServerRequestHandler handler;
@@ -75,10 +76,16 @@ class UserConnectionConfig extends AbstractConnectionConfig {
} else {
authEnabled = false;
}
-
impersonationManager = !config.getBoolean(ExecConstants.IMPERSONATION_ENABLED)
? null
: new InboundImpersonationManager();
+
+ sslEnabled = config.getBoolean(ExecConstants.USER_SSL_ENABLED);
+
+ if(isSSLEnabled() && isAuthEnabled() && isEncryptionEnabled()){
+ logger.warn("The server is configured to use both SSL and SASL encryption (only one should be configured).");
+ }
+
}
@Override
@@ -90,6 +97,10 @@ class UserConnectionConfig extends AbstractConnectionConfig {
return authEnabled;
}
+ boolean isSSLEnabled() {
+ return sslEnabled;
+ }
+
InboundImpersonationManager getImpersonationManager() {
return impersonationManager;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 254fdca29..c0573db81 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -21,9 +21,15 @@ import java.io.IOException;
import java.net.SocketAddress;
import java.util.UUID;
+import javax.net.ssl.SSLEngine;
import javax.security.sasl.SaslException;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.ssl.SslHandler;
import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.exec.ssl.SSLConfig;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
@@ -53,8 +59,10 @@ import org.apache.drill.exec.rpc.security.plain.PlainFactory;
import org.apache.drill.exec.rpc.user.UserServer.BitToUserConnection;
import org.apache.drill.exec.rpc.user.security.UserAuthenticationException;
import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.ssl.SSLConfigBuilder;
import org.apache.drill.exec.work.user.UserWorker;
import org.apache.hadoop.security.HadoopKerberosName;
+import org.apache.hadoop.security.ssl.SSLFactory;
import org.slf4j.Logger;
import com.google.protobuf.MessageLite;
@@ -71,6 +79,8 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
private static final String SERVER_NAME = "Apache Drill Server";
private final UserConnectionConfig config;
+ private final SSLConfig sslConfig;
+ private Channel sslChannel;
private final UserWorker userWorker;
public UserServer(BootStrapContext context, BufferAllocator allocator, EventLoopGroup eventLoopGroup,
@@ -79,6 +89,17 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
allocator.getAsByteBufAllocator(),
eventLoopGroup);
this.config = new UserConnectionConfig(allocator, context, new UserServerRequestHandler(worker));
+ this.sslChannel = null;
+ try {
+ this.sslConfig = new SSLConfigBuilder()
+ .config(context.getConfig())
+ .mode(SSLFactory.Mode.SERVER)
+ .initializeSSLContext(true)
+ .validateKeyStore(true)
+ .build();
+ } catch (DrillException e) {
+ throw new DrillbitStartupException(e.getMessage(), e.getCause());
+ }
this.userWorker = worker;
// Initialize Singleton instance of UserRpcMetrics.
@@ -86,6 +107,34 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
}
@Override
+ protected void setupSSL(ChannelPipeline pipe) {
+
+ SSLEngine sslEngine = sslConfig.createSSLEngine(config.getAllocator(), null, 0);
+ // Add SSL handler into pipeline
+ pipe.addFirst(RpcConstants.SSL_HANDLER, new SslHandler(sslEngine));
+ logger.debug("SSL communication between client and server is enabled.");
+ logger.debug(sslConfig.toString());
+
+ }
+
+ @Override
+ protected boolean isSslEnabled() {
+ return sslConfig.isUserSslEnabled();
+ }
+
+ @Override
+ public void setSslChannel(Channel c) {
+ sslChannel = c;
+ }
+
+ @Override
+ protected void closeSSL(){
+ if(isSslEnabled() && sslChannel != null){
+ sslChannel.close();
+ }
+ }
+
+ @Override
protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
// a user server only expects acknowledgments on messages it creates.
switch (rpcType) {