diff options
author | Parth Chandra <parthc@apache.org> | 2017-06-09 22:03:59 -0700 |
---|---|---|
committer | Parth Chandra <parthc@apache.org> | 2017-10-11 19:26:13 -0700 |
commit | 552d7d825e37d42835bd6bfccfc07fc7d3b5fd94 (patch) | |
tree | 669a74d1646c97522969f8acf8c4ed2baff8a68f /exec | |
parent | b803405c0f978beb8beaf77211f99731014ac92f (diff) |
DRILL-5431: SSL Support (Java) - Java client server SSL implementation
Also enable OpenSSL support
Also fix exclusions and java-exec pom file to eliminate netty-tcnative as a transitive dependency on all projects
Diffstat (limited to 'exec')
24 files changed, 2297 insertions, 387 deletions
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml index ab1e02ee6..88039a331 100644 --- a/exec/java-exec/pom.xml +++ b/exec/java-exec/pom.xml @@ -20,6 +20,9 @@ <name>exec/Java Execution Engine</name> <properties> <libpam4j.version>1.8-rev1</libpam4j.version> + <!-- Configure the os-maven-plugin extension to expand the classifier on --> + <!-- Fedora-"like" systems. Used for netty-tcnative inclusion --> + <os.detection.classifierWithLikes>fedora</os.detection.classifierWithLikes> </properties> <dependencies> @@ -456,6 +459,14 @@ <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + </exclusion> </exclusions> </dependency> <dependency> @@ -469,6 +480,14 @@ <artifactId>commons-codec</artifactId> </exclusion> <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> <groupId>io.netty</groupId> <artifactId>netty</artifactId> </exclusion> @@ -486,14 +505,6 @@ <groupId>commons-codec</groupId> <artifactId>commons-codec</artifactId> </exclusion> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>netty</artifactId> - </exclusion> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>netty-all</artifactId> - </exclusion> </exclusions> </dependency> <dependency> @@ -576,6 +587,14 @@ <artifactId>libpam4j</artifactId> <version>${libpam4j.version}</version> </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-tcnative</artifactId> + <version>2.0.1.Final</version> + <scope>provided</scope> + <classifier>${os.detected.classifier}</classifier> + </dependency> + </dependencies> <profiles> @@ -595,6 +614,18 @@ <groupId>org.apache.hadoop</groupId> <exclusions> <exclusion> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </exclusion> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> <groupId>io.netty</groupId> <artifactId>netty</artifactId> </exclusion> @@ -614,6 +645,14 @@ <groupId>org.apache.hadoop</groupId> <exclusions> <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> <groupId>io.netty</groupId> <artifactId>netty</artifactId> </exclusion> @@ -658,6 +697,17 @@ <build> + <extensions> + <!-- + Include the os-maven-plugin to get os.detected.classifier + --> + <extension> + <groupId>kr.motd.maven</groupId> + <artifactId>os-maven-plugin</artifactId> + <version>1.5.0.Final</version> + </extension> + </extensions> + <plugins> <plugin> <artifactId>maven-resources-plugin</artifactId> diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 9b125cb03..5c19e1398 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -117,6 +117,18 @@ public final class ExecConstants { public static final String HASHAGG_FALLBACK_ENABLED_KEY = "drill.exec.hashagg.fallback.enabled"; public static final BooleanValidator HASHAGG_FALLBACK_ENABLED_VALIDATOR = new BooleanValidator(HASHAGG_FALLBACK_ENABLED_KEY); + public static final String SSL_PROVIDER = "drill.exec.ssl.provider"; // valid values are "JDK", "OPENSSL" // default JDK + public static final String SSL_PROTOCOL = "drill.exec.ssl.protocol"; // valid values are SSL, SSLV2, SSLV3, TLS, TLSV1, TLSv1.1, TLSv1.2(default) + public static final String SSL_KEYSTORE_TYPE = "drill.exec.ssl.keyStoreType"; + public static final String SSL_KEYSTORE_PATH = "drill.exec.ssl.keyStorePath"; // path to keystore. default : $JRE_HOME/lib/security/keystore.jks + public static final String SSL_KEYSTORE_PASSWORD = "drill.exec.ssl.keyStorePassword"; // default: changeit + public static final String SSL_KEY_PASSWORD = "drill.exec.ssl.keyPassword"; // + public static final String SSL_TRUSTSTORE_TYPE = "drill.exec.ssl.trustStoreType"; // valid values are jks(default), jceks, pkcs12 + public static final String SSL_TRUSTSTORE_PATH = "drill.exec.ssl.trustStorePath"; // path to keystore. default : $JRE_HOME/lib/security/cacerts.jks + public static final String SSL_TRUSTSTORE_PASSWORD = "drill.exec.ssl.trustStorePassword"; // default: changeit + public static final String SSL_USE_HADOOP_CONF = "drill.exec.ssl.useHadoopConfig"; // Initialize ssl params from hadoop if not provided by drill. default: true + public static final String SSL_HANDSHAKE_TIMEOUT = "drill.exec.security.user.encryption.ssl.handshakeTimeout"; // Default 10 seconds + public static final String TEXT_LINE_READER_BATCH_SIZE = "drill.exec.storage.file.text.batch.size"; public static final String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size"; public static final String HAZELCAST_SUBNETS = "drill.exec.cache.hazel.subnets"; @@ -133,10 +145,10 @@ public final class ExecConstants { public static final String HTTP_SESSION_MEMORY_RESERVATION = "drill.exec.http.session.memory.reservation"; public static final String HTTP_SESSION_MEMORY_MAXIMUM = "drill.exec.http.session.memory.maximum"; public static final String HTTP_SESSION_MAX_IDLE_SECS = "drill.exec.http.session_max_idle_secs"; - public static final String HTTP_KEYSTORE_PATH = "drill.exec.ssl.keyStorePath"; - public static final String HTTP_KEYSTORE_PASSWORD = "drill.exec.ssl.keyStorePassword"; - public static final String HTTP_TRUSTSTORE_PATH = "drill.exec.ssl.trustStorePath"; - public static final String HTTP_TRUSTSTORE_PASSWORD = "drill.exec.ssl.trustStorePassword"; + public static final String HTTP_KEYSTORE_PATH = SSL_KEYSTORE_PATH; + public static final String HTTP_KEYSTORE_PASSWORD = SSL_KEYSTORE_PASSWORD; + public static final String HTTP_TRUSTSTORE_PATH = SSL_TRUSTSTORE_PATH; + public static final String HTTP_TRUSTSTORE_PASSWORD = SSL_TRUSTSTORE_PASSWORD; public static final String SYS_STORE_PROVIDER_CLASS = "drill.exec.sys.store.provider.class"; public static final String SYS_STORE_PROVIDER_LOCAL_PATH = "drill.exec.sys.store.provider.local.path"; public static final String SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE = "drill.exec.sys.store.provider.local.write"; @@ -153,6 +165,8 @@ public final class ExecConstants { public static final String USE_LOGIN_PRINCIPAL = "drill.exec.security.bit.auth.use_login_principal"; public static final String USER_ENCRYPTION_SASL_ENABLED = "drill.exec.security.user.encryption.sasl.enabled"; public static final String USER_ENCRYPTION_SASL_MAX_WRAPPED_SIZE = "drill.exec.security.user.encryption.sasl.max_wrapped_size"; + + public static final String USER_SSL_ENABLED = "drill.exec.security.user.encryption.ssl.enabled"; public static final String BIT_ENCRYPTION_SASL_ENABLED = "drill.exec.security.bit.encryption.sasl.enabled"; public static final String BIT_ENCRYPTION_SASL_MAX_WRAPPED_SIZE = "drill.exec.security.bit.encryption.sasl.max_wrapped_size"; @@ -208,7 +222,7 @@ public final class ExecConstants { public static final OptionValidator PARQUET_DICT_PAGE_SIZE_VALIDATOR = new PositiveLongValidator(PARQUET_DICT_PAGE_SIZE, Integer.MAX_VALUE); public static final String PARQUET_WRITER_COMPRESSION_TYPE = "store.parquet.compression"; public static final OptionValidator PARQUET_WRITER_COMPRESSION_TYPE_VALIDATOR = new EnumeratedStringValidator( - PARQUET_WRITER_COMPRESSION_TYPE, "snappy", "gzip", "none"); + PARQUET_WRITER_COMPRESSION_TYPE, "snappy", "gzip", "none"); public static final String PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING = "store.parquet.enable_dictionary_encoding"; public static final OptionValidator PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING_VALIDATOR = new BooleanValidator( PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING); @@ -464,7 +478,7 @@ public final class ExecConstants { */ public static final String IMPERSONATION_POLICIES_KEY = "exec.impersonation.inbound_policies"; public static final StringValidator IMPERSONATION_POLICY_VALIDATOR = - new InboundImpersonationManager.InboundImpersonationPolicyValidator(IMPERSONATION_POLICIES_KEY); + new InboundImpersonationManager.InboundImpersonationPolicyValidator(IMPERSONATION_POLICIES_KEY); /** @@ -546,5 +560,5 @@ public final class ExecConstants { public static String bootDefaultFor(String name) { return OPTION_DEFAULTS_ROOT + name; - } +} } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/SSLConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/SSLConfig.java deleted file mode 100644 index c6d637480..000000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/SSLConfig.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec; - -import com.typesafe.config.Config; -import org.apache.drill.common.exceptions.DrillException; - -public class SSLConfig { - - private final String keystorePath; - - private final String keystorePassword; - - private final String truststorePath; - - private final String truststorePassword; - - - public SSLConfig(Config config) throws DrillException { - - keystorePath = config.getString(ExecConstants.HTTP_KEYSTORE_PATH).trim(); - - keystorePassword = config.getString(ExecConstants.HTTP_KEYSTORE_PASSWORD).trim(); - - truststorePath = config.getString(ExecConstants.HTTP_TRUSTSTORE_PATH).trim(); - - truststorePassword = config.getString(ExecConstants.HTTP_TRUSTSTORE_PASSWORD).trim(); - - /*If keystorePath or keystorePassword is provided in the configuration file use that*/ - if (!keystorePath.isEmpty() || !keystorePassword.isEmpty()) { - if (keystorePath.isEmpty()) { - throw new DrillException(" *.ssl.keyStorePath in the configuration file is empty, but *.ssl.keyStorePassword is set"); - } - else if (keystorePassword.isEmpty()){ - throw new DrillException(" *.ssl.keyStorePassword in the configuration file is empty, but *.ssl.keyStorePath is set "); - } - - } - } - - public boolean isSslValid() { return !keystorePath.isEmpty() && !keystorePassword.isEmpty(); } - - public String getKeyStorePath() { return keystorePath; } - - public String getKeyStorePassword() { return keystorePassword; } - - public boolean hasTrustStorePath() { return !truststorePath.isEmpty(); } - - public boolean hasTrustStorePassword() { return !truststorePassword.isEmpty(); } - - public String getTrustStorePath() { return truststorePath; } - - public String getTrustStorePassword() { return truststorePassword; } -}
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java index 9fbbfddaf..84b34a7ca 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java @@ -367,8 +367,12 @@ public class DrillClient implements Closeable, ConnectionThrottle { DrillbitEndpoint endpoint; while (triedEndpointIndex < connectTriesVal) { - client = new UserClient(clientName, config, supportComplexTypes, allocator, eventLoopGroup, executor); endpoint = endpoints.get(triedEndpointIndex); + // Note: the properties member is a DrillProperties instance which lower cases names of + // properties. That does not work too well with properties that are mixed case. + // For user client severla properties are mixed case so we do not use the properties member + // but instead pass the props parameter. + client = new UserClient(clientName, config, props, supportComplexTypes, allocator, eventLoopGroup, executor, endpoint); logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort()); if (!properties.containsKey(DrillProperties.SERVICE_HOST)) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java index 2f4753857..99614bdfa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java @@ -20,16 +20,25 @@ package org.apache.drill.exec.rpc.user; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.net.ssl.SSLEngine; import javax.security.sasl.SaslClient; import javax.security.sasl.SaslException; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.ssl.SslHandler; import org.apache.drill.common.KerberosUtil; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.config.DrillProperties; +import org.apache.drill.common.exceptions.DrillException; +import org.apache.drill.exec.client.InvalidConnectionInfoException; +import org.apache.drill.exec.ssl.SSLConfig; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; @@ -55,6 +64,7 @@ import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake; import org.apache.drill.exec.rpc.AbstractClientConnection; import org.apache.drill.exec.rpc.Acks; import org.apache.drill.exec.rpc.BasicClient; +import org.apache.drill.exec.rpc.ConnectionMultiListener; import org.apache.drill.exec.rpc.DrillRpcFuture; import org.apache.drill.exec.rpc.NonTransientRpcException; import org.apache.drill.exec.rpc.OutOfMemoryHandler; @@ -62,6 +72,7 @@ import org.apache.drill.exec.rpc.ProtobufLengthDecoder; import org.apache.drill.exec.rpc.Response; import org.apache.drill.exec.rpc.ResponseSender; import org.apache.drill.exec.rpc.RpcConnectionHandler; +import org.apache.drill.exec.rpc.RpcConstants; import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.RpcOutcomeListener; import org.apache.drill.exec.rpc.security.AuthStringUtil; @@ -70,7 +81,9 @@ import org.apache.drill.exec.rpc.security.AuthenticatorFactory; import org.apache.drill.exec.rpc.security.ClientAuthenticatorProvider; import org.apache.drill.exec.rpc.security.plain.PlainFactory; import org.apache.drill.exec.rpc.security.SaslProperties; +import org.apache.drill.exec.ssl.SSLConfigBuilder; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.ssl.SSLFactory; import org.slf4j.Logger; import com.google.common.base.Strings; @@ -87,8 +100,8 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.SocketChannel; -public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnection, - UserToBitHandshake, BitToUserHandshake> { +public class UserClient + extends BasicClient<RpcType, UserClient.UserToBitConnection, UserToBitHandshake, BitToUserHandshake> { private static final Logger logger = org.slf4j.LoggerFactory.getLogger(UserClient.class); private final BufferAllocator allocator; @@ -102,19 +115,47 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect // these are used for authentication private volatile List<String> serverAuthMechanisms = null; private volatile boolean authComplete = true; - - public UserClient(String clientName, DrillConfig config, boolean supportComplexTypes, - BufferAllocator allocator, EventLoopGroup eventLoopGroup, Executor eventExecutor) { - super( - UserRpcConfig.getMapping(config, eventExecutor), - allocator.getAsByteBufAllocator(), - eventLoopGroup, - RpcType.HANDSHAKE, - BitToUserHandshake.class, - BitToUserHandshake.PARSER); + private SSLConfig sslConfig; + private DrillbitEndpoint endpoint; + + public UserClient(String clientName, DrillConfig config, Properties properties, boolean supportComplexTypes, + BufferAllocator allocator, EventLoopGroup eventLoopGroup, Executor eventExecutor, + DrillbitEndpoint endpoint) throws NonTransientRpcException { + super(UserRpcConfig.getMapping(config, eventExecutor), allocator.getAsByteBufAllocator(), + eventLoopGroup, RpcType.HANDSHAKE, BitToUserHandshake.class, BitToUserHandshake.PARSER); + this.endpoint = endpoint; // save the endpoint; it might be needed by SSL init. this.clientName = clientName; this.allocator = allocator; this.supportComplexTypes = supportComplexTypes; + try { + this.sslConfig = new SSLConfigBuilder().properties(properties).mode(SSLFactory.Mode.CLIENT) + .initializeSSLContext(true).validateKeyStore(false).build(); + } catch (DrillException e) { + throw new InvalidConnectionInfoException(e.getMessage()); + } + + } + + @Override protected void setupSSL(ChannelPipeline pipe, + ConnectionMultiListener.SSLHandshakeListener sslHandshakeListener) { + + String peerHost = endpoint.getAddress(); + int peerPort = endpoint.getUserPort(); + SSLEngine sslEngine = sslConfig.createSSLEngine(allocator, peerHost, peerPort); + + // Add SSL handler into pipeline + SslHandler sslHandler = new SslHandler(sslEngine); + sslHandler.setHandshakeTimeoutMillis(sslConfig.getHandshakeTimeout()); + + // Add a listener for SSL Handshake complete. The Drill client handshake will be enabled only + // after this is done. + sslHandler.handshakeFuture().addListener(sslHandshakeListener); + pipe.addFirst(RpcConstants.SSL_HANDLER, sslHandler); + logger.debug(sslConfig.toString()); + } + + @Override protected boolean isSslEnabled() { + return sslConfig.isUserSslEnabled(); } public RpcEndpointInfos getServerInfos() { @@ -132,38 +173,51 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect /** * Connects, and if required, authenticates. This method blocks until both operations are complete. * - * @param endpoint endpoint to connect to - * @param properties properties + * @param endpoint endpoint to connect to + * @param properties properties * @param credentials credentials * @throws RpcException if either connection or authentication fails */ public void connect(final DrillbitEndpoint endpoint, final DrillProperties properties, - final UserCredentials credentials) throws RpcException { - final UserToBitHandshake.Builder hsBuilder = UserToBitHandshake.newBuilder() - .setRpcVersion(UserRpcConfig.RPC_VERSION) - .setSupportListening(true) - .setSupportComplexTypes(supportComplexTypes) - .setSupportTimeout(true) - .setCredentials(credentials) - .setClientInfos(UserRpcUtils.getRpcEndpointInfos(clientName)) - .setSaslSupport(SaslSupport.SASL_PRIVACY) - .setProperties(properties.serializeForServer()); + final UserCredentials credentials) throws RpcException { + final UserToBitHandshake.Builder hsBuilder = + UserToBitHandshake.newBuilder() + .setRpcVersion(UserRpcConfig.RPC_VERSION) + .setSupportListening(true) + .setSupportComplexTypes(supportComplexTypes) + .setSupportTimeout(true).setCredentials(credentials) + .setClientInfos(UserRpcUtils.getRpcEndpointInfos(clientName)) + .setSaslSupport(SaslSupport.SASL_PRIVACY) + .setProperties(properties.serializeForServer()); // Only used for testing purpose if (properties.containsKey(DrillProperties.TEST_SASL_LEVEL)) { - hsBuilder.setSaslSupport(SaslSupport.valueOf( - Integer.parseInt(properties.getProperty(DrillProperties.TEST_SASL_LEVEL)))); + hsBuilder.setSaslSupport( + SaslSupport.valueOf(Integer.parseInt(properties.getProperty(DrillProperties.TEST_SASL_LEVEL)))); } - connect(hsBuilder.build(), endpoint).checkedGet(); + if (sslConfig.isUserSslEnabled()) { + try { + connect(hsBuilder.build(), endpoint) + .checkedGet(sslConfig.getHandshakeTimeout(), TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + String msg = new StringBuilder().append( + "Connecting to the server timed out. This is sometimes due to a mismatch in the SSL configuration between client and server. [ Exception: ") + .append(e.getMessage()).append("]").toString(); + throw new NonTransientRpcException(msg); + } + } else { + connect(hsBuilder.build(), endpoint).checkedGet(); + } // Check if client needs encryption and server is not configured for encryption. - final boolean clientNeedsEncryption = properties.containsKey(DrillProperties.SASL_ENCRYPT) - && Boolean.parseBoolean(properties.getProperty(DrillProperties.SASL_ENCRYPT)); + final boolean clientNeedsEncryption = properties.containsKey(DrillProperties.SASL_ENCRYPT) && Boolean + .parseBoolean(properties.getProperty(DrillProperties.SASL_ENCRYPT)); - if(clientNeedsEncryption && !connection.isEncryptionEnabled()) { - throw new NonTransientRpcException("Client needs encrypted connection but server is not configured for " + - "encryption. Please check connection parameter or contact your administrator"); + if (clientNeedsEncryption && !connection.isEncryptionEnabled()) { + throw new NonTransientRpcException( + "Client needs encrypted connection but server is not configured for " + + "encryption. Please check connection parameter or contact your administrator"); } if (serverAuthMechanisms != null) { @@ -176,31 +230,28 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect } private CheckedFuture<Void, RpcException> connect(final UserToBitHandshake handshake, - final DrillbitEndpoint endpoint) { + final DrillbitEndpoint endpoint) { final SettableFuture<Void> connectionSettable = SettableFuture.create(); final CheckedFuture<Void, RpcException> connectionFuture = new AbstractCheckedFuture<Void, RpcException>(connectionSettable) { - @Override - protected RpcException mapException(Exception e) { + @Override protected RpcException mapException(Exception e) { return RpcException.mapException(e); } }; final RpcConnectionHandler<UserToBitConnection> connectionHandler = new RpcConnectionHandler<UserToBitConnection>() { - @Override - public void connectionSucceeded(UserToBitConnection connection) { + @Override public void connectionSucceeded(UserToBitConnection connection) { connectionSettable.set(null); } - @Override - public void connectionFailed(FailureType type, Throwable t) { - connectionSettable.setException(new RpcException(String.format("%s : %s", - type.name(), t.getMessage()), t)); + @Override public void connectionFailed(FailureType type, Throwable t) { + connectionSettable + .setException(new RpcException(String.format("%s : %s", type.name(), t.getMessage()), t)); } }; - connectAsClient(queryResultHandler.getWrappedConnectionHandler(connectionHandler), - handshake, endpoint.getAddress(), endpoint.getUserPort()); + connectAsClient(queryResultHandler.getWrappedConnectionHandler(connectionHandler), handshake, + endpoint.getAddress(), endpoint.getUserPort()); return connectionFuture; } @@ -211,15 +262,15 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect // Set correct QOP property and Strength based on server needs encryption or not. // If ChunkMode is enabled then negotiate for buffer size equal to wrapChunkSize, // If ChunkMode is disabled then negotiate for MAX_WRAPPED_SIZE buffer size. - propertiesMap.putAll(SaslProperties.getSaslProperties(connection.isEncryptionEnabled(), - connection.getMaxWrappedSize())); + propertiesMap.putAll( + SaslProperties.getSaslProperties(connection.isEncryptionEnabled(), connection.getMaxWrappedSize())); - final SettableFuture<Void> authSettable = SettableFuture.create(); // use handleAuthFailure to setException + final SettableFuture<Void> authSettable = + SettableFuture.create(); // use handleAuthFailure to setException final CheckedFuture<Void, SaslException> authFuture = new AbstractCheckedFuture<Void, SaslException>(authSettable) { - @Override - protected SaslException mapException(Exception e) { + @Override protected SaslException mapException(Exception e) { if (e instanceof ExecutionException) { final Throwable cause = Throwables.getRootCause(e); if (cause instanceof SaslException) { @@ -227,8 +278,9 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect connection.getEncryptionCtxtString(), cause.getMessage()), cause); } } - return new SaslException(String.format("Authentication failed unexpectedly. [Details: %s, Error %s]", - connection.getEncryptionCtxtString(), e.getMessage()), e); + return new SaslException(String + .format("Authentication failed unexpectedly. [Details: %s, Error %s]", + connection.getEncryptionCtxtString(), e.getMessage()), e); } }; @@ -244,8 +296,10 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect ugi = factory.createAndLoginUser(propertiesMap); saslClient = factory.createSaslClient(ugi, propertiesMap); if (saslClient == null) { - throw new SaslException(String.format("Cannot initiate authentication using %s mechanism. Insufficient " + - "credentials or selected mechanism doesn't support configured security layers?", factory.getSimpleName())); + throw new SaslException(String.format( + "Cannot initiate authentication using %s mechanism. Insufficient " + + "credentials or selected mechanism doesn't support configured security layers?", + factory.getSimpleName())); } connection.setSaslClient(saslClient); } catch (final IOException e) { @@ -256,26 +310,24 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect logger.trace("Initiating SASL exchange."); new AuthenticationOutcomeListener<>(this, connection, RpcType.SASL_MESSAGE, ugi, new RpcOutcomeListener<Void>() { - @Override - public void failed(RpcException ex) { + @Override public void failed(RpcException ex) { authSettable.setException(ex); } - @Override - public void success(Void value, ByteBuf buffer) { + @Override public void success(Void value, ByteBuf buffer) { authComplete = true; authSettable.set(null); } - @Override - public void interrupted(InterruptedException e) { + @Override public void interrupted(InterruptedException e) { authSettable.setException(e); } }).initiate(mechanismName); return authFuture; } - private AuthenticatorFactory getAuthenticatorFactory(final DrillProperties properties) throws SaslException { + private AuthenticatorFactory getAuthenticatorFactory(final DrillProperties properties) + throws SaslException { final Set<String> mechanismSet = AuthStringUtil.asSet(serverAuthMechanisms); // first, check if a certain mechanism must be used @@ -285,129 +337,126 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect throw new SaslException(String.format("Unknown mechanism: %s", authMechanism)); } if (!mechanismSet.contains(authMechanism.toUpperCase())) { - throw new SaslException(String.format("Server does not support authentication using: %s. [Details: %s]", - authMechanism, connection.getEncryptionCtxtString())); + throw new SaslException(String + .format("Server does not support authentication using: %s. [Details: %s]", authMechanism, + connection.getEncryptionCtxtString())); } - return ClientAuthenticatorProvider.getInstance() - .getAuthenticatorFactory(authMechanism); + return ClientAuthenticatorProvider.getInstance().getAuthenticatorFactory(authMechanism); } // check if Kerberos is supported, and the service principal is provided - if (mechanismSet.contains(KerberosUtil.KERBEROS_SIMPLE_NAME) && - properties.containsKey(DrillProperties.SERVICE_PRINCIPAL)) { + if (mechanismSet.contains(KerberosUtil.KERBEROS_SIMPLE_NAME) && properties + .containsKey(DrillProperties.SERVICE_PRINCIPAL)) { return ClientAuthenticatorProvider.getInstance() .getAuthenticatorFactory(KerberosUtil.KERBEROS_SIMPLE_NAME); } // check if username/password is supported, and username/password are provided - if (mechanismSet.contains(PlainFactory.SIMPLE_NAME) && - properties.containsKey(DrillProperties.USER) && - !Strings.isNullOrEmpty(properties.getProperty(DrillProperties.PASSWORD))) { - return ClientAuthenticatorProvider.getInstance() - .getAuthenticatorFactory(PlainFactory.SIMPLE_NAME); + if (mechanismSet.contains(PlainFactory.SIMPLE_NAME) && properties.containsKey(DrillProperties.USER) + && !Strings.isNullOrEmpty(properties.getProperty(DrillProperties.PASSWORD))) { + return ClientAuthenticatorProvider.getInstance().getAuthenticatorFactory(PlainFactory.SIMPLE_NAME); } - throw new SaslException(String.format("Server requires authentication using %s. Insufficient credentials?. " + - "[Details: %s]. ", serverAuthMechanisms, connection.getEncryptionCtxtString())); + throw new SaslException(String + .format("Server requires authentication using %s. Insufficient credentials?. " + "[Details: %s]. ", + serverAuthMechanisms, connection.getEncryptionCtxtString())); } - protected <SEND extends MessageLite, RECEIVE extends MessageLite> - void send(RpcOutcomeListener<RECEIVE> listener, RpcType rpcType, SEND protobufBody, Class<RECEIVE> clazz, - boolean allowInEventLoop, ByteBuf... dataBodies) { + protected <SEND extends MessageLite, RECEIVE extends MessageLite> void send( + RpcOutcomeListener<RECEIVE> listener, RpcType rpcType, SEND protobufBody, Class<RECEIVE> clazz, + boolean allowInEventLoop, ByteBuf... dataBodies) { super.send(listener, connection, rpcType, protobufBody, clazz, allowInEventLoop, dataBodies); } - @Override - protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException { + @Override protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException { switch (rpcType) { - case RpcType.ACK_VALUE: - return Ack.getDefaultInstance(); - case RpcType.HANDSHAKE_VALUE: - return BitToUserHandshake.getDefaultInstance(); - case RpcType.QUERY_HANDLE_VALUE: - return QueryId.getDefaultInstance(); - case RpcType.QUERY_RESULT_VALUE: - return QueryResult.getDefaultInstance(); - case RpcType.QUERY_DATA_VALUE: - return QueryData.getDefaultInstance(); - case RpcType.QUERY_PLAN_FRAGMENTS_VALUE: - return QueryPlanFragments.getDefaultInstance(); - case RpcType.CATALOGS_VALUE: - return GetCatalogsResp.getDefaultInstance(); - case RpcType.SCHEMAS_VALUE: - return GetSchemasResp.getDefaultInstance(); - case RpcType.TABLES_VALUE: - return GetTablesResp.getDefaultInstance(); - case RpcType.COLUMNS_VALUE: - return GetColumnsResp.getDefaultInstance(); - case RpcType.PREPARED_STATEMENT_VALUE: - return CreatePreparedStatementResp.getDefaultInstance(); - case RpcType.SASL_MESSAGE_VALUE: - return SaslMessage.getDefaultInstance(); - case RpcType.SERVER_META_VALUE: - return GetServerMetaResp.getDefaultInstance(); + case RpcType.ACK_VALUE: + return Ack.getDefaultInstance(); + case RpcType.HANDSHAKE_VALUE: + return BitToUserHandshake.getDefaultInstance(); + case RpcType.QUERY_HANDLE_VALUE: + return QueryId.getDefaultInstance(); + case RpcType.QUERY_RESULT_VALUE: + return QueryResult.getDefaultInstance(); + case RpcType.QUERY_DATA_VALUE: + return QueryData.getDefaultInstance(); + case RpcType.QUERY_PLAN_FRAGMENTS_VALUE: + return QueryPlanFragments.getDefaultInstance(); + case RpcType.CATALOGS_VALUE: + return GetCatalogsResp.getDefaultInstance(); + case RpcType.SCHEMAS_VALUE: + return GetSchemasResp.getDefaultInstance(); + case RpcType.TABLES_VALUE: + return GetTablesResp.getDefaultInstance(); + case RpcType.COLUMNS_VALUE: + return GetColumnsResp.getDefaultInstance(); + case RpcType.PREPARED_STATEMENT_VALUE: + return CreatePreparedStatementResp.getDefaultInstance(); + case RpcType.SASL_MESSAGE_VALUE: + return SaslMessage.getDefaultInstance(); + case RpcType.SERVER_META_VALUE: + return GetServerMetaResp.getDefaultInstance(); } throw new RpcException(String.format("Unable to deal with RpcType of %d", rpcType)); } - @Override - protected void handle(UserToBitConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody, - ResponseSender sender) throws RpcException { + @Override protected void handle(UserToBitConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody, + ResponseSender sender) throws RpcException { if (!authComplete) { // Remote should not be making any requests before authenticating, drop connection - throw new RpcException(String.format("Request of type %d is not allowed without authentication. " + - "Remote on %s must authenticate before making requests. Connection dropped.", - rpcType, connection.getRemoteAddress())); + throw new RpcException(String.format("Request of type %d is not allowed without authentication. " + + "Remote on %s must authenticate before making requests. Connection dropped.", rpcType, + connection.getRemoteAddress())); } switch (rpcType) { - case RpcType.QUERY_DATA_VALUE: - queryResultHandler.batchArrived(connection, pBody, dBody); - sender.send(new Response(RpcType.ACK, Acks.OK)); - break; - case RpcType.QUERY_RESULT_VALUE: - queryResultHandler.resultArrived(pBody); - sender.send(new Response(RpcType.ACK, Acks.OK)); - break; - default: - throw new RpcException(String.format("Unknown Rpc Type %d. ", rpcType)); + case RpcType.QUERY_DATA_VALUE: + queryResultHandler.batchArrived(connection, pBody, dBody); + sender.send(new Response(RpcType.ACK, Acks.OK)); + break; + case RpcType.QUERY_RESULT_VALUE: + queryResultHandler.resultArrived(pBody); + sender.send(new Response(RpcType.ACK, Acks.OK)); + break; + default: + throw new RpcException(String.format("Unknown Rpc Type %d. ", rpcType)); } } - @Override - protected void validateHandshake(BitToUserHandshake inbound) throws RpcException { -// logger.debug("Handling handshake from bit to user. {}", inbound); + @Override protected void validateHandshake(BitToUserHandshake inbound) throws RpcException { + // logger.debug("Handling handshake from bit to user. {}", inbound); if (inbound.hasServerInfos()) { serverInfos = inbound.getServerInfos(); } supportedMethods = Sets.immutableEnumSet(inbound.getSupportedMethodsList()); switch (inbound.getStatus()) { - case SUCCESS: - break; - case AUTH_REQUIRED: { - authComplete = false; - serverAuthMechanisms = ImmutableList.copyOf(inbound.getAuthenticationMechanismsList()); - connection.setEncryption(inbound.hasEncrypted() && inbound.getEncrypted()); - - if (inbound.hasMaxWrappedSize()) { - connection.setMaxWrappedSize(inbound.getMaxWrappedSize()); + case SUCCESS: + break; + case AUTH_REQUIRED: { + authComplete = false; + serverAuthMechanisms = ImmutableList.copyOf(inbound.getAuthenticationMechanismsList()); + connection.setEncryption(inbound.hasEncrypted() && inbound.getEncrypted()); + + if (inbound.hasMaxWrappedSize()) { + connection.setMaxWrappedSize(inbound.getMaxWrappedSize()); + } + logger.trace(String + .format("Server requires authentication with encryption context %s before proceeding.", + connection.getEncryptionCtxtString())); + break; } - logger.trace(String.format("Server requires authentication with encryption context %s before proceeding.", - connection.getEncryptionCtxtString())); - break; - } - case AUTH_FAILED: - case RPC_VERSION_MISMATCH: - case UNKNOWN_FAILURE: - final String errMsg = String.format("Status: %s, Error Id: %s, Error message: %s", - inbound.getStatus(), inbound.getErrorId(), inbound.getErrorMessage()); - logger.error(errMsg); - throw new NonTransientRpcException(errMsg); + case AUTH_FAILED: + case RPC_VERSION_MISMATCH: + case UNKNOWN_FAILURE: + final String errMsg = String + .format("Status: %s, Error Id: %s, Error message: %s", inbound.getStatus(), + inbound.getErrorId(), inbound.getErrorMessage()); + logger.error(errMsg); + throw new NonTransientRpcException(errMsg); } } - @Override - protected UserToBitConnection initRemoteConnection(SocketChannel channel) { + @Override protected UserToBitConnection initRemoteConnection(SocketChannel channel) { super.initRemoteConnection(channel); return new UserToBitConnection(channel); } @@ -421,39 +470,34 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect super(channel, "user client"); } - @Override - public BufferAllocator getAllocator() { + @Override public BufferAllocator getAllocator() { return allocator; } - @Override - protected Logger getLogger() { + @Override protected Logger getLogger() { return logger; } - @Override - public void incConnectionCounter() { + @Override public void incConnectionCounter() { // no-op } - @Override - public void decConnectionCounter() { + @Override public void decConnectionCounter() { // no-op } } - @Override - public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) { + @Override public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) { return new UserProtobufLengthDecoder(allocator, OutOfMemoryHandler.DEFAULT_INSTANCE); } /** * planQuery is an API to plan a query without query execution + * * @param req - data necessary to plan query * @return list of PlanFragments that can later on be submitted for execution */ - public DrillRpcFuture<QueryPlanFragments> planQuery( - GetQueryPlanFragments req) { + public DrillRpcFuture<QueryPlanFragments> planQuery(GetQueryPlanFragments req) { return send(RpcType.GET_QUERY_PLAN_FRAGMENTS, req, QueryPlanFragments.class); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java index 57ad4d51b..4e4c80e97 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java @@ -33,6 +33,7 @@ class UserConnectionConfig extends AbstractConnectionConfig { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserConnectionConfig.class); private final boolean authEnabled; + private final boolean sslEnabled; private final InboundImpersonationManager impersonationManager; private final UserServerRequestHandler handler; @@ -75,10 +76,16 @@ class UserConnectionConfig extends AbstractConnectionConfig { } else { authEnabled = false; } - impersonationManager = !config.getBoolean(ExecConstants.IMPERSONATION_ENABLED) ? null : new InboundImpersonationManager(); + + sslEnabled = config.getBoolean(ExecConstants.USER_SSL_ENABLED); + + if(isSSLEnabled() && isAuthEnabled() && isEncryptionEnabled()){ + logger.warn("The server is configured to use both SSL and SASL encryption (only one should be configured)."); + } + } @Override @@ -90,6 +97,10 @@ class UserConnectionConfig extends AbstractConnectionConfig { return authEnabled; } + boolean isSSLEnabled() { + return sslEnabled; + } + InboundImpersonationManager getImpersonationManager() { return impersonationManager; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java index 254fdca29..c0573db81 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java @@ -21,9 +21,15 @@ import java.io.IOException; import java.net.SocketAddress; import java.util.UUID; +import javax.net.ssl.SSLEngine; import javax.security.sasl.SaslException; +import io.netty.channel.Channel; +import io.netty.channel.ChannelPipeline; +import io.netty.handler.ssl.SslHandler; import org.apache.drill.common.config.DrillProperties; +import org.apache.drill.common.exceptions.DrillException; +import org.apache.drill.exec.ssl.SSLConfig; import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch; @@ -53,8 +59,10 @@ import org.apache.drill.exec.rpc.security.plain.PlainFactory; import org.apache.drill.exec.rpc.user.UserServer.BitToUserConnection; import org.apache.drill.exec.rpc.user.security.UserAuthenticationException; import org.apache.drill.exec.server.BootStrapContext; +import org.apache.drill.exec.ssl.SSLConfigBuilder; import org.apache.drill.exec.work.user.UserWorker; import org.apache.hadoop.security.HadoopKerberosName; +import org.apache.hadoop.security.ssl.SSLFactory; import org.slf4j.Logger; import com.google.protobuf.MessageLite; @@ -71,6 +79,8 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> { private static final String SERVER_NAME = "Apache Drill Server"; private final UserConnectionConfig config; + private final SSLConfig sslConfig; + private Channel sslChannel; private final UserWorker userWorker; public UserServer(BootStrapContext context, BufferAllocator allocator, EventLoopGroup eventLoopGroup, @@ -79,6 +89,17 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> { allocator.getAsByteBufAllocator(), eventLoopGroup); this.config = new UserConnectionConfig(allocator, context, new UserServerRequestHandler(worker)); + this.sslChannel = null; + try { + this.sslConfig = new SSLConfigBuilder() + .config(context.getConfig()) + .mode(SSLFactory.Mode.SERVER) + .initializeSSLContext(true) + .validateKeyStore(true) + .build(); + } catch (DrillException e) { + throw new DrillbitStartupException(e.getMessage(), e.getCause()); + } this.userWorker = worker; // Initialize Singleton instance of UserRpcMetrics. @@ -86,6 +107,34 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> { } @Override + protected void setupSSL(ChannelPipeline pipe) { + + SSLEngine sslEngine = sslConfig.createSSLEngine(config.getAllocator(), null, 0); + // Add SSL handler into pipeline + pipe.addFirst(RpcConstants.SSL_HANDLER, new SslHandler(sslEngine)); + logger.debug("SSL communication between client and server is enabled."); + logger.debug(sslConfig.toString()); + + } + + @Override + protected boolean isSslEnabled() { + return sslConfig.isUserSslEnabled(); + } + + @Override + public void setSslChannel(Channel c) { + sslChannel = c; + } + + @Override + protected void closeSSL(){ + if(isSslEnabled() && sslChannel != null){ + sslChannel.close(); + } + } + + @Override protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException { // a user server only expects acknowledgments on messages it creates. switch (rpcType) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java index 69f2cab76..e35f5421e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java @@ -26,12 +26,14 @@ import org.apache.commons.lang3.StringUtils; import org.apache.drill.common.exceptions.DrillException; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.SSLConfig; +import org.apache.drill.exec.ssl.SSLConfig; import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.rpc.security.plain.PlainFactory; import org.apache.drill.exec.server.BootStrapContext; import org.apache.drill.exec.server.rest.auth.DrillRestLoginService; +import org.apache.drill.exec.ssl.SSLConfigBuilder; import org.apache.drill.exec.work.WorkManager; +import org.apache.hadoop.security.ssl.SSLFactory; import org.bouncycastle.asn1.x500.X500NameBuilder; import org.bouncycastle.asn1.x500.style.BCStyle; import org.bouncycastle.cert.X509v3CertificateBuilder; @@ -331,12 +333,18 @@ public class WebServer implements AutoCloseable { logger.info("Setting up HTTPS connector for web server"); final SslContextFactory sslContextFactory = new SslContextFactory(); - SSLConfig ssl = new SSLConfig(config); + SSLConfig ssl = new SSLConfigBuilder() + .config(config) + .mode(SSLFactory.Mode.SERVER) + .initializeSSLContext(false) + .validateKeyStore(true) + .build(); if(ssl.isSslValid()){ logger.info("Using configured SSL settings for web server"); sslContextFactory.setKeyStorePath(ssl.getKeyStorePath()); sslContextFactory.setKeyStorePassword(ssl.getKeyStorePassword()); + sslContextFactory.setKeyManagerPassword(ssl.getKeyPassword()); if(ssl.hasTrustStorePath()){ sslContextFactory.setTrustStorePath(ssl.getTrustStorePath()); if(ssl.hasTrustStorePassword()){ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfig.java new file mode 100644 index 000000000..98a8c8d4f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfig.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.ssl; + +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslProvider; +import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import org.apache.drill.common.exceptions.DrillException; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.hadoop.security.ssl.SSLFactory; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.TrustManagerFactory; +import java.io.FileInputStream; +import java.io.InputStream; +import java.security.KeyStore; + +public abstract class SSLConfig { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SSLConfig.class); + + public static final String DEFAULT_SSL_PROVIDER = "JDK"; // JDK or OPENSSL + public static final String DEFAULT_SSL_PROTOCOL = "TLSv1.2"; + public static final int DEFAULT_SSL_HANDSHAKE_TIMEOUT_MS = 10 * 1000; // 10 seconds + + // Either the Netty SSL context or the JDK SSL context will be initialized + // The JDK SSL context is use iff the useSystemTrustStore setting is enabled. + protected SslContext nettySslContext; + protected SSLContext jdkSSlContext; + + private static final boolean isWindows = System.getProperty("os.name").toLowerCase().indexOf("win") >= 0; + private static final boolean isMacOs = System.getProperty("os.name").toLowerCase().indexOf("mac") >= 0; + + public static final String HADOOP_SSL_CONF_TPL_KEY = "hadoop.ssl.{0}.conf"; + public static final String HADOOP_SSL_KEYSTORE_LOCATION_TPL_KEY = "ssl.{0}.keystore.location"; + public static final String HADOOP_SSL_KEYSTORE_PASSWORD_TPL_KEY = "ssl.{0}.keystore.password"; + public static final String HADOOP_SSL_KEYSTORE_TYPE_TPL_KEY = "ssl.{0}.keystore.type"; + public static final String HADOOP_SSL_KEYSTORE_KEYPASSWORD_TPL_KEY = + "ssl.{0}.keystore.keypassword"; + public static final String HADOOP_SSL_TRUSTSTORE_LOCATION_TPL_KEY = "ssl.{0}.truststore.location"; + public static final String HADOOP_SSL_TRUSTSTORE_PASSWORD_TPL_KEY = "ssl.{0}.truststore.password"; + public static final String HADOOP_SSL_TRUSTSTORE_TYPE_TPL_KEY = "ssl.{0}.truststore.type"; + + public SSLConfig() { + } + + public abstract void validateKeyStore() throws DrillException; + + // We need to use different SSLContext objects depending on what the user has chosen + // For most uses we will use the Netty SslContext class. This allows us to use either + // the JDK implementation or the OpenSSL implementation. However if the user wants to + // use the system trust store, then the only way to access it is via the JDK's + // SSLContext class. (See the createSSLEngine method below). + + public abstract SslContext initNettySslContext() throws DrillException; + + public abstract SSLContext initJDKSSLContext() throws DrillException; + + public abstract boolean isUserSslEnabled(); + + public abstract boolean isHttpsEnabled(); + + public abstract String getKeyStoreType(); + + public abstract String getKeyStorePath(); + + public abstract String getKeyStorePassword(); + + public abstract String getKeyPassword(); + + public abstract String getTrustStoreType(); + + public abstract boolean hasTrustStorePath(); + + public abstract String getTrustStorePath(); + + public abstract boolean hasTrustStorePassword(); + + public abstract String getTrustStorePassword(); + + public abstract String getProtocol(); + + public abstract SslProvider getProvider(); + + public abstract int getHandshakeTimeout(); + + public abstract SSLFactory.Mode getMode(); + + public abstract boolean disableHostVerification(); + + public abstract boolean disableCertificateVerification(); + + public abstract boolean useSystemTrustStore(); + + public abstract boolean isSslValid(); + + public SslContext getNettySslContext() { + return nettySslContext; + } + + public TrustManagerFactory initializeTrustManagerFactory() throws DrillException { + TrustManagerFactory tmf; + KeyStore ts = null; + //Support Windows/MacOs system trust store + try { + String trustStoreType = getTrustStoreType(); + if ((isWindows || isMacOs) && useSystemTrustStore()) { + // This is valid for MS-Windows and MacOs + logger.debug("Initializing System truststore."); + ts = KeyStore.getInstance(!trustStoreType.isEmpty() ? trustStoreType : KeyStore.getDefaultType()); + ts.load(null, null); + } else if (!getTrustStorePath().isEmpty()) { + // if truststore is not provided then we will use the default. Note that the default depends on + // the TrustManagerFactory that in turn depends on the Security Provider. + // Use null as the truststore which will result in the default truststore being picked up + logger.debug("Initializing truststore {}.", getTrustStorePath()); + ts = KeyStore.getInstance(!trustStoreType.isEmpty() ? trustStoreType : KeyStore.getDefaultType()); + InputStream tsStream = new FileInputStream(getTrustStorePath()); + ts.load(tsStream, getTrustStorePassword().toCharArray()); + } else { + logger.debug("Initializing default truststore."); + } + if (disableCertificateVerification()) { + tmf = InsecureTrustManagerFactory.INSTANCE; + } else { + tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + } + tmf.init(ts); + } catch (Exception e) { + // Catch any SSL initialization Exceptions here and abort. + throw new DrillException( + new StringBuilder() + .append("Exception while initializing the truststore: [") + .append(e.getMessage()) + .append("]. ") + .toString(), e); + } + return tmf; + } + + public KeyManagerFactory initializeKeyManagerFactory() throws DrillException { + KeyManagerFactory kmf; + String keyStorePath = getKeyStorePath(); + String keyStorePassword = getKeyStorePassword(); + String keyStoreType = getKeyStoreType(); + try { + if (keyStorePath.isEmpty()) { + throw new DrillException("No Keystore provided."); + } + KeyStore ks = + KeyStore.getInstance(!keyStoreType.isEmpty() ? keyStoreType : KeyStore.getDefaultType()); + //initialize the key manager factory + // Will throw an exception if the file is not found/accessible. + InputStream ksStream = new FileInputStream(keyStorePath); + // A key password CANNOT be null or an empty string. + if (keyStorePassword.isEmpty()) { + throw new DrillException("The Keystore password cannot be empty."); + } + ks.load(ksStream, keyStorePassword.toCharArray()); + // Empty Keystore. (Remarkably, it is possible to do this). + if (ks.size() == 0) { + throw new DrillException("The Keystore has no entries."); + } + kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + kmf.init(ks, getKeyPassword().toCharArray()); + + } catch (Exception e) { + throw new DrillException( + new StringBuilder() + .append("Exception while initializing the keystore: [") + .append(e.getMessage()) + .append("]. ") + .toString()); + } + return kmf; + } + + public void initContext() throws DrillException { + if ((isWindows || isMacOs) && useSystemTrustStore()) { + initJDKSSLContext(); + logger.debug("Initialized Windows/MacOs SSL context using JDK."); + } else { + initNettySslContext(); + logger.debug("Initialized SSL context."); + } + return; + } + + public SSLEngine createSSLEngine(BufferAllocator allocator, String peerHost, int peerPort) { + SSLEngine engine; + if ((isWindows || isMacOs) && useSystemTrustStore()) { + if (peerHost != null) { + engine = jdkSSlContext.createSSLEngine(peerHost, peerPort); + logger.debug("Initializing Windows/MacOs SSLEngine with hostname."); + } else { + engine = jdkSSlContext.createSSLEngine(); + logger.debug("Initializing Windows/MacOs SSLEngine with no hostname."); + } + } else { + if (peerHost != null) { + engine = nettySslContext.newEngine(allocator.getAsByteBufAllocator(), peerHost, peerPort); + logger.debug("Initializing SSLEngine with hostname."); + } else { + engine = nettySslContext.newEngine(allocator.getAsByteBufAllocator()); + logger.debug("Initializing SSLEngine with no hostname."); + } + } + return engine; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("SSL is ") + .append(isUserSslEnabled()?"":" not ") + .append("enabled.\n"); + sb.append("HTTPS is ") + .append(isHttpsEnabled()?"":" not ") + .append("enabled.\n"); + if(isUserSslEnabled() || isHttpsEnabled()) { + sb.append("SSL Configuration :") + .append("OS:").append(System.getProperty("os.name")) + .append("\n\tUsing system trust store: ").append(useSystemTrustStore()) + .append("\n\tprotocol: ").append(getProtocol()) + .append("\n\tkeyStoreType: ").append(getKeyStoreType()) + .append("\n\tkeyStorePath: ").append(getKeyStorePath()) + .append("\n\tkeyStorePassword: ").append(getPrintablePassword(getKeyStorePassword())) + .append("\n\tkeyPassword: ").append(getPrintablePassword(getKeyPassword())) + .append("\n\ttrustStoreType: ").append(getTrustStoreType()) + .append("\n\ttrustStorePath: ").append(getTrustStorePath()) + .append("\n\ttrustStorePassword: ").append(getPrintablePassword(getTrustStorePassword())) + .append("\n\thandshakeTimeout: ").append(getHandshakeTimeout()) + .append("\n\tdisableHostVerification: ").append(disableHostVerification()) + .append("\n\tdisableCertificateVerification: ").append(disableCertificateVerification()) + ; + } + return sb.toString(); + } + + private String getPrintablePassword(String password) { + StringBuilder sb = new StringBuilder(); + if(password == null || password.length()<2 ){ + return password; + } + sb.append(password.charAt(0)).append("****").append(password.charAt(password.length()-1)); + return sb.toString(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfigBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfigBuilder.java new file mode 100644 index 000000000..0941960fa --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfigBuilder.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.ssl; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.exceptions.DrillConfigurationException; +import org.apache.drill.common.exceptions.DrillException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.ssl.SSLFactory; + +import java.util.Properties; + + +public class SSLConfigBuilder { + + private static final org.slf4j.Logger logger = + org.slf4j.LoggerFactory.getLogger(org.apache.drill.exec.ssl.SSLConfigBuilder.class); + + private DrillConfig config = null; + private Configuration hadoopConfig = null; + private Properties properties; + private SSLFactory.Mode mode = SSLFactory.Mode.SERVER; + private boolean initializeSSLContext = false; + private boolean validateKeyStore = false; + + public SSLConfigBuilder() { + + } + + public SSLConfig build() throws DrillException { + if (mode == SSLFactory.Mode.SERVER && config == null) { + throw new DrillConfigurationException( + "Cannot create SSL configuration from null Drill configuration."); + } + SSLConfig sslConfig; + if (mode == SSLFactory.Mode.SERVER) { + sslConfig = new SSLConfigServer(config, hadoopConfig); + } else { + sslConfig = new SSLConfigClient(properties); + } + if(validateKeyStore){ + sslConfig.validateKeyStore(); + } + if(initializeSSLContext){ + sslConfig.initContext(); + } + return sslConfig; + } + + public SSLConfigBuilder config(DrillConfig config) { + this.config = config; + return this; + } + + public SSLConfigBuilder hadoopConfig(Configuration hadoopConfig) { + this.hadoopConfig = hadoopConfig; + return this; + } + + public SSLConfigBuilder properties(Properties props) { + this.properties = props; + return this; + } + + public SSLConfigBuilder mode(SSLFactory.Mode mode) { + this.mode = mode; + return this; + } + + public SSLConfigBuilder initializeSSLContext(boolean initializeSSLContext) { + this.initializeSSLContext = initializeSSLContext; + return this; + } + + public SSLConfigBuilder validateKeyStore(boolean validateKeyStore) { + this.validateKeyStore = validateKeyStore; + return this; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfigClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfigClient.java new file mode 100644 index 000000000..6a70d436a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfigClient.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.ssl; + +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslProvider; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.config.DrillProperties; +import org.apache.drill.common.exceptions.DrillException; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.ssl.SSLFactory; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLParameters; +import javax.net.ssl.TrustManagerFactory; +import java.util.Properties; + +public class SSLConfigClient extends SSLConfig { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SSLConfigClient.class); + + private final Properties properties; + private final boolean userSslEnabled; + private final String trustStoreType; + private final String trustStorePath; + private final String trustStorePassword; + private final boolean disableHostVerification; + private final boolean disableCertificateVerification; + private final boolean useSystemTrustStore; + private final String protocol; + private final int handshakeTimeout; + private final String provider; + + private final String emptyString = new String(); + + public SSLConfigClient(Properties properties) throws DrillException { + this.properties = properties; + userSslEnabled = getBooleanProperty(DrillProperties.ENABLE_TLS); + trustStoreType = getStringProperty(DrillProperties.TRUSTSTORE_TYPE, "JKS"); + trustStorePath = getStringProperty(DrillProperties.TRUSTSTORE_PATH, ""); + trustStorePassword = getStringProperty(DrillProperties.TRUSTSTORE_PASSWORD, ""); + disableHostVerification = getBooleanProperty(DrillProperties.DISABLE_HOST_VERIFICATION); + disableCertificateVerification = getBooleanProperty(DrillProperties.DISABLE_CERT_VERIFICATION); + useSystemTrustStore = getBooleanProperty(DrillProperties.USE_SYSTEM_TRUSTSTORE); + protocol = getStringProperty(DrillProperties.TLS_PROTOCOL, DEFAULT_SSL_PROTOCOL); + int hsTimeout = getIntProperty(DrillProperties.TLS_HANDSHAKE_TIMEOUT, DEFAULT_SSL_HANDSHAKE_TIMEOUT_MS); + if (hsTimeout <= 0) { + hsTimeout = DEFAULT_SSL_HANDSHAKE_TIMEOUT_MS; + } + handshakeTimeout = hsTimeout; + provider = getStringProperty(DrillProperties.TLS_PROVIDER, DEFAULT_SSL_PROVIDER); + } + + private boolean getBooleanProperty(String propName) { + return (properties != null) && (properties.containsKey(propName)) + && (properties.getProperty(propName).compareToIgnoreCase("true") == 0); + } + + private String getStringProperty(String name, String defaultValue) { + String value = ""; + if ( (properties != null) && (properties.containsKey(name))) { + value = properties.getProperty(name); + } + if (value.isEmpty()) { + value = defaultValue; + } + value = value.trim(); + return value; + } + + private int getIntProperty(String name, int defaultValue) { + int value = defaultValue; + if ( (properties != null) && (properties.containsKey(name))) { + value = new Integer(properties.getProperty(name)).intValue(); + } + return value; + } + + public void validateKeyStore() throws DrillException { + + } + + @Override + public SslContext initNettySslContext() throws DrillException { + final SslContext sslCtx; + + if (!userSslEnabled) { + return null; + } + + TrustManagerFactory tmf; + try { + tmf = initializeTrustManagerFactory(); + sslCtx = SslContextBuilder.forClient() + .sslProvider(getProvider()) + .trustManager(tmf) + .protocols(protocol) + .build(); + } catch (Exception e) { + // Catch any SSL initialization Exceptions here and abort. + throw new DrillException(new StringBuilder() + .append("SSL is enabled but cannot be initialized due to the following exception: ") + .append("[ ") + .append(e.getMessage()) + .append("]. ") + .toString()); + } + this.nettySslContext = sslCtx; + return sslCtx; + } + + @Override + public SSLContext initJDKSSLContext() throws DrillException { + final SSLContext sslCtx; + + if (!userSslEnabled) { + return null; + } + + TrustManagerFactory tmf; + try { + tmf = initializeTrustManagerFactory(); + sslCtx = SSLContext.getInstance(protocol); + sslCtx.init(null, tmf.getTrustManagers(), null); + } catch (Exception e) { + // Catch any SSL initialization Exceptions here and abort. + throw new DrillException(new StringBuilder() + .append("SSL is enabled but cannot be initialized due to the following exception: ") + .append("[ ") + .append(e.getMessage()) + .append("]. ") + .toString()); + } + this.jdkSSlContext = sslCtx; + return sslCtx; + } + + @Override + public SSLEngine createSSLEngine(BufferAllocator allocator, String peerHost, int peerPort) { + SSLEngine engine = super.createSSLEngine(allocator, peerHost, peerPort); + + if (!this.disableHostVerification()) { + SSLParameters sslParameters = engine.getSSLParameters(); + // only available since Java 7 + sslParameters.setEndpointIdentificationAlgorithm("HTTPS"); + engine.setSSLParameters(sslParameters); + } + + engine.setUseClientMode(true); + + try { + engine.setEnableSessionCreation(true); + } catch (Exception e) { + // Openssl implementation may throw this. + logger.debug("Session creation not enabled. Exception: {}", e.getMessage()); + } + + return engine; + } + + @Override + public boolean isUserSslEnabled() { + return userSslEnabled; + } + + @Override + public boolean isHttpsEnabled() { + return false; + } + + @Override + public String getKeyStoreType() { + return emptyString; + } + + @Override + public String getKeyStorePath() { + return emptyString; + } + + @Override + public String getKeyStorePassword() { + return emptyString; + } + + @Override + public String getKeyPassword() { + return emptyString; + } + + @Override + public String getTrustStoreType() { + return trustStoreType; + } + + @Override + public boolean hasTrustStorePath() { + return !trustStorePath.isEmpty(); + } + + @Override + public String getTrustStorePath() { + return trustStorePath; + } + + @Override + public boolean hasTrustStorePassword() { + return !trustStorePassword.isEmpty(); + } + + @Override + public String getTrustStorePassword() { + return trustStorePassword; + } + + @Override + public String getProtocol() { + return protocol; + } + + @Override + public SslProvider getProvider() { + return provider.equalsIgnoreCase("JDK") ? SslProvider.JDK : SslProvider.OPENSSL; + } + + @Override + public int getHandshakeTimeout() { + return handshakeTimeout; + } + + @Override + public SSLFactory.Mode getMode() { + return SSLFactory.Mode.CLIENT; + } + + @Override + public boolean disableHostVerification() { + return disableHostVerification; + } + + @Override + public boolean disableCertificateVerification() { + return disableCertificateVerification; + } + + @Override + public boolean useSystemTrustStore() { + return useSystemTrustStore; + } + + public boolean isSslValid() { + return true; + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfigServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfigServer.java new file mode 100644 index 000000000..da31d2e7f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfigServer.java @@ -0,0 +1,330 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.ssl; + +import com.google.common.base.Preconditions; +import io.netty.handler.ssl.SslContext; +import io.netty.handler.ssl.SslContextBuilder; +import io.netty.handler.ssl.SslProvider; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.exceptions.DrillException; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.ssl.SSLFactory; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.TrustManagerFactory; +import java.text.MessageFormat; + +public class SSLConfigServer extends SSLConfig { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SSLConfigServer.class); + + private final DrillConfig config; + private final Configuration hadoopConfig; + private final boolean userSslEnabled; + private final boolean httpsEnabled; + private final String keyStoreType; + private final String keyStorePath; + private final String keyStorePassword; + private final String keyPassword; + private final String trustStoreType; + private final String trustStorePath; + private final String trustStorePassword; + private final String protocol; + private final String provider; + + public SSLConfigServer(DrillConfig config, Configuration hadoopConfig) throws DrillException { + this.config = config; + SSLFactory.Mode mode = SSLFactory.Mode.SERVER; + httpsEnabled = + config.hasPath(ExecConstants.HTTP_ENABLE_SSL) && config.getBoolean(ExecConstants.HTTP_ENABLE_SSL); + // For testing we will mock up a hadoop configuration, however for regular use, we find the actual hadoop config. + boolean enableHadoopConfig = config.getBoolean(ExecConstants.SSL_USE_HADOOP_CONF); + if (enableHadoopConfig) { + if (hadoopConfig == null) { + this.hadoopConfig = new Configuration(); // get hadoop configuration + } else { + this.hadoopConfig = hadoopConfig; + } + String hadoopSSLConfigFile = + this.hadoopConfig.get(resolveHadoopPropertyName(HADOOP_SSL_CONF_TPL_KEY, getMode())); + logger.debug("Using Hadoop configuration for SSL"); + logger.debug("Hadoop SSL configuration file: {}", hadoopSSLConfigFile); + this.hadoopConfig.addResource(hadoopSSLConfigFile); + } else { + this.hadoopConfig = null; + } + userSslEnabled = + config.hasPath(ExecConstants.USER_SSL_ENABLED) && config.getBoolean(ExecConstants.USER_SSL_ENABLED); + trustStoreType = getConfigParam(ExecConstants.SSL_TRUSTSTORE_TYPE, + resolveHadoopPropertyName(HADOOP_SSL_TRUSTSTORE_TYPE_TPL_KEY, mode)); + trustStorePath = getConfigParam(ExecConstants.SSL_TRUSTSTORE_PATH, + resolveHadoopPropertyName(HADOOP_SSL_TRUSTSTORE_LOCATION_TPL_KEY, mode)); + trustStorePassword = getConfigParam(ExecConstants.SSL_TRUSTSTORE_PASSWORD, + resolveHadoopPropertyName(HADOOP_SSL_TRUSTSTORE_PASSWORD_TPL_KEY, mode)); + keyStoreType = getConfigParam(ExecConstants.SSL_KEYSTORE_TYPE, + resolveHadoopPropertyName(HADOOP_SSL_KEYSTORE_TYPE_TPL_KEY, mode)); + keyStorePath = getConfigParam(ExecConstants.SSL_KEYSTORE_PATH, + resolveHadoopPropertyName(HADOOP_SSL_KEYSTORE_LOCATION_TPL_KEY, mode)); + keyStorePassword = getConfigParam(ExecConstants.SSL_KEYSTORE_PASSWORD, + resolveHadoopPropertyName(HADOOP_SSL_KEYSTORE_PASSWORD_TPL_KEY, mode)); + // if no keypassword specified, use keystore password + String keyPass = getConfigParam(ExecConstants.SSL_KEY_PASSWORD, + resolveHadoopPropertyName(HADOOP_SSL_KEYSTORE_KEYPASSWORD_TPL_KEY, mode)); + keyPassword = keyPass.isEmpty() ? keyStorePassword : keyPass; + protocol = getConfigParamWithDefault(ExecConstants.SSL_PROTOCOL, DEFAULT_SSL_PROTOCOL); + provider = getConfigParamWithDefault(ExecConstants.SSL_PROVIDER, DEFAULT_SSL_PROVIDER); + } + + public void validateKeyStore() throws DrillException { + //HTTPS validates the keystore is not empty. User Server SSL context initialization also validates keystore, but + // much more strictly. User Client context initialization does not validate keystore. + /*If keystorePath or keystorePassword is provided in the configuration file use that*/ + if ((isUserSslEnabled() || isHttpsEnabled())) { + if (!keyStorePath.isEmpty() || !keyStorePassword.isEmpty()) { + if (keyStorePath.isEmpty()) { + throw new DrillException( + " *.ssl.keyStorePath in the configuration file is empty, but *.ssl.keyStorePassword is set"); + } else if (keyStorePassword.isEmpty()) { + throw new DrillException( + " *.ssl.keyStorePassword in the configuration file is empty, but *.ssl.keyStorePath is set "); + } + } + } + } + + @Override + public SslContext initNettySslContext() throws DrillException { + final SslContext sslCtx; + + if (!userSslEnabled) { + return null; + } + + KeyManagerFactory kmf; + TrustManagerFactory tmf; + try { + if (keyStorePath.isEmpty()) { + throw new DrillException("No Keystore provided."); + } + kmf = initializeKeyManagerFactory(); + tmf = initializeTrustManagerFactory(); + sslCtx = SslContextBuilder.forServer(kmf) + .trustManager(tmf) + .protocols(protocol) + .sslProvider(getProvider()) + .build(); // Will throw an exception if the key password is not correct + } catch (Exception e) { + // Catch any SSL initialization Exceptions here and abort. + throw new DrillException(new StringBuilder() + .append("SSL is enabled but cannot be initialized - ") + .append("[ ") + .append(e.getMessage()) + .append("]. ") + .toString()); + } + this.nettySslContext = sslCtx; + return sslCtx; + } + + @Override + public SSLContext initJDKSSLContext() throws DrillException { + final SSLContext sslCtx; + + if (!userSslEnabled) { + return null; + } + + KeyManagerFactory kmf; + TrustManagerFactory tmf; + try { + if (keyStorePath.isEmpty()) { + throw new DrillException("No Keystore provided."); + } + kmf = initializeKeyManagerFactory(); + tmf = initializeTrustManagerFactory(); + sslCtx = SSLContext.getInstance(protocol); + sslCtx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); + } catch (Exception e) { + // Catch any SSL initialization Exceptions here and abort. + throw new DrillException( + new StringBuilder().append("SSL is enabled but cannot be initialized - ") + .append("[ ") + .append(e.getMessage()) + .append("]. ") + .toString()); + } + this.jdkSSlContext = sslCtx; + return sslCtx; + } + + @Override + public SSLEngine createSSLEngine(BufferAllocator allocator, String peerHost, int peerPort) { + SSLEngine engine = super.createSSLEngine(allocator, peerHost, peerPort); + + engine.setUseClientMode(false); + + // No need for client side authentication (HTTPS like behaviour) + engine.setNeedClientAuth(false); + + try { + engine.setEnableSessionCreation(true); + } catch (Exception e) { + // Openssl implementation may throw this. + logger.debug("Session creation not enabled. Exception: {}", e.getMessage()); + } + + return engine; + } + + private String getConfigParam(String name, String hadoopName) { + String value = ""; + if (hadoopConfig != null) { + value = getHadoopConfigParam(hadoopName); + } + if (value.isEmpty() && config.hasPath(name)) { + value = config.getString(name); + } + value = value.trim(); + return value; + } + + private String getHadoopConfigParam(String name) { + Preconditions.checkArgument(this.hadoopConfig != null); + String value = hadoopConfig.get(name, ""); + value = value.trim(); + return value; + } + + private String getConfigParamWithDefault(String name, String defaultValue) { + String value = ""; + if (config.hasPath(name)) { + value = config.getString(name); + } + if (value.isEmpty()) { + value = defaultValue; + } + value = value.trim(); + return value; + } + + private String resolveHadoopPropertyName(String nameTemplate, SSLFactory.Mode mode) { + return MessageFormat.format(nameTemplate, mode.toString().toLowerCase()); + } + + + + @Override + public boolean isUserSslEnabled() { + return userSslEnabled; + } + + @Override + public boolean isHttpsEnabled() { + return httpsEnabled; + } + + @Override + public String getKeyStoreType() { + return keyStoreType; + } + + @Override + public String getKeyStorePath() { + return keyStorePath; + } + + @Override + public String getKeyStorePassword() { + return keyStorePassword; + } + + @Override + public String getKeyPassword() { + return keyPassword; + } + + @Override + public String getTrustStoreType() { + return trustStoreType; + } + + @Override + public boolean hasTrustStorePath() { + return !trustStorePath.isEmpty(); + } + + @Override + public String getTrustStorePath() { + return trustStorePath; + } + + @Override + public boolean hasTrustStorePassword() { + return !trustStorePassword.isEmpty(); + } + + @Override + public String getTrustStorePassword() { + return trustStorePassword; + } + + @Override + public String getProtocol() { + return protocol; + } + + @Override + public SslProvider getProvider() { + return provider.equalsIgnoreCase("JDK") ? SslProvider.JDK : SslProvider.OPENSSL; + } + + @Override + public int getHandshakeTimeout() { + return 0; + } + + @Override + public SSLFactory.Mode getMode() { + return SSLFactory.Mode.SERVER; + } + + @Override + public boolean disableHostVerification() { + return false; + } + + @Override + public boolean disableCertificateVerification() { + return false; + } + + @Override + public boolean useSystemTrustStore() { + return false; // Client only, notsupported by the server + } + + public boolean isSslValid() { + return !keyStorePath.isEmpty() && !keyStorePassword.isEmpty(); + } + +} diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index bfdb74c42..26c8df0b6 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -51,12 +51,6 @@ drill.client: { // By default ${DRILL_TMP_DIR} is used if set or ${drill.tmp-dir} if it's been overridden. drill.tmp-dir: "/tmp" drill.tmp-dir: ${?DRILL_TMP_DIR} -javax.net.ssl: { - keyStore = "", - keyStorePassword = "", - trustStore = "", - trustStorePassword = "" -}, drill.exec: { cluster-id: "drillbits1" rpc: { @@ -142,10 +136,20 @@ drill.exec: { }, //setting javax variables for ssl configurations is being deprecated. ssl: { - keyStorePath = ${?javax.net.ssl.keyStore} - keyStorePassword = ${?javax.net.ssl.keyStorePassword} - trustStorePath = ${?javax.net.ssl.trustStore} + keyStoreType = ${?javax.net.ssl.keyStoreType}, + keyStorePath = ${?javax.net.ssl.keyStore}, + keyStorePassword = ${?javax.net.ssl.keyStorePassword}, + trustStoreType = ${?javax.net.ssl.trustStoreType}, + trustStorePath = ${?javax.net.ssl.trustStore}, trustStorePassword = ${?javax.net.ssl.trustStorePassword} + # default key password to keystore password + keyPassword = ${?javax.net.ssl.keyStorePassword}, + protocol: "TLSv1.2", + # if true, then Drill will read SSL parameters from the + # Hadoop configuration files. + useHadoopConfig : true, + #Drill can use either the JDK implementation or the OpenSSL implementation. + provider: "JDK" }, network: { start: 35000 @@ -168,21 +172,24 @@ drill.exec: { enabled: false, max_chained_user_hops: 3 }, - security.user.auth { + security.user.auth: { enabled: false }, - security.bit.auth { + security.bit.auth: { enabled: false use_login_principal: false } - security.user.encryption.sasl { + security.user.encryption.sasl: { enabled : false, max_wrapped_size : 65536 } - security.bit.encryption.sasl { + security.bit.encryption.sasl: { enabled : false, max_wrapped_size : 65536 } + security.user.encryption.ssl: { + enabled : false + } trace: { directory: "/tmp/drill-trace", filesystem: "file:///" diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java index e59c384f0..a094f518d 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java +++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java @@ -94,6 +94,7 @@ public class BaseTestQuery extends ExecTest { put(ExecConstants.HTTP_ENABLE, "false"); // Increasing retry attempts for testing put(ExecConstants.UDF_RETRY_ATTEMPTS, "10"); + put(ExecConstants.SSL_USE_HADOOP_CONF, "false"); } }; @@ -220,12 +221,12 @@ public class BaseTestQuery extends ExecTest { } if (!properties.containsKey(DrillProperties.DRILLBIT_CONNECTION)) { - properties = new Properties(properties); properties.setProperty(DrillProperties.DRILLBIT_CONNECTION, - String.format("localhost:%s", bits[0].getUserPort())); + String.format("localhost:%s", bits[0].getUserPort())); } - client = QueryTestUtil.createClient(config, serviceSet, MAX_WIDTH_PER_NODE, properties); + DrillConfig clientConfig = DrillConfig.forClient(); + client = QueryTestUtil.createClient(clientConfig, serviceSet, MAX_WIDTH_PER_NODE, properties); } /** @@ -241,7 +242,8 @@ public class BaseTestQuery extends ExecTest { client = null; } - client = QueryTestUtil.createClient(config, serviceSet, MAX_WIDTH_PER_NODE, properties); + DrillConfig clientConfig = DrillConfig.forClient(); + client = QueryTestUtil.createClient(clientConfig, serviceSet, MAX_WIDTH_PER_NODE, properties); } /* diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestSSLConfig.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestSSLConfig.java index 0b2847351..729cc6f0a 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestSSLConfig.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestSSLConfig.java @@ -21,11 +21,19 @@ package org.apache.drill.exec; import org.apache.drill.categories.SecurityTest; import org.apache.drill.common.exceptions.DrillException; +import org.apache.drill.exec.ssl.SSLConfig; +import org.apache.drill.exec.ssl.SSLConfigBuilder; import org.apache.drill.test.ConfigBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.ssl.SSLFactory; import org.junit.Test; import org.junit.experimental.categories.Category; + +import java.text.MessageFormat; + import static junit.framework.TestCase.fail; +import static org.apache.drill.exec.ssl.SSLConfig.HADOOP_SSL_CONF_TPL_KEY; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -38,8 +46,15 @@ public class TestSSLConfig { ConfigBuilder config = new ConfigBuilder(); config.put(ExecConstants.HTTP_KEYSTORE_PATH, ""); config.put(ExecConstants.HTTP_KEYSTORE_PASSWORD, "root"); + config.put(ExecConstants.SSL_USE_HADOOP_CONF, false); + config.put(ExecConstants.USER_SSL_ENABLED, true); try { - SSLConfig sslv = new SSLConfig(config.build()); + SSLConfig sslv = new SSLConfigBuilder() + .config(config.build()) + .mode(SSLFactory.Mode.SERVER) + .initializeSSLContext(false) + .validateKeyStore(true) + .build(); fail(); //Expected } catch (Exception e) { @@ -53,8 +68,15 @@ public class TestSSLConfig { ConfigBuilder config = new ConfigBuilder(); config.put(ExecConstants.HTTP_KEYSTORE_PATH, "/root"); config.put(ExecConstants.HTTP_KEYSTORE_PASSWORD, ""); + config.put(ExecConstants.SSL_USE_HADOOP_CONF, false); + config.put(ExecConstants.USER_SSL_ENABLED, true); try { - SSLConfig sslv = new SSLConfig(config.build()); + SSLConfig sslv = new SSLConfigBuilder() + .config(config.build()) + .mode(SSLFactory.Mode.SERVER) + .initializeSSLContext(false) + .validateKeyStore(true) + .build(); fail(); //Expected } catch (Exception e) { @@ -69,7 +91,12 @@ public class TestSSLConfig { config.put(ExecConstants.HTTP_KEYSTORE_PATH, "/root"); config.put(ExecConstants.HTTP_KEYSTORE_PASSWORD, "root"); try { - SSLConfig sslv = new SSLConfig(config.build()); + SSLConfig sslv = new SSLConfigBuilder() + .config(config.build()) + .mode(SSLFactory.Mode.SERVER) + .initializeSSLContext(false) + .validateKeyStore(true) + .build(); assertEquals("/root", sslv.getKeyStorePath()); assertEquals("root", sslv.getKeyStorePassword()); } catch (Exception e) { @@ -84,7 +111,12 @@ public class TestSSLConfig { ConfigBuilder config = new ConfigBuilder(); config.put("javax.net.ssl.keyStore", "/root"); config.put("javax.net.ssl.keyStorePassword", "root"); - SSLConfig sslv = new SSLConfig(config.build()); + SSLConfig sslv = new SSLConfigBuilder() + .config(config.build()) + .mode(SSLFactory.Mode.SERVER) + .initializeSSLContext(false) + .validateKeyStore(true) + .build(); assertEquals("/root",sslv.getKeyStorePath()); assertEquals("root", sslv.getKeyStorePassword()); } @@ -95,10 +127,41 @@ public class TestSSLConfig { ConfigBuilder config = new ConfigBuilder(); config.put(ExecConstants.HTTP_TRUSTSTORE_PATH, "/root"); config.put(ExecConstants.HTTP_TRUSTSTORE_PASSWORD, "root"); - SSLConfig sslv = new SSLConfig(config.build()); + config.put(ExecConstants.SSL_USE_HADOOP_CONF, false); + SSLConfig sslv = new SSLConfigBuilder() + .config(config.build()) + .mode(SSLFactory.Mode.SERVER) + .initializeSSLContext(false) + .validateKeyStore(true) + .build(); assertEquals(true, sslv.hasTrustStorePath()); assertEquals(true,sslv.hasTrustStorePassword()); assertEquals("/root",sslv.getTrustStorePath()); assertEquals("root",sslv.getTrustStorePassword()); } -}
\ No newline at end of file + + @Test + public void testInvalidHadoopKeystore() throws Exception { + Configuration hadoopConfig = new Configuration(); + String hadoopSSLFileProp = MessageFormat + .format(HADOOP_SSL_CONF_TPL_KEY, SSLFactory.Mode.SERVER.toString().toLowerCase()); + hadoopConfig.set(hadoopSSLFileProp, "ssl-server-invalid.xml"); + ConfigBuilder config = new ConfigBuilder(); + config.put(ExecConstants.USER_SSL_ENABLED, true); + config.put(ExecConstants.SSL_USE_HADOOP_CONF, true); + SSLConfig sslv; + try { + sslv = new SSLConfigBuilder() + .config(config.build()) + .mode(SSLFactory.Mode.SERVER) + .initializeSSLContext(false) + .validateKeyStore(true) + .hadoopConfig(hadoopConfig) + .build(); + fail(); + } catch (Exception e) { + assertTrue(e instanceof DrillException); + } + } + +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitSSL.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitSSL.java new file mode 100644 index 000000000..2228e30d7 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitSSL.java @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.rpc.user.security; + +import com.typesafe.config.ConfigValueFactory; +import io.netty.handler.ssl.util.SelfSignedCertificate; +import junit.framework.TestCase; +import org.apache.drill.BaseTestQuery; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.config.DrillProperties; +import org.apache.drill.exec.ExecConstants; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.File; +import java.io.FileOutputStream; +import java.net.InetAddress; +import java.security.KeyStore; +import java.util.Properties; + +import static junit.framework.TestCase.fail; +import static org.junit.Assert.assertEquals; + +public class TestUserBitSSL extends BaseTestQuery { + private static final org.slf4j.Logger logger = + org.slf4j.LoggerFactory.getLogger(TestUserBitSSL.class); + + private static DrillConfig newConfig; + private static Properties initProps; // initial client properties + private static ClassLoader classLoader; + private static String ksPath; + private static String tsPath; + private static String emptyTSPath; + private static String unknownKsPath; + + @BeforeClass + public static void setupTest() throws Exception { + + // Create a new DrillConfig + classLoader = TestUserBitSSL.class.getClassLoader(); + ksPath = new File(classLoader.getResource("ssl/keystore.ks").getFile()).getAbsolutePath(); + unknownKsPath = new File(classLoader.getResource("ssl/unknownkeystore.ks").getFile()).getAbsolutePath(); + tsPath = new File(classLoader.getResource("ssl/truststore.ks").getFile()).getAbsolutePath(); + emptyTSPath = new File(classLoader.getResource("ssl/emptytruststore.ks").getFile()).getAbsolutePath(); + newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties()) + .withValue(ExecConstants.SSL_USE_HADOOP_CONF, + ConfigValueFactory.fromAnyRef(false)) + .withValue(ExecConstants.USER_SSL_ENABLED, + ConfigValueFactory.fromAnyRef(true)) + .withValue(ExecConstants.SSL_KEYSTORE_TYPE, + ConfigValueFactory.fromAnyRef("JKS")) + .withValue(ExecConstants.SSL_KEYSTORE_PATH, + ConfigValueFactory.fromAnyRef(ksPath)) + .withValue(ExecConstants.SSL_KEYSTORE_PASSWORD, + ConfigValueFactory.fromAnyRef("drill123")) + .withValue(ExecConstants.SSL_KEY_PASSWORD, + ConfigValueFactory.fromAnyRef("drill123")) + .withValue(ExecConstants.SSL_TRUSTSTORE_TYPE, + ConfigValueFactory.fromAnyRef("JKS")) + .withValue(ExecConstants.SSL_TRUSTSTORE_PATH, + ConfigValueFactory.fromAnyRef(tsPath)) + .withValue(ExecConstants.SSL_TRUSTSTORE_PASSWORD, + ConfigValueFactory.fromAnyRef("drill123")) + .withValue(ExecConstants.SSL_PROTOCOL, + ConfigValueFactory.fromAnyRef("TLSv1.2")), + false); + + initProps = new Properties(); + initProps.setProperty(DrillProperties.ENABLE_TLS, "true"); + initProps.setProperty(DrillProperties.TRUSTSTORE_PATH, tsPath); + initProps.setProperty(DrillProperties.TRUSTSTORE_PASSWORD, "drill123"); + initProps.setProperty(DrillProperties.DISABLE_HOST_VERIFICATION, "true"); + + // Start an SSL enabled cluster + updateTestCluster(1, newConfig, initProps); + } + + @AfterClass + public static void cleanTest() throws Exception { + DrillConfig restoreConfig = + new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties()), false); + updateTestCluster(1, restoreConfig); + } + + @Test + public void testSSLConnection() throws Exception { + final Properties connectionProps = new Properties(); + connectionProps.setProperty(DrillProperties.ENABLE_TLS, "true"); + connectionProps.setProperty(DrillProperties.TRUSTSTORE_PATH, tsPath); + connectionProps.setProperty(DrillProperties.TRUSTSTORE_PASSWORD, "drill123"); + connectionProps.setProperty(DrillProperties.DISABLE_HOST_VERIFICATION, "true"); + try { + updateClient(connectionProps); + } catch (Exception e) { + TestCase.fail( new StringBuilder() + .append("SSL Connection failed with exception [" ) + .append( e.getMessage() ) + .append("]") + .toString()); + } + } + + @Test + public void testSSLConnectionWithKeystore() throws Exception { + final Properties connectionProps = new Properties(); + connectionProps.setProperty(DrillProperties.ENABLE_TLS, "true"); + connectionProps.setProperty(DrillProperties.TRUSTSTORE_PATH, ksPath); + connectionProps.setProperty(DrillProperties.TRUSTSTORE_PASSWORD, "drill123"); + connectionProps.setProperty(DrillProperties.DISABLE_HOST_VERIFICATION, "true"); + try { + updateClient(connectionProps); + } catch (Exception e) { + TestCase.fail( new StringBuilder() + .append("SSL Connection failed with exception [" ) + .append( e.getMessage() ) + .append("]") + .toString()); + } + } + + @Test + public void testSSLConnectionFailBadTrustStore() throws Exception { + final Properties connectionProps = new Properties(); + connectionProps.setProperty(DrillProperties.ENABLE_TLS, "true"); + connectionProps.setProperty(DrillProperties.TRUSTSTORE_PATH, ""); // NO truststore + connectionProps.setProperty(DrillProperties.TRUSTSTORE_PASSWORD, "drill123"); + connectionProps.setProperty(DrillProperties.DISABLE_HOST_VERIFICATION, "true"); + boolean failureCaught = false; + try { + updateClient(connectionProps); + } catch (Exception e) { + failureCaught = true; + } + assertEquals(failureCaught, true); + } + + @Test + public void testSSLConnectionFailBadPassword() throws Exception { + final Properties connectionProps = new Properties(); + connectionProps.setProperty(DrillProperties.ENABLE_TLS, "true"); + connectionProps.setProperty(DrillProperties.TRUSTSTORE_PATH, tsPath); + connectionProps.setProperty(DrillProperties.TRUSTSTORE_PASSWORD, "bad_password"); + connectionProps.setProperty(DrillProperties.DISABLE_HOST_VERIFICATION, "true"); + boolean failureCaught = false; + try { + updateClient(connectionProps); + } catch (Exception e) { + failureCaught = true; + } + assertEquals(failureCaught, true); + } + + @Test + public void testSSLConnectionFailEmptyTrustStore() throws Exception { + final Properties connectionProps = new Properties(); + connectionProps.setProperty(DrillProperties.ENABLE_TLS, "true"); + connectionProps.setProperty(DrillProperties.TRUSTSTORE_PATH, emptyTSPath); + connectionProps.setProperty(DrillProperties.TRUSTSTORE_PASSWORD, "drill123"); + connectionProps.setProperty(DrillProperties.DISABLE_HOST_VERIFICATION, "true"); + boolean failureCaught = false; + try { + updateClient(connectionProps); + } catch (Exception e) { + failureCaught = true; + } + assertEquals(failureCaught, true); + } + + @Test + public void testSSLQuery() throws Exception { + final Properties connectionProps = new Properties(); + connectionProps.setProperty(DrillProperties.ENABLE_TLS, "true"); + connectionProps.setProperty(DrillProperties.TRUSTSTORE_PATH, tsPath); + connectionProps.setProperty(DrillProperties.TRUSTSTORE_PASSWORD, "drill123"); + connectionProps.setProperty(DrillProperties.DISABLE_HOST_VERIFICATION, "true"); + try { + updateClient(connectionProps); + } catch (Exception e) { + TestCase.fail( new StringBuilder() + .append("SSL Connection failed with exception [" ) + .append( e.getMessage() ) + .append("]") + .toString()); + } + test("SELECT * FROM cp.`region.json`"); + } + + @Ignore("This test fails in some cases where the host name may be set up inconsistently.") + @Test + public void testClientConfigHostnameVerification() { + String password = "test_password"; + String trustStoreFileName = "drillTestTrustStore"; + String keyStoreFileName = "drillTestKeyStore"; + KeyStore ts, ks; + File tempFile1, tempFile2; + String trustStorePath; + String keyStorePath; + + try { + String fqdn = InetAddress.getLocalHost().getHostName(); + SelfSignedCertificate certificate = new SelfSignedCertificate(fqdn); + + tempFile1 = File.createTempFile(trustStoreFileName, ".ks"); + tempFile1.deleteOnExit(); + trustStorePath = tempFile1.getAbsolutePath(); + //generate a truststore. + ts = KeyStore.getInstance(KeyStore.getDefaultType()); + ts.load(null, password.toCharArray()); + ts.setCertificateEntry("drillTest", certificate.cert()); + // Store away the truststore. + try (FileOutputStream fos1 = new FileOutputStream(tempFile1);) { + ts.store(fos1, password.toCharArray()); + } catch (Exception e) { + fail(e.getMessage()); + } + + tempFile2 = File.createTempFile(keyStoreFileName, ".ks"); + tempFile2.deleteOnExit(); + keyStorePath = tempFile2.getAbsolutePath(); + //generate a keystore. + ts = KeyStore.getInstance(KeyStore.getDefaultType()); + ts.load(null, password.toCharArray()); + ts.setKeyEntry("drillTest", certificate.key(), password.toCharArray(), new java.security.cert.Certificate[]{certificate.cert()}); + // Store away the keystore. + try (FileOutputStream fos2 = new FileOutputStream(tempFile2);) { + ts.store(fos2, password.toCharArray()); + } catch (Exception e) { + fail(e.getMessage()); + } + + final Properties connectionProps = new Properties(); + connectionProps.setProperty(DrillProperties.ENABLE_TLS, "true"); + connectionProps.setProperty(DrillProperties.TRUSTSTORE_PATH, trustStorePath); + connectionProps.setProperty(DrillProperties.TRUSTSTORE_PASSWORD, password); + connectionProps.setProperty(DrillProperties.DISABLE_HOST_VERIFICATION, "false"); + + DrillConfig sslConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties()) + .withValue(ExecConstants.USER_SSL_ENABLED, ConfigValueFactory.fromAnyRef(true)) + .withValue(ExecConstants.SSL_KEYSTORE_TYPE, ConfigValueFactory.fromAnyRef("JKS")) + .withValue(ExecConstants.SSL_KEYSTORE_PATH, ConfigValueFactory.fromAnyRef(keyStorePath)) + .withValue(ExecConstants.SSL_KEYSTORE_PASSWORD, ConfigValueFactory.fromAnyRef("test_password")) + .withValue(ExecConstants.SSL_PROTOCOL, ConfigValueFactory.fromAnyRef("TLSv1.2")), false); + + updateTestCluster(1, sslConfig, connectionProps); + + } catch (Exception e) { + fail(e.getMessage()); + } + //reset cluster + updateTestCluster(1, newConfig, initProps); + + } + + @Test + public void testClientConfigHostNameVerificationFail() throws Exception { + final Properties connectionProps = new Properties(); + connectionProps.setProperty(DrillProperties.ENABLE_TLS, "true"); + connectionProps.setProperty(DrillProperties.TRUSTSTORE_PATH, tsPath); + connectionProps.setProperty(DrillProperties.TRUSTSTORE_PASSWORD, "password"); + connectionProps.setProperty(DrillProperties.DISABLE_HOST_VERIFICATION, "false"); + boolean failureCaught = false; + try { + updateClient(connectionProps); + } catch (Exception e) { + failureCaught = true; + } + assertEquals(failureCaught, true); + } + + @Test + public void testClientConfigCertificateVerification() { + // Fail if certificate is not valid + boolean failureCaught = false; + try { + final Properties connectionProps = new Properties(); + connectionProps.setProperty(DrillProperties.ENABLE_TLS, "true"); + connectionProps.setProperty(DrillProperties.TRUSTSTORE_PATH, tsPath); + connectionProps.setProperty(DrillProperties.TRUSTSTORE_PASSWORD, "drill123"); + connectionProps.setProperty(DrillProperties.DISABLE_HOST_VERIFICATION, "true"); + //connectionProps.setProperty(DrillProperties.DISABLE_CERT_VERIFICATION, "true"); + + DrillConfig sslConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties()) + .withValue(ExecConstants.USER_SSL_ENABLED, ConfigValueFactory.fromAnyRef(true)) + .withValue(ExecConstants.SSL_KEYSTORE_TYPE, ConfigValueFactory.fromAnyRef("JKS")) + .withValue(ExecConstants.SSL_KEYSTORE_PATH, ConfigValueFactory.fromAnyRef(unknownKsPath)) + .withValue(ExecConstants.SSL_KEYSTORE_PASSWORD, ConfigValueFactory.fromAnyRef("drill123")) + .withValue(ExecConstants.SSL_PROTOCOL, ConfigValueFactory.fromAnyRef("TLSv1.2")), false); + + updateTestCluster(1, sslConfig, connectionProps); + + } catch (Exception e) { + failureCaught = true; + } + //reset cluster + updateTestCluster(1, newConfig, initProps); + assertEquals(failureCaught, true); + } + + @Test + public void testClientConfigNoCertificateVerification() { + // Pass if certificate is not valid, but mode is insecure. + try { + final Properties connectionProps = new Properties(); + connectionProps.setProperty(DrillProperties.ENABLE_TLS, "true"); + connectionProps.setProperty(DrillProperties.TRUSTSTORE_PATH, tsPath); + connectionProps.setProperty(DrillProperties.TRUSTSTORE_PASSWORD, "drill123"); + connectionProps.setProperty(DrillProperties.DISABLE_HOST_VERIFICATION, "true"); + connectionProps.setProperty(DrillProperties.DISABLE_CERT_VERIFICATION, "true"); + + DrillConfig sslConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties()) + .withValue(ExecConstants.USER_SSL_ENABLED, ConfigValueFactory.fromAnyRef(true)) + .withValue(ExecConstants.SSL_KEYSTORE_TYPE, ConfigValueFactory.fromAnyRef("JKS")) + .withValue(ExecConstants.SSL_KEYSTORE_PATH, ConfigValueFactory.fromAnyRef(unknownKsPath)) + .withValue(ExecConstants.SSL_KEYSTORE_PASSWORD, ConfigValueFactory.fromAnyRef("drill123")) + .withValue(ExecConstants.SSL_PROTOCOL, ConfigValueFactory.fromAnyRef("TLSv1.2")), false); + + updateTestCluster(1, sslConfig, connectionProps); + + } catch (Exception e) { + fail(e.getMessage()); + } + //reset cluster + updateTestCluster(1, newConfig, initProps); + + } + +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitSSLServer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitSSLServer.java new file mode 100644 index 000000000..05d124512 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitSSLServer.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.rpc.user.security; + +import com.typesafe.config.ConfigValueFactory; +import junit.framework.TestCase; +import org.apache.drill.BaseTestQuery; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.config.DrillProperties; +import org.apache.drill.exec.ExecConstants; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.ssl.SSLFactory; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.text.MessageFormat; +import java.util.Properties; + +import static org.apache.drill.exec.ssl.SSLConfig.HADOOP_SSL_CONF_TPL_KEY; +import static org.junit.Assert.assertEquals; + +public class TestUserBitSSLServer extends BaseTestQuery { + private static final org.slf4j.Logger logger = + org.slf4j.LoggerFactory.getLogger(TestUserBitSSLServer.class); + + private static DrillConfig sslConfig; + private static Properties initProps; // initial client properties + private static ClassLoader classLoader; + private static String ksPath; + private static String tsPath; + private static String emptyTSPath; + + @BeforeClass + public static void setupTest() throws Exception { + + classLoader = TestUserBitSSLServer.class.getClassLoader(); + ksPath = new File(classLoader.getResource("ssl/keystore.ks").getFile()).getAbsolutePath(); + tsPath = new File(classLoader.getResource("ssl/truststore.ks").getFile()).getAbsolutePath(); + emptyTSPath = new File(classLoader.getResource("ssl/emptytruststore.ks").getFile()).getAbsolutePath(); + sslConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties()) + .withValue(ExecConstants.USER_SSL_ENABLED, ConfigValueFactory.fromAnyRef(true)) + .withValue(ExecConstants.SSL_KEYSTORE_TYPE, ConfigValueFactory.fromAnyRef("JKS")) + .withValue(ExecConstants.SSL_KEYSTORE_PATH, ConfigValueFactory.fromAnyRef(ksPath)) + .withValue(ExecConstants.SSL_KEYSTORE_PASSWORD, ConfigValueFactory.fromAnyRef("drill123")) + .withValue(ExecConstants.SSL_KEY_PASSWORD, ConfigValueFactory.fromAnyRef("drill123")) + .withValue(ExecConstants.SSL_PROTOCOL, ConfigValueFactory.fromAnyRef("TLSv1.2")), false); + initProps = new Properties(); + initProps.setProperty(DrillProperties.ENABLE_TLS, "true"); + initProps.setProperty(DrillProperties.TRUSTSTORE_PATH, tsPath); + initProps.setProperty(DrillProperties.TRUSTSTORE_PASSWORD, "drill123"); + initProps.setProperty(DrillProperties.DISABLE_HOST_VERIFICATION, "true"); + } + + @AfterClass + public static void cleanTest() throws Exception { + DrillConfig restoreConfig = + new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties()), false); + updateTestCluster(1, restoreConfig); + } + + @Test + public void testInvalidKeystorePath() throws Exception { + DrillConfig testConfig = new DrillConfig(DrillConfig.create(sslConfig) + .withValue(ExecConstants.SSL_KEYSTORE_PATH, ConfigValueFactory.fromAnyRef("/bad/path")), + false); + + // Start an SSL enabled cluster + boolean failureCaught = false; + try { + updateTestCluster(1, testConfig, initProps); + } catch (Exception e) { + failureCaught = true; + } + assertEquals(failureCaught, true); + } + + @Test + public void testInvalidKeystorePassword() throws Exception { + DrillConfig testConfig = new DrillConfig(DrillConfig.create(sslConfig) + .withValue(ExecConstants.SSL_KEYSTORE_PASSWORD, ConfigValueFactory.fromAnyRef("badpassword")), + false); + + // Start an SSL enabled cluster + boolean failureCaught = false; + try { + updateTestCluster(1, testConfig, initProps); + } catch (Exception e) { + failureCaught = true; + } + assertEquals(failureCaught, true); + } + + @Test + public void testInvalidKeyPassword() throws Exception { + DrillConfig testConfig = new DrillConfig(DrillConfig.create(sslConfig) + .withValue(ExecConstants.SSL_KEY_PASSWORD, ConfigValueFactory.fromAnyRef("badpassword")), + false); + + // Start an SSL enabled cluster + boolean failureCaught = false; + try { + updateTestCluster(1, testConfig, initProps); + } catch (Exception e) { + failureCaught = true; + } + assertEquals(failureCaught, true); + } + + @Test + // Should pass because the keystore password will be used. + public void testNoKeyPassword() throws Exception { + DrillConfig testConfig = new DrillConfig(DrillConfig.create(sslConfig) + .withValue(ExecConstants.SSL_KEY_PASSWORD, ConfigValueFactory.fromAnyRef("")), + false); + + // Start an SSL enabled cluster + boolean failureCaught = false; + try { + updateTestCluster(1, testConfig, initProps); + } catch (Exception e) { + failureCaught = true; + } + assertEquals(failureCaught, false); + } + +} diff --git a/exec/java-exec/src/test/resources/ssl-server-invalid.xml b/exec/java-exec/src/test/resources/ssl-server-invalid.xml new file mode 100644 index 000000000..6bfac5bce --- /dev/null +++ b/exec/java-exec/src/test/resources/ssl-server-invalid.xml @@ -0,0 +1,72 @@ +<?xml version="1.0"?> + <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<configuration> + + <property> + <name>ssl.server.truststore.location</name> + <value></value> + <description>Truststore to be used by NN and DN. Must be specified. + </description> + </property> + + <property> + <name>ssl.server.truststore.password</name> + <value></value> + <description>Optional. Default value is "". + </description> + </property> + + <property> + <name>ssl.server.truststore.type</name> + <value>jks</value> + <description>Optional. Default value is "jks". + </description> + </property> + + <property> + <name>ssl.server.keystore.location</name> + <value></value> + <description>Keystore to be used by NN and DN. Must be specified. + </description> + </property> + + <property> + <name>ssl.server.keystore.password</name> + <value>FAIL</value> + <description>Must be specified. + </description> + </property> + + <property> + <name>ssl.server.keystore.keypassword</name> + <value></value> + <description>Must be specified. + </description> + </property> + + <property> + <name>ssl.server.keystore.type</name> + <value>jks</value> + <description>Optional. Default value is "jks". + </description> + </property> + +</configuration> diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillConnectionImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillConnectionImpl.java index 0e4726d99..c0804c5a3 100644 --- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillConnectionImpl.java +++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillConnectionImpl.java @@ -157,14 +157,14 @@ class DrillConnectionImpl extends AvaticaConnection this.client.setClientName("Apache Drill JDBC Driver"); this.client.connect(connect, info); } catch (OutOfMemoryException e) { - throw new SQLException("Failure creating root allocator", e); + throw new SQLNonTransientConnectionException("Failure creating root allocator", e); } catch (InvalidConnectionInfoException e) { - throw new SQLException("Invalid parameter in connection string: " + e.getMessage(), e); + throw new SQLNonTransientConnectionException("Invalid parameter in connection string: " + e.getMessage(), e); } catch (RpcException e) { // (Include cause exception's text in wrapping exception's text so // it's more likely to get to user (e.g., via SQLLine), and use // toString() since getMessage() text doesn't always mention error:) - throw new SQLException("Failure in connecting to Drill: " + e, e); + throw new SQLNonTransientConnectionException("Failure in connecting to Drill: " + e, e); } } diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java index face1af1c..6e11236fc 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java @@ -271,17 +271,26 @@ public abstract class AbstractRemoteConnection implements RemoteConnection, Encr public void addSecurityHandlers() { final ChannelPipeline channelPipeline = getChannel().pipeline(); - channelPipeline.addFirst(RpcConstants.SASL_DECRYPTION_HANDLER, - new SaslDecryptionHandler(saslCodec, getMaxWrappedSize(), OutOfMemoryHandler.DEFAULT_INSTANCE)); - - channelPipeline.addFirst(RpcConstants.LENGTH_DECODER_HANDLER, - new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, Integer.MAX_VALUE, - RpcConstants.LENGTH_FIELD_OFFSET, RpcConstants.LENGTH_FIELD_LENGTH, - RpcConstants.LENGTH_ADJUSTMENT, RpcConstants.INITIAL_BYTES_TO_STRIP, true)); + if (channelPipeline.names().contains(RpcConstants.SSL_HANDLER)) { + channelPipeline.addAfter(RpcConstants.SSL_HANDLER, RpcConstants.SASL_DECRYPTION_HANDLER, + new SaslDecryptionHandler(saslCodec, getMaxWrappedSize(), OutOfMemoryHandler.DEFAULT_INSTANCE)); + + channelPipeline.addAfter(RpcConstants.SSL_HANDLER, RpcConstants.LENGTH_DECODER_HANDLER, + new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, Integer.MAX_VALUE, + RpcConstants.LENGTH_FIELD_OFFSET, RpcConstants.LENGTH_FIELD_LENGTH, + RpcConstants.LENGTH_ADJUSTMENT, RpcConstants.INITIAL_BYTES_TO_STRIP, true)); + } else { + channelPipeline.addFirst(RpcConstants.SASL_DECRYPTION_HANDLER, + new SaslDecryptionHandler(saslCodec, getMaxWrappedSize(), OutOfMemoryHandler.DEFAULT_INSTANCE)); + + channelPipeline.addFirst(RpcConstants.LENGTH_DECODER_HANDLER, + new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, Integer.MAX_VALUE, + RpcConstants.LENGTH_FIELD_OFFSET, RpcConstants.LENGTH_FIELD_LENGTH, + RpcConstants.LENGTH_ADJUSTMENT, RpcConstants.INITIAL_BYTES_TO_STRIP, true)); + } channelPipeline.addAfter(RpcConstants.MESSAGE_DECODER, RpcConstants.SASL_ENCRYPTION_HANDLER, - new SaslEncryptionHandler(saslCodec, getWrapSizeLimit(), - OutOfMemoryHandler.DEFAULT_INSTANCE)); + new SaslEncryptionHandler(saslCodec, getWrapSizeLimit(), OutOfMemoryHandler.DEFAULT_INSTANCE)); channelPipeline.addAfter(RpcConstants.SASL_ENCRYPTION_HANDLER, RpcConstants.CHUNK_CREATION_HANDLER, new ChunkCreationHandler(getWrapSizeLimit())); diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java index d51b748e5..0d80df6da 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.rpc; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import io.netty.channel.ChannelFuture; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -33,12 +33,10 @@ import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; -import java.net.SocketAddress; import java.util.concurrent.TimeUnit; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode; -import org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType; import com.google.common.base.Preconditions; import com.google.protobuf.Internal.EnumLite; @@ -69,6 +67,7 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio private final Parser<HR> handshakeParser; private final IdlePingHandler pingHandler; + private ConnectionMultiListener.SSLHandshakeListener sslHandshakeListener = null; public BasicClient(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, T handshakeType, Class<HR> responseClass, Parser<HR> handshakeParser) { @@ -100,6 +99,10 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio ch.closeFuture().addListener(getCloseHandler(ch, connection)); final ChannelPipeline pipe = ch.pipeline(); + // Make sure that the SSL handler is the first handler in the pipeline so everything is encrypted + if (isSslEnabled()) { + setupSSL(pipe, sslHandshakeListener); + } pipe.addLast(RpcConstants.PROTOCOL_DECODER, getDecoder(connection.getAllocator())); pipe.addLast(RpcConstants.MESSAGE_DECODER, new RpcDecoder("c-" + rpcConfig.getName())); @@ -111,7 +114,7 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio } pipe.addLast(RpcConstants.MESSAGE_HANDLER, new InboundHandler(connection)); - pipe.addLast(RpcConstants.EXCEPTION_HANDLER, new RpcExceptionHandler<CC>(connection)); + pipe.addLast(RpcConstants.EXCEPTION_HANDLER, new RpcExceptionHandler<>(connection)); } }); // @@ -120,6 +123,21 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio // } } + // Adds a SSL handler if enabled. Required only for client and server communications, so + // a real implementation is only available for UserClient + protected void setupSSL(ChannelPipeline pipe, ConnectionMultiListener.SSLHandshakeListener sslHandshakeListener) { + throw new UnsupportedOperationException("SSL is implemented only by the User Client."); + } + + protected boolean isSslEnabled() { + return false; + } + + // Save the SslChannel after the SSL handshake so it can be closed later + public void setSslChannel(Channel c) { + + } + @Override protected CC initRemoteConnection(SocketChannel channel){ local=channel.localAddress(); @@ -144,7 +162,7 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio } }; - public IdlePingHandler(long idleWaitInMillis) { + IdlePingHandler(long idleWaitInMillis) { super(0, idleWaitInMillis, 0, TimeUnit.MILLISECONDS); } @@ -179,6 +197,13 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio return super.send(connection, rpcType, protobufBody, clazz, dataBodies); } + public <SEND extends MessageLite, RECEIVE extends MessageLite> + void send(RpcOutcomeListener<RECEIVE> listener, SEND protobufBody, boolean allowInEventLoop, + ByteBuf... dataBodies) { + super.send(listener, connection, handshakeType, protobufBody, (Class<RECEIVE>) responseClass, + allowInEventLoop, dataBodies); + } + // the command itself must be "run" by the caller (to avoid calling inEventLoop) protected <M extends MessageLite> RpcCommand<M, CC> getInitialCommand(final RpcCommand<M, CC> command) { @@ -187,116 +212,24 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio protected void connectAsClient(RpcConnectionHandler<CC> connectionListener, HS handshakeValue, String host, int port) { - ConnectionMultiListener cml = new ConnectionMultiListener(connectionListener, handshakeValue); - b.connect(host, port).addListener(cml.connectionHandler); - } - - private class ConnectionMultiListener { - private final RpcConnectionHandler<CC> l; - private final HS handshakeValue; - - public ConnectionMultiListener(RpcConnectionHandler<CC> l, HS handshakeValue) { - assert l != null; - assert handshakeValue != null; - - this.l = l; - this.handshakeValue = handshakeValue; - } - - public final ConnectionHandler connectionHandler = new ConnectionHandler(); - public final HandshakeSendHandler handshakeSendHandler = new HandshakeSendHandler(); - - /** - * Manages connection establishment outcomes. - */ - private class ConnectionHandler implements GenericFutureListener<ChannelFuture> { - - @Override - public void operationComplete(ChannelFuture future) throws Exception { - boolean isInterrupted = false; - - // We want to wait for at least 120 secs when interrupts occur. Establishing a connection fails/succeeds quickly, - // So there is no point propagating the interruption as failure immediately. - long remainingWaitTimeMills = 120000; - long startTime = System.currentTimeMillis(); - // logger.debug("Connection operation finished. Success: {}", future.isSuccess()); - while(true) { - try { - future.get(remainingWaitTimeMills, TimeUnit.MILLISECONDS); - if (future.isSuccess()) { - SocketAddress remote = future.channel().remoteAddress(); - SocketAddress local = future.channel().localAddress(); - setAddresses(remote, local); - // send a handshake on the current thread. This is the only time we will send from within the event thread. - // We can do this because the connection will not be backed up. - send(handshakeSendHandler, connection, handshakeType, handshakeValue, responseClass, true); - } else { - l.connectionFailed(FailureType.CONNECTION, new RpcException("General connection failure.")); - } - // logger.debug("Handshake queued for send."); - break; - } catch (final InterruptedException interruptEx) { - remainingWaitTimeMills -= (System.currentTimeMillis() - startTime); - startTime = System.currentTimeMillis(); - isInterrupted = true; - if (remainingWaitTimeMills < 1) { - l.connectionFailed(FailureType.CONNECTION, interruptEx); - break; - } - // Ignore the interrupt and continue to wait until we elapse remainingWaitTimeMills. - } catch (final Exception ex) { - logger.error("Failed to establish connection", ex); - l.connectionFailed(FailureType.CONNECTION, ex); - break; - } - } - - if (isInterrupted) { - // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the - // interruption and respond to it if it wants to. - Thread.currentThread().interrupt(); - } - } - } - - /** - * manages handshake outcomes. - */ - private class HandshakeSendHandler implements RpcOutcomeListener<HR> { - - @Override - public void failed(RpcException ex) { - logger.debug("Failure while initiating handshake", ex); - l.connectionFailed(FailureType.HANDSHAKE_COMMUNICATION, ex); - } - - @Override - public void success(HR value, ByteBuf buffer) { - // logger.debug("Handshake received. {}", value); - try { - validateHandshake(value); - finalizeConnection(value, connection); - l.connectionSucceeded(connection); - // logger.debug("Handshake completed succesfully."); - } catch (Exception ex) { - logger.debug("Failure while validating handshake", ex); - l.connectionFailed(FailureType.HANDSHAKE_VALIDATION, ex); - } - } - - @Override - public void interrupted(final InterruptedException ex) { - logger.warn("Interrupted while waiting for handshake response", ex); - l.connectionFailed(FailureType.HANDSHAKE_COMMUNICATION, ex); - } + ConnectionMultiListener<T, CC, HS, HR, BasicClient<T, CC, HS, HR>> cml; + ConnectionMultiListener.Builder<T, CC, HS, HR, BasicClient<T, CC, HS, HR> > builder = + ConnectionMultiListener.newBuilder(connectionListener, handshakeValue, this); + if (isSslEnabled()) { + cml = builder.enableSSL().build(); + sslHandshakeListener = new ConnectionMultiListener.SSLHandshakeListener(); + sslHandshakeListener.setParent(cml); + } else { + cml = builder.build(); } + b.connect(host, port).addListener(cml.connectionHandler); } private class ClientHandshakeHandler extends AbstractHandshakeHandler<HR> { private final CC connection; - public ClientHandshakeHandler(CC connection) { + ClientHandshakeHandler(CC connection) { super(BasicClient.this.handshakeType, BasicClient.this.handshakeParser); Preconditions.checkNotNull(connection); this.connection = connection; diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java index a7258dd96..67fc89a19 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java @@ -21,6 +21,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -82,6 +83,11 @@ public abstract class BasicServer<T extends EnumLite, SC extends ServerConnectio ch.closeFuture().addListener(getCloseHandler(ch, connection)); final ChannelPipeline pipe = ch.pipeline(); + // Make sure that the SSL handler is the first handler in the pipeline so everything is encrypted + if (isSslEnabled()) { + setupSSL(pipe); + } + pipe.addLast(RpcConstants.PROTOCOL_DECODER, getDecoder(connection.getAllocator(), getOutOfMemoryHandler())); pipe.addLast(RpcConstants.MESSAGE_DECODER, new RpcDecoder("s-" + rpcConfig.getName())); pipe.addLast(RpcConstants.PROTOCOL_ENCODER, new RpcEncoder("s-" + rpcConfig.getName())); @@ -105,6 +111,25 @@ public abstract class BasicServer<T extends EnumLite, SC extends ServerConnectio // } } + // Adds a SSL handler if enabled. Required only for client and server communications, so + // a real implementation is only available for UserServer + protected void setupSSL(ChannelPipeline pipe) { + throw new UnsupportedOperationException("SSL is implemented only by the User Server."); + } + + protected boolean isSslEnabled() { + return false; + } + + // Save the SslChannel after the SSL handshake so it can be closed later + public void setSslChannel(Channel c) { + return; + } + + protected void closeSSL() { + return; + } + private class LoggingReadTimeoutHandler extends ReadTimeoutHandler { private final SC connection; @@ -203,6 +228,9 @@ public abstract class BasicServer<T extends EnumLite, SC extends ServerConnectio if (elapsed > 500) { logger.info("closed eventLoopGroup " + eventLoopGroup + " in " + elapsed + " ms"); } + if(isSslEnabled()) { + closeSSL(); + } } catch (final InterruptedException | ExecutionException e) { logger.warn("Failure while shutting down {}. ", this.getClass().getName(), e); diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ConnectionMultiListener.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ConnectionMultiListener.java new file mode 100644 index 000000000..0cdca13a6 --- /dev/null +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ConnectionMultiListener.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.rpc; + +import com.google.protobuf.Internal.EnumLite; +import com.google.protobuf.MessageLite; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import org.apache.drill.common.exceptions.DrillException; +import org.slf4j.Logger; + +import java.net.SocketAddress; +import java.util.concurrent.TimeUnit; + +/** + * @param <CC> Client Connection Listener + * @param <HS> Outbound handshake message type + * @param <HR> Inbound handshake message type + * @param <BC> BasicClient type + * <p> + * Implements a wrapper class that allows a client connection to associate different behaviours after + * establishing a connection with the server. The client can choose to send an application handshake, or + * in the case of SSL, wait for a SSL handshake completion and then send an application handshake. + */ + +public class ConnectionMultiListener<T extends EnumLite, CC extends ClientConnection, HS extends MessageLite, HR extends MessageLite, BC extends BasicClient<T, CC, HS, HR>> { + + private static final Logger logger = org.slf4j.LoggerFactory.getLogger(ConnectionMultiListener.class); + + private final RpcConnectionHandler<CC> connectionListener; + private final HS handshakeValue; + private final BC parent; + + private ConnectionMultiListener(RpcConnectionHandler<CC> connectionListener, HS handshakeValue, + BC basicClient) { + assert connectionListener != null; + assert handshakeValue != null; + + this.connectionListener = connectionListener; + this.handshakeValue = handshakeValue; + this.parent = basicClient; + } + + public static <T extends EnumLite, CC extends ClientConnection, HS extends MessageLite, HR extends MessageLite, BC extends BasicClient<T, CC, HS, HR>> + Builder<T, CC, HS, HR, BC> + newBuilder(RpcConnectionHandler<CC> connectionListener, HS handshakeValue, + BC basicClient) { + return new Builder<>(connectionListener, handshakeValue, basicClient); + } + + ConnectionHandler connectionHandler = null; + private HandshakeSendHandler handshakeSendHandler = null; + private SSLConnectionHandler sslConnectionHandler = null; + + /** + * Manages connection establishment outcomes. + */ + private class ConnectionHandler implements GenericFutureListener<ChannelFuture> { + + @Override + public void operationComplete(ChannelFuture future) throws Exception { + boolean isInterrupted = false; + + // We want to wait for at least 120 secs when interrupts occur. Establishing a connection fails/succeeds quickly, + // So there is no point propagating the interruption as failure immediately. + long remainingWaitTimeMills = 120000; + long startTime = System.currentTimeMillis(); + // logger.debug("Connection operation finished. Success: {}", future.isSuccess()); + while (true) { + try { + future.get(remainingWaitTimeMills, TimeUnit.MILLISECONDS); + if (future.isSuccess()) { + SocketAddress remote = future.channel().remoteAddress(); + SocketAddress local = future.channel().localAddress(); + parent.setAddresses(remote, local); + // if SSL is enabled send the handshake after the ssl handshake is completed, otherwise send it + // now + if(!parent.isSslEnabled()) { + // send a handshake on the current thread. This is the only time we will send from within the event thread. + // We can do this because the connection will not be backed up. + parent.send(handshakeSendHandler, handshakeValue, true); + } + } else { + connectionListener.connectionFailed(RpcConnectionHandler.FailureType.CONNECTION, + new RpcException("General connection failure.")); + } + // logger.debug("Handshake queued for send."); + break; + } catch (final InterruptedException interruptEx) { + remainingWaitTimeMills -= (System.currentTimeMillis() - startTime); + startTime = System.currentTimeMillis(); + isInterrupted = true; + if (remainingWaitTimeMills < 1) { + connectionListener.connectionFailed(RpcConnectionHandler.FailureType.CONNECTION, interruptEx); + break; + } + // Ignore the interrupt and continue to wait until we elapse remainingWaitTimeMills. + } catch (final Exception ex) { + logger.error("Failed to establish connection", ex); + connectionListener.connectionFailed(RpcConnectionHandler.FailureType.CONNECTION, ex); + break; + } + } + + if (isInterrupted) { + // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the + // interruption and respond to it if it wants to. + Thread.currentThread().interrupt(); + } + } + } + + private class SSLConnectionHandler implements GenericFutureListener<Future<Channel>> { + @Override + public void operationComplete(Future<Channel> future) throws Exception { + // send the handshake + parent.send(handshakeSendHandler, handshakeValue, true); + } + } + + /** + * manages handshake outcomes. + */ + private class HandshakeSendHandler implements RpcOutcomeListener<HR> { + + @Override + public void failed(RpcException ex) { + logger.debug("Failure while initiating handshake", ex); + connectionListener.connectionFailed(RpcConnectionHandler.FailureType.HANDSHAKE_COMMUNICATION, ex); + } + + @Override + public void success(HR value, ByteBuf buffer) { + // logger.debug("Handshake received. {}", value); + try { + parent.validateHandshake(value); + parent.finalizeConnection(value, parent.connection); + connectionListener.connectionSucceeded(parent.connection); + // logger.debug("Handshake completed succesfully."); + } catch (Exception ex) { + logger.debug("Failure while validating handshake", ex); + connectionListener.connectionFailed(RpcConnectionHandler.FailureType.HANDSHAKE_VALIDATION, ex); + } + } + + @Override + public void interrupted(final InterruptedException ex) { + logger.warn("Interrupted while waiting for handshake response", ex); + connectionListener.connectionFailed(RpcConnectionHandler.FailureType.HANDSHAKE_COMMUNICATION, ex); + } + } + + /* + The SSL Handshake listener is special in that it is needed at the time of initializing an SSL + enabled pipeline and so is instantiated before the instance of the outer class may be needed. + We create an instance and set a reference back to the outer class instance when it is created + at the time of connection. + */ + public static class SSLHandshakeListener implements GenericFutureListener<Future<Channel>> { + ConnectionMultiListener parent; + SSLHandshakeListener() { + } + + public void setParent(ConnectionMultiListener cml){ + this.parent = cml; + } + + @Override + public void operationComplete(Future<Channel> future) throws Exception { + if(parent != null){ + if(future.isSuccess()) { + Channel c = future.get(); + parent.sslConnectionHandler.operationComplete(future); + parent.parent.setSslChannel(c); + } else { + throw new DrillException("SSL handshake failed.", future.cause()); + } + } else { + throw new RpcException("RPC Setup error. SSL handshake complete handler is not set up."); + } + } + } + + + public static class Builder<T extends EnumLite, CC extends ClientConnection, HS extends MessageLite, HR extends MessageLite, BC extends BasicClient<T, CC, HS, HR> > { + + private final RpcConnectionHandler<CC> connectionListener; + private final HS handshakeValue; + private final BC basicClient; + boolean enableSSL = false; + + private ConnectionMultiListener<T, CC, HS, HR, BC> cml; + + private Builder(RpcConnectionHandler<CC> connectionListener, HS handshakeValue, BC basicClient) { + this.connectionListener = connectionListener; + this.handshakeValue = handshakeValue; + this.basicClient = basicClient; + } + + Builder<T, CC, HS, HR, BC> enableSSL() { + enableSSL = true; + return this; + } + + public ConnectionMultiListener<T, CC, HS, HR, BC> build() { + this.cml = new ConnectionMultiListener<>(connectionListener, handshakeValue, basicClient); + cml.connectionHandler = cml.new ConnectionHandler(); + cml.handshakeSendHandler = cml.new HandshakeSendHandler(); + if(enableSSL) { + cml.sslConnectionHandler = cml.new SSLConnectionHandler(); + } + return cml; + } + + } + +} diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java index be58f371c..455ada84c 100644 --- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java +++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java @@ -38,6 +38,7 @@ public class RpcConstants { public static final String SASL_ENCRYPTION_HANDLER = "sasl-encryption-handler"; public static final String LENGTH_DECODER_HANDLER = "length-decoder"; public static final String CHUNK_CREATION_HANDLER = "chunk-creation-handler"; + public static final String SSL_HANDLER = "ssl-handler"; |