aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java124
1 files changed, 10 insertions, 114 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
index cba323e96..267b483d7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
@@ -17,7 +17,7 @@
*/
package org.apache.drill.exec.rpc.data;
-import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.collect.ImmutableList;
import com.google.protobuf.MessageLite;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
@@ -29,21 +29,14 @@ import org.apache.drill.exec.proto.BitData.BitServerHandshake;
import org.apache.drill.exec.proto.BitData.RpcType;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.rpc.BasicClient;
+import org.apache.drill.exec.rpc.BitRpcUtility;
import org.apache.drill.exec.rpc.OutOfMemoryHandler;
import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
import org.apache.drill.exec.rpc.ResponseSender;
-import org.apache.drill.exec.rpc.RpcCommand;
+import org.apache.drill.exec.rpc.RpcConnectionHandler;
import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.RpcOutcomeListener;
-import org.apache.drill.exec.rpc.security.AuthenticationOutcomeListener;
-import org.apache.drill.exec.rpc.security.SaslProperties;
-import org.apache.hadoop.security.UserGroupInformation;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
+import java.util.List;
public class DataClient extends BasicClient<RpcType, DataClientConnection, BitClientHandshake, BitServerHandshake> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataClient.class);
@@ -103,114 +96,17 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl
}
@Override
- protected void validateHandshake(BitServerHandshake handshake) throws RpcException {
- if (handshake.getRpcVersion() != DataRpcConfig.RPC_VERSION) {
- throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.",
- handshake.getRpcVersion(), DataRpcConfig.RPC_VERSION));
- }
-
- if (handshake.getAuthenticationMechanismsCount() != 0) { // remote requires authentication
- final SaslClient saslClient;
- try {
-
- final Map<String, String> saslProperties = SaslProperties.getSaslProperties(connection.isEncryptionEnabled(),
- connection.getMaxWrappedSize());
-
- saslClient = config.getAuthFactory(handshake.getAuthenticationMechanismsList())
- .createSaslClient(UserGroupInformation.getLoginUser(),
- config.getSaslClientProperties(remoteEndpoint, saslProperties));
- } catch (final IOException e) {
- throw new RpcException(String.format("Failed to initiate authenticate to %s", remoteEndpoint.getAddress()), e);
- }
- if (saslClient == null) {
- throw new RpcException("Unexpected failure. Could not initiate SASL exchange.");
- }
- connection.setSaslClient(saslClient);
- } else {
- if (config.getAuthMechanismToUse() != null) {
- throw new RpcException(String.format("Drillbit (%s) does not require auth, but auth is enabled.",
- remoteEndpoint.getAddress()));
- }
- }
- }
-
- protected <M extends MessageLite> RpcCommand<M, DataClientConnection>
- getInitialCommand(final RpcCommand<M, DataClientConnection> command) {
- final RpcCommand<M, DataClientConnection> initialCommand = super.getInitialCommand(command);
- if (config.getAuthMechanismToUse() == null) {
- return initialCommand;
- } else {
- return new AuthenticationCommand<>(initialCommand);
- }
+ protected void prepareSaslHandshake(final RpcConnectionHandler<DataClientConnection> connectionHandler, List<String> serverAuthMechanisms) {
+ BitRpcUtility.prepareSaslHandshake(connectionHandler, serverAuthMechanisms, connection, config, remoteEndpoint,
+ this, RpcType.SASL_MESSAGE);
}
- private class AuthenticationCommand<M extends MessageLite> implements RpcCommand<M, DataClientConnection> {
-
- private final RpcCommand<M, DataClientConnection> command;
-
- AuthenticationCommand(RpcCommand<M, DataClientConnection> command) {
- this.command = command;
- }
-
@Override
- public void connectionAvailable(DataClientConnection connection) {
- command.connectionFailed(FailureType.AUTHENTICATION, new SaslException("Should not reach here."));
+ protected List<String> validateHandshake(BitServerHandshake handshake) throws RpcException {
+ return BitRpcUtility.validateHandshake(handshake.getRpcVersion(), handshake.getAuthenticationMechanismsList(),
+ DataRpcConfig.RPC_VERSION, connection, config, this);
}
- @Override
- public void connectionSucceeded(final DataClientConnection connection) {
- final UserGroupInformation loginUser;
- try {
- loginUser = UserGroupInformation.getLoginUser();
- } catch (final IOException e) {
- logger.debug("Unexpected failure trying to login.", e);
- command.connectionFailed(FailureType.AUTHENTICATION, e);
- return;
- }
-
- final SettableFuture<Void> future = SettableFuture.create();
- new AuthenticationOutcomeListener<>(DataClient.this, connection, RpcType.SASL_MESSAGE,
- loginUser,
- new RpcOutcomeListener<Void>() {
- @Override
- public void failed(RpcException ex) {
- logger.debug("Authentication failed.", ex);
- future.setException(ex);
- }
-
- @Override
- public void success(Void value, ByteBuf buffer) {
- future.set(null);
- }
-
- @Override
- public void interrupted(InterruptedException e) {
- logger.debug("Authentication failed.", e);
- future.setException(e);
- }
- }).initiate(config.getAuthMechanismToUse());
-
- try {
- logger.trace("Waiting until authentication completes..");
- future.get();
- command.connectionSucceeded(connection);
- } catch (InterruptedException e) {
- command.connectionFailed(FailureType.AUTHENTICATION, e);
- // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
- // interruption and respond to it if it wants to.
- Thread.currentThread().interrupt();
- } catch (ExecutionException e) {
- command.connectionFailed(FailureType.AUTHENTICATION, e);
- }
- }
-
- @Override
- public void connectionFailed(FailureType type, Throwable t) {
- logger.debug("Authentication failed.", t);
- command.connectionFailed(FailureType.AUTHENTICATION, t);
- }
- }
-
@Override
public ProtobufLengthDecoder getDecoder(BufferAllocator allocator) {
return new DataProtobufLengthDecoder.Client(allocator, OutOfMemoryHandler.DEFAULT_INSTANCE);