diff options
author | Parth Chandra <parthc@apache.org> | 2017-06-09 22:03:59 -0700 |
---|---|---|
committer | Parth Chandra <parthc@apache.org> | 2017-10-11 19:26:13 -0700 |
commit | 552d7d825e37d42835bd6bfccfc07fc7d3b5fd94 (patch) | |
tree | 669a74d1646c97522969f8acf8c4ed2baff8a68f /exec/java-exec/src/main/java/org/apache/drill/exec/rpc | |
parent | b803405c0f978beb8beaf77211f99731014ac92f (diff) |
DRILL-5431: SSL Support (Java) - Java client server SSL implementation
Also enable OpenSSL support
Also fix exclusions and java-exec pom file to eliminate netty-tcnative as a transitive dependency on all projects
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/rpc')
3 files changed, 261 insertions, 157 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java index 2f4753857..99614bdfa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java @@ -20,16 +20,25 @@ package org.apache.drill.exec.rpc.user; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.net.ssl.SSLEngine; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.ssl.SslHandler; import org.apache.drill.common.KerberosUtil; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.config.DrillProperties; +import org.apache.drill.common.exceptions.DrillException; +import org.apache.drill.exec.client.InvalidConnectionInfoException; +import org.apache.drill.exec.ssl.SSLConfig; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; @@ -55,6 +64,7 @@ import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake; import org.apache.drill.exec.rpc.AbstractClientConnection; import org.apache.drill.exec.rpc.Acks; import org.apache.drill.exec.rpc.BasicClient; +import org.apache.drill.exec.rpc.ConnectionMultiListener; import org.apache.drill.exec.rpc.DrillRpcFuture; import org.apache.drill.exec.rpc.NonTransientRpcException; import org.apache.drill.exec.rpc.OutOfMemoryHandler; @@ -62,6 +72,7 @@ import org.apache.drill.exec.rpc.ProtobufLengthDecoder; import org.apache.drill.exec.rpc.Response; import org.apache.drill.exec.rpc.ResponseSender; import org.apache.drill.exec.rpc.RpcConnectionHandler; +import org.apache.drill.exec.rpc.RpcConstants; import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.RpcOutcomeListener; import org.apache.drill.exec.rpc.security.AuthStringUtil; @@ -70,7 +81,9 @@ import org.apache.drill.exec.rpc.security.AuthenticatorFactory; import org.apache.drill.exec.rpc.security.ClientAuthenticatorProvider; import org.apache.drill.exec.rpc.security.plain.PlainFactory; import org.apache.drill.exec.rpc.security.SaslProperties; +import org.apache.drill.exec.ssl.SSLConfigBuilder; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.ssl.SSLFactory; import org.slf4j.Logger; import com.google.common.base.Strings; @@ -87,8 +100,8 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.SocketChannel; -public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnection, - UserToBitHandshake, BitToUserHandshake> { +public class UserClient + extends BasicClient<RpcType, UserClient.UserToBitConnection, UserToBitHandshake, BitToUserHandshake> { private static final Logger logger = org.slf4j.LoggerFactory.getLogger(UserClient.class); private final BufferAllocator allocator; @@ -102,19 +115,47 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect // these are used for authentication private volatile List<String> serverAuthMechanisms = null; private volatile boolean authComplete = true; - - public UserClient(String clientName, DrillConfig config, boolean supportComplexTypes, - BufferAllocator allocator, EventLoopGroup eventLoopGroup, Executor eventExecutor) { - super( - UserRpcConfig.getMapping(config, eventExecutor), - allocator.getAsByteBufAllocator(), - eventLoopGroup, - RpcType.HANDSHAKE, - BitToUserHandshake.class, - BitToUserHandshake.PARSER); + private SSLConfig sslConfig; + private DrillbitEndpoint endpoint; + + public UserClient(String clientName, DrillConfig config, Properties properties, boolean supportComplexTypes, + BufferAllocator allocator, EventLoopGroup eventLoopGroup, Executor eventExecutor, + DrillbitEndpoint endpoint) throws NonTransientRpcException { + super(UserRpcConfig.getMapping(config, eventExecutor), allocator.getAsByteBufAllocator(), + eventLoopGroup, RpcType.HANDSHAKE, BitToUserHandshake.class, BitToUserHandshake.PARSER); + this.endpoint = endpoint; // save the endpoint; it might be needed by SSL init. this.clientName = clientName; this.allocator = allocator; this.supportComplexTypes = supportComplexTypes; + try { + this.sslConfig = new SSLConfigBuilder().properties(properties).mode(SSLFactory.Mode.CLIENT) + .initializeSSLContext(true).validateKeyStore(false).build(); + } catch (DrillException e) { + throw new InvalidConnectionInfoException(e.getMessage()); + } + + } + + @Override protected void setupSSL(ChannelPipeline pipe, + ConnectionMultiListener.SSLHandshakeListener sslHandshakeListener) { + + String peerHost = endpoint.getAddress(); + int peerPort = endpoint.getUserPort(); + SSLEngine sslEngine = sslConfig.createSSLEngine(allocator, peerHost, peerPort); + + // Add SSL handler into pipeline + SslHandler sslHandler = new SslHandler(sslEngine); + sslHandler.setHandshakeTimeoutMillis(sslConfig.getHandshakeTimeout()); + + // Add a listener for SSL Handshake complete. The Drill client handshake will be enabled only + // after this is done. + sslHandler.handshakeFuture().addListener(sslHandshakeListener); + pipe.addFirst(RpcConstants.SSL_HANDLER, sslHandler); + logger.debug(sslConfig.toString()); + } + + @Override protected boolean isSslEnabled() { + return sslConfig.isUserSslEnabled(); } public RpcEndpointInfos getServerInfos() { @@ -132,38 +173,51 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect /** * Connects, and if required, authenticates. This method blocks until both operations are complete. * - * @param endpoint endpoint to connect to - * @param properties properties + * @param endpoint endpoint to connect to + * @param properties properties * @param credentials credentials * @throws RpcException if either connection or authentication fails */ public void connect(final DrillbitEndpoint endpoint, final DrillProperties properties, - final UserCredentials credentials) throws RpcException { - final UserToBitHandshake.Builder hsBuilder = UserToBitHandshake.newBuilder() - .setRpcVersion(UserRpcConfig.RPC_VERSION) - .setSupportListening(true) - .setSupportComplexTypes(supportComplexTypes) - .setSupportTimeout(true) - .setCredentials(credentials) - .setClientInfos(UserRpcUtils.getRpcEndpointInfos(clientName)) - .setSaslSupport(SaslSupport.SASL_PRIVACY) - .setProperties(properties.serializeForServer()); + final UserCredentials credentials) throws RpcException { + final UserToBitHandshake.Builder hsBuilder = + UserToBitHandshake.newBuilder() + .setRpcVersion(UserRpcConfig.RPC_VERSION) + .setSupportListening(true) + .setSupportComplexTypes(supportComplexTypes) + .setSupportTimeout(true).setCredentials(credentials) + .setClientInfos(UserRpcUtils.getRpcEndpointInfos(clientName)) + .setSaslSupport(SaslSupport.SASL_PRIVACY) + .setProperties(properties.serializeForServer()); // Only used for testing purpose if (properties.containsKey(DrillProperties.TEST_SASL_LEVEL)) { - hsBuilder.setSaslSupport(SaslSupport.valueOf( - Integer.parseInt(properties.getProperty(DrillProperties.TEST_SASL_LEVEL)))); + hsBuilder.setSaslSupport( + SaslSupport.valueOf(Integer.parseInt(properties.getProperty(DrillProperties.TEST_SASL_LEVEL)))); } - connect(hsBuilder.build(), endpoint).checkedGet(); + if (sslConfig.isUserSslEnabled()) { + try { + connect(hsBuilder.build(), endpoint) + .checkedGet(sslConfig.getHandshakeTimeout(), TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + String msg = new StringBuilder().append( + "Connecting to the server timed out. This is sometimes due to a mismatch in the SSL configuration between client and server. [ Exception: ") + .append(e.getMessage()).append("]").toString(); + throw new NonTransientRpcException(msg); + } + } else { + connect(hsBuilder.build(), endpoint).checkedGet(); + } // Check if client needs encryption and server is not configured for encryption. - final boolean clientNeedsEncryption = properties.containsKey(DrillProperties.SASL_ENCRYPT) - && Boolean.parseBoolean(properties.getProperty(DrillProperties.SASL_ENCRYPT)); + final boolean clientNeedsEncryption = properties.containsKey(DrillProperties.SASL_ENCRYPT) && Boolean + .parseBoolean(properties.getProperty(DrillProperties.SASL_ENCRYPT)); - if(clientNeedsEncryption && !connection.isEncryptionEnabled()) { - throw new NonTransientRpcException("Client needs encrypted connection but server is not configured for " + - "encryption. Please check connection parameter or contact your administrator"); + if (clientNeedsEncryption && !connection.isEncryptionEnabled()) { + throw new NonTransientRpcException( + "Client needs encrypted connection but server is not configured for " + + "encryption. Please check connection parameter or contact your administrator"); } if (serverAuthMechanisms != null) { @@ -176,31 +230,28 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect } private CheckedFuture<Void, RpcException> connect(final UserToBitHandshake handshake, - final DrillbitEndpoint endpoint) { + final DrillbitEndpoint endpoint) { final SettableFuture<Void> connectionSettable = SettableFuture.create(); final CheckedFuture<Void, RpcException> connectionFuture = new AbstractCheckedFuture<Void, RpcException>(connectionSettable) { - @Override - protected RpcException mapException(Exception e) { + @Override protected RpcException mapException(Exception e) { return RpcException.mapException(e); } }; final RpcConnectionHandler<UserToBitConnection> connectionHandler = new RpcConnectionHandler<UserToBitConnection>() { - @Override - public void connectionSucceeded(UserToBitConnection connection) { + @Override public void connectionSucceeded(UserToBitConnection connection) { connectionSettable.set(null); } - @Override - public void connectionFailed(FailureType type, Throwable t) { - connectionSettable.setException(new RpcException(String.format("%s : %s", - type.name(), t.getMessage()), t)); + @Override public void connectionFailed(FailureType type, Throwable t) { + connectionSettable + .setException(new RpcException(String.format("%s : %s", type.name(), t.getMessage()), t)); } }; - connectAsClient(queryResultHandler.getWrappedConnectionHandler(connectionHandler), - handshake, endpoint.getAddress(), endpoint.getUserPort()); + connectAsClient(queryResultHandler.getWrappedConnectionHandler(connectionHandler), handshake, + endpoint.getAddress(), endpoint.getUserPort()); return connectionFuture; } @@ -211,15 +262,15 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect // Set correct QOP property and Strength based on server needs encryption or not. // If ChunkMode is enabled then negotiate for buffer size equal to wrapChunkSize, // If ChunkMode is disabled then negotiate for MAX_WRAPPED_SIZE buffer size. - propertiesMap.putAll(SaslProperties.getSaslProperties(connection.isEncryptionEnabled(), - connection.getMaxWrappedSize())); + propertiesMap.putAll( + SaslProperties.getSaslProperties(connection.isEncryptionEnabled(), connection.getMaxWrappedSize())); - final SettableFuture<Void> authSettable = SettableFuture.create(); // use handleAuthFailure to setException + final SettableFuture<Void> authSettable = + SettableFuture.create(); // use handleAuthFailure to setException final CheckedFuture<Void, SaslException> authFuture = new AbstractCheckedFuture<Void, SaslException>(authSettable) { - @Override - protected SaslException mapException(Exception e) { + @Override protected SaslException mapException(Exception e) { if (e instanceof ExecutionException) { final Throwable cause = Throwables.getRootCause(e); if (cause instanceof SaslException) { @@ -227,8 +278,9 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect connection.getEncryptionCtxtString(), cause.getMessage()), cause); } } - return new SaslException(String.format("Authentication failed unexpectedly. [Details: %s, Error %s]", - connection.getEncryptionCtxtString(), e.getMessage()), e); + return new SaslException(String + .format("Authentication failed unexpectedly. [Details: %s, Error %s]", + connection.getEncryptionCtxtString(), e.getMessage()), e); } }; @@ -244,8 +296,10 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect ugi = factory.createAndLoginUser(propertiesMap); saslClient = factory.createSaslClient(ugi, propertiesMap); if (saslClient == null) { - throw new SaslException(String.format("Cannot initiate authentication using %s mechanism. Insufficient " + - "credentials or selected mechanism doesn't support configured security layers?", factory.getSimpleName())); + throw new SaslException(String.format( + "Cannot initiate authentication using %s mechanism. Insufficient " + + "credentials or selected mechanism doesn't support configured security layers?", + factory.getSimpleName())); } connection.setSaslClient(saslClient); } catch (final IOException e) { @@ -256,26 +310,24 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect logger.trace("Initiating SASL exchange."); new AuthenticationOutcomeListener<>(this, connection, RpcType.SASL_MESSAGE, ugi, new RpcOutcomeListener<Void>() { - @Override - public void failed(RpcException ex) { + @Override public void failed(RpcException ex) { authSettable.setException(ex); } - @Override - public void success(Void value, ByteBuf buffer) { + @Override public void success(Void value, ByteBuf buffer) { authComplete = true; authSettable.set(null); } - @Override - public void interrupted(InterruptedException e) { + @Override public void interrupted(InterruptedException e) { authSettable.setException(e); } }).initiate(mechanismName); return authFuture; } - private AuthenticatorFactory getAuthenticatorFactory(final DrillProperties properties) throws SaslException { + private AuthenticatorFactory getAuthenticatorFactory(final DrillProperties properties) + throws SaslException { final Set<String> mechanismSet = AuthStringUtil.asSet(serverAuthMechanisms); // first, check if a certain mechanism must be used @@ -285,129 +337,126 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect throw new SaslException(String.format("Unknown mechanism: %s", authMechanism)); } if (!mechanismSet.contains(authMechanism.toUpperCase())) { - throw new SaslException(String.format("Server does not support authentication using: %s. [Details: %s]", - authMechanism, connection.getEncryptionCtxtString())); + throw new SaslException(String + .format("Server does not support authentication using: %s. [Details: %s]", authMechanism, + connection.getEncryptionCtxtString())); } - return ClientAuthenticatorProvider.getInstance() - .getAuthenticatorFactory(authMechanism); + return ClientAuthenticatorProvider.getInstance().getAuthenticatorFactory(authMechanism); } // check if Kerberos is supported, and the service principal is provided - if (mechanismSet.contains(KerberosUtil.KERBEROS_SIMPLE_NAME) && - properties.containsKey(DrillProperties.SERVICE_PRINCIPAL)) { + if (mechanismSet.contains(KerberosUtil.KERBEROS_SIMPLE_NAME) && properties + .containsKey(DrillProperties.SERVICE_PRINCIPAL)) { return ClientAuthenticatorProvider.getInstance() .getAuthenticatorFactory(KerberosUtil.KERBEROS_SIMPLE_NAME); } // check if username/password is supported, and username/password are provided - if (mechanismSet.contains(PlainFactory.SIMPLE_NAME) && - properties.containsKey(DrillProperties.USER) && - !Strings.isNullOrEmpty(properties.getProperty(DrillProperties.PASSWORD))) { - return ClientAuthenticatorProvider.getInstance() - .getAuthenticatorFactory(PlainFactory.SIMPLE_NAME); + if (mechanismSet.contains(PlainFactory.SIMPLE_NAME) && properties.containsKey(DrillProperties.USER) + && !Strings.isNullOrEmpty(properties.getProperty(DrillProperties.PASSWORD))) { + return ClientAuthenticatorProvider.getInstance().getAuthenticatorFactory(PlainFactory.SIMPLE_NAME); } - throw new SaslException(String.format("Server requires authentication using %s. Insufficient credentials?. " + - "[Details: %s]. ", serverAuthMechanisms, connection.getEncryptionCtxtString())); + throw new SaslException(String + .format("Server requires authentication using %s. Insufficient credentials?. " + "[Details: %s]. ", + serverAuthMechanisms, connection.getEncryptionCtxtString())); } - protected <SEND extends MessageLite, RECEIVE extends MessageLite> - void send(RpcOutcomeListener<RECEIVE> listener, RpcType rpcType, SEND protobufBody, Class<RECEIVE> clazz, - boolean allowInEventLoop, ByteBuf... dataBodies) { + protected <SEND extends MessageLite, RECEIVE extends MessageLite> void send( + RpcOutcomeListener<RECEIVE> listener, RpcType rpcType, SEND protobufBody, Class<RECEIVE> clazz, + boolean allowInEventLoop, ByteBuf... dataBodies) { super.send(listener, connection, rpcType, protobufBody, clazz, allowInEventLoop, dataBodies); } - @Override - protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException { + @Override protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException { switch (rpcType) { - case RpcType.ACK_VALUE: - return Ack.getDefaultInstance(); - case RpcType.HANDSHAKE_VALUE: - return BitToUserHandshake.getDefaultInstance(); - case RpcType.QUERY_HANDLE_VALUE: - return QueryId.getDefaultInstance(); - case RpcType.QUERY_RESULT_VALUE: - return QueryResult.getDefaultInstance(); - case RpcType.QUERY_DATA_VALUE: - return QueryData.getDefaultInstance(); - case RpcType.QUERY_PLAN_FRAGMENTS_VALUE: - return QueryPlanFragments.getDefaultInstance(); - case RpcType.CATALOGS_VALUE: - return GetCatalogsResp.getDefaultInstance(); - case RpcType.SCHEMAS_VALUE: - return GetSchemasResp.getDefaultInstance(); - case RpcType.TABLES_VALUE: - return GetTablesResp.getDefaultInstance(); - case RpcType.COLUMNS_VALUE: - return GetColumnsResp.getDefaultInstance(); - case RpcType.PREPARED_STATEMENT_VALUE: - return CreatePreparedStatementResp.getDefaultInstance(); - case RpcType.SASL_MESSAGE_VALUE: - return SaslMessage.getDefaultInstance(); - case RpcType.SERVER_META_VALUE: - return GetServerMetaResp.getDefaultInstance(); + case RpcType.ACK_VALUE: + return Ack.getDefaultInstance(); + case RpcType.HANDSHAKE_VALUE: + return BitToUserHandshake.getDefaultInstance(); + case RpcType.QUERY_HANDLE_VALUE: + return QueryId.getDefaultInstance(); + case RpcType.QUERY_RESULT_VALUE: + return QueryResult.getDefaultInstance(); + case RpcType.QUERY_DATA_VALUE: + return QueryData.getDefaultInstance(); + case RpcType.QUERY_PLAN_FRAGMENTS_VALUE: + return QueryPlanFragments.getDefaultInstance(); + case RpcType.CATALOGS_VALUE: + return GetCatalogsResp.getDefaultInstance(); + case RpcType.SCHEMAS_VALUE: + return GetSchemasResp.getDefaultInstance(); + case RpcType.TABLES_VALUE: + return GetTablesResp.getDefaultInstance(); + case RpcType.COLUMNS_VALUE: + return GetColumnsResp.getDefaultInstance(); + case RpcType.PREPARED_STATEMENT_VALUE: + return CreatePreparedStatementResp.getDefaultInstance(); + case RpcType.SASL_MESSAGE_VALUE: + return SaslMessage.getDefaultInstance(); + case RpcType.SERVER_META_VALUE: + return GetServerMetaResp.getDefaultInstance(); } throw new RpcException(String.format("Unable to deal with RpcType of %d", rpcType)); } - @Override - protected void handle(UserToBitConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody, - ResponseSender sender) throws RpcException { + @Override protected void handle(UserToBitConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody, + ResponseSender sender) throws RpcException { if (!authComplete) { // Remote should not be making any requests before authenticating, drop connection - throw new RpcException(String.format("Request of type %d is not allowed without authentication. " + - "Remote on %s must authenticate before making requests. Connection dropped.", - rpcType, connection.getRemoteAddress())); + throw new RpcException(String.format("Request of type %d is not allowed without authentication. " + + "Remote on %s must authenticate before making requests. Connection dropped.", rpcType, + connection.getRemoteAddress())); } switch (rpcType) { - case RpcType.QUERY_DATA_VALUE: - queryResultHandler.batchArrived(connection, pBody, dBody); - sender.send(new Response(RpcType.ACK, Acks.OK)); - break; - case RpcType.QUERY_RESULT_VALUE: - queryResultHandler.resultArrived(pBody); - sender.send(new Response(RpcType.ACK, Acks.OK)); - break; - default: - throw new RpcException(String.format("Unknown Rpc Type %d. ", rpcType)); + case RpcType.QUERY_DATA_VALUE: + queryResultHandler.batchArrived(connection, pBody, dBody); + sender.send(new Response(RpcType.ACK, Acks.OK)); + break; + case RpcType.QUERY_RESULT_VALUE: + queryResultHandler.resultArrived(pBody); + sender.send(new Response(RpcType.ACK, Acks.OK)); + break; + default: + throw new RpcException(String.format("Unknown Rpc Type %d. ", rpcType)); } } - @Override - protected void validateHandshake(BitToUserHandshake inbound) throws RpcException { -// logger.debug("Handling handshake from bit to user. {}", inbound); + @Override protected void validateHandshake(BitToUserHandshake inbound) throws RpcException { + // logger.debug("Handling handshake from bit to user. {}", inbound); if (inbound.hasServerInfos()) { serverInfos = inbound.getServerInfos(); } supportedMethods = Sets.immutableEnumSet(inbound.getSupportedMethodsList()); switch (inbound.getStatus()) { - case SUCCESS: - break; - case AUTH_REQUIRED: { - authComplete = false; - serverAuthMechanisms = ImmutableList.copyOf(inbound.getAuthenticationMechanismsList()); - connection.setEncryption(inbound.hasEncrypted() && inbound.getEncrypted()); - - if (inbound.hasMaxWrappedSize()) { - connection.setMaxWrappedSize(inbound.getMaxWrappedSize()); + case SUCCESS: + break; + case AUTH_REQUIRED: { + authComplete = false; + serverAuthMechanisms = ImmutableList.copyOf(inbound.getAuthenticationMechanismsList()); + connection.setEncryption(inbound.hasEncrypted() && inbound.getEncrypted()); + + if (inbound.hasMaxWrappedSize()) { + connection.setMaxWrappedSize(inbound.getMaxWrappedSize()); + } + logger.trace(String + .format("Server requires authentication with encryption context %s before proceeding.", + connection.getEncryptionCtxtString())); + break; } - logger.trace(String.format("Server requires authentication with encryption context %s before proceeding.", - connection.getEncryptionCtxtString())); - break; - } - case AUTH_FAILED: - case RPC_VERSION_MISMATCH: - case UNKNOWN_FAILURE: - final String errMsg = String.format("Status: %s, Error Id: %s, Error message: %s", - inbound.getStatus(), inbound.getErrorId(), inbound.getErrorMessage()); - logger.error(errMsg); - throw new NonTransientRpcException(errMsg); + case AUTH_FAILED: + case RPC_VERSION_MISMATCH: + case UNKNOWN_FAILURE: + final String errMsg = String + .format("Status: %s, Error Id: %s, Error message: %s", inbound.getStatus(), + inbound.getErrorId(), inbound.getErrorMessage()); + logger.error(errMsg); + throw new NonTransientRpcException(errMsg); } } - @Override - protected UserToBitConnection initRemoteConnection(SocketChannel channel) { + @Override protected UserToBitConnection initRemoteConnection(SocketChannel channel) { super.initRemoteConnection(channel); return new UserToBitConnection(channel); } @@ -421,39 +470,34 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect super(channel, "user client"); } - @Override - public BufferAllocator getAllocator() { + @Override public BufferAllocator getAllocator() { return allocator; } - @Override - protected Logger getLogger() { + @Override protected Logger getLogger() { return logger; } - @Override - public void incConnectionCounter() { + @Override public void incConnectionCounter() { // no-op } - @Override - public void decConnectionCounter() { + @Override public void decConnectionCounter() { // no-op } } - @Override - public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) { + @Override public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) { return new UserProtobufLengthDecoder(allocator, OutOfMemoryHandler.DEFAULT_INSTANCE); } /** * planQuery is an API to plan a query without query execution + * * @param req - data necessary to plan query * @return list of PlanFragments that can later on be submitted for execution */ - public DrillRpcFuture<QueryPlanFragments> planQuery( - GetQueryPlanFragments req) { + public DrillRpcFuture<QueryPlanFragments> planQuery(GetQueryPlanFragments req) { return send(RpcType.GET_QUERY_PLAN_FRAGMENTS, req, QueryPlanFragments.class); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java index 57ad4d51b..4e4c80e97 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java @@ -33,6 +33,7 @@ class UserConnectionConfig extends AbstractConnectionConfig { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserConnectionConfig.class); private final boolean authEnabled; + private final boolean sslEnabled; private final InboundImpersonationManager impersonationManager; private final UserServerRequestHandler handler; @@ -75,10 +76,16 @@ class UserConnectionConfig extends AbstractConnectionConfig { } else { authEnabled = false; } - impersonationManager = !config.getBoolean(ExecConstants.IMPERSONATION_ENABLED) ? null : new InboundImpersonationManager(); + + sslEnabled = config.getBoolean(ExecConstants.USER_SSL_ENABLED); + + if(isSSLEnabled() && isAuthEnabled() && isEncryptionEnabled()){ + logger.warn("The server is configured to use both SSL and SASL encryption (only one should be configured)."); + } + } @Override @@ -90,6 +97,10 @@ class UserConnectionConfig extends AbstractConnectionConfig { return authEnabled; } + boolean isSSLEnabled() { + return sslEnabled; + } + InboundImpersonationManager getImpersonationManager() { return impersonationManager; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java index 254fdca29..c0573db81 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java @@ -21,9 +21,15 @@ import java.io.IOException; import java.net.SocketAddress; import java.util.UUID; +import javax.net.ssl.SSLEngine; import javax.security.sasl.SaslException; +import io.netty.channel.Channel; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.ssl.SslHandler; import org.apache.drill.common.config.DrillProperties; +import org.apache.drill.common.exceptions.DrillException; +import org.apache.drill.exec.ssl.SSLConfig; import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch; @@ -53,8 +59,10 @@ import org.apache.drill.exec.rpc.security.plain.PlainFactory; import org.apache.drill.exec.rpc.user.UserServer.BitToUserConnection; import org.apache.drill.exec.rpc.user.security.UserAuthenticationException; import org.apache.drill.exec.server.BootStrapContext; +import org.apache.drill.exec.ssl.SSLConfigBuilder; import org.apache.drill.exec.work.user.UserWorker; import org.apache.hadoop.security.HadoopKerberosName; +import org.apache.hadoop.security.ssl.SSLFactory; import org.slf4j.Logger; import com.google.protobuf.MessageLite; @@ -71,6 +79,8 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> { private static final String SERVER_NAME = "Apache Drill Server"; private final UserConnectionConfig config; + private final SSLConfig sslConfig; + private Channel sslChannel; private final UserWorker userWorker; public UserServer(BootStrapContext context, BufferAllocator allocator, EventLoopGroup eventLoopGroup, @@ -79,6 +89,17 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> { allocator.getAsByteBufAllocator(), eventLoopGroup); this.config = new UserConnectionConfig(allocator, context, new UserServerRequestHandler(worker)); + this.sslChannel = null; + try { + this.sslConfig = new SSLConfigBuilder() + .config(context.getConfig()) + .mode(SSLFactory.Mode.SERVER) + .initializeSSLContext(true) + .validateKeyStore(true) + .build(); + } catch (DrillException e) { + throw new DrillbitStartupException(e.getMessage(), e.getCause()); + } this.userWorker = worker; // Initialize Singleton instance of UserRpcMetrics. @@ -86,6 +107,34 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> { } @Override + protected void setupSSL(ChannelPipeline pipe) { + + SSLEngine sslEngine = sslConfig.createSSLEngine(config.getAllocator(), null, 0); + // Add SSL handler into pipeline + pipe.addFirst(RpcConstants.SSL_HANDLER, new SslHandler(sslEngine)); + logger.debug("SSL communication between client and server is enabled."); + logger.debug(sslConfig.toString()); + + } + + @Override + protected boolean isSslEnabled() { + return sslConfig.isUserSslEnabled(); + } + + @Override + public void setSslChannel(Channel c) { + sslChannel = c; + } + + @Override + protected void closeSSL(){ + if(isSslEnabled() && sslChannel != null){ + sslChannel.close(); + } + } + + @Override protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException { // a user server only expects acknowledgments on messages it creates. switch (rpcType) { |