aboutsummaryrefslogtreecommitdiff
path: root/exec
diff options
context:
space:
mode:
authorParth Chandra <parthc@apache.org>2017-06-09 22:03:59 -0700
committerParth Chandra <parthc@apache.org>2017-10-11 19:26:13 -0700
commit552d7d825e37d42835bd6bfccfc07fc7d3b5fd94 (patch)
tree669a74d1646c97522969f8acf8c4ed2baff8a68f /exec
parentb803405c0f978beb8beaf77211f99731014ac92f (diff)
DRILL-5431: SSL Support (Java) - Java client server SSL implementation
Also enable OpenSSL support Also fix exclusions and java-exec pom file to eliminate netty-tcnative as a transitive dependency on all projects
Diffstat (limited to 'exec')
-rw-r--r--exec/java-exec/pom.xml66
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java28
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/SSLConfig.java69
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java356
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java49
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfig.java265
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfigBuilder.java94
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfigClient.java273
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfigServer.java330
-rw-r--r--exec/java-exec/src/main/resources/drill-module.conf33
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java10
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/TestSSLConfig.java75
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitSSL.java344
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitSSLServer.java143
-rw-r--r--exec/java-exec/src/test/resources/ssl-server-invalid.xml72
-rw-r--r--exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillConnectionImpl.java6
-rw-r--r--exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java27
-rw-r--r--exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java149
-rw-r--r--exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java28
-rw-r--r--exec/rpc/src/main/java/org/apache/drill/exec/rpc/ConnectionMultiListener.java235
-rw-r--r--exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java1
24 files changed, 2297 insertions, 387 deletions
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index ab1e02ee6..88039a331 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -20,6 +20,9 @@
<name>exec/Java Execution Engine</name>
<properties>
<libpam4j.version>1.8-rev1</libpam4j.version>
+ <!-- Configure the os-maven-plugin extension to expand the classifier on -->
+ <!-- Fedora-"like" systems. Used for netty-tcnative inclusion -->
+ <os.detection.classifierWithLikes>fedora</os.detection.classifierWithLikes>
</properties>
<dependencies>
@@ -456,6 +459,14 @@
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -469,6 +480,14 @@
<artifactId>commons-codec</artifactId>
</exclusion>
<exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
@@ -486,14 +505,6 @@
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
- <exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- <exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -576,6 +587,14 @@
<artifactId>libpam4j</artifactId>
<version>${libpam4j.version}</version>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-tcnative</artifactId>
+ <version>2.0.1.Final</version>
+ <scope>provided</scope>
+ <classifier>${os.detected.classifier}</classifier>
+ </dependency>
+
</dependencies>
<profiles>
@@ -595,6 +614,18 @@
<groupId>org.apache.hadoop</groupId>
<exclusions>
<exclusion>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
@@ -614,6 +645,14 @@
<groupId>org.apache.hadoop</groupId>
<exclusions>
<exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
@@ -658,6 +697,17 @@
<build>
+ <extensions>
+ <!--
+ Include the os-maven-plugin to get os.detected.classifier
+ -->
+ <extension>
+ <groupId>kr.motd.maven</groupId>
+ <artifactId>os-maven-plugin</artifactId>
+ <version>1.5.0.Final</version>
+ </extension>
+ </extensions>
+
<plugins>
<plugin>
<artifactId>maven-resources-plugin</artifactId>
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 9b125cb03..5c19e1398 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -117,6 +117,18 @@ public final class ExecConstants {
public static final String HASHAGG_FALLBACK_ENABLED_KEY = "drill.exec.hashagg.fallback.enabled";
public static final BooleanValidator HASHAGG_FALLBACK_ENABLED_VALIDATOR = new BooleanValidator(HASHAGG_FALLBACK_ENABLED_KEY);
+ public static final String SSL_PROVIDER = "drill.exec.ssl.provider"; // valid values are "JDK", "OPENSSL" // default JDK
+ public static final String SSL_PROTOCOL = "drill.exec.ssl.protocol"; // valid values are SSL, SSLV2, SSLV3, TLS, TLSV1, TLSv1.1, TLSv1.2(default)
+ public static final String SSL_KEYSTORE_TYPE = "drill.exec.ssl.keyStoreType";
+ public static final String SSL_KEYSTORE_PATH = "drill.exec.ssl.keyStorePath"; // path to keystore. default : $JRE_HOME/lib/security/keystore.jks
+ public static final String SSL_KEYSTORE_PASSWORD = "drill.exec.ssl.keyStorePassword"; // default: changeit
+ public static final String SSL_KEY_PASSWORD = "drill.exec.ssl.keyPassword"; //
+ public static final String SSL_TRUSTSTORE_TYPE = "drill.exec.ssl.trustStoreType"; // valid values are jks(default), jceks, pkcs12
+ public static final String SSL_TRUSTSTORE_PATH = "drill.exec.ssl.trustStorePath"; // path to keystore. default : $JRE_HOME/lib/security/cacerts.jks
+ public static final String SSL_TRUSTSTORE_PASSWORD = "drill.exec.ssl.trustStorePassword"; // default: changeit
+ public static final String SSL_USE_HADOOP_CONF = "drill.exec.ssl.useHadoopConfig"; // Initialize ssl params from hadoop if not provided by drill. default: true
+ public static final String SSL_HANDSHAKE_TIMEOUT = "drill.exec.security.user.encryption.ssl.handshakeTimeout"; // Default 10 seconds
+
public static final String TEXT_LINE_READER_BATCH_SIZE = "drill.exec.storage.file.text.batch.size";
public static final String TEXT_LINE_READER_BUFFER_SIZE = "drill.exec.storage.file.text.buffer.size";
public static final String HAZELCAST_SUBNETS = "drill.exec.cache.hazel.subnets";
@@ -133,10 +145,10 @@ public final class ExecConstants {
public static final String HTTP_SESSION_MEMORY_RESERVATION = "drill.exec.http.session.memory.reservation";
public static final String HTTP_SESSION_MEMORY_MAXIMUM = "drill.exec.http.session.memory.maximum";
public static final String HTTP_SESSION_MAX_IDLE_SECS = "drill.exec.http.session_max_idle_secs";
- public static final String HTTP_KEYSTORE_PATH = "drill.exec.ssl.keyStorePath";
- public static final String HTTP_KEYSTORE_PASSWORD = "drill.exec.ssl.keyStorePassword";
- public static final String HTTP_TRUSTSTORE_PATH = "drill.exec.ssl.trustStorePath";
- public static final String HTTP_TRUSTSTORE_PASSWORD = "drill.exec.ssl.trustStorePassword";
+ public static final String HTTP_KEYSTORE_PATH = SSL_KEYSTORE_PATH;
+ public static final String HTTP_KEYSTORE_PASSWORD = SSL_KEYSTORE_PASSWORD;
+ public static final String HTTP_TRUSTSTORE_PATH = SSL_TRUSTSTORE_PATH;
+ public static final String HTTP_TRUSTSTORE_PASSWORD = SSL_TRUSTSTORE_PASSWORD;
public static final String SYS_STORE_PROVIDER_CLASS = "drill.exec.sys.store.provider.class";
public static final String SYS_STORE_PROVIDER_LOCAL_PATH = "drill.exec.sys.store.provider.local.path";
public static final String SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE = "drill.exec.sys.store.provider.local.write";
@@ -153,6 +165,8 @@ public final class ExecConstants {
public static final String USE_LOGIN_PRINCIPAL = "drill.exec.security.bit.auth.use_login_principal";
public static final String USER_ENCRYPTION_SASL_ENABLED = "drill.exec.security.user.encryption.sasl.enabled";
public static final String USER_ENCRYPTION_SASL_MAX_WRAPPED_SIZE = "drill.exec.security.user.encryption.sasl.max_wrapped_size";
+
+ public static final String USER_SSL_ENABLED = "drill.exec.security.user.encryption.ssl.enabled";
public static final String BIT_ENCRYPTION_SASL_ENABLED = "drill.exec.security.bit.encryption.sasl.enabled";
public static final String BIT_ENCRYPTION_SASL_MAX_WRAPPED_SIZE = "drill.exec.security.bit.encryption.sasl.max_wrapped_size";
@@ -208,7 +222,7 @@ public final class ExecConstants {
public static final OptionValidator PARQUET_DICT_PAGE_SIZE_VALIDATOR = new PositiveLongValidator(PARQUET_DICT_PAGE_SIZE, Integer.MAX_VALUE);
public static final String PARQUET_WRITER_COMPRESSION_TYPE = "store.parquet.compression";
public static final OptionValidator PARQUET_WRITER_COMPRESSION_TYPE_VALIDATOR = new EnumeratedStringValidator(
- PARQUET_WRITER_COMPRESSION_TYPE, "snappy", "gzip", "none");
+ PARQUET_WRITER_COMPRESSION_TYPE, "snappy", "gzip", "none");
public static final String PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING = "store.parquet.enable_dictionary_encoding";
public static final OptionValidator PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING_VALIDATOR = new BooleanValidator(
PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING);
@@ -464,7 +478,7 @@ public final class ExecConstants {
*/
public static final String IMPERSONATION_POLICIES_KEY = "exec.impersonation.inbound_policies";
public static final StringValidator IMPERSONATION_POLICY_VALIDATOR =
- new InboundImpersonationManager.InboundImpersonationPolicyValidator(IMPERSONATION_POLICIES_KEY);
+ new InboundImpersonationManager.InboundImpersonationPolicyValidator(IMPERSONATION_POLICIES_KEY);
/**
@@ -546,5 +560,5 @@ public final class ExecConstants {
public static String bootDefaultFor(String name) {
return OPTION_DEFAULTS_ROOT + name;
- }
+}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/SSLConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/SSLConfig.java
deleted file mode 100644
index c6d637480..000000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/SSLConfig.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec;
-
-import com.typesafe.config.Config;
-import org.apache.drill.common.exceptions.DrillException;
-
-public class SSLConfig {
-
- private final String keystorePath;
-
- private final String keystorePassword;
-
- private final String truststorePath;
-
- private final String truststorePassword;
-
-
- public SSLConfig(Config config) throws DrillException {
-
- keystorePath = config.getString(ExecConstants.HTTP_KEYSTORE_PATH).trim();
-
- keystorePassword = config.getString(ExecConstants.HTTP_KEYSTORE_PASSWORD).trim();
-
- truststorePath = config.getString(ExecConstants.HTTP_TRUSTSTORE_PATH).trim();
-
- truststorePassword = config.getString(ExecConstants.HTTP_TRUSTSTORE_PASSWORD).trim();
-
- /*If keystorePath or keystorePassword is provided in the configuration file use that*/
- if (!keystorePath.isEmpty() || !keystorePassword.isEmpty()) {
- if (keystorePath.isEmpty()) {
- throw new DrillException(" *.ssl.keyStorePath in the configuration file is empty, but *.ssl.keyStorePassword is set");
- }
- else if (keystorePassword.isEmpty()){
- throw new DrillException(" *.ssl.keyStorePassword in the configuration file is empty, but *.ssl.keyStorePath is set ");
- }
-
- }
- }
-
- public boolean isSslValid() { return !keystorePath.isEmpty() && !keystorePassword.isEmpty(); }
-
- public String getKeyStorePath() { return keystorePath; }
-
- public String getKeyStorePassword() { return keystorePassword; }
-
- public boolean hasTrustStorePath() { return !truststorePath.isEmpty(); }
-
- public boolean hasTrustStorePassword() { return !truststorePassword.isEmpty(); }
-
- public String getTrustStorePath() { return truststorePath; }
-
- public String getTrustStorePassword() { return truststorePassword; }
-} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 9fbbfddaf..84b34a7ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -367,8 +367,12 @@ public class DrillClient implements Closeable, ConnectionThrottle {
DrillbitEndpoint endpoint;
while (triedEndpointIndex < connectTriesVal) {
- client = new UserClient(clientName, config, supportComplexTypes, allocator, eventLoopGroup, executor);
endpoint = endpoints.get(triedEndpointIndex);
+ // Note: the properties member is a DrillProperties instance which lower cases names of
+ // properties. That does not work too well with properties that are mixed case.
+ // For user client severla properties are mixed case so we do not use the properties member
+ // but instead pass the props parameter.
+ client = new UserClient(clientName, config, props, supportComplexTypes, allocator, eventLoopGroup, executor, endpoint);
logger.debug("Connecting to server {}:{}", endpoint.getAddress(), endpoint.getUserPort());
if (!properties.containsKey(DrillProperties.SERVICE_HOST)) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 2f4753857..99614bdfa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -20,16 +20,25 @@ package org.apache.drill.exec.rpc.user;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.net.ssl.SSLEngine;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslException;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.ssl.SslHandler;
import org.apache.drill.common.KerberosUtil;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.exec.client.InvalidConnectionInfoException;
+import org.apache.drill.exec.ssl.SSLConfig;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -55,6 +64,7 @@ import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
import org.apache.drill.exec.rpc.AbstractClientConnection;
import org.apache.drill.exec.rpc.Acks;
import org.apache.drill.exec.rpc.BasicClient;
+import org.apache.drill.exec.rpc.ConnectionMultiListener;
import org.apache.drill.exec.rpc.DrillRpcFuture;
import org.apache.drill.exec.rpc.NonTransientRpcException;
import org.apache.drill.exec.rpc.OutOfMemoryHandler;
@@ -62,6 +72,7 @@ import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.ResponseSender;
import org.apache.drill.exec.rpc.RpcConnectionHandler;
+import org.apache.drill.exec.rpc.RpcConstants;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.security.AuthStringUtil;
@@ -70,7 +81,9 @@ import org.apache.drill.exec.rpc.security.AuthenticatorFactory;
import org.apache.drill.exec.rpc.security.ClientAuthenticatorProvider;
import org.apache.drill.exec.rpc.security.plain.PlainFactory;
import org.apache.drill.exec.rpc.security.SaslProperties;
+import org.apache.drill.exec.ssl.SSLConfigBuilder;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.ssl.SSLFactory;
import org.slf4j.Logger;
import com.google.common.base.Strings;
@@ -87,8 +100,8 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
-public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnection,
- UserToBitHandshake, BitToUserHandshake> {
+public class UserClient
+ extends BasicClient<RpcType, UserClient.UserToBitConnection, UserToBitHandshake, BitToUserHandshake> {
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(UserClient.class);
private final BufferAllocator allocator;
@@ -102,19 +115,47 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
// these are used for authentication
private volatile List<String> serverAuthMechanisms = null;
private volatile boolean authComplete = true;
-
- public UserClient(String clientName, DrillConfig config, boolean supportComplexTypes,
- BufferAllocator allocator, EventLoopGroup eventLoopGroup, Executor eventExecutor) {
- super(
- UserRpcConfig.getMapping(config, eventExecutor),
- allocator.getAsByteBufAllocator(),
- eventLoopGroup,
- RpcType.HANDSHAKE,
- BitToUserHandshake.class,
- BitToUserHandshake.PARSER);
+ private SSLConfig sslConfig;
+ private DrillbitEndpoint endpoint;
+
+ public UserClient(String clientName, DrillConfig config, Properties properties, boolean supportComplexTypes,
+ BufferAllocator allocator, EventLoopGroup eventLoopGroup, Executor eventExecutor,
+ DrillbitEndpoint endpoint) throws NonTransientRpcException {
+ super(UserRpcConfig.getMapping(config, eventExecutor), allocator.getAsByteBufAllocator(),
+ eventLoopGroup, RpcType.HANDSHAKE, BitToUserHandshake.class, BitToUserHandshake.PARSER);
+ this.endpoint = endpoint; // save the endpoint; it might be needed by SSL init.
this.clientName = clientName;
this.allocator = allocator;
this.supportComplexTypes = supportComplexTypes;
+ try {
+ this.sslConfig = new SSLConfigBuilder().properties(properties).mode(SSLFactory.Mode.CLIENT)
+ .initializeSSLContext(true).validateKeyStore(false).build();
+ } catch (DrillException e) {
+ throw new InvalidConnectionInfoException(e.getMessage());
+ }
+
+ }
+
+ @Override protected void setupSSL(ChannelPipeline pipe,
+ ConnectionMultiListener.SSLHandshakeListener sslHandshakeListener) {
+
+ String peerHost = endpoint.getAddress();
+ int peerPort = endpoint.getUserPort();
+ SSLEngine sslEngine = sslConfig.createSSLEngine(allocator, peerHost, peerPort);
+
+ // Add SSL handler into pipeline
+ SslHandler sslHandler = new SslHandler(sslEngine);
+ sslHandler.setHandshakeTimeoutMillis(sslConfig.getHandshakeTimeout());
+
+ // Add a listener for SSL Handshake complete. The Drill client handshake will be enabled only
+ // after this is done.
+ sslHandler.handshakeFuture().addListener(sslHandshakeListener);
+ pipe.addFirst(RpcConstants.SSL_HANDLER, sslHandler);
+ logger.debug(sslConfig.toString());
+ }
+
+ @Override protected boolean isSslEnabled() {
+ return sslConfig.isUserSslEnabled();
}
public RpcEndpointInfos getServerInfos() {
@@ -132,38 +173,51 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
/**
* Connects, and if required, authenticates. This method blocks until both operations are complete.
*
- * @param endpoint endpoint to connect to
- * @param properties properties
+ * @param endpoint endpoint to connect to
+ * @param properties properties
* @param credentials credentials
* @throws RpcException if either connection or authentication fails
*/
public void connect(final DrillbitEndpoint endpoint, final DrillProperties properties,
- final UserCredentials credentials) throws RpcException {
- final UserToBitHandshake.Builder hsBuilder = UserToBitHandshake.newBuilder()
- .setRpcVersion(UserRpcConfig.RPC_VERSION)
- .setSupportListening(true)
- .setSupportComplexTypes(supportComplexTypes)
- .setSupportTimeout(true)
- .setCredentials(credentials)
- .setClientInfos(UserRpcUtils.getRpcEndpointInfos(clientName))
- .setSaslSupport(SaslSupport.SASL_PRIVACY)
- .setProperties(properties.serializeForServer());
+ final UserCredentials credentials) throws RpcException {
+ final UserToBitHandshake.Builder hsBuilder =
+ UserToBitHandshake.newBuilder()
+ .setRpcVersion(UserRpcConfig.RPC_VERSION)
+ .setSupportListening(true)
+ .setSupportComplexTypes(supportComplexTypes)
+ .setSupportTimeout(true).setCredentials(credentials)
+ .setClientInfos(UserRpcUtils.getRpcEndpointInfos(clientName))
+ .setSaslSupport(SaslSupport.SASL_PRIVACY)
+ .setProperties(properties.serializeForServer());
// Only used for testing purpose
if (properties.containsKey(DrillProperties.TEST_SASL_LEVEL)) {
- hsBuilder.setSaslSupport(SaslSupport.valueOf(
- Integer.parseInt(properties.getProperty(DrillProperties.TEST_SASL_LEVEL))));
+ hsBuilder.setSaslSupport(
+ SaslSupport.valueOf(Integer.parseInt(properties.getProperty(DrillProperties.TEST_SASL_LEVEL))));
}
- connect(hsBuilder.build(), endpoint).checkedGet();
+ if (sslConfig.isUserSslEnabled()) {
+ try {
+ connect(hsBuilder.build(), endpoint)
+ .checkedGet(sslConfig.getHandshakeTimeout(), TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ String msg = new StringBuilder().append(
+ "Connecting to the server timed out. This is sometimes due to a mismatch in the SSL configuration between client and server. [ Exception: ")
+ .append(e.getMessage()).append("]").toString();
+ throw new NonTransientRpcException(msg);
+ }
+ } else {
+ connect(hsBuilder.build(), endpoint).checkedGet();
+ }
// Check if client needs encryption and server is not configured for encryption.
- final boolean clientNeedsEncryption = properties.containsKey(DrillProperties.SASL_ENCRYPT)
- && Boolean.parseBoolean(properties.getProperty(DrillProperties.SASL_ENCRYPT));
+ final boolean clientNeedsEncryption = properties.containsKey(DrillProperties.SASL_ENCRYPT) && Boolean
+ .parseBoolean(properties.getProperty(DrillProperties.SASL_ENCRYPT));
- if(clientNeedsEncryption && !connection.isEncryptionEnabled()) {
- throw new NonTransientRpcException("Client needs encrypted connection but server is not configured for " +
- "encryption. Please check connection parameter or contact your administrator");
+ if (clientNeedsEncryption && !connection.isEncryptionEnabled()) {
+ throw new NonTransientRpcException(
+ "Client needs encrypted connection but server is not configured for "
+ + "encryption. Please check connection parameter or contact your administrator");
}
if (serverAuthMechanisms != null) {
@@ -176,31 +230,28 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
}
private CheckedFuture<Void, RpcException> connect(final UserToBitHandshake handshake,
- final DrillbitEndpoint endpoint) {
+ final DrillbitEndpoint endpoint) {
final SettableFuture<Void> connectionSettable = SettableFuture.create();
final CheckedFuture<Void, RpcException> connectionFuture =
new AbstractCheckedFuture<Void, RpcException>(connectionSettable) {
- @Override
- protected RpcException mapException(Exception e) {
+ @Override protected RpcException mapException(Exception e) {
return RpcException.mapException(e);
}
};
final RpcConnectionHandler<UserToBitConnection> connectionHandler =
new RpcConnectionHandler<UserToBitConnection>() {
- @Override
- public void connectionSucceeded(UserToBitConnection connection) {
+ @Override public void connectionSucceeded(UserToBitConnection connection) {
connectionSettable.set(null);
}
- @Override
- public void connectionFailed(FailureType type, Throwable t) {
- connectionSettable.setException(new RpcException(String.format("%s : %s",
- type.name(), t.getMessage()), t));
+ @Override public void connectionFailed(FailureType type, Throwable t) {
+ connectionSettable
+ .setException(new RpcException(String.format("%s : %s", type.name(), t.getMessage()), t));
}
};
- connectAsClient(queryResultHandler.getWrappedConnectionHandler(connectionHandler),
- handshake, endpoint.getAddress(), endpoint.getUserPort());
+ connectAsClient(queryResultHandler.getWrappedConnectionHandler(connectionHandler), handshake,
+ endpoint.getAddress(), endpoint.getUserPort());
return connectionFuture;
}
@@ -211,15 +262,15 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
// Set correct QOP property and Strength based on server needs encryption or not.
// If ChunkMode is enabled then negotiate for buffer size equal to wrapChunkSize,
// If ChunkMode is disabled then negotiate for MAX_WRAPPED_SIZE buffer size.
- propertiesMap.putAll(SaslProperties.getSaslProperties(connection.isEncryptionEnabled(),
- connection.getMaxWrappedSize()));
+ propertiesMap.putAll(
+ SaslProperties.getSaslProperties(connection.isEncryptionEnabled(), connection.getMaxWrappedSize()));
- final SettableFuture<Void> authSettable = SettableFuture.create(); // use handleAuthFailure to setException
+ final SettableFuture<Void> authSettable =
+ SettableFuture.create(); // use handleAuthFailure to setException
final CheckedFuture<Void, SaslException> authFuture =
new AbstractCheckedFuture<Void, SaslException>(authSettable) {
- @Override
- protected SaslException mapException(Exception e) {
+ @Override protected SaslException mapException(Exception e) {
if (e instanceof ExecutionException) {
final Throwable cause = Throwables.getRootCause(e);
if (cause instanceof SaslException) {
@@ -227,8 +278,9 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
connection.getEncryptionCtxtString(), cause.getMessage()), cause);
}
}
- return new SaslException(String.format("Authentication failed unexpectedly. [Details: %s, Error %s]",
- connection.getEncryptionCtxtString(), e.getMessage()), e);
+ return new SaslException(String
+ .format("Authentication failed unexpectedly. [Details: %s, Error %s]",
+ connection.getEncryptionCtxtString(), e.getMessage()), e);
}
};
@@ -244,8 +296,10 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
ugi = factory.createAndLoginUser(propertiesMap);
saslClient = factory.createSaslClient(ugi, propertiesMap);
if (saslClient == null) {
- throw new SaslException(String.format("Cannot initiate authentication using %s mechanism. Insufficient " +
- "credentials or selected mechanism doesn't support configured security layers?", factory.getSimpleName()));
+ throw new SaslException(String.format(
+ "Cannot initiate authentication using %s mechanism. Insufficient "
+ + "credentials or selected mechanism doesn't support configured security layers?",
+ factory.getSimpleName()));
}
connection.setSaslClient(saslClient);
} catch (final IOException e) {
@@ -256,26 +310,24 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
logger.trace("Initiating SASL exchange.");
new AuthenticationOutcomeListener<>(this, connection, RpcType.SASL_MESSAGE, ugi,
new RpcOutcomeListener<Void>() {
- @Override
- public void failed(RpcException ex) {
+ @Override public void failed(RpcException ex) {
authSettable.setException(ex);
}
- @Override
- public void success(Void value, ByteBuf buffer) {
+ @Override public void success(Void value, ByteBuf buffer) {
authComplete = true;
authSettable.set(null);
}
- @Override
- public void interrupted(InterruptedException e) {
+ @Override public void interrupted(InterruptedException e) {
authSettable.setException(e);
}
}).initiate(mechanismName);
return authFuture;
}
- private AuthenticatorFactory getAuthenticatorFactory(final DrillProperties properties) throws SaslException {
+ private AuthenticatorFactory getAuthenticatorFactory(final DrillProperties properties)
+ throws SaslException {
final Set<String> mechanismSet = AuthStringUtil.asSet(serverAuthMechanisms);
// first, check if a certain mechanism must be used
@@ -285,129 +337,126 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
throw new SaslException(String.format("Unknown mechanism: %s", authMechanism));
}
if (!mechanismSet.contains(authMechanism.toUpperCase())) {
- throw new SaslException(String.format("Server does not support authentication using: %s. [Details: %s]",
- authMechanism, connection.getEncryptionCtxtString()));
+ throw new SaslException(String
+ .format("Server does not support authentication using: %s. [Details: %s]", authMechanism,
+ connection.getEncryptionCtxtString()));
}
- return ClientAuthenticatorProvider.getInstance()
- .getAuthenticatorFactory(authMechanism);
+ return ClientAuthenticatorProvider.getInstance().getAuthenticatorFactory(authMechanism);
}
// check if Kerberos is supported, and the service principal is provided
- if (mechanismSet.contains(KerberosUtil.KERBEROS_SIMPLE_NAME) &&
- properties.containsKey(DrillProperties.SERVICE_PRINCIPAL)) {
+ if (mechanismSet.contains(KerberosUtil.KERBEROS_SIMPLE_NAME) && properties
+ .containsKey(DrillProperties.SERVICE_PRINCIPAL)) {
return ClientAuthenticatorProvider.getInstance()
.getAuthenticatorFactory(KerberosUtil.KERBEROS_SIMPLE_NAME);
}
// check if username/password is supported, and username/password are provided
- if (mechanismSet.contains(PlainFactory.SIMPLE_NAME) &&
- properties.containsKey(DrillProperties.USER) &&
- !Strings.isNullOrEmpty(properties.getProperty(DrillProperties.PASSWORD))) {
- return ClientAuthenticatorProvider.getInstance()
- .getAuthenticatorFactory(PlainFactory.SIMPLE_NAME);
+ if (mechanismSet.contains(PlainFactory.SIMPLE_NAME) && properties.containsKey(DrillProperties.USER)
+ && !Strings.isNullOrEmpty(properties.getProperty(DrillProperties.PASSWORD))) {
+ return ClientAuthenticatorProvider.getInstance().getAuthenticatorFactory(PlainFactory.SIMPLE_NAME);
}
- throw new SaslException(String.format("Server requires authentication using %s. Insufficient credentials?. " +
- "[Details: %s]. ", serverAuthMechanisms, connection.getEncryptionCtxtString()));
+ throw new SaslException(String
+ .format("Server requires authentication using %s. Insufficient credentials?. " + "[Details: %s]. ",
+ serverAuthMechanisms, connection.getEncryptionCtxtString()));
}
- protected <SEND extends MessageLite, RECEIVE extends MessageLite>
- void send(RpcOutcomeListener<RECEIVE> listener, RpcType rpcType, SEND protobufBody, Class<RECEIVE> clazz,
- boolean allowInEventLoop, ByteBuf... dataBodies) {
+ protected <SEND extends MessageLite, RECEIVE extends MessageLite> void send(
+ RpcOutcomeListener<RECEIVE> listener, RpcType rpcType, SEND protobufBody, Class<RECEIVE> clazz,
+ boolean allowInEventLoop, ByteBuf... dataBodies) {
super.send(listener, connection, rpcType, protobufBody, clazz, allowInEventLoop, dataBodies);
}
- @Override
- protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
+ @Override protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
switch (rpcType) {
- case RpcType.ACK_VALUE:
- return Ack.getDefaultInstance();
- case RpcType.HANDSHAKE_VALUE:
- return BitToUserHandshake.getDefaultInstance();
- case RpcType.QUERY_HANDLE_VALUE:
- return QueryId.getDefaultInstance();
- case RpcType.QUERY_RESULT_VALUE:
- return QueryResult.getDefaultInstance();
- case RpcType.QUERY_DATA_VALUE:
- return QueryData.getDefaultInstance();
- case RpcType.QUERY_PLAN_FRAGMENTS_VALUE:
- return QueryPlanFragments.getDefaultInstance();
- case RpcType.CATALOGS_VALUE:
- return GetCatalogsResp.getDefaultInstance();
- case RpcType.SCHEMAS_VALUE:
- return GetSchemasResp.getDefaultInstance();
- case RpcType.TABLES_VALUE:
- return GetTablesResp.getDefaultInstance();
- case RpcType.COLUMNS_VALUE:
- return GetColumnsResp.getDefaultInstance();
- case RpcType.PREPARED_STATEMENT_VALUE:
- return CreatePreparedStatementResp.getDefaultInstance();
- case RpcType.SASL_MESSAGE_VALUE:
- return SaslMessage.getDefaultInstance();
- case RpcType.SERVER_META_VALUE:
- return GetServerMetaResp.getDefaultInstance();
+ case RpcType.ACK_VALUE:
+ return Ack.getDefaultInstance();
+ case RpcType.HANDSHAKE_VALUE:
+ return BitToUserHandshake.getDefaultInstance();
+ case RpcType.QUERY_HANDLE_VALUE:
+ return QueryId.getDefaultInstance();
+ case RpcType.QUERY_RESULT_VALUE:
+ return QueryResult.getDefaultInstance();
+ case RpcType.QUERY_DATA_VALUE:
+ return QueryData.getDefaultInstance();
+ case RpcType.QUERY_PLAN_FRAGMENTS_VALUE:
+ return QueryPlanFragments.getDefaultInstance();
+ case RpcType.CATALOGS_VALUE:
+ return GetCatalogsResp.getDefaultInstance();
+ case RpcType.SCHEMAS_VALUE:
+ return GetSchemasResp.getDefaultInstance();
+ case RpcType.TABLES_VALUE:
+ return GetTablesResp.getDefaultInstance();
+ case RpcType.COLUMNS_VALUE:
+ return GetColumnsResp.getDefaultInstance();
+ case RpcType.PREPARED_STATEMENT_VALUE:
+ return CreatePreparedStatementResp.getDefaultInstance();
+ case RpcType.SASL_MESSAGE_VALUE:
+ return SaslMessage.getDefaultInstance();
+ case RpcType.SERVER_META_VALUE:
+ return GetServerMetaResp.getDefaultInstance();
}
throw new RpcException(String.format("Unable to deal with RpcType of %d", rpcType));
}
- @Override
- protected void handle(UserToBitConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody,
- ResponseSender sender) throws RpcException {
+ @Override protected void handle(UserToBitConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody,
+ ResponseSender sender) throws RpcException {
if (!authComplete) {
// Remote should not be making any requests before authenticating, drop connection
- throw new RpcException(String.format("Request of type %d is not allowed without authentication. " +
- "Remote on %s must authenticate before making requests. Connection dropped.",
- rpcType, connection.getRemoteAddress()));
+ throw new RpcException(String.format("Request of type %d is not allowed without authentication. "
+ + "Remote on %s must authenticate before making requests. Connection dropped.", rpcType,
+ connection.getRemoteAddress()));
}
switch (rpcType) {
- case RpcType.QUERY_DATA_VALUE:
- queryResultHandler.batchArrived(connection, pBody, dBody);
- sender.send(new Response(RpcType.ACK, Acks.OK));
- break;
- case RpcType.QUERY_RESULT_VALUE:
- queryResultHandler.resultArrived(pBody);
- sender.send(new Response(RpcType.ACK, Acks.OK));
- break;
- default:
- throw new RpcException(String.format("Unknown Rpc Type %d. ", rpcType));
+ case RpcType.QUERY_DATA_VALUE:
+ queryResultHandler.batchArrived(connection, pBody, dBody);
+ sender.send(new Response(RpcType.ACK, Acks.OK));
+ break;
+ case RpcType.QUERY_RESULT_VALUE:
+ queryResultHandler.resultArrived(pBody);
+ sender.send(new Response(RpcType.ACK, Acks.OK));
+ break;
+ default:
+ throw new RpcException(String.format("Unknown Rpc Type %d. ", rpcType));
}
}
- @Override
- protected void validateHandshake(BitToUserHandshake inbound) throws RpcException {
-// logger.debug("Handling handshake from bit to user. {}", inbound);
+ @Override protected void validateHandshake(BitToUserHandshake inbound) throws RpcException {
+ // logger.debug("Handling handshake from bit to user. {}", inbound);
if (inbound.hasServerInfos()) {
serverInfos = inbound.getServerInfos();
}
supportedMethods = Sets.immutableEnumSet(inbound.getSupportedMethodsList());
switch (inbound.getStatus()) {
- case SUCCESS:
- break;
- case AUTH_REQUIRED: {
- authComplete = false;
- serverAuthMechanisms = ImmutableList.copyOf(inbound.getAuthenticationMechanismsList());
- connection.setEncryption(inbound.hasEncrypted() && inbound.getEncrypted());
-
- if (inbound.hasMaxWrappedSize()) {
- connection.setMaxWrappedSize(inbound.getMaxWrappedSize());
+ case SUCCESS:
+ break;
+ case AUTH_REQUIRED: {
+ authComplete = false;
+ serverAuthMechanisms = ImmutableList.copyOf(inbound.getAuthenticationMechanismsList());
+ connection.setEncryption(inbound.hasEncrypted() && inbound.getEncrypted());
+
+ if (inbound.hasMaxWrappedSize()) {
+ connection.setMaxWrappedSize(inbound.getMaxWrappedSize());
+ }
+ logger.trace(String
+ .format("Server requires authentication with encryption context %s before proceeding.",
+ connection.getEncryptionCtxtString()));
+ break;
}
- logger.trace(String.format("Server requires authentication with encryption context %s before proceeding.",
- connection.getEncryptionCtxtString()));
- break;
- }
- case AUTH_FAILED:
- case RPC_VERSION_MISMATCH:
- case UNKNOWN_FAILURE:
- final String errMsg = String.format("Status: %s, Error Id: %s, Error message: %s",
- inbound.getStatus(), inbound.getErrorId(), inbound.getErrorMessage());
- logger.error(errMsg);
- throw new NonTransientRpcException(errMsg);
+ case AUTH_FAILED:
+ case RPC_VERSION_MISMATCH:
+ case UNKNOWN_FAILURE:
+ final String errMsg = String
+ .format("Status: %s, Error Id: %s, Error message: %s", inbound.getStatus(),
+ inbound.getErrorId(), inbound.getErrorMessage());
+ logger.error(errMsg);
+ throw new NonTransientRpcException(errMsg);
}
}
- @Override
- protected UserToBitConnection initRemoteConnection(SocketChannel channel) {
+ @Override protected UserToBitConnection initRemoteConnection(SocketChannel channel) {
super.initRemoteConnection(channel);
return new UserToBitConnection(channel);
}
@@ -421,39 +470,34 @@ public class UserClient extends BasicClient<RpcType, UserClient.UserToBitConnect
super(channel, "user client");
}
- @Override
- public BufferAllocator getAllocator() {
+ @Override public BufferAllocator getAllocator() {
return allocator;
}
- @Override
- protected Logger getLogger() {
+ @Override protected Logger getLogger() {
return logger;
}
- @Override
- public void incConnectionCounter() {
+ @Override public void incConnectionCounter() {
// no-op
}
- @Override
- public void decConnectionCounter() {
+ @Override public void decConnectionCounter() {
// no-op
}
}
- @Override
- public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) {
+ @Override public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) {
return new UserProtobufLengthDecoder(allocator, OutOfMemoryHandler.DEFAULT_INSTANCE);
}
/**
* planQuery is an API to plan a query without query execution
+ *
* @param req - data necessary to plan query
* @return list of PlanFragments that can later on be submitted for execution
*/
- public DrillRpcFuture<QueryPlanFragments> planQuery(
- GetQueryPlanFragments req) {
+ public DrillRpcFuture<QueryPlanFragments> planQuery(GetQueryPlanFragments req) {
return send(RpcType.GET_QUERY_PLAN_FRAGMENTS, req, QueryPlanFragments.class);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java
index 57ad4d51b..4e4c80e97 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserConnectionConfig.java
@@ -33,6 +33,7 @@ class UserConnectionConfig extends AbstractConnectionConfig {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UserConnectionConfig.class);
private final boolean authEnabled;
+ private final boolean sslEnabled;
private final InboundImpersonationManager impersonationManager;
private final UserServerRequestHandler handler;
@@ -75,10 +76,16 @@ class UserConnectionConfig extends AbstractConnectionConfig {
} else {
authEnabled = false;
}
-
impersonationManager = !config.getBoolean(ExecConstants.IMPERSONATION_ENABLED)
? null
: new InboundImpersonationManager();
+
+ sslEnabled = config.getBoolean(ExecConstants.USER_SSL_ENABLED);
+
+ if(isSSLEnabled() && isAuthEnabled() && isEncryptionEnabled()){
+ logger.warn("The server is configured to use both SSL and SASL encryption (only one should be configured).");
+ }
+
}
@Override
@@ -90,6 +97,10 @@ class UserConnectionConfig extends AbstractConnectionConfig {
return authEnabled;
}
+ boolean isSSLEnabled() {
+ return sslEnabled;
+ }
+
InboundImpersonationManager getImpersonationManager() {
return impersonationManager;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index 254fdca29..c0573db81 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -21,9 +21,15 @@ import java.io.IOException;
import java.net.SocketAddress;
import java.util.UUID;
+import javax.net.ssl.SSLEngine;
import javax.security.sasl.SaslException;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.ssl.SslHandler;
import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.exec.ssl.SSLConfig;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
@@ -53,8 +59,10 @@ import org.apache.drill.exec.rpc.security.plain.PlainFactory;
import org.apache.drill.exec.rpc.user.UserServer.BitToUserConnection;
import org.apache.drill.exec.rpc.user.security.UserAuthenticationException;
import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.ssl.SSLConfigBuilder;
import org.apache.drill.exec.work.user.UserWorker;
import org.apache.hadoop.security.HadoopKerberosName;
+import org.apache.hadoop.security.ssl.SSLFactory;
import org.slf4j.Logger;
import com.google.protobuf.MessageLite;
@@ -71,6 +79,8 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
private static final String SERVER_NAME = "Apache Drill Server";
private final UserConnectionConfig config;
+ private final SSLConfig sslConfig;
+ private Channel sslChannel;
private final UserWorker userWorker;
public UserServer(BootStrapContext context, BufferAllocator allocator, EventLoopGroup eventLoopGroup,
@@ -79,6 +89,17 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
allocator.getAsByteBufAllocator(),
eventLoopGroup);
this.config = new UserConnectionConfig(allocator, context, new UserServerRequestHandler(worker));
+ this.sslChannel = null;
+ try {
+ this.sslConfig = new SSLConfigBuilder()
+ .config(context.getConfig())
+ .mode(SSLFactory.Mode.SERVER)
+ .initializeSSLContext(true)
+ .validateKeyStore(true)
+ .build();
+ } catch (DrillException e) {
+ throw new DrillbitStartupException(e.getMessage(), e.getCause());
+ }
this.userWorker = worker;
// Initialize Singleton instance of UserRpcMetrics.
@@ -86,6 +107,34 @@ public class UserServer extends BasicServer<RpcType, BitToUserConnection> {
}
@Override
+ protected void setupSSL(ChannelPipeline pipe) {
+
+ SSLEngine sslEngine = sslConfig.createSSLEngine(config.getAllocator(), null, 0);
+ // Add SSL handler into pipeline
+ pipe.addFirst(RpcConstants.SSL_HANDLER, new SslHandler(sslEngine));
+ logger.debug("SSL communication between client and server is enabled.");
+ logger.debug(sslConfig.toString());
+
+ }
+
+ @Override
+ protected boolean isSslEnabled() {
+ return sslConfig.isUserSslEnabled();
+ }
+
+ @Override
+ public void setSslChannel(Channel c) {
+ sslChannel = c;
+ }
+
+ @Override
+ protected void closeSSL(){
+ if(isSslEnabled() && sslChannel != null){
+ sslChannel.close();
+ }
+ }
+
+ @Override
protected MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
// a user server only expects acknowledgments on messages it creates.
switch (rpcType) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
index 69f2cab76..e35f5421e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
@@ -26,12 +26,14 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.drill.common.exceptions.DrillException;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.SSLConfig;
+import org.apache.drill.exec.ssl.SSLConfig;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.rpc.security.plain.PlainFactory;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.server.rest.auth.DrillRestLoginService;
+import org.apache.drill.exec.ssl.SSLConfigBuilder;
import org.apache.drill.exec.work.WorkManager;
+import org.apache.hadoop.security.ssl.SSLFactory;
import org.bouncycastle.asn1.x500.X500NameBuilder;
import org.bouncycastle.asn1.x500.style.BCStyle;
import org.bouncycastle.cert.X509v3CertificateBuilder;
@@ -331,12 +333,18 @@ public class WebServer implements AutoCloseable {
logger.info("Setting up HTTPS connector for web server");
final SslContextFactory sslContextFactory = new SslContextFactory();
- SSLConfig ssl = new SSLConfig(config);
+ SSLConfig ssl = new SSLConfigBuilder()
+ .config(config)
+ .mode(SSLFactory.Mode.SERVER)
+ .initializeSSLContext(false)
+ .validateKeyStore(true)
+ .build();
if(ssl.isSslValid()){
logger.info("Using configured SSL settings for web server");
sslContextFactory.setKeyStorePath(ssl.getKeyStorePath());
sslContextFactory.setKeyStorePassword(ssl.getKeyStorePassword());
+ sslContextFactory.setKeyManagerPassword(ssl.getKeyPassword());
if(ssl.hasTrustStorePath()){
sslContextFactory.setTrustStorePath(ssl.getTrustStorePath());
if(ssl.hasTrustStorePassword()){
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfig.java
new file mode 100644
index 000000000..98a8c8d4f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfig.java
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ssl;
+
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslProvider;
+import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.hadoop.security.ssl.SSLFactory;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.security.KeyStore;
+
+public abstract class SSLConfig {
+
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SSLConfig.class);
+
+ public static final String DEFAULT_SSL_PROVIDER = "JDK"; // JDK or OPENSSL
+ public static final String DEFAULT_SSL_PROTOCOL = "TLSv1.2";
+ public static final int DEFAULT_SSL_HANDSHAKE_TIMEOUT_MS = 10 * 1000; // 10 seconds
+
+ // Either the Netty SSL context or the JDK SSL context will be initialized
+ // The JDK SSL context is use iff the useSystemTrustStore setting is enabled.
+ protected SslContext nettySslContext;
+ protected SSLContext jdkSSlContext;
+
+ private static final boolean isWindows = System.getProperty("os.name").toLowerCase().indexOf("win") >= 0;
+ private static final boolean isMacOs = System.getProperty("os.name").toLowerCase().indexOf("mac") >= 0;
+
+ public static final String HADOOP_SSL_CONF_TPL_KEY = "hadoop.ssl.{0}.conf";
+ public static final String HADOOP_SSL_KEYSTORE_LOCATION_TPL_KEY = "ssl.{0}.keystore.location";
+ public static final String HADOOP_SSL_KEYSTORE_PASSWORD_TPL_KEY = "ssl.{0}.keystore.password";
+ public static final String HADOOP_SSL_KEYSTORE_TYPE_TPL_KEY = "ssl.{0}.keystore.type";
+ public static final String HADOOP_SSL_KEYSTORE_KEYPASSWORD_TPL_KEY =
+ "ssl.{0}.keystore.keypassword";
+ public static final String HADOOP_SSL_TRUSTSTORE_LOCATION_TPL_KEY = "ssl.{0}.truststore.location";
+ public static final String HADOOP_SSL_TRUSTSTORE_PASSWORD_TPL_KEY = "ssl.{0}.truststore.password";
+ public static final String HADOOP_SSL_TRUSTSTORE_TYPE_TPL_KEY = "ssl.{0}.truststore.type";
+
+ public SSLConfig() {
+ }
+
+ public abstract void validateKeyStore() throws DrillException;
+
+ // We need to use different SSLContext objects depending on what the user has chosen
+ // For most uses we will use the Netty SslContext class. This allows us to use either
+ // the JDK implementation or the OpenSSL implementation. However if the user wants to
+ // use the system trust store, then the only way to access it is via the JDK's
+ // SSLContext class. (See the createSSLEngine method below).
+
+ public abstract SslContext initNettySslContext() throws DrillException;
+
+ public abstract SSLContext initJDKSSLContext() throws DrillException;
+
+ public abstract boolean isUserSslEnabled();
+
+ public abstract boolean isHttpsEnabled();
+
+ public abstract String getKeyStoreType();
+
+ public abstract String getKeyStorePath();
+
+ public abstract String getKeyStorePassword();
+
+ public abstract String getKeyPassword();
+
+ public abstract String getTrustStoreType();
+
+ public abstract boolean hasTrustStorePath();
+
+ public abstract String getTrustStorePath();
+
+ public abstract boolean hasTrustStorePassword();
+
+ public abstract String getTrustStorePassword();
+
+ public abstract String getProtocol();
+
+ public abstract SslProvider getProvider();
+
+ public abstract int getHandshakeTimeout();
+
+ public abstract SSLFactory.Mode getMode();
+
+ public abstract boolean disableHostVerification();
+
+ public abstract boolean disableCertificateVerification();
+
+ public abstract boolean useSystemTrustStore();
+
+ public abstract boolean isSslValid();
+
+ public SslContext getNettySslContext() {
+ return nettySslContext;
+ }
+
+ public TrustManagerFactory initializeTrustManagerFactory() throws DrillException {
+ TrustManagerFactory tmf;
+ KeyStore ts = null;
+ //Support Windows/MacOs system trust store
+ try {
+ String trustStoreType = getTrustStoreType();
+ if ((isWindows || isMacOs) && useSystemTrustStore()) {
+ // This is valid for MS-Windows and MacOs
+ logger.debug("Initializing System truststore.");
+ ts = KeyStore.getInstance(!trustStoreType.isEmpty() ? trustStoreType : KeyStore.getDefaultType());
+ ts.load(null, null);
+ } else if (!getTrustStorePath().isEmpty()) {
+ // if truststore is not provided then we will use the default. Note that the default depends on
+ // the TrustManagerFactory that in turn depends on the Security Provider.
+ // Use null as the truststore which will result in the default truststore being picked up
+ logger.debug("Initializing truststore {}.", getTrustStorePath());
+ ts = KeyStore.getInstance(!trustStoreType.isEmpty() ? trustStoreType : KeyStore.getDefaultType());
+ InputStream tsStream = new FileInputStream(getTrustStorePath());
+ ts.load(tsStream, getTrustStorePassword().toCharArray());
+ } else {
+ logger.debug("Initializing default truststore.");
+ }
+ if (disableCertificateVerification()) {
+ tmf = InsecureTrustManagerFactory.INSTANCE;
+ } else {
+ tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
+ }
+ tmf.init(ts);
+ } catch (Exception e) {
+ // Catch any SSL initialization Exceptions here and abort.
+ throw new DrillException(
+ new StringBuilder()
+ .append("Exception while initializing the truststore: [")
+ .append(e.getMessage())
+ .append("]. ")
+ .toString(), e);
+ }
+ return tmf;
+ }
+
+ public KeyManagerFactory initializeKeyManagerFactory() throws DrillException {
+ KeyManagerFactory kmf;
+ String keyStorePath = getKeyStorePath();
+ String keyStorePassword = getKeyStorePassword();
+ String keyStoreType = getKeyStoreType();
+ try {
+ if (keyStorePath.isEmpty()) {
+ throw new DrillException("No Keystore provided.");
+ }
+ KeyStore ks =
+ KeyStore.getInstance(!keyStoreType.isEmpty() ? keyStoreType : KeyStore.getDefaultType());
+ //initialize the key manager factory
+ // Will throw an exception if the file is not found/accessible.
+ InputStream ksStream = new FileInputStream(keyStorePath);
+ // A key password CANNOT be null or an empty string.
+ if (keyStorePassword.isEmpty()) {
+ throw new DrillException("The Keystore password cannot be empty.");
+ }
+ ks.load(ksStream, keyStorePassword.toCharArray());
+ // Empty Keystore. (Remarkably, it is possible to do this).
+ if (ks.size() == 0) {
+ throw new DrillException("The Keystore has no entries.");
+ }
+ kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
+ kmf.init(ks, getKeyPassword().toCharArray());
+
+ } catch (Exception e) {
+ throw new DrillException(
+ new StringBuilder()
+ .append("Exception while initializing the keystore: [")
+ .append(e.getMessage())
+ .append("]. ")
+ .toString());
+ }
+ return kmf;
+ }
+
+ public void initContext() throws DrillException {
+ if ((isWindows || isMacOs) && useSystemTrustStore()) {
+ initJDKSSLContext();
+ logger.debug("Initialized Windows/MacOs SSL context using JDK.");
+ } else {
+ initNettySslContext();
+ logger.debug("Initialized SSL context.");
+ }
+ return;
+ }
+
+ public SSLEngine createSSLEngine(BufferAllocator allocator, String peerHost, int peerPort) {
+ SSLEngine engine;
+ if ((isWindows || isMacOs) && useSystemTrustStore()) {
+ if (peerHost != null) {
+ engine = jdkSSlContext.createSSLEngine(peerHost, peerPort);
+ logger.debug("Initializing Windows/MacOs SSLEngine with hostname.");
+ } else {
+ engine = jdkSSlContext.createSSLEngine();
+ logger.debug("Initializing Windows/MacOs SSLEngine with no hostname.");
+ }
+ } else {
+ if (peerHost != null) {
+ engine = nettySslContext.newEngine(allocator.getAsByteBufAllocator(), peerHost, peerPort);
+ logger.debug("Initializing SSLEngine with hostname.");
+ } else {
+ engine = nettySslContext.newEngine(allocator.getAsByteBufAllocator());
+ logger.debug("Initializing SSLEngine with no hostname.");
+ }
+ }
+ return engine;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("SSL is ")
+ .append(isUserSslEnabled()?"":" not ")
+ .append("enabled.\n");
+ sb.append("HTTPS is ")
+ .append(isHttpsEnabled()?"":" not ")
+ .append("enabled.\n");
+ if(isUserSslEnabled() || isHttpsEnabled()) {
+ sb.append("SSL Configuration :")
+ .append("OS:").append(System.getProperty("os.name"))
+ .append("\n\tUsing system trust store: ").append(useSystemTrustStore())
+ .append("\n\tprotocol: ").append(getProtocol())
+ .append("\n\tkeyStoreType: ").append(getKeyStoreType())
+ .append("\n\tkeyStorePath: ").append(getKeyStorePath())
+ .append("\n\tkeyStorePassword: ").append(getPrintablePassword(getKeyStorePassword()))
+ .append("\n\tkeyPassword: ").append(getPrintablePassword(getKeyPassword()))
+ .append("\n\ttrustStoreType: ").append(getTrustStoreType())
+ .append("\n\ttrustStorePath: ").append(getTrustStorePath())
+ .append("\n\ttrustStorePassword: ").append(getPrintablePassword(getTrustStorePassword()))
+ .append("\n\thandshakeTimeout: ").append(getHandshakeTimeout())
+ .append("\n\tdisableHostVerification: ").append(disableHostVerification())
+ .append("\n\tdisableCertificateVerification: ").append(disableCertificateVerification())
+ ;
+ }
+ return sb.toString();
+ }
+
+ private String getPrintablePassword(String password) {
+ StringBuilder sb = new StringBuilder();
+ if(password == null || password.length()<2 ){
+ return password;
+ }
+ sb.append(password.charAt(0)).append("****").append(password.charAt(password.length()-1));
+ return sb.toString();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfigBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfigBuilder.java
new file mode 100644
index 000000000..0941960fa
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfigBuilder.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ssl;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillConfigurationException;
+import org.apache.drill.common.exceptions.DrillException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.ssl.SSLFactory;
+
+import java.util.Properties;
+
+
+public class SSLConfigBuilder {
+
+ private static final org.slf4j.Logger logger =
+ org.slf4j.LoggerFactory.getLogger(org.apache.drill.exec.ssl.SSLConfigBuilder.class);
+
+ private DrillConfig config = null;
+ private Configuration hadoopConfig = null;
+ private Properties properties;
+ private SSLFactory.Mode mode = SSLFactory.Mode.SERVER;
+ private boolean initializeSSLContext = false;
+ private boolean validateKeyStore = false;
+
+ public SSLConfigBuilder() {
+
+ }
+
+ public SSLConfig build() throws DrillException {
+ if (mode == SSLFactory.Mode.SERVER && config == null) {
+ throw new DrillConfigurationException(
+ "Cannot create SSL configuration from null Drill configuration.");
+ }
+ SSLConfig sslConfig;
+ if (mode == SSLFactory.Mode.SERVER) {
+ sslConfig = new SSLConfigServer(config, hadoopConfig);
+ } else {
+ sslConfig = new SSLConfigClient(properties);
+ }
+ if(validateKeyStore){
+ sslConfig.validateKeyStore();
+ }
+ if(initializeSSLContext){
+ sslConfig.initContext();
+ }
+ return sslConfig;
+ }
+
+ public SSLConfigBuilder config(DrillConfig config) {
+ this.config = config;
+ return this;
+ }
+
+ public SSLConfigBuilder hadoopConfig(Configuration hadoopConfig) {
+ this.hadoopConfig = hadoopConfig;
+ return this;
+ }
+
+ public SSLConfigBuilder properties(Properties props) {
+ this.properties = props;
+ return this;
+ }
+
+ public SSLConfigBuilder mode(SSLFactory.Mode mode) {
+ this.mode = mode;
+ return this;
+ }
+
+ public SSLConfigBuilder initializeSSLContext(boolean initializeSSLContext) {
+ this.initializeSSLContext = initializeSSLContext;
+ return this;
+ }
+
+ public SSLConfigBuilder validateKeyStore(boolean validateKeyStore) {
+ this.validateKeyStore = validateKeyStore;
+ return this;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfigClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfigClient.java
new file mode 100644
index 000000000..6a70d436a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfigClient.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ssl;
+
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.ssl.SSLFactory;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLParameters;
+import javax.net.ssl.TrustManagerFactory;
+import java.util.Properties;
+
+public class SSLConfigClient extends SSLConfig {
+
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SSLConfigClient.class);
+
+ private final Properties properties;
+ private final boolean userSslEnabled;
+ private final String trustStoreType;
+ private final String trustStorePath;
+ private final String trustStorePassword;
+ private final boolean disableHostVerification;
+ private final boolean disableCertificateVerification;
+ private final boolean useSystemTrustStore;
+ private final String protocol;
+ private final int handshakeTimeout;
+ private final String provider;
+
+ private final String emptyString = new String();
+
+ public SSLConfigClient(Properties properties) throws DrillException {
+ this.properties = properties;
+ userSslEnabled = getBooleanProperty(DrillProperties.ENABLE_TLS);
+ trustStoreType = getStringProperty(DrillProperties.TRUSTSTORE_TYPE, "JKS");
+ trustStorePath = getStringProperty(DrillProperties.TRUSTSTORE_PATH, "");
+ trustStorePassword = getStringProperty(DrillProperties.TRUSTSTORE_PASSWORD, "");
+ disableHostVerification = getBooleanProperty(DrillProperties.DISABLE_HOST_VERIFICATION);
+ disableCertificateVerification = getBooleanProperty(DrillProperties.DISABLE_CERT_VERIFICATION);
+ useSystemTrustStore = getBooleanProperty(DrillProperties.USE_SYSTEM_TRUSTSTORE);
+ protocol = getStringProperty(DrillProperties.TLS_PROTOCOL, DEFAULT_SSL_PROTOCOL);
+ int hsTimeout = getIntProperty(DrillProperties.TLS_HANDSHAKE_TIMEOUT, DEFAULT_SSL_HANDSHAKE_TIMEOUT_MS);
+ if (hsTimeout <= 0) {
+ hsTimeout = DEFAULT_SSL_HANDSHAKE_TIMEOUT_MS;
+ }
+ handshakeTimeout = hsTimeout;
+ provider = getStringProperty(DrillProperties.TLS_PROVIDER, DEFAULT_SSL_PROVIDER);
+ }
+
+ private boolean getBooleanProperty(String propName) {
+ return (properties != null) && (properties.containsKey(propName))
+ && (properties.getProperty(propName).compareToIgnoreCase("true") == 0);
+ }
+
+ private String getStringProperty(String name, String defaultValue) {
+ String value = "";
+ if ( (properties != null) && (properties.containsKey(name))) {
+ value = properties.getProperty(name);
+ }
+ if (value.isEmpty()) {
+ value = defaultValue;
+ }
+ value = value.trim();
+ return value;
+ }
+
+ private int getIntProperty(String name, int defaultValue) {
+ int value = defaultValue;
+ if ( (properties != null) && (properties.containsKey(name))) {
+ value = new Integer(properties.getProperty(name)).intValue();
+ }
+ return value;
+ }
+
+ public void validateKeyStore() throws DrillException {
+
+ }
+
+ @Override
+ public SslContext initNettySslContext() throws DrillException {
+ final SslContext sslCtx;
+
+ if (!userSslEnabled) {
+ return null;
+ }
+
+ TrustManagerFactory tmf;
+ try {
+ tmf = initializeTrustManagerFactory();
+ sslCtx = SslContextBuilder.forClient()
+ .sslProvider(getProvider())
+ .trustManager(tmf)
+ .protocols(protocol)
+ .build();
+ } catch (Exception e) {
+ // Catch any SSL initialization Exceptions here and abort.
+ throw new DrillException(new StringBuilder()
+ .append("SSL is enabled but cannot be initialized due to the following exception: ")
+ .append("[ ")
+ .append(e.getMessage())
+ .append("]. ")
+ .toString());
+ }
+ this.nettySslContext = sslCtx;
+ return sslCtx;
+ }
+
+ @Override
+ public SSLContext initJDKSSLContext() throws DrillException {
+ final SSLContext sslCtx;
+
+ if (!userSslEnabled) {
+ return null;
+ }
+
+ TrustManagerFactory tmf;
+ try {
+ tmf = initializeTrustManagerFactory();
+ sslCtx = SSLContext.getInstance(protocol);
+ sslCtx.init(null, tmf.getTrustManagers(), null);
+ } catch (Exception e) {
+ // Catch any SSL initialization Exceptions here and abort.
+ throw new DrillException(new StringBuilder()
+ .append("SSL is enabled but cannot be initialized due to the following exception: ")
+ .append("[ ")
+ .append(e.getMessage())
+ .append("]. ")
+ .toString());
+ }
+ this.jdkSSlContext = sslCtx;
+ return sslCtx;
+ }
+
+ @Override
+ public SSLEngine createSSLEngine(BufferAllocator allocator, String peerHost, int peerPort) {
+ SSLEngine engine = super.createSSLEngine(allocator, peerHost, peerPort);
+
+ if (!this.disableHostVerification()) {
+ SSLParameters sslParameters = engine.getSSLParameters();
+ // only available since Java 7
+ sslParameters.setEndpointIdentificationAlgorithm("HTTPS");
+ engine.setSSLParameters(sslParameters);
+ }
+
+ engine.setUseClientMode(true);
+
+ try {
+ engine.setEnableSessionCreation(true);
+ } catch (Exception e) {
+ // Openssl implementation may throw this.
+ logger.debug("Session creation not enabled. Exception: {}", e.getMessage());
+ }
+
+ return engine;
+ }
+
+ @Override
+ public boolean isUserSslEnabled() {
+ return userSslEnabled;
+ }
+
+ @Override
+ public boolean isHttpsEnabled() {
+ return false;
+ }
+
+ @Override
+ public String getKeyStoreType() {
+ return emptyString;
+ }
+
+ @Override
+ public String getKeyStorePath() {
+ return emptyString;
+ }
+
+ @Override
+ public String getKeyStorePassword() {
+ return emptyString;
+ }
+
+ @Override
+ public String getKeyPassword() {
+ return emptyString;
+ }
+
+ @Override
+ public String getTrustStoreType() {
+ return trustStoreType;
+ }
+
+ @Override
+ public boolean hasTrustStorePath() {
+ return !trustStorePath.isEmpty();
+ }
+
+ @Override
+ public String getTrustStorePath() {
+ return trustStorePath;
+ }
+
+ @Override
+ public boolean hasTrustStorePassword() {
+ return !trustStorePassword.isEmpty();
+ }
+
+ @Override
+ public String getTrustStorePassword() {
+ return trustStorePassword;
+ }
+
+ @Override
+ public String getProtocol() {
+ return protocol;
+ }
+
+ @Override
+ public SslProvider getProvider() {
+ return provider.equalsIgnoreCase("JDK") ? SslProvider.JDK : SslProvider.OPENSSL;
+ }
+
+ @Override
+ public int getHandshakeTimeout() {
+ return handshakeTimeout;
+ }
+
+ @Override
+ public SSLFactory.Mode getMode() {
+ return SSLFactory.Mode.CLIENT;
+ }
+
+ @Override
+ public boolean disableHostVerification() {
+ return disableHostVerification;
+ }
+
+ @Override
+ public boolean disableCertificateVerification() {
+ return disableCertificateVerification;
+ }
+
+ @Override
+ public boolean useSystemTrustStore() {
+ return useSystemTrustStore;
+ }
+
+ public boolean isSslValid() {
+ return true;
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfigServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfigServer.java
new file mode 100644
index 000000000..da31d2e7f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ssl/SSLConfigServer.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.ssl;
+
+import com.google.common.base.Preconditions;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslProvider;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.ssl.SSLFactory;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.TrustManagerFactory;
+import java.text.MessageFormat;
+
+public class SSLConfigServer extends SSLConfig {
+
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SSLConfigServer.class);
+
+ private final DrillConfig config;
+ private final Configuration hadoopConfig;
+ private final boolean userSslEnabled;
+ private final boolean httpsEnabled;
+ private final String keyStoreType;
+ private final String keyStorePath;
+ private final String keyStorePassword;
+ private final String keyPassword;
+ private final String trustStoreType;
+ private final String trustStorePath;
+ private final String trustStorePassword;
+ private final String protocol;
+ private final String provider;
+
+ public SSLConfigServer(DrillConfig config, Configuration hadoopConfig) throws DrillException {
+ this.config = config;
+ SSLFactory.Mode mode = SSLFactory.Mode.SERVER;
+ httpsEnabled =
+ config.hasPath(ExecConstants.HTTP_ENABLE_SSL) && config.getBoolean(ExecConstants.HTTP_ENABLE_SSL);
+ // For testing we will mock up a hadoop configuration, however for regular use, we find the actual hadoop config.
+ boolean enableHadoopConfig = config.getBoolean(ExecConstants.SSL_USE_HADOOP_CONF);
+ if (enableHadoopConfig) {
+ if (hadoopConfig == null) {
+ this.hadoopConfig = new Configuration(); // get hadoop configuration
+ } else {
+ this.hadoopConfig = hadoopConfig;
+ }
+ String hadoopSSLConfigFile =
+ this.hadoopConfig.get(resolveHadoopPropertyName(HADOOP_SSL_CONF_TPL_KEY, getMode()));
+ logger.debug("Using Hadoop configuration for SSL");
+ logger.debug("Hadoop SSL configuration file: {}", hadoopSSLConfigFile);
+ this.hadoopConfig.addResource(hadoopSSLConfigFile);
+ } else {
+ this.hadoopConfig = null;
+ }
+ userSslEnabled =
+ config.hasPath(ExecConstants.USER_SSL_ENABLED) && config.getBoolean(ExecConstants.USER_SSL_ENABLED);
+ trustStoreType = getConfigParam(ExecConstants.SSL_TRUSTSTORE_TYPE,
+ resolveHadoopPropertyName(HADOOP_SSL_TRUSTSTORE_TYPE_TPL_KEY, mode));
+ trustStorePath = getConfigParam(ExecConstants.SSL_TRUSTSTORE_PATH,
+ resolveHadoopPropertyName(HADOOP_SSL_TRUSTSTORE_LOCATION_TPL_KEY, mode));
+ trustStorePassword = getConfigParam(ExecConstants.SSL_TRUSTSTORE_PASSWORD,
+ resolveHadoopPropertyName(HADOOP_SSL_TRUSTSTORE_PASSWORD_TPL_KEY, mode));
+ keyStoreType = getConfigParam(ExecConstants.SSL_KEYSTORE_TYPE,
+ resolveHadoopPropertyName(HADOOP_SSL_KEYSTORE_TYPE_TPL_KEY, mode));
+ keyStorePath = getConfigParam(ExecConstants.SSL_KEYSTORE_PATH,
+ resolveHadoopPropertyName(HADOOP_SSL_KEYSTORE_LOCATION_TPL_KEY, mode));
+ keyStorePassword = getConfigParam(ExecConstants.SSL_KEYSTORE_PASSWORD,
+ resolveHadoopPropertyName(HADOOP_SSL_KEYSTORE_PASSWORD_TPL_KEY, mode));
+ // if no keypassword specified, use keystore password
+ String keyPass = getConfigParam(ExecConstants.SSL_KEY_PASSWORD,
+ resolveHadoopPropertyName(HADOOP_SSL_KEYSTORE_KEYPASSWORD_TPL_KEY, mode));
+ keyPassword = keyPass.isEmpty() ? keyStorePassword : keyPass;
+ protocol = getConfigParamWithDefault(ExecConstants.SSL_PROTOCOL, DEFAULT_SSL_PROTOCOL);
+ provider = getConfigParamWithDefault(ExecConstants.SSL_PROVIDER, DEFAULT_SSL_PROVIDER);
+ }
+
+ public void validateKeyStore() throws DrillException {
+ //HTTPS validates the keystore is not empty. User Server SSL context initialization also validates keystore, but
+ // much more strictly. User Client context initialization does not validate keystore.
+ /*If keystorePath or keystorePassword is provided in the configuration file use that*/
+ if ((isUserSslEnabled() || isHttpsEnabled())) {
+ if (!keyStorePath.isEmpty() || !keyStorePassword.isEmpty()) {
+ if (keyStorePath.isEmpty()) {
+ throw new DrillException(
+ " *.ssl.keyStorePath in the configuration file is empty, but *.ssl.keyStorePassword is set");
+ } else if (keyStorePassword.isEmpty()) {
+ throw new DrillException(
+ " *.ssl.keyStorePassword in the configuration file is empty, but *.ssl.keyStorePath is set ");
+ }
+ }
+ }
+ }
+
+ @Override
+ public SslContext initNettySslContext() throws DrillException {
+ final SslContext sslCtx;
+
+ if (!userSslEnabled) {
+ return null;
+ }
+
+ KeyManagerFactory kmf;
+ TrustManagerFactory tmf;
+ try {
+ if (keyStorePath.isEmpty()) {
+ throw new DrillException("No Keystore provided.");
+ }
+ kmf = initializeKeyManagerFactory();
+ tmf = initializeTrustManagerFactory();
+ sslCtx = SslContextBuilder.forServer(kmf)
+ .trustManager(tmf)
+ .protocols(protocol)
+ .sslProvider(getProvider())
+ .build(); // Will throw an exception if the key password is not correct
+ } catch (Exception e) {
+ // Catch any SSL initialization Exceptions here and abort.
+ throw new DrillException(new StringBuilder()
+ .append("SSL is enabled but cannot be initialized - ")
+ .append("[ ")
+ .append(e.getMessage())
+ .append("]. ")
+ .toString());
+ }
+ this.nettySslContext = sslCtx;
+ return sslCtx;
+ }
+
+ @Override
+ public SSLContext initJDKSSLContext() throws DrillException {
+ final SSLContext sslCtx;
+
+ if (!userSslEnabled) {
+ return null;
+ }
+
+ KeyManagerFactory kmf;
+ TrustManagerFactory tmf;
+ try {
+ if (keyStorePath.isEmpty()) {
+ throw new DrillException("No Keystore provided.");
+ }
+ kmf = initializeKeyManagerFactory();
+ tmf = initializeTrustManagerFactory();
+ sslCtx = SSLContext.getInstance(protocol);
+ sslCtx.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+ } catch (Exception e) {
+ // Catch any SSL initialization Exceptions here and abort.
+ throw new DrillException(
+ new StringBuilder().append("SSL is enabled but cannot be initialized - ")
+ .append("[ ")
+ .append(e.getMessage())
+ .append("]. ")
+ .toString());
+ }
+ this.jdkSSlContext = sslCtx;
+ return sslCtx;
+ }
+
+ @Override
+ public SSLEngine createSSLEngine(BufferAllocator allocator, String peerHost, int peerPort) {
+ SSLEngine engine = super.createSSLEngine(allocator, peerHost, peerPort);
+
+ engine.setUseClientMode(false);
+
+ // No need for client side authentication (HTTPS like behaviour)
+ engine.setNeedClientAuth(false);
+
+ try {
+ engine.setEnableSessionCreation(true);
+ } catch (Exception e) {
+ // Openssl implementation may throw this.
+ logger.debug("Session creation not enabled. Exception: {}", e.getMessage());
+ }
+
+ return engine;
+ }
+
+ private String getConfigParam(String name, String hadoopName) {
+ String value = "";
+ if (hadoopConfig != null) {
+ value = getHadoopConfigParam(hadoopName);
+ }
+ if (value.isEmpty() && config.hasPath(name)) {
+ value = config.getString(name);
+ }
+ value = value.trim();
+ return value;
+ }
+
+ private String getHadoopConfigParam(String name) {
+ Preconditions.checkArgument(this.hadoopConfig != null);
+ String value = hadoopConfig.get(name, "");
+ value = value.trim();
+ return value;
+ }
+
+ private String getConfigParamWithDefault(String name, String defaultValue) {
+ String value = "";
+ if (config.hasPath(name)) {
+ value = config.getString(name);
+ }
+ if (value.isEmpty()) {
+ value = defaultValue;
+ }
+ value = value.trim();
+ return value;
+ }
+
+ private String resolveHadoopPropertyName(String nameTemplate, SSLFactory.Mode mode) {
+ return MessageFormat.format(nameTemplate, mode.toString().toLowerCase());
+ }
+
+
+
+ @Override
+ public boolean isUserSslEnabled() {
+ return userSslEnabled;
+ }
+
+ @Override
+ public boolean isHttpsEnabled() {
+ return httpsEnabled;
+ }
+
+ @Override
+ public String getKeyStoreType() {
+ return keyStoreType;
+ }
+
+ @Override
+ public String getKeyStorePath() {
+ return keyStorePath;
+ }
+
+ @Override
+ public String getKeyStorePassword() {
+ return keyStorePassword;
+ }
+
+ @Override
+ public String getKeyPassword() {
+ return keyPassword;
+ }
+
+ @Override
+ public String getTrustStoreType() {
+ return trustStoreType;
+ }
+
+ @Override
+ public boolean hasTrustStorePath() {
+ return !trustStorePath.isEmpty();
+ }
+
+ @Override
+ public String getTrustStorePath() {
+ return trustStorePath;
+ }
+
+ @Override
+ public boolean hasTrustStorePassword() {
+ return !trustStorePassword.isEmpty();
+ }
+
+ @Override
+ public String getTrustStorePassword() {
+ return trustStorePassword;
+ }
+
+ @Override
+ public String getProtocol() {
+ return protocol;
+ }
+
+ @Override
+ public SslProvider getProvider() {
+ return provider.equalsIgnoreCase("JDK") ? SslProvider.JDK : SslProvider.OPENSSL;
+ }
+
+ @Override
+ public int getHandshakeTimeout() {
+ return 0;
+ }
+
+ @Override
+ public SSLFactory.Mode getMode() {
+ return SSLFactory.Mode.SERVER;
+ }
+
+ @Override
+ public boolean disableHostVerification() {
+ return false;
+ }
+
+ @Override
+ public boolean disableCertificateVerification() {
+ return false;
+ }
+
+ @Override
+ public boolean useSystemTrustStore() {
+ return false; // Client only, notsupported by the server
+ }
+
+ public boolean isSslValid() {
+ return !keyStorePath.isEmpty() && !keyStorePassword.isEmpty();
+ }
+
+}
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index bfdb74c42..26c8df0b6 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -51,12 +51,6 @@ drill.client: {
// By default ${DRILL_TMP_DIR} is used if set or ${drill.tmp-dir} if it's been overridden.
drill.tmp-dir: "/tmp"
drill.tmp-dir: ${?DRILL_TMP_DIR}
-javax.net.ssl: {
- keyStore = "",
- keyStorePassword = "",
- trustStore = "",
- trustStorePassword = ""
-},
drill.exec: {
cluster-id: "drillbits1"
rpc: {
@@ -142,10 +136,20 @@ drill.exec: {
},
//setting javax variables for ssl configurations is being deprecated.
ssl: {
- keyStorePath = ${?javax.net.ssl.keyStore}
- keyStorePassword = ${?javax.net.ssl.keyStorePassword}
- trustStorePath = ${?javax.net.ssl.trustStore}
+ keyStoreType = ${?javax.net.ssl.keyStoreType},
+ keyStorePath = ${?javax.net.ssl.keyStore},
+ keyStorePassword = ${?javax.net.ssl.keyStorePassword},
+ trustStoreType = ${?javax.net.ssl.trustStoreType},
+ trustStorePath = ${?javax.net.ssl.trustStore},
trustStorePassword = ${?javax.net.ssl.trustStorePassword}
+ # default key password to keystore password
+ keyPassword = ${?javax.net.ssl.keyStorePassword},
+ protocol: "TLSv1.2",
+ # if true, then Drill will read SSL parameters from the
+ # Hadoop configuration files.
+ useHadoopConfig : true,
+ #Drill can use either the JDK implementation or the OpenSSL implementation.
+ provider: "JDK"
},
network: {
start: 35000
@@ -168,21 +172,24 @@ drill.exec: {
enabled: false,
max_chained_user_hops: 3
},
- security.user.auth {
+ security.user.auth: {
enabled: false
},
- security.bit.auth {
+ security.bit.auth: {
enabled: false
use_login_principal: false
}
- security.user.encryption.sasl {
+ security.user.encryption.sasl: {
enabled : false,
max_wrapped_size : 65536
}
- security.bit.encryption.sasl {
+ security.bit.encryption.sasl: {
enabled : false,
max_wrapped_size : 65536
}
+ security.user.encryption.ssl: {
+ enabled : false
+ }
trace: {
directory: "/tmp/drill-trace",
filesystem: "file:///"
diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
index e59c384f0..a094f518d 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java
@@ -94,6 +94,7 @@ public class BaseTestQuery extends ExecTest {
put(ExecConstants.HTTP_ENABLE, "false");
// Increasing retry attempts for testing
put(ExecConstants.UDF_RETRY_ATTEMPTS, "10");
+ put(ExecConstants.SSL_USE_HADOOP_CONF, "false");
}
};
@@ -220,12 +221,12 @@ public class BaseTestQuery extends ExecTest {
}
if (!properties.containsKey(DrillProperties.DRILLBIT_CONNECTION)) {
- properties = new Properties(properties);
properties.setProperty(DrillProperties.DRILLBIT_CONNECTION,
- String.format("localhost:%s", bits[0].getUserPort()));
+ String.format("localhost:%s", bits[0].getUserPort()));
}
- client = QueryTestUtil.createClient(config, serviceSet, MAX_WIDTH_PER_NODE, properties);
+ DrillConfig clientConfig = DrillConfig.forClient();
+ client = QueryTestUtil.createClient(clientConfig, serviceSet, MAX_WIDTH_PER_NODE, properties);
}
/**
@@ -241,7 +242,8 @@ public class BaseTestQuery extends ExecTest {
client = null;
}
- client = QueryTestUtil.createClient(config, serviceSet, MAX_WIDTH_PER_NODE, properties);
+ DrillConfig clientConfig = DrillConfig.forClient();
+ client = QueryTestUtil.createClient(clientConfig, serviceSet, MAX_WIDTH_PER_NODE, properties);
}
/*
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestSSLConfig.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestSSLConfig.java
index 0b2847351..729cc6f0a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestSSLConfig.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestSSLConfig.java
@@ -21,11 +21,19 @@ package org.apache.drill.exec;
import org.apache.drill.categories.SecurityTest;
import org.apache.drill.common.exceptions.DrillException;
+import org.apache.drill.exec.ssl.SSLConfig;
+import org.apache.drill.exec.ssl.SSLConfigBuilder;
import org.apache.drill.test.ConfigBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.ssl.SSLFactory;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+
+import java.text.MessageFormat;
+
import static junit.framework.TestCase.fail;
+import static org.apache.drill.exec.ssl.SSLConfig.HADOOP_SSL_CONF_TPL_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -38,8 +46,15 @@ public class TestSSLConfig {
ConfigBuilder config = new ConfigBuilder();
config.put(ExecConstants.HTTP_KEYSTORE_PATH, "");
config.put(ExecConstants.HTTP_KEYSTORE_PASSWORD, "root");
+ config.put(ExecConstants.SSL_USE_HADOOP_CONF, false);
+ config.put(ExecConstants.USER_SSL_ENABLED, true);
try {
- SSLConfig sslv = new SSLConfig(config.build());
+ SSLConfig sslv = new SSLConfigBuilder()
+ .config(config.build())
+ .mode(SSLFactory.Mode.SERVER)
+ .initializeSSLContext(false)
+ .validateKeyStore(true)
+ .build();
fail();
//Expected
} catch (Exception e) {
@@ -53,8 +68,15 @@ public class TestSSLConfig {
ConfigBuilder config = new ConfigBuilder();
config.put(ExecConstants.HTTP_KEYSTORE_PATH, "/root");
config.put(ExecConstants.HTTP_KEYSTORE_PASSWORD, "");
+ config.put(ExecConstants.SSL_USE_HADOOP_CONF, false);
+ config.put(ExecConstants.USER_SSL_ENABLED, true);
try {
- SSLConfig sslv = new SSLConfig(config.build());
+ SSLConfig sslv = new SSLConfigBuilder()
+ .config(config.build())
+ .mode(SSLFactory.Mode.SERVER)
+ .initializeSSLContext(false)
+ .validateKeyStore(true)
+ .build();
fail();
//Expected
} catch (Exception e) {
@@ -69,7 +91,12 @@ public class TestSSLConfig {
config.put(ExecConstants.HTTP_KEYSTORE_PATH, "/root");
config.put(ExecConstants.HTTP_KEYSTORE_PASSWORD, "root");
try {
- SSLConfig sslv = new SSLConfig(config.build());
+ SSLConfig sslv = new SSLConfigBuilder()
+ .config(config.build())
+ .mode(SSLFactory.Mode.SERVER)
+ .initializeSSLContext(false)
+ .validateKeyStore(true)
+ .build();
assertEquals("/root", sslv.getKeyStorePath());
assertEquals("root", sslv.getKeyStorePassword());
} catch (Exception e) {
@@ -84,7 +111,12 @@ public class TestSSLConfig {
ConfigBuilder config = new ConfigBuilder();
config.put("javax.net.ssl.keyStore", "/root");
config.put("javax.net.ssl.keyStorePassword", "root");
- SSLConfig sslv = new SSLConfig(config.build());
+ SSLConfig sslv = new SSLConfigBuilder()
+ .config(config.build())
+ .mode(SSLFactory.Mode.SERVER)
+ .initializeSSLContext(false)
+ .validateKeyStore(true)
+ .build();
assertEquals("/root",sslv.getKeyStorePath());
assertEquals("root", sslv.getKeyStorePassword());
}
@@ -95,10 +127,41 @@ public class TestSSLConfig {
ConfigBuilder config = new ConfigBuilder();
config.put(ExecConstants.HTTP_TRUSTSTORE_PATH, "/root");
config.put(ExecConstants.HTTP_TRUSTSTORE_PASSWORD, "root");
- SSLConfig sslv = new SSLConfig(config.build());
+ config.put(ExecConstants.SSL_USE_HADOOP_CONF, false);
+ SSLConfig sslv = new SSLConfigBuilder()
+ .config(config.build())
+ .mode(SSLFactory.Mode.SERVER)
+ .initializeSSLContext(false)
+ .validateKeyStore(true)
+ .build();
assertEquals(true, sslv.hasTrustStorePath());
assertEquals(true,sslv.hasTrustStorePassword());
assertEquals("/root",sslv.getTrustStorePath());
assertEquals("root",sslv.getTrustStorePassword());
}
-} \ No newline at end of file
+
+ @Test
+ public void testInvalidHadoopKeystore() throws Exception {
+ Configuration hadoopConfig = new Configuration();
+ String hadoopSSLFileProp = MessageFormat
+ .format(HADOOP_SSL_CONF_TPL_KEY, SSLFactory.Mode.SERVER.toString().toLowerCase());
+ hadoopConfig.set(hadoopSSLFileProp, "ssl-server-invalid.xml");
+ ConfigBuilder config = new ConfigBuilder();
+ config.put(ExecConstants.USER_SSL_ENABLED, true);
+ config.put(ExecConstants.SSL_USE_HADOOP_CONF, true);
+ SSLConfig sslv;
+ try {
+ sslv = new SSLConfigBuilder()
+ .config(config.build())
+ .mode(SSLFactory.Mode.SERVER)
+ .initializeSSLContext(false)
+ .validateKeyStore(true)
+ .hadoopConfig(hadoopConfig)
+ .build();
+ fail();
+ } catch (Exception e) {
+ assertTrue(e instanceof DrillException);
+ }
+ }
+
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitSSL.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitSSL.java
new file mode 100644
index 000000000..2228e30d7
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitSSL.java
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.user.security;
+
+import com.typesafe.config.ConfigValueFactory;
+import io.netty.handler.ssl.util.SelfSignedCertificate;
+import junit.framework.TestCase;
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.exec.ExecConstants;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.net.InetAddress;
+import java.security.KeyStore;
+import java.util.Properties;
+
+import static junit.framework.TestCase.fail;
+import static org.junit.Assert.assertEquals;
+
+public class TestUserBitSSL extends BaseTestQuery {
+ private static final org.slf4j.Logger logger =
+ org.slf4j.LoggerFactory.getLogger(TestUserBitSSL.class);
+
+ private static DrillConfig newConfig;
+ private static Properties initProps; // initial client properties
+ private static ClassLoader classLoader;
+ private static String ksPath;
+ private static String tsPath;
+ private static String emptyTSPath;
+ private static String unknownKsPath;
+
+ @BeforeClass
+ public static void setupTest() throws Exception {
+
+ // Create a new DrillConfig
+ classLoader = TestUserBitSSL.class.getClassLoader();
+ ksPath = new File(classLoader.getResource("ssl/keystore.ks").getFile()).getAbsolutePath();
+ unknownKsPath = new File(classLoader.getResource("ssl/unknownkeystore.ks").getFile()).getAbsolutePath();
+ tsPath = new File(classLoader.getResource("ssl/truststore.ks").getFile()).getAbsolutePath();
+ emptyTSPath = new File(classLoader.getResource("ssl/emptytruststore.ks").getFile()).getAbsolutePath();
+ newConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
+ .withValue(ExecConstants.SSL_USE_HADOOP_CONF,
+ ConfigValueFactory.fromAnyRef(false))
+ .withValue(ExecConstants.USER_SSL_ENABLED,
+ ConfigValueFactory.fromAnyRef(true))
+ .withValue(ExecConstants.SSL_KEYSTORE_TYPE,
+ ConfigValueFactory.fromAnyRef("JKS"))
+ .withValue(ExecConstants.SSL_KEYSTORE_PATH,
+ ConfigValueFactory.fromAnyRef(ksPath))
+ .withValue(ExecConstants.SSL_KEYSTORE_PASSWORD,
+ ConfigValueFactory.fromAnyRef("drill123"))
+ .withValue(ExecConstants.SSL_KEY_PASSWORD,
+ ConfigValueFactory.fromAnyRef("drill123"))
+ .withValue(ExecConstants.SSL_TRUSTSTORE_TYPE,
+ ConfigValueFactory.fromAnyRef("JKS"))
+ .withValue(ExecConstants.SSL_TRUSTSTORE_PATH,
+ ConfigValueFactory.fromAnyRef(tsPath))
+ .withValue(ExecConstants.SSL_TRUSTSTORE_PASSWORD,
+ ConfigValueFactory.fromAnyRef("drill123"))
+ .withValue(ExecConstants.SSL_PROTOCOL,
+ ConfigValueFactory.fromAnyRef("TLSv1.2")),
+ false);
+
+ initProps = new Properties();
+ initProps.setProperty(DrillProperties.ENABLE_TLS, "true");
+ initProps.setProperty(DrillProperties.TRUSTSTORE_PATH, tsPath);
+ initProps.setProperty(DrillProperties.TRUSTSTORE_PASSWORD, "drill123");
+ initProps.setProperty(DrillProperties.DISABLE_HOST_VERIFICATION, "true");
+
+ // Start an SSL enabled cluster
+ updateTestCluster(1, newConfig, initProps);
+ }
+
+ @AfterClass
+ public static void cleanTest() throws Exception {
+ DrillConfig restoreConfig =
+ new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties()), false);
+ updateTestCluster(1, restoreConfig);
+ }
+
+ @Test
+ public void testSSLConnection() throws Exception {
+ final Properties connectionProps = new Properties();
+ connectionProps.setProperty(DrillProperties.ENABLE_TLS, "true");
+ connectionProps.setProperty(DrillProperties.TRUSTSTORE_PATH, tsPath);
+ connectionProps.setProperty(DrillProperties.TRUSTSTORE_PASSWORD, "drill123");
+ connectionProps.setProperty(DrillProperties.DISABLE_HOST_VERIFICATION, "true");
+ try {
+ updateClient(connectionProps);
+ } catch (Exception e) {
+ TestCase.fail( new StringBuilder()
+ .append("SSL Connection failed with exception [" )
+ .append( e.getMessage() )
+ .append("]")
+ .toString());
+ }
+ }
+
+ @Test
+ public void testSSLConnectionWithKeystore() throws Exception {
+ final Properties connectionProps = new Properties();
+ connectionProps.setProperty(DrillProperties.ENABLE_TLS, "true");
+ connectionProps.setProperty(DrillProperties.TRUSTSTORE_PATH, ksPath);
+ connectionProps.setProperty(DrillProperties.TRUSTSTORE_PASSWORD, "drill123");
+ connectionProps.setProperty(DrillProperties.DISABLE_HOST_VERIFICATION, "true");
+ try {
+ updateClient(connectionProps);
+ } catch (Exception e) {
+ TestCase.fail( new StringBuilder()
+ .append("SSL Connection failed with exception [" )
+ .append( e.getMessage() )
+ .append("]")
+ .toString());
+ }
+ }
+
+ @Test
+ public void testSSLConnectionFailBadTrustStore() throws Exception {
+ final Properties connectionProps = new Properties();
+ connectionProps.setProperty(DrillProperties.ENABLE_TLS, "true");
+ connectionProps.setProperty(DrillProperties.TRUSTSTORE_PATH, ""); // NO truststore
+ connectionProps.setProperty(DrillProperties.TRUSTSTORE_PASSWORD, "drill123");
+ connectionProps.setProperty(DrillProperties.DISABLE_HOST_VERIFICATION, "true");
+ boolean failureCaught = false;
+ try {
+ updateClient(connectionProps);
+ } catch (Exception e) {
+ failureCaught = true;
+ }
+ assertEquals(failureCaught, true);
+ }
+
+ @Test
+ public void testSSLConnectionFailBadPassword() throws Exception {
+ final Properties connectionProps = new Properties();
+ connectionProps.setProperty(DrillProperties.ENABLE_TLS, "true");
+ connectionProps.setProperty(DrillProperties.TRUSTSTORE_PATH, tsPath);
+ connectionProps.setProperty(DrillProperties.TRUSTSTORE_PASSWORD, "bad_password");
+ connectionProps.setProperty(DrillProperties.DISABLE_HOST_VERIFICATION, "true");
+ boolean failureCaught = false;
+ try {
+ updateClient(connectionProps);
+ } catch (Exception e) {
+ failureCaught = true;
+ }
+ assertEquals(failureCaught, true);
+ }
+
+ @Test
+ public void testSSLConnectionFailEmptyTrustStore() throws Exception {
+ final Properties connectionProps = new Properties();
+ connectionProps.setProperty(DrillProperties.ENABLE_TLS, "true");
+ connectionProps.setProperty(DrillProperties.TRUSTSTORE_PATH, emptyTSPath);
+ connectionProps.setProperty(DrillProperties.TRUSTSTORE_PASSWORD, "drill123");
+ connectionProps.setProperty(DrillProperties.DISABLE_HOST_VERIFICATION, "true");
+ boolean failureCaught = false;
+ try {
+ updateClient(connectionProps);
+ } catch (Exception e) {
+ failureCaught = true;
+ }
+ assertEquals(failureCaught, true);
+ }
+
+ @Test
+ public void testSSLQuery() throws Exception {
+ final Properties connectionProps = new Properties();
+ connectionProps.setProperty(DrillProperties.ENABLE_TLS, "true");
+ connectionProps.setProperty(DrillProperties.TRUSTSTORE_PATH, tsPath);
+ connectionProps.setProperty(DrillProperties.TRUSTSTORE_PASSWORD, "drill123");
+ connectionProps.setProperty(DrillProperties.DISABLE_HOST_VERIFICATION, "true");
+ try {
+ updateClient(connectionProps);
+ } catch (Exception e) {
+ TestCase.fail( new StringBuilder()
+ .append("SSL Connection failed with exception [" )
+ .append( e.getMessage() )
+ .append("]")
+ .toString());
+ }
+ test("SELECT * FROM cp.`region.json`");
+ }
+
+ @Ignore("This test fails in some cases where the host name may be set up inconsistently.")
+ @Test
+ public void testClientConfigHostnameVerification() {
+ String password = "test_password";
+ String trustStoreFileName = "drillTestTrustStore";
+ String keyStoreFileName = "drillTestKeyStore";
+ KeyStore ts, ks;
+ File tempFile1, tempFile2;
+ String trustStorePath;
+ String keyStorePath;
+
+ try {
+ String fqdn = InetAddress.getLocalHost().getHostName();
+ SelfSignedCertificate certificate = new SelfSignedCertificate(fqdn);
+
+ tempFile1 = File.createTempFile(trustStoreFileName, ".ks");
+ tempFile1.deleteOnExit();
+ trustStorePath = tempFile1.getAbsolutePath();
+ //generate a truststore.
+ ts = KeyStore.getInstance(KeyStore.getDefaultType());
+ ts.load(null, password.toCharArray());
+ ts.setCertificateEntry("drillTest", certificate.cert());
+ // Store away the truststore.
+ try (FileOutputStream fos1 = new FileOutputStream(tempFile1);) {
+ ts.store(fos1, password.toCharArray());
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+
+ tempFile2 = File.createTempFile(keyStoreFileName, ".ks");
+ tempFile2.deleteOnExit();
+ keyStorePath = tempFile2.getAbsolutePath();
+ //generate a keystore.
+ ts = KeyStore.getInstance(KeyStore.getDefaultType());
+ ts.load(null, password.toCharArray());
+ ts.setKeyEntry("drillTest", certificate.key(), password.toCharArray(), new java.security.cert.Certificate[]{certificate.cert()});
+ // Store away the keystore.
+ try (FileOutputStream fos2 = new FileOutputStream(tempFile2);) {
+ ts.store(fos2, password.toCharArray());
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+
+ final Properties connectionProps = new Properties();
+ connectionProps.setProperty(DrillProperties.ENABLE_TLS, "true");
+ connectionProps.setProperty(DrillProperties.TRUSTSTORE_PATH, trustStorePath);
+ connectionProps.setProperty(DrillProperties.TRUSTSTORE_PASSWORD, password);
+ connectionProps.setProperty(DrillProperties.DISABLE_HOST_VERIFICATION, "false");
+
+ DrillConfig sslConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
+ .withValue(ExecConstants.USER_SSL_ENABLED, ConfigValueFactory.fromAnyRef(true))
+ .withValue(ExecConstants.SSL_KEYSTORE_TYPE, ConfigValueFactory.fromAnyRef("JKS"))
+ .withValue(ExecConstants.SSL_KEYSTORE_PATH, ConfigValueFactory.fromAnyRef(keyStorePath))
+ .withValue(ExecConstants.SSL_KEYSTORE_PASSWORD, ConfigValueFactory.fromAnyRef("test_password"))
+ .withValue(ExecConstants.SSL_PROTOCOL, ConfigValueFactory.fromAnyRef("TLSv1.2")), false);
+
+ updateTestCluster(1, sslConfig, connectionProps);
+
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ //reset cluster
+ updateTestCluster(1, newConfig, initProps);
+
+ }
+
+ @Test
+ public void testClientConfigHostNameVerificationFail() throws Exception {
+ final Properties connectionProps = new Properties();
+ connectionProps.setProperty(DrillProperties.ENABLE_TLS, "true");
+ connectionProps.setProperty(DrillProperties.TRUSTSTORE_PATH, tsPath);
+ connectionProps.setProperty(DrillProperties.TRUSTSTORE_PASSWORD, "password");
+ connectionProps.setProperty(DrillProperties.DISABLE_HOST_VERIFICATION, "false");
+ boolean failureCaught = false;
+ try {
+ updateClient(connectionProps);
+ } catch (Exception e) {
+ failureCaught = true;
+ }
+ assertEquals(failureCaught, true);
+ }
+
+ @Test
+ public void testClientConfigCertificateVerification() {
+ // Fail if certificate is not valid
+ boolean failureCaught = false;
+ try {
+ final Properties connectionProps = new Properties();
+ connectionProps.setProperty(DrillProperties.ENABLE_TLS, "true");
+ connectionProps.setProperty(DrillProperties.TRUSTSTORE_PATH, tsPath);
+ connectionProps.setProperty(DrillProperties.TRUSTSTORE_PASSWORD, "drill123");
+ connectionProps.setProperty(DrillProperties.DISABLE_HOST_VERIFICATION, "true");
+ //connectionProps.setProperty(DrillProperties.DISABLE_CERT_VERIFICATION, "true");
+
+ DrillConfig sslConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
+ .withValue(ExecConstants.USER_SSL_ENABLED, ConfigValueFactory.fromAnyRef(true))
+ .withValue(ExecConstants.SSL_KEYSTORE_TYPE, ConfigValueFactory.fromAnyRef("JKS"))
+ .withValue(ExecConstants.SSL_KEYSTORE_PATH, ConfigValueFactory.fromAnyRef(unknownKsPath))
+ .withValue(ExecConstants.SSL_KEYSTORE_PASSWORD, ConfigValueFactory.fromAnyRef("drill123"))
+ .withValue(ExecConstants.SSL_PROTOCOL, ConfigValueFactory.fromAnyRef("TLSv1.2")), false);
+
+ updateTestCluster(1, sslConfig, connectionProps);
+
+ } catch (Exception e) {
+ failureCaught = true;
+ }
+ //reset cluster
+ updateTestCluster(1, newConfig, initProps);
+ assertEquals(failureCaught, true);
+ }
+
+ @Test
+ public void testClientConfigNoCertificateVerification() {
+ // Pass if certificate is not valid, but mode is insecure.
+ try {
+ final Properties connectionProps = new Properties();
+ connectionProps.setProperty(DrillProperties.ENABLE_TLS, "true");
+ connectionProps.setProperty(DrillProperties.TRUSTSTORE_PATH, tsPath);
+ connectionProps.setProperty(DrillProperties.TRUSTSTORE_PASSWORD, "drill123");
+ connectionProps.setProperty(DrillProperties.DISABLE_HOST_VERIFICATION, "true");
+ connectionProps.setProperty(DrillProperties.DISABLE_CERT_VERIFICATION, "true");
+
+ DrillConfig sslConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
+ .withValue(ExecConstants.USER_SSL_ENABLED, ConfigValueFactory.fromAnyRef(true))
+ .withValue(ExecConstants.SSL_KEYSTORE_TYPE, ConfigValueFactory.fromAnyRef("JKS"))
+ .withValue(ExecConstants.SSL_KEYSTORE_PATH, ConfigValueFactory.fromAnyRef(unknownKsPath))
+ .withValue(ExecConstants.SSL_KEYSTORE_PASSWORD, ConfigValueFactory.fromAnyRef("drill123"))
+ .withValue(ExecConstants.SSL_PROTOCOL, ConfigValueFactory.fromAnyRef("TLSv1.2")), false);
+
+ updateTestCluster(1, sslConfig, connectionProps);
+
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ //reset cluster
+ updateTestCluster(1, newConfig, initProps);
+
+ }
+
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitSSLServer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitSSLServer.java
new file mode 100644
index 000000000..05d124512
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/rpc/user/security/TestUserBitSSLServer.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.user.security;
+
+import com.typesafe.config.ConfigValueFactory;
+import junit.framework.TestCase;
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.config.DrillProperties;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.ssl.SSLFactory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.text.MessageFormat;
+import java.util.Properties;
+
+import static org.apache.drill.exec.ssl.SSLConfig.HADOOP_SSL_CONF_TPL_KEY;
+import static org.junit.Assert.assertEquals;
+
+public class TestUserBitSSLServer extends BaseTestQuery {
+ private static final org.slf4j.Logger logger =
+ org.slf4j.LoggerFactory.getLogger(TestUserBitSSLServer.class);
+
+ private static DrillConfig sslConfig;
+ private static Properties initProps; // initial client properties
+ private static ClassLoader classLoader;
+ private static String ksPath;
+ private static String tsPath;
+ private static String emptyTSPath;
+
+ @BeforeClass
+ public static void setupTest() throws Exception {
+
+ classLoader = TestUserBitSSLServer.class.getClassLoader();
+ ksPath = new File(classLoader.getResource("ssl/keystore.ks").getFile()).getAbsolutePath();
+ tsPath = new File(classLoader.getResource("ssl/truststore.ks").getFile()).getAbsolutePath();
+ emptyTSPath = new File(classLoader.getResource("ssl/emptytruststore.ks").getFile()).getAbsolutePath();
+ sslConfig = new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties())
+ .withValue(ExecConstants.USER_SSL_ENABLED, ConfigValueFactory.fromAnyRef(true))
+ .withValue(ExecConstants.SSL_KEYSTORE_TYPE, ConfigValueFactory.fromAnyRef("JKS"))
+ .withValue(ExecConstants.SSL_KEYSTORE_PATH, ConfigValueFactory.fromAnyRef(ksPath))
+ .withValue(ExecConstants.SSL_KEYSTORE_PASSWORD, ConfigValueFactory.fromAnyRef("drill123"))
+ .withValue(ExecConstants.SSL_KEY_PASSWORD, ConfigValueFactory.fromAnyRef("drill123"))
+ .withValue(ExecConstants.SSL_PROTOCOL, ConfigValueFactory.fromAnyRef("TLSv1.2")), false);
+ initProps = new Properties();
+ initProps.setProperty(DrillProperties.ENABLE_TLS, "true");
+ initProps.setProperty(DrillProperties.TRUSTSTORE_PATH, tsPath);
+ initProps.setProperty(DrillProperties.TRUSTSTORE_PASSWORD, "drill123");
+ initProps.setProperty(DrillProperties.DISABLE_HOST_VERIFICATION, "true");
+ }
+
+ @AfterClass
+ public static void cleanTest() throws Exception {
+ DrillConfig restoreConfig =
+ new DrillConfig(DrillConfig.create(cloneDefaultTestConfigProperties()), false);
+ updateTestCluster(1, restoreConfig);
+ }
+
+ @Test
+ public void testInvalidKeystorePath() throws Exception {
+ DrillConfig testConfig = new DrillConfig(DrillConfig.create(sslConfig)
+ .withValue(ExecConstants.SSL_KEYSTORE_PATH, ConfigValueFactory.fromAnyRef("/bad/path")),
+ false);
+
+ // Start an SSL enabled cluster
+ boolean failureCaught = false;
+ try {
+ updateTestCluster(1, testConfig, initProps);
+ } catch (Exception e) {
+ failureCaught = true;
+ }
+ assertEquals(failureCaught, true);
+ }
+
+ @Test
+ public void testInvalidKeystorePassword() throws Exception {
+ DrillConfig testConfig = new DrillConfig(DrillConfig.create(sslConfig)
+ .withValue(ExecConstants.SSL_KEYSTORE_PASSWORD, ConfigValueFactory.fromAnyRef("badpassword")),
+ false);
+
+ // Start an SSL enabled cluster
+ boolean failureCaught = false;
+ try {
+ updateTestCluster(1, testConfig, initProps);
+ } catch (Exception e) {
+ failureCaught = true;
+ }
+ assertEquals(failureCaught, true);
+ }
+
+ @Test
+ public void testInvalidKeyPassword() throws Exception {
+ DrillConfig testConfig = new DrillConfig(DrillConfig.create(sslConfig)
+ .withValue(ExecConstants.SSL_KEY_PASSWORD, ConfigValueFactory.fromAnyRef("badpassword")),
+ false);
+
+ // Start an SSL enabled cluster
+ boolean failureCaught = false;
+ try {
+ updateTestCluster(1, testConfig, initProps);
+ } catch (Exception e) {
+ failureCaught = true;
+ }
+ assertEquals(failureCaught, true);
+ }
+
+ @Test
+ // Should pass because the keystore password will be used.
+ public void testNoKeyPassword() throws Exception {
+ DrillConfig testConfig = new DrillConfig(DrillConfig.create(sslConfig)
+ .withValue(ExecConstants.SSL_KEY_PASSWORD, ConfigValueFactory.fromAnyRef("")),
+ false);
+
+ // Start an SSL enabled cluster
+ boolean failureCaught = false;
+ try {
+ updateTestCluster(1, testConfig, initProps);
+ } catch (Exception e) {
+ failureCaught = true;
+ }
+ assertEquals(failureCaught, false);
+ }
+
+}
diff --git a/exec/java-exec/src/test/resources/ssl-server-invalid.xml b/exec/java-exec/src/test/resources/ssl-server-invalid.xml
new file mode 100644
index 000000000..6bfac5bce
--- /dev/null
+++ b/exec/java-exec/src/test/resources/ssl-server-invalid.xml
@@ -0,0 +1,72 @@
+<?xml version="1.0"?>
+ <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<configuration>
+
+ <property>
+ <name>ssl.server.truststore.location</name>
+ <value></value>
+ <description>Truststore to be used by NN and DN. Must be specified.
+ </description>
+ </property>
+
+ <property>
+ <name>ssl.server.truststore.password</name>
+ <value></value>
+ <description>Optional. Default value is "".
+ </description>
+ </property>
+
+ <property>
+ <name>ssl.server.truststore.type</name>
+ <value>jks</value>
+ <description>Optional. Default value is "jks".
+ </description>
+ </property>
+
+ <property>
+ <name>ssl.server.keystore.location</name>
+ <value></value>
+ <description>Keystore to be used by NN and DN. Must be specified.
+ </description>
+ </property>
+
+ <property>
+ <name>ssl.server.keystore.password</name>
+ <value>FAIL</value>
+ <description>Must be specified.
+ </description>
+ </property>
+
+ <property>
+ <name>ssl.server.keystore.keypassword</name>
+ <value></value>
+ <description>Must be specified.
+ </description>
+ </property>
+
+ <property>
+ <name>ssl.server.keystore.type</name>
+ <value>jks</value>
+ <description>Optional. Default value is "jks".
+ </description>
+ </property>
+
+</configuration>
diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillConnectionImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillConnectionImpl.java
index 0e4726d99..c0804c5a3 100644
--- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillConnectionImpl.java
+++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/impl/DrillConnectionImpl.java
@@ -157,14 +157,14 @@ class DrillConnectionImpl extends AvaticaConnection
this.client.setClientName("Apache Drill JDBC Driver");
this.client.connect(connect, info);
} catch (OutOfMemoryException e) {
- throw new SQLException("Failure creating root allocator", e);
+ throw new SQLNonTransientConnectionException("Failure creating root allocator", e);
} catch (InvalidConnectionInfoException e) {
- throw new SQLException("Invalid parameter in connection string: " + e.getMessage(), e);
+ throw new SQLNonTransientConnectionException("Invalid parameter in connection string: " + e.getMessage(), e);
} catch (RpcException e) {
// (Include cause exception's text in wrapping exception's text so
// it's more likely to get to user (e.g., via SQLLine), and use
// toString() since getMessage() text doesn't always mention error:)
- throw new SQLException("Failure in connecting to Drill: " + e, e);
+ throw new SQLNonTransientConnectionException("Failure in connecting to Drill: " + e, e);
}
}
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java
index face1af1c..6e11236fc 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/AbstractRemoteConnection.java
@@ -271,17 +271,26 @@ public abstract class AbstractRemoteConnection implements RemoteConnection, Encr
public void addSecurityHandlers() {
final ChannelPipeline channelPipeline = getChannel().pipeline();
- channelPipeline.addFirst(RpcConstants.SASL_DECRYPTION_HANDLER,
- new SaslDecryptionHandler(saslCodec, getMaxWrappedSize(), OutOfMemoryHandler.DEFAULT_INSTANCE));
-
- channelPipeline.addFirst(RpcConstants.LENGTH_DECODER_HANDLER,
- new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, Integer.MAX_VALUE,
- RpcConstants.LENGTH_FIELD_OFFSET, RpcConstants.LENGTH_FIELD_LENGTH,
- RpcConstants.LENGTH_ADJUSTMENT, RpcConstants.INITIAL_BYTES_TO_STRIP, true));
+ if (channelPipeline.names().contains(RpcConstants.SSL_HANDLER)) {
+ channelPipeline.addAfter(RpcConstants.SSL_HANDLER, RpcConstants.SASL_DECRYPTION_HANDLER,
+ new SaslDecryptionHandler(saslCodec, getMaxWrappedSize(), OutOfMemoryHandler.DEFAULT_INSTANCE));
+
+ channelPipeline.addAfter(RpcConstants.SSL_HANDLER, RpcConstants.LENGTH_DECODER_HANDLER,
+ new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, Integer.MAX_VALUE,
+ RpcConstants.LENGTH_FIELD_OFFSET, RpcConstants.LENGTH_FIELD_LENGTH,
+ RpcConstants.LENGTH_ADJUSTMENT, RpcConstants.INITIAL_BYTES_TO_STRIP, true));
+ } else {
+ channelPipeline.addFirst(RpcConstants.SASL_DECRYPTION_HANDLER,
+ new SaslDecryptionHandler(saslCodec, getMaxWrappedSize(), OutOfMemoryHandler.DEFAULT_INSTANCE));
+
+ channelPipeline.addFirst(RpcConstants.LENGTH_DECODER_HANDLER,
+ new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, Integer.MAX_VALUE,
+ RpcConstants.LENGTH_FIELD_OFFSET, RpcConstants.LENGTH_FIELD_LENGTH,
+ RpcConstants.LENGTH_ADJUSTMENT, RpcConstants.INITIAL_BYTES_TO_STRIP, true));
+ }
channelPipeline.addAfter(RpcConstants.MESSAGE_DECODER, RpcConstants.SASL_ENCRYPTION_HANDLER,
- new SaslEncryptionHandler(saslCodec, getWrapSizeLimit(),
- OutOfMemoryHandler.DEFAULT_INSTANCE));
+ new SaslEncryptionHandler(saslCodec, getWrapSizeLimit(), OutOfMemoryHandler.DEFAULT_INSTANCE));
channelPipeline.addAfter(RpcConstants.SASL_ENCRYPTION_HANDLER, RpcConstants.CHUNK_CREATION_HANDLER,
new ChunkCreationHandler(getWrapSizeLimit()));
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index d51b748e5..0d80df6da 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.rpc;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.ChannelFuture;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
@@ -33,12 +33,10 @@ import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
-import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
-import org.apache.drill.exec.rpc.RpcConnectionHandler.FailureType;
import com.google.common.base.Preconditions;
import com.google.protobuf.Internal.EnumLite;
@@ -69,6 +67,7 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
private final Parser<HR> handshakeParser;
private final IdlePingHandler pingHandler;
+ private ConnectionMultiListener.SSLHandshakeListener sslHandshakeListener = null;
public BasicClient(RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup, T handshakeType,
Class<HR> responseClass, Parser<HR> handshakeParser) {
@@ -100,6 +99,10 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
ch.closeFuture().addListener(getCloseHandler(ch, connection));
final ChannelPipeline pipe = ch.pipeline();
+ // Make sure that the SSL handler is the first handler in the pipeline so everything is encrypted
+ if (isSslEnabled()) {
+ setupSSL(pipe, sslHandshakeListener);
+ }
pipe.addLast(RpcConstants.PROTOCOL_DECODER, getDecoder(connection.getAllocator()));
pipe.addLast(RpcConstants.MESSAGE_DECODER, new RpcDecoder("c-" + rpcConfig.getName()));
@@ -111,7 +114,7 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
}
pipe.addLast(RpcConstants.MESSAGE_HANDLER, new InboundHandler(connection));
- pipe.addLast(RpcConstants.EXCEPTION_HANDLER, new RpcExceptionHandler<CC>(connection));
+ pipe.addLast(RpcConstants.EXCEPTION_HANDLER, new RpcExceptionHandler<>(connection));
}
}); //
@@ -120,6 +123,21 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
// }
}
+ // Adds a SSL handler if enabled. Required only for client and server communications, so
+ // a real implementation is only available for UserClient
+ protected void setupSSL(ChannelPipeline pipe, ConnectionMultiListener.SSLHandshakeListener sslHandshakeListener) {
+ throw new UnsupportedOperationException("SSL is implemented only by the User Client.");
+ }
+
+ protected boolean isSslEnabled() {
+ return false;
+ }
+
+ // Save the SslChannel after the SSL handshake so it can be closed later
+ public void setSslChannel(Channel c) {
+
+ }
+
@Override
protected CC initRemoteConnection(SocketChannel channel){
local=channel.localAddress();
@@ -144,7 +162,7 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
}
};
- public IdlePingHandler(long idleWaitInMillis) {
+ IdlePingHandler(long idleWaitInMillis) {
super(0, idleWaitInMillis, 0, TimeUnit.MILLISECONDS);
}
@@ -179,6 +197,13 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
return super.send(connection, rpcType, protobufBody, clazz, dataBodies);
}
+ public <SEND extends MessageLite, RECEIVE extends MessageLite>
+ void send(RpcOutcomeListener<RECEIVE> listener, SEND protobufBody, boolean allowInEventLoop,
+ ByteBuf... dataBodies) {
+ super.send(listener, connection, handshakeType, protobufBody, (Class<RECEIVE>) responseClass,
+ allowInEventLoop, dataBodies);
+ }
+
// the command itself must be "run" by the caller (to avoid calling inEventLoop)
protected <M extends MessageLite> RpcCommand<M, CC>
getInitialCommand(final RpcCommand<M, CC> command) {
@@ -187,116 +212,24 @@ public abstract class BasicClient<T extends EnumLite, CC extends ClientConnectio
protected void connectAsClient(RpcConnectionHandler<CC> connectionListener, HS handshakeValue,
String host, int port) {
- ConnectionMultiListener cml = new ConnectionMultiListener(connectionListener, handshakeValue);
- b.connect(host, port).addListener(cml.connectionHandler);
- }
-
- private class ConnectionMultiListener {
- private final RpcConnectionHandler<CC> l;
- private final HS handshakeValue;
-
- public ConnectionMultiListener(RpcConnectionHandler<CC> l, HS handshakeValue) {
- assert l != null;
- assert handshakeValue != null;
-
- this.l = l;
- this.handshakeValue = handshakeValue;
- }
-
- public final ConnectionHandler connectionHandler = new ConnectionHandler();
- public final HandshakeSendHandler handshakeSendHandler = new HandshakeSendHandler();
-
- /**
- * Manages connection establishment outcomes.
- */
- private class ConnectionHandler implements GenericFutureListener<ChannelFuture> {
-
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- boolean isInterrupted = false;
-
- // We want to wait for at least 120 secs when interrupts occur. Establishing a connection fails/succeeds quickly,
- // So there is no point propagating the interruption as failure immediately.
- long remainingWaitTimeMills = 120000;
- long startTime = System.currentTimeMillis();
- // logger.debug("Connection operation finished. Success: {}", future.isSuccess());
- while(true) {
- try {
- future.get(remainingWaitTimeMills, TimeUnit.MILLISECONDS);
- if (future.isSuccess()) {
- SocketAddress remote = future.channel().remoteAddress();
- SocketAddress local = future.channel().localAddress();
- setAddresses(remote, local);
- // send a handshake on the current thread. This is the only time we will send from within the event thread.
- // We can do this because the connection will not be backed up.
- send(handshakeSendHandler, connection, handshakeType, handshakeValue, responseClass, true);
- } else {
- l.connectionFailed(FailureType.CONNECTION, new RpcException("General connection failure."));
- }
- // logger.debug("Handshake queued for send.");
- break;
- } catch (final InterruptedException interruptEx) {
- remainingWaitTimeMills -= (System.currentTimeMillis() - startTime);
- startTime = System.currentTimeMillis();
- isInterrupted = true;
- if (remainingWaitTimeMills < 1) {
- l.connectionFailed(FailureType.CONNECTION, interruptEx);
- break;
- }
- // Ignore the interrupt and continue to wait until we elapse remainingWaitTimeMills.
- } catch (final Exception ex) {
- logger.error("Failed to establish connection", ex);
- l.connectionFailed(FailureType.CONNECTION, ex);
- break;
- }
- }
-
- if (isInterrupted) {
- // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
- // interruption and respond to it if it wants to.
- Thread.currentThread().interrupt();
- }
- }
- }
-
- /**
- * manages handshake outcomes.
- */
- private class HandshakeSendHandler implements RpcOutcomeListener<HR> {
-
- @Override
- public void failed(RpcException ex) {
- logger.debug("Failure while initiating handshake", ex);
- l.connectionFailed(FailureType.HANDSHAKE_COMMUNICATION, ex);
- }
-
- @Override
- public void success(HR value, ByteBuf buffer) {
- // logger.debug("Handshake received. {}", value);
- try {
- validateHandshake(value);
- finalizeConnection(value, connection);
- l.connectionSucceeded(connection);
- // logger.debug("Handshake completed succesfully.");
- } catch (Exception ex) {
- logger.debug("Failure while validating handshake", ex);
- l.connectionFailed(FailureType.HANDSHAKE_VALIDATION, ex);
- }
- }
-
- @Override
- public void interrupted(final InterruptedException ex) {
- logger.warn("Interrupted while waiting for handshake response", ex);
- l.connectionFailed(FailureType.HANDSHAKE_COMMUNICATION, ex);
- }
+ ConnectionMultiListener<T, CC, HS, HR, BasicClient<T, CC, HS, HR>> cml;
+ ConnectionMultiListener.Builder<T, CC, HS, HR, BasicClient<T, CC, HS, HR> > builder =
+ ConnectionMultiListener.newBuilder(connectionListener, handshakeValue, this);
+ if (isSslEnabled()) {
+ cml = builder.enableSSL().build();
+ sslHandshakeListener = new ConnectionMultiListener.SSLHandshakeListener();
+ sslHandshakeListener.setParent(cml);
+ } else {
+ cml = builder.build();
}
+ b.connect(host, port).addListener(cml.connectionHandler);
}
private class ClientHandshakeHandler extends AbstractHandshakeHandler<HR> {
private final CC connection;
- public ClientHandshakeHandler(CC connection) {
+ ClientHandshakeHandler(CC connection) {
super(BasicClient.this.handshakeType, BasicClient.this.handshakeParser);
Preconditions.checkNotNull(connection);
this.connection = connection;
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index a7258dd96..67fc89a19 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -21,6 +21,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
@@ -82,6 +83,11 @@ public abstract class BasicServer<T extends EnumLite, SC extends ServerConnectio
ch.closeFuture().addListener(getCloseHandler(ch, connection));
final ChannelPipeline pipe = ch.pipeline();
+ // Make sure that the SSL handler is the first handler in the pipeline so everything is encrypted
+ if (isSslEnabled()) {
+ setupSSL(pipe);
+ }
+
pipe.addLast(RpcConstants.PROTOCOL_DECODER, getDecoder(connection.getAllocator(), getOutOfMemoryHandler()));
pipe.addLast(RpcConstants.MESSAGE_DECODER, new RpcDecoder("s-" + rpcConfig.getName()));
pipe.addLast(RpcConstants.PROTOCOL_ENCODER, new RpcEncoder("s-" + rpcConfig.getName()));
@@ -105,6 +111,25 @@ public abstract class BasicServer<T extends EnumLite, SC extends ServerConnectio
// }
}
+ // Adds a SSL handler if enabled. Required only for client and server communications, so
+ // a real implementation is only available for UserServer
+ protected void setupSSL(ChannelPipeline pipe) {
+ throw new UnsupportedOperationException("SSL is implemented only by the User Server.");
+ }
+
+ protected boolean isSslEnabled() {
+ return false;
+ }
+
+ // Save the SslChannel after the SSL handshake so it can be closed later
+ public void setSslChannel(Channel c) {
+ return;
+ }
+
+ protected void closeSSL() {
+ return;
+ }
+
private class LoggingReadTimeoutHandler extends ReadTimeoutHandler {
private final SC connection;
@@ -203,6 +228,9 @@ public abstract class BasicServer<T extends EnumLite, SC extends ServerConnectio
if (elapsed > 500) {
logger.info("closed eventLoopGroup " + eventLoopGroup + " in " + elapsed + " ms");
}
+ if(isSslEnabled()) {
+ closeSSL();
+ }
} catch (final InterruptedException | ExecutionException e) {
logger.warn("Failure while shutting down {}. ", this.getClass().getName(), e);
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ConnectionMultiListener.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ConnectionMultiListener.java
new file mode 100644
index 000000000..0cdca13a6
--- /dev/null
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/ConnectionMultiListener.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc;
+
+import com.google.protobuf.Internal.EnumLite;
+import com.google.protobuf.MessageLite;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+import org.apache.drill.common.exceptions.DrillException;
+import org.slf4j.Logger;
+
+import java.net.SocketAddress;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @param <CC> Client Connection Listener
+ * @param <HS> Outbound handshake message type
+ * @param <HR> Inbound handshake message type
+ * @param <BC> BasicClient type
+ * <p>
+ * Implements a wrapper class that allows a client connection to associate different behaviours after
+ * establishing a connection with the server. The client can choose to send an application handshake, or
+ * in the case of SSL, wait for a SSL handshake completion and then send an application handshake.
+ */
+
+public class ConnectionMultiListener<T extends EnumLite, CC extends ClientConnection, HS extends MessageLite, HR extends MessageLite, BC extends BasicClient<T, CC, HS, HR>> {
+
+ private static final Logger logger = org.slf4j.LoggerFactory.getLogger(ConnectionMultiListener.class);
+
+ private final RpcConnectionHandler<CC> connectionListener;
+ private final HS handshakeValue;
+ private final BC parent;
+
+ private ConnectionMultiListener(RpcConnectionHandler<CC> connectionListener, HS handshakeValue,
+ BC basicClient) {
+ assert connectionListener != null;
+ assert handshakeValue != null;
+
+ this.connectionListener = connectionListener;
+ this.handshakeValue = handshakeValue;
+ this.parent = basicClient;
+ }
+
+ public static <T extends EnumLite, CC extends ClientConnection, HS extends MessageLite, HR extends MessageLite, BC extends BasicClient<T, CC, HS, HR>>
+ Builder<T, CC, HS, HR, BC>
+ newBuilder(RpcConnectionHandler<CC> connectionListener, HS handshakeValue,
+ BC basicClient) {
+ return new Builder<>(connectionListener, handshakeValue, basicClient);
+ }
+
+ ConnectionHandler connectionHandler = null;
+ private HandshakeSendHandler handshakeSendHandler = null;
+ private SSLConnectionHandler sslConnectionHandler = null;
+
+ /**
+ * Manages connection establishment outcomes.
+ */
+ private class ConnectionHandler implements GenericFutureListener<ChannelFuture> {
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ boolean isInterrupted = false;
+
+ // We want to wait for at least 120 secs when interrupts occur. Establishing a connection fails/succeeds quickly,
+ // So there is no point propagating the interruption as failure immediately.
+ long remainingWaitTimeMills = 120000;
+ long startTime = System.currentTimeMillis();
+ // logger.debug("Connection operation finished. Success: {}", future.isSuccess());
+ while (true) {
+ try {
+ future.get(remainingWaitTimeMills, TimeUnit.MILLISECONDS);
+ if (future.isSuccess()) {
+ SocketAddress remote = future.channel().remoteAddress();
+ SocketAddress local = future.channel().localAddress();
+ parent.setAddresses(remote, local);
+ // if SSL is enabled send the handshake after the ssl handshake is completed, otherwise send it
+ // now
+ if(!parent.isSslEnabled()) {
+ // send a handshake on the current thread. This is the only time we will send from within the event thread.
+ // We can do this because the connection will not be backed up.
+ parent.send(handshakeSendHandler, handshakeValue, true);
+ }
+ } else {
+ connectionListener.connectionFailed(RpcConnectionHandler.FailureType.CONNECTION,
+ new RpcException("General connection failure."));
+ }
+ // logger.debug("Handshake queued for send.");
+ break;
+ } catch (final InterruptedException interruptEx) {
+ remainingWaitTimeMills -= (System.currentTimeMillis() - startTime);
+ startTime = System.currentTimeMillis();
+ isInterrupted = true;
+ if (remainingWaitTimeMills < 1) {
+ connectionListener.connectionFailed(RpcConnectionHandler.FailureType.CONNECTION, interruptEx);
+ break;
+ }
+ // Ignore the interrupt and continue to wait until we elapse remainingWaitTimeMills.
+ } catch (final Exception ex) {
+ logger.error("Failed to establish connection", ex);
+ connectionListener.connectionFailed(RpcConnectionHandler.FailureType.CONNECTION, ex);
+ break;
+ }
+ }
+
+ if (isInterrupted) {
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private class SSLConnectionHandler implements GenericFutureListener<Future<Channel>> {
+ @Override
+ public void operationComplete(Future<Channel> future) throws Exception {
+ // send the handshake
+ parent.send(handshakeSendHandler, handshakeValue, true);
+ }
+ }
+
+ /**
+ * manages handshake outcomes.
+ */
+ private class HandshakeSendHandler implements RpcOutcomeListener<HR> {
+
+ @Override
+ public void failed(RpcException ex) {
+ logger.debug("Failure while initiating handshake", ex);
+ connectionListener.connectionFailed(RpcConnectionHandler.FailureType.HANDSHAKE_COMMUNICATION, ex);
+ }
+
+ @Override
+ public void success(HR value, ByteBuf buffer) {
+ // logger.debug("Handshake received. {}", value);
+ try {
+ parent.validateHandshake(value);
+ parent.finalizeConnection(value, parent.connection);
+ connectionListener.connectionSucceeded(parent.connection);
+ // logger.debug("Handshake completed succesfully.");
+ } catch (Exception ex) {
+ logger.debug("Failure while validating handshake", ex);
+ connectionListener.connectionFailed(RpcConnectionHandler.FailureType.HANDSHAKE_VALIDATION, ex);
+ }
+ }
+
+ @Override
+ public void interrupted(final InterruptedException ex) {
+ logger.warn("Interrupted while waiting for handshake response", ex);
+ connectionListener.connectionFailed(RpcConnectionHandler.FailureType.HANDSHAKE_COMMUNICATION, ex);
+ }
+ }
+
+ /*
+ The SSL Handshake listener is special in that it is needed at the time of initializing an SSL
+ enabled pipeline and so is instantiated before the instance of the outer class may be needed.
+ We create an instance and set a reference back to the outer class instance when it is created
+ at the time of connection.
+ */
+ public static class SSLHandshakeListener implements GenericFutureListener<Future<Channel>> {
+ ConnectionMultiListener parent;
+ SSLHandshakeListener() {
+ }
+
+ public void setParent(ConnectionMultiListener cml){
+ this.parent = cml;
+ }
+
+ @Override
+ public void operationComplete(Future<Channel> future) throws Exception {
+ if(parent != null){
+ if(future.isSuccess()) {
+ Channel c = future.get();
+ parent.sslConnectionHandler.operationComplete(future);
+ parent.parent.setSslChannel(c);
+ } else {
+ throw new DrillException("SSL handshake failed.", future.cause());
+ }
+ } else {
+ throw new RpcException("RPC Setup error. SSL handshake complete handler is not set up.");
+ }
+ }
+ }
+
+
+ public static class Builder<T extends EnumLite, CC extends ClientConnection, HS extends MessageLite, HR extends MessageLite, BC extends BasicClient<T, CC, HS, HR> > {
+
+ private final RpcConnectionHandler<CC> connectionListener;
+ private final HS handshakeValue;
+ private final BC basicClient;
+ boolean enableSSL = false;
+
+ private ConnectionMultiListener<T, CC, HS, HR, BC> cml;
+
+ private Builder(RpcConnectionHandler<CC> connectionListener, HS handshakeValue, BC basicClient) {
+ this.connectionListener = connectionListener;
+ this.handshakeValue = handshakeValue;
+ this.basicClient = basicClient;
+ }
+
+ Builder<T, CC, HS, HR, BC> enableSSL() {
+ enableSSL = true;
+ return this;
+ }
+
+ public ConnectionMultiListener<T, CC, HS, HR, BC> build() {
+ this.cml = new ConnectionMultiListener<>(connectionListener, handshakeValue, basicClient);
+ cml.connectionHandler = cml.new ConnectionHandler();
+ cml.handshakeSendHandler = cml.new HandshakeSendHandler();
+ if(enableSSL) {
+ cml.sslConnectionHandler = cml.new SSLConnectionHandler();
+ }
+ return cml;
+ }
+
+ }
+
+}
diff --git a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java
index be58f371c..455ada84c 100644
--- a/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java
+++ b/exec/rpc/src/main/java/org/apache/drill/exec/rpc/RpcConstants.java
@@ -38,6 +38,7 @@ public class RpcConstants {
public static final String SASL_ENCRYPTION_HANDLER = "sasl-encryption-handler";
public static final String LENGTH_DECODER_HANDLER = "length-decoder";
public static final String CHUNK_CREATION_HANDLER = "chunk-creation-handler";
+ public static final String SSL_HANDLER = "ssl-handler";