diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java | 124 |
1 files changed, 10 insertions, 114 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java index cba323e96..267b483d7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java @@ -17,7 +17,7 @@ */ package org.apache.drill.exec.rpc.data; -import com.google.common.util.concurrent.SettableFuture; +import com.google.common.collect.ImmutableList; import com.google.protobuf.MessageLite; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; @@ -29,21 +29,14 @@ import org.apache.drill.exec.proto.BitData.BitServerHandshake; import org.apache.drill.exec.proto.BitData.RpcType; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.rpc.BasicClient; +import org.apache.drill.exec.rpc.BitRpcUtility; import org.apache.drill.exec.rpc.OutOfMemoryHandler; import org.apache.drill.exec.rpc.ProtobufLengthDecoder; import org.apache.drill.exec.rpc.ResponseSender; -import org.apache.drill.exec.rpc.RpcCommand; +import org.apache.drill.exec.rpc.RpcConnectionHandler; import org.apache.drill.exec.rpc.RpcException; -import org.apache.drill.exec.rpc.RpcOutcomeListener; -import org.apache.drill.exec.rpc.security.AuthenticationOutcomeListener; -import org.apache.drill.exec.rpc.security.SaslProperties; -import org.apache.hadoop.security.UserGroupInformation; -import javax.security.sasl.SaslClient; -import javax.security.sasl.SaslException; -import java.io.IOException; -import java.util.Map; -import java.util.concurrent.ExecutionException; +import java.util.List; public class DataClient extends BasicClient<RpcType, DataClientConnection, BitClientHandshake, BitServerHandshake> { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataClient.class); @@ -103,114 +96,17 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl } @Override - protected void validateHandshake(BitServerHandshake handshake) throws RpcException { - if (handshake.getRpcVersion() != DataRpcConfig.RPC_VERSION) { - throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", - handshake.getRpcVersion(), DataRpcConfig.RPC_VERSION)); - } - - if (handshake.getAuthenticationMechanismsCount() != 0) { // remote requires authentication - final SaslClient saslClient; - try { - - final Map<String, String> saslProperties = SaslProperties.getSaslProperties(connection.isEncryptionEnabled(), - connection.getMaxWrappedSize()); - - saslClient = config.getAuthFactory(handshake.getAuthenticationMechanismsList()) - .createSaslClient(UserGroupInformation.getLoginUser(), - config.getSaslClientProperties(remoteEndpoint, saslProperties)); - } catch (final IOException e) { - throw new RpcException(String.format("Failed to initiate authenticate to %s", remoteEndpoint.getAddress()), e); - } - if (saslClient == null) { - throw new RpcException("Unexpected failure. Could not initiate SASL exchange."); - } - connection.setSaslClient(saslClient); - } else { - if (config.getAuthMechanismToUse() != null) { - throw new RpcException(String.format("Drillbit (%s) does not require auth, but auth is enabled.", - remoteEndpoint.getAddress())); - } - } - } - - protected <M extends MessageLite> RpcCommand<M, DataClientConnection> - getInitialCommand(final RpcCommand<M, DataClientConnection> command) { - final RpcCommand<M, DataClientConnection> initialCommand = super.getInitialCommand(command); - if (config.getAuthMechanismToUse() == null) { - return initialCommand; - } else { - return new AuthenticationCommand<>(initialCommand); - } + protected void prepareSaslHandshake(final RpcConnectionHandler<DataClientConnection> connectionHandler, List<String> serverAuthMechanisms) { + BitRpcUtility.prepareSaslHandshake(connectionHandler, serverAuthMechanisms, connection, config, remoteEndpoint, + this, RpcType.SASL_MESSAGE); } - private class AuthenticationCommand<M extends MessageLite> implements RpcCommand<M, DataClientConnection> { - - private final RpcCommand<M, DataClientConnection> command; - - AuthenticationCommand(RpcCommand<M, DataClientConnection> command) { - this.command = command; - } - @Override - public void connectionAvailable(DataClientConnection connection) { - command.connectionFailed(FailureType.AUTHENTICATION, new SaslException("Should not reach here.")); + protected List<String> validateHandshake(BitServerHandshake handshake) throws RpcException { + return BitRpcUtility.validateHandshake(handshake.getRpcVersion(), handshake.getAuthenticationMechanismsList(), + DataRpcConfig.RPC_VERSION, connection, config, this); } - @Override - public void connectionSucceeded(final DataClientConnection connection) { - final UserGroupInformation loginUser; - try { - loginUser = UserGroupInformation.getLoginUser(); - } catch (final IOException e) { - logger.debug("Unexpected failure trying to login.", e); - command.connectionFailed(FailureType.AUTHENTICATION, e); - return; - } - - final SettableFuture<Void> future = SettableFuture.create(); - new AuthenticationOutcomeListener<>(DataClient.this, connection, RpcType.SASL_MESSAGE, - loginUser, - new RpcOutcomeListener<Void>() { - @Override - public void failed(RpcException ex) { - logger.debug("Authentication failed.", ex); - future.setException(ex); - } - - @Override - public void success(Void value, ByteBuf buffer) { - future.set(null); - } - - @Override - public void interrupted(InterruptedException e) { - logger.debug("Authentication failed.", e); - future.setException(e); - } - }).initiate(config.getAuthMechanismToUse()); - - try { - logger.trace("Waiting until authentication completes.."); - future.get(); - command.connectionSucceeded(connection); - } catch (InterruptedException e) { - command.connectionFailed(FailureType.AUTHENTICATION, e); - // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the - // interruption and respond to it if it wants to. - Thread.currentThread().interrupt(); - } catch (ExecutionException e) { - command.connectionFailed(FailureType.AUTHENTICATION, e); - } - } - - @Override - public void connectionFailed(FailureType type, Throwable t) { - logger.debug("Authentication failed.", t); - command.connectionFailed(FailureType.AUTHENTICATION, t); - } - } - @Override public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) { return new DataProtobufLengthDecoder.Client(allocator, OutOfMemoryHandler.DEFAULT_INSTANCE); |