diff options
author | Sudheesh Katkam <sudheesh@apache.org> | 2017-01-25 19:04:33 -0800 |
---|---|---|
committer | Sudheesh Katkam <sudheesh@apache.org> | 2017-02-24 19:01:43 -0800 |
commit | e2582a10bfb6475e691be326c6d0498a697e6a0f (patch) | |
tree | b293a919a84463a9fee19457a4a0dea9abab01c8 /exec/java-exec/src/main/java/org/apache/drill/exec/rpc | |
parent | 180dd5648ab604b6396d91ba69a2f777f19bf79c (diff) |
DRILL-4280: CORE (bit to bit authentication, data)
+ Support authentication in DataServer and DataClient
+ Add AuthenticationCommand as an initial command after handshake
and before the command that initiates a connection
+ Add DataConnectionConfig to encapsulate configuration
+ Add DataServerRequestHandler to encapsulate all handling of
requests to DataServer
data
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/rpc')
11 files changed, 365 insertions, 187 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java index 9db551b46..a37008d5d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.rpc.data; +import com.google.common.util.concurrent.SettableFuture; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.socket.SocketChannel; @@ -30,42 +31,54 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.rpc.BasicClient; import org.apache.drill.exec.rpc.OutOfMemoryHandler; import org.apache.drill.exec.rpc.ProtobufLengthDecoder; -import org.apache.drill.exec.rpc.Response; +import org.apache.drill.exec.rpc.ResponseSender; +import org.apache.drill.exec.rpc.RpcCommand; import org.apache.drill.exec.rpc.RpcException; -import org.apache.drill.exec.server.BootStrapContext; +import org.apache.drill.exec.rpc.security.AuthenticationOutcomeListener; +import org.apache.drill.exec.rpc.RpcOutcomeListener; import com.google.protobuf.MessageLite; +import org.apache.hadoop.security.UserGroupInformation; -public class DataClient extends BasicClient<RpcType, DataClientConnection, BitClientHandshake, BitServerHandshake>{ +import javax.security.sasl.SaslClient; +import javax.security.sasl.SaslException; +import java.io.IOException; +import java.util.concurrent.ExecutionException; - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataClient.class); +public class DataClient extends BasicClient<RpcType, DataClientConnection, BitClientHandshake, BitServerHandshake> { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataClient.class); + private final DrillbitEndpoint remoteEndpoint; private volatile DataClientConnection connection; - private final BufferAllocator allocator; private final DataConnectionManager.CloseHandlerCreator closeHandlerFactory; + private final DataConnectionConfig config; - - public DataClient(DrillbitEndpoint remoteEndpoint, BootStrapContext context, DataConnectionManager.CloseHandlerCreator closeHandlerFactory) { + public DataClient(DrillbitEndpoint remoteEndpoint, DataConnectionConfig config, + DataConnectionManager.CloseHandlerCreator closeHandlerFactory) { super( - DataRpcConfig.getMapping(context.getConfig(), context.getExecutor()), - context.getAllocator().getAsByteBufAllocator(), - context.getBitClientLoopGroup(), + DataRpcConfig.getMapping(config.getBootstrapContext().getConfig(), + config.getBootstrapContext().getExecutor()), + config.getBootstrapContext().getAllocator().getAsByteBufAllocator(), + config.getBootstrapContext().getBitClientLoopGroup(), RpcType.HANDSHAKE, BitServerHandshake.class, BitServerHandshake.PARSER); + + this.remoteEndpoint = remoteEndpoint; + this.config = config; this.closeHandlerFactory = closeHandlerFactory; - this.allocator = context.getAllocator(); } @Override - public DataClientConnection initRemoteConnection(SocketChannel channel) { + protected DataClientConnection initRemoteConnection(SocketChannel channel) { super.initRemoteConnection(channel); this.connection = new DataClientConnection(channel, this); return connection; } @Override - protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, DataClientConnection clientConnection) { + protected GenericFutureListener<ChannelFuture> + getCloseHandler(SocketChannel ch, DataClientConnection clientConnection) { return closeHandlerFactory.getHandler(clientConnection, super.getCloseHandler(ch, clientConnection)); } @@ -75,27 +88,119 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl } @Override - protected Response handle(DataClientConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException { + protected void handle(DataClientConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody, + ResponseSender sender) throws RpcException { throw new UnsupportedOperationException("DataClient is unidirectional by design."); } BufferAllocator getAllocator() { - return allocator; + return config.getAllocator(); } @Override protected void validateHandshake(BitServerHandshake handshake) throws RpcException { if (handshake.getRpcVersion() != DataRpcConfig.RPC_VERSION) { - throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", handshake.getRpcVersion(), DataRpcConfig.RPC_VERSION)); + throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", + handshake.getRpcVersion(), DataRpcConfig.RPC_VERSION)); + } + + if (handshake.getAuthenticationMechanismsCount() != 0) { // remote requires authentication + final SaslClient saslClient; + try { + saslClient = config.getAuthFactory(handshake.getAuthenticationMechanismsList()) + .createSaslClient(UserGroupInformation.getLoginUser(), + config.getSaslClientProperties(remoteEndpoint)); + } catch (final IOException e) { + throw new RpcException(String.format("Failed to initiate authenticate to %s", remoteEndpoint.getAddress()), e); + } + if (saslClient == null) { + throw new RpcException("Unexpected failure. Could not initiate SASL exchange."); + } + connection.setSaslClient(saslClient); + } else { + if (config.getAuthMechanismToUse() != null) { + throw new RpcException(String.format("Drillbit (%s) does not require auth, but auth is enabled.", + remoteEndpoint.getAddress())); + } } } @Override - protected void finalizeConnection(BitServerHandshake handshake, DataClientConnection connection) { + protected <M extends MessageLite> RpcCommand<M, DataClientConnection> + getInitialCommand(final RpcCommand<M, DataClientConnection> command) { + final RpcCommand<M, DataClientConnection> initialCommand = super.getInitialCommand(command); + if (config.getAuthMechanismToUse() == null) { + return initialCommand; + } else { + return new AuthenticationCommand<>(initialCommand); + } } - public DataClientConnection getConnection() { - return this.connection; + private class AuthenticationCommand<M extends MessageLite> implements RpcCommand<M, DataClientConnection> { + + private final RpcCommand<M, DataClientConnection> command; + + AuthenticationCommand(RpcCommand<M, DataClientConnection> command) { + this.command = command; + } + + @Override + public void connectionAvailable(DataClientConnection connection) { + command.connectionFailed(FailureType.AUTHENTICATION, new SaslException("Should not reach here.")); + } + + @Override + public void connectionSucceeded(final DataClientConnection connection) { + final UserGroupInformation loginUser; + try { + loginUser = UserGroupInformation.getLoginUser(); + } catch (final IOException e) { + logger.debug("Unexpected failure trying to login.", e); + command.connectionFailed(FailureType.AUTHENTICATION, e); + return; + } + + final SettableFuture<Void> future = SettableFuture.create(); + new AuthenticationOutcomeListener<>(DataClient.this, connection, RpcType.SASL_MESSAGE, + loginUser, + new RpcOutcomeListener<Void>() { + @Override + public void failed(RpcException ex) { + logger.debug("Authentication failed.", ex); + future.setException(ex); + } + + @Override + public void success(Void value, ByteBuf buffer) { + future.set(null); + } + + @Override + public void interrupted(InterruptedException e) { + logger.debug("Authentication failed.", e); + future.setException(e); + } + }).initiate(config.getAuthMechanismToUse()); + + try { + logger.trace("Waiting until authentication completes.."); + future.get(); + command.connectionSucceeded(connection); + } catch (InterruptedException e) { + command.connectionFailed(FailureType.AUTHENTICATION, e); + // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the + // interruption and respond to it if it wants to. + Thread.currentThread().interrupt(); + } catch (ExecutionException e) { + command.connectionFailed(FailureType.AUTHENTICATION, e); + } + } + + @Override + public void connectionFailed(FailureType type, Throwable t) { + logger.debug("Authentication failed.", t); + command.connectionFailed(FailureType.AUTHENTICATION, t); + } } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java index d6d83e5d3..625ab25ae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java @@ -24,13 +24,15 @@ import java.util.UUID; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.BitData.RpcType; -import org.apache.drill.exec.rpc.RemoteConnection; +import org.apache.drill.exec.rpc.AbstractClientConnection; import org.apache.drill.exec.rpc.RpcOutcomeListener; import com.google.protobuf.MessageLite; +import org.slf4j.Logger; -public class DataClientConnection extends RemoteConnection{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataClientConnection.class); +// data connection on client-side (i.e. bit making request or sending data) +public class DataClientConnection extends AbstractClientConnection { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataClientConnection.class); private final DataClient client; private final UUID id; @@ -38,7 +40,6 @@ public class DataClientConnection extends RemoteConnection{ public DataClientConnection(SocketChannel channel, DataClient client) { super(channel, "data client"); this.client = client; - // we use a local listener pool unless a global one is provided. this.id = UUID.randomUUID(); } @@ -47,10 +48,10 @@ public class DataClientConnection extends RemoteConnection{ return client.getAllocator(); } - public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> outcomeListener, RpcType rpcType, - SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) { + public <SEND extends MessageLite, RECEIVE extends MessageLite> + void send(RpcOutcomeListener<RECEIVE> outcomeListener, RpcType rpcType, SEND protobufBody, + Class<RECEIVE> clazz, ByteBuf... dataBodies) { client.send(outcomeListener, this, rpcType, protobufBody, clazz, dataBodies); - } @Override @@ -83,5 +84,8 @@ public class DataClientConnection extends RemoteConnection{ return true; } - + @Override + protected Logger getLogger() { + return logger; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionConfig.java index 44c8ddd42..0d03d7fdc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionConfig.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * <p> * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -17,24 +17,30 @@ */ package org.apache.drill.exec.rpc.data; -import io.netty.channel.socket.SocketChannel; - +import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.rpc.RemoteConnection; +import org.apache.drill.exec.rpc.BitConnectionConfig; +import org.apache.drill.exec.server.BootStrapContext; -public class BitServerConnection extends RemoteConnection{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitServerConnection.class); +// config for bit to bit data connection +// package private +class DataConnectionConfig extends BitConnectionConfig { +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataConnectionConfig.class); - private final BufferAllocator allocator; + private final DataServerRequestHandler handler; - public BitServerConnection(SocketChannel channel, BufferAllocator allocator) { - super(channel, "data server"); - this.allocator = allocator; + DataConnectionConfig(BufferAllocator allocator, BootStrapContext context, DataServerRequestHandler handler) + throws DrillbitStartupException { + super(allocator, context); + this.handler = handler; } @Override - public BufferAllocator getAllocator() { - return allocator; + public String getName() { + return "data server"; } + DataServerRequestHandler getMessageHandler() { + return handler; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java index 5c71b91cb..25c83b380 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java @@ -37,40 +37,27 @@ public class DataConnectionCreator implements AutoCloseable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataConnectionCreator.class); private volatile DataServer server; - private final BootStrapContext context; - private final WorkEventBus workBus; - private final WorkerBee bee; - private final boolean allowPortHunting; - private ConcurrentMap<DrillbitEndpoint, DataConnectionManager> connectionManager = Maps.newConcurrentMap(); - private final BufferAllocator dataAllocator; + private final ConcurrentMap<DrillbitEndpoint, DataConnectionManager> connectionManager = Maps.newConcurrentMap(); - public DataConnectionCreator( - BootStrapContext context, - BufferAllocator allocator, - WorkEventBus workBus, - WorkerBee bee, - boolean allowPortHunting) { - super(); - this.context = context; - this.workBus = workBus; - this.bee = bee; - this.allowPortHunting = allowPortHunting; - this.dataAllocator = allocator; + private final DataConnectionConfig config; + + public DataConnectionCreator(BootStrapContext context, BufferAllocator allocator, WorkEventBus workBus, + WorkerBee bee) throws DrillbitStartupException { + config = new DataConnectionConfig(allocator, context, new DataServerRequestHandler(workBus, bee)); } - public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws DrillbitStartupException { - server = new DataServer(context, dataAllocator, workBus, bee); + public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint, boolean allowPortHunting) { + server = new DataServer(config); int port = partialEndpoint.getControlPort() + 1; - if (context.getConfig().hasPath(ExecConstants.INITIAL_DATA_PORT)) { - port = context.getConfig().getInt(ExecConstants.INITIAL_DATA_PORT); + if (config.getBootstrapContext().getConfig().hasPath(ExecConstants.INITIAL_DATA_PORT)) { + port = config.getBootstrapContext().getConfig().getInt(ExecConstants.INITIAL_DATA_PORT); } port = server.bind(port, allowPortHunting); - DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setDataPort(port).build(); - return completeEndpoint; + return partialEndpoint.toBuilder().setDataPort(port).build(); } public DataTunnel getTunnel(DrillbitEndpoint endpoint) { - DataConnectionManager newManager = new DataConnectionManager(endpoint, context); + DataConnectionManager newManager = new DataConnectionManager(endpoint, config); DataConnectionManager oldManager = connectionManager.putIfAbsent(endpoint, newManager); if(oldManager != null){ newManager = oldManager; @@ -80,7 +67,7 @@ public class DataConnectionCreator implements AutoCloseable { @Override public void close() throws Exception { - AutoCloseables.close(server, dataAllocator); + AutoCloseables.close(server, config.getAllocator()); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionManager.java index 267b7e3ce..f620a80cc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionManager.java @@ -21,14 +21,12 @@ import org.apache.drill.exec.proto.BitData.BitClientHandshake; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.UserBitShared.RpcChannel; import org.apache.drill.exec.rpc.ReconnectingConnection; -import org.apache.drill.exec.server.BootStrapContext; public class DataConnectionManager extends ReconnectingConnection<DataClientConnection, BitClientHandshake>{ +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataConnectionManager.class); - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataConnectionManager.class); - - private final DrillbitEndpoint endpoint; - private final BootStrapContext context; + private final DrillbitEndpoint remoteEndpoint; + private final DataConnectionConfig config; private final static BitClientHandshake HANDSHAKE = BitClientHandshake // .newBuilder() // @@ -36,15 +34,15 @@ public class DataConnectionManager extends ReconnectingConnection<DataClientConn .setChannel(RpcChannel.BIT_DATA) // .build(); - public DataConnectionManager(DrillbitEndpoint endpoint, BootStrapContext context) { - super(HANDSHAKE, endpoint.getAddress(), endpoint.getDataPort()); - this.endpoint = endpoint; - this.context = context; + public DataConnectionManager(DrillbitEndpoint remoteEndpoint, DataConnectionConfig config) { + super(HANDSHAKE, remoteEndpoint.getAddress(), remoteEndpoint.getDataPort()); + this.remoteEndpoint = remoteEndpoint; + this.config = config; } @Override protected DataClient getNewClient() { - return new DataClient(endpoint, context, new CloseHandlerCreator()); + return new DataClient(remoteEndpoint, config, new CloseHandlerCreator()); } }
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataDefaultInstanceHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataDefaultInstanceHandler.java index fab275ac6..1dfe4c053 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataDefaultInstanceHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataDefaultInstanceHandler.java @@ -22,6 +22,7 @@ import org.apache.drill.exec.proto.BitData.BitServerHandshake; import org.apache.drill.exec.proto.BitData.FragmentRecordBatch; import org.apache.drill.exec.proto.BitData.RpcType; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; +import org.apache.drill.exec.proto.UserBitShared.SaslMessage; import org.apache.drill.exec.rpc.RpcException; import com.google.protobuf.MessageLite; @@ -29,13 +30,14 @@ import com.google.protobuf.MessageLite; public class DataDefaultInstanceHandler { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataDefaultInstanceHandler.class); - public static MessageLite getResponseDefaultInstanceClient(int rpcType) throws RpcException { switch (rpcType) { case RpcType.ACK_VALUE: return Ack.getDefaultInstance(); case RpcType.HANDSHAKE_VALUE: return BitServerHandshake.getDefaultInstance(); + case RpcType.SASL_MESSAGE_VALUE: + return SaslMessage.getDefaultInstance(); default: throw new UnsupportedOperationException(); @@ -50,6 +52,8 @@ public class DataDefaultInstanceHandler { return BitClientHandshake.getDefaultInstance(); case RpcType.REQ_RECORD_BATCH_VALUE: return FragmentRecordBatch.getDefaultInstance(); + case RpcType.SASL_MESSAGE_VALUE: + return SaslMessage.getDefaultInstance(); default: throw new UnsupportedOperationException(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java index cd3cdfe46..88bd37944 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java @@ -26,6 +26,7 @@ import org.apache.drill.exec.proto.BitData.BitServerHandshake; import org.apache.drill.exec.proto.BitData.FragmentRecordBatch; import org.apache.drill.exec.proto.BitData.RpcType; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; +import org.apache.drill.exec.proto.UserBitShared.SaslMessage; import org.apache.drill.exec.rpc.Acks; import org.apache.drill.exec.rpc.Response; import org.apache.drill.exec.rpc.RpcConfig; @@ -40,6 +41,7 @@ public class DataRpcConfig { .timeout(config.getInt(ExecConstants.BIT_RPC_TIMEOUT)) .add(RpcType.HANDSHAKE, BitClientHandshake.class, RpcType.HANDSHAKE, BitServerHandshake.class) .add(RpcType.REQ_RECORD_BATCH, FragmentRecordBatch.class, RpcType.ACK, Ack.class) + .add(RpcType.SASL_MESSAGE, SaslMessage.class, RpcType.SASL_MESSAGE, SaslMessage.class) .build(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java index 03118d754..33270fd86 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,55 +17,34 @@ */ package org.apache.drill.exec.rpc.data; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.DrillBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.socket.SocketChannel; import io.netty.util.concurrent.GenericFutureListener; -import java.io.IOException; -import java.util.concurrent.ThreadLocalRandom; - -import org.apache.drill.exec.exception.FragmentSetupException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.BitData.BitClientHandshake; import org.apache.drill.exec.proto.BitData.BitServerHandshake; -import org.apache.drill.exec.proto.BitData.FragmentRecordBatch; import org.apache.drill.exec.proto.BitData.RpcType; -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.UserBitShared.RpcChannel; -import org.apache.drill.exec.proto.helper.QueryIdHelper; -import org.apache.drill.exec.rpc.Acks; import org.apache.drill.exec.rpc.BasicServer; import org.apache.drill.exec.rpc.OutOfMemoryHandler; import org.apache.drill.exec.rpc.ProtobufLengthDecoder; -import org.apache.drill.exec.rpc.Response; -import org.apache.drill.exec.rpc.ResponseSender; import org.apache.drill.exec.rpc.RpcException; -import org.apache.drill.exec.rpc.control.WorkEventBus; -import org.apache.drill.exec.server.BootStrapContext; -import org.apache.drill.exec.work.WorkManager.WorkerBee; -import org.apache.drill.exec.work.fragment.FragmentManager; import com.google.protobuf.MessageLite; -public class DataServer extends BasicServer<RpcType, BitServerConnection> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataServer.class); +public class DataServer extends BasicServer<RpcType, DataServerConnection> { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataServer.class); - private volatile ProxyCloseHandler proxyCloseHandler; - private final BootStrapContext context; - private final WorkEventBus workBus; - private final WorkerBee bee; + private final DataConnectionConfig config; - public DataServer(BootStrapContext context, BufferAllocator alloc, WorkEventBus workBus, - WorkerBee bee) { + public DataServer(DataConnectionConfig config) { super( - DataRpcConfig.getMapping(context.getConfig(), context.getExecutor()), - alloc.getAsByteBufAllocator(), - context.getBitLoopGroup()); - this.context = context; - this.workBus = workBus; - this.bee = bee; + DataRpcConfig.getMapping(config.getBootstrapContext().getConfig(), + config.getBootstrapContext().getExecutor()), + config.getAllocator().getAsByteBufAllocator(), + config.getBootstrapContext().getBitLoopGroup()); + this.config = config; } @Override @@ -74,19 +53,18 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> { } @Override - protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, BitServerConnection connection) { - this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(ch, connection)); - return proxyCloseHandler; + protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, DataServerConnection connection) { + return new ProxyCloseHandler(super.getCloseHandler(ch, connection)); } @Override - public BitServerConnection initRemoteConnection(SocketChannel channel) { + protected DataServerConnection initRemoteConnection(SocketChannel channel) { super.initRemoteConnection(channel); - return new BitServerConnection(channel, context.getAllocator()); + return new DataServerConnection(channel, config); } @Override - protected ServerHandshakeHandler<BitClientHandshake> getHandshakeHandler(final BitServerConnection connection) { + protected ServerHandshakeHandler<BitClientHandshake> getHandshakeHandler(final DataServerConnection connection) { return new ServerHandshakeHandler<BitClientHandshake>(RpcType.HANDSHAKE, BitClientHandshake.PARSER) { @Override @@ -101,79 +79,17 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> { inbound.getChannel())); } - return BitServerHandshake.newBuilder().setRpcVersion(DataRpcConfig.RPC_VERSION).build(); + final BitServerHandshake.Builder builder = BitServerHandshake.newBuilder(); + builder.setRpcVersion(DataRpcConfig.RPC_VERSION); + if (config.getAuthMechanismToUse() != null) { + builder.addAllAuthenticationMechanisms(config.getAuthProvider().getAllFactoryNames()); + } + return builder.build(); } }; } - private static FragmentHandle getHandle(FragmentRecordBatch batch, int index) { - return FragmentHandle.newBuilder() - .setQueryId(batch.getQueryId()) - .setMajorFragmentId(batch.getReceivingMajorFragmentId()) - .setMinorFragmentId(batch.getReceivingMinorFragmentId(index)) - .build(); - } - - private void submit(IncomingDataBatch batch, int minorStart, int minorStopExclusive) throws FragmentSetupException, - IOException { - for (int minor = minorStart; minor < minorStopExclusive; minor++) { - final FragmentManager manager = workBus.getFragmentManager(getHandle(batch.getHeader(), minor)); - if (manager == null) { - // A missing manager means the query already terminated. We can simply drop this data. - continue; - } - - final boolean canRun = manager.handle(batch); - if (canRun) { - // logger.debug("Arriving batch means local batch can run, starting local batch."); - /* - * If we've reached the canRun threshold, we'll proceed. This expects manager.handle() to only return a single - * true. This is guaranteed by the interface. - */ - bee.startFragmentPendingRemote(manager); - } - } - - } - - @Override - protected void handle(BitServerConnection connection, int rpcType, ByteBuf pBody, ByteBuf body, ResponseSender sender) throws RpcException { - assert rpcType == RpcType.REQ_RECORD_BATCH_VALUE; - - final FragmentRecordBatch fragmentBatch = get(pBody, FragmentRecordBatch.PARSER); - final AckSender ack = new AckSender(sender); - - - // increment so we don't get false returns. - ack.increment(); - - try { - - final IncomingDataBatch batch = new IncomingDataBatch(fragmentBatch, (DrillBuf) body, ack); - final int targetCount = fragmentBatch.getReceivingMinorFragmentIdCount(); - - // randomize who gets first transfer (and thus ownership) so memory usage is balanced when we're sharing amongst - // multiple fragments. - final int firstOwner = ThreadLocalRandom.current().nextInt(targetCount); - submit(batch, firstOwner, targetCount); - submit(batch, 0, firstOwner); - - } catch (IOException | FragmentSetupException e) { - logger.error("Failure while getting fragment manager. {}", - QueryIdHelper.getQueryIdentifiers(fragmentBatch.getQueryId(), - fragmentBatch.getReceivingMajorFragmentId(), - fragmentBatch.getReceivingMinorFragmentIdList()), e); - ack.clear(); - sender.send(new Response(RpcType.ACK, Acks.FAIL)); - } finally { - - // decrement the extra reference we grabbed at the top. - ack.sendOk(); - } - } - - private class ProxyCloseHandler implements GenericFutureListener<ChannelFuture> { private volatile GenericFutureListener<ChannelFuture> handler; @@ -191,7 +107,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> { } @Override - public OutOfMemoryHandler getOutOfMemoryHandler() { + protected OutOfMemoryHandler getOutOfMemoryHandler() { return new OutOfMemoryHandler() { @Override public void handle() { @@ -201,7 +117,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> { } @Override - public ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) { + protected ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) { return new DataProtobufLengthDecoder.Server(allocator, outOfMemoryHandler); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerConnection.java new file mode 100644 index 000000000..70e262f31 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerConnection.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.rpc.data; + +import io.netty.channel.socket.SocketChannel; +import org.apache.drill.exec.proto.BitData.RpcType; +import org.apache.drill.exec.rpc.security.ServerAuthenticationHandler; +import org.apache.drill.exec.rpc.AbstractServerConnection; +import org.slf4j.Logger; + +// data connection on server-side (i.e. bit handling request or receiving data) +public class DataServerConnection extends AbstractServerConnection<DataServerConnection> { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataServerConnection.class); + + DataServerConnection(SocketChannel channel, DataConnectionConfig config) { + super(channel, config, config.getAuthMechanismToUse() == null + ? config.getMessageHandler() + : new ServerAuthenticationHandler<>(config.getMessageHandler(), + RpcType.SASL_MESSAGE_VALUE, RpcType.SASL_MESSAGE)); + } + + @Override + protected Logger getLogger() { + return logger; + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerRequestHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerRequestHandler.java new file mode 100644 index 000000000..ff2e4a082 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerRequestHandler.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.rpc.data; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.exception.FragmentSetupException; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.proto.BitData.FragmentRecordBatch; +import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.rpc.Acks; +import org.apache.drill.exec.rpc.RequestHandler; +import org.apache.drill.exec.rpc.Response; +import org.apache.drill.exec.rpc.ResponseSender; +import org.apache.drill.exec.rpc.RpcBus; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.control.WorkEventBus; +import org.apache.drill.exec.work.WorkManager; +import org.apache.drill.exec.work.fragment.FragmentManager; + +import java.io.IOException; +import java.util.concurrent.ThreadLocalRandom; + +// package private +class DataServerRequestHandler implements RequestHandler<DataServerConnection> { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataServerRequestHandler.class); + + private final WorkEventBus workBus; + private final WorkManager.WorkerBee bee; + + public DataServerRequestHandler(WorkEventBus workBus, WorkManager.WorkerBee bee) { + this.workBus = workBus; + this.bee = bee; + } + + @Override + public void handle(DataServerConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody, + ResponseSender sender) throws RpcException { + assert rpcType == BitData.RpcType.REQ_RECORD_BATCH_VALUE; + + final FragmentRecordBatch fragmentBatch = RpcBus.get(pBody, FragmentRecordBatch.PARSER); + final AckSender ack = new AckSender(sender); + + // increment so we don't get false returns. + ack.increment(); + + try { + final IncomingDataBatch batch = new IncomingDataBatch(fragmentBatch, (DrillBuf) dBody, ack); + final int targetCount = fragmentBatch.getReceivingMinorFragmentIdCount(); + + // randomize who gets first transfer (and thus ownership) so memory usage is balanced when we're sharing amongst + // multiple fragments. + final int firstOwner = ThreadLocalRandom.current().nextInt(targetCount); + submit(batch, firstOwner, targetCount); + submit(batch, 0, firstOwner); + + } catch (IOException | FragmentSetupException e) { + logger.error("Failure while getting fragment manager. {}", + QueryIdHelper.getQueryIdentifiers(fragmentBatch.getQueryId(), + fragmentBatch.getReceivingMajorFragmentId(), + fragmentBatch.getReceivingMinorFragmentIdList()), e); + ack.clear(); + sender.send(new Response(BitData.RpcType.ACK, Acks.FAIL)); + } finally { + + // decrement the extra reference we grabbed at the top. + ack.sendOk(); + } + } + + private void submit(IncomingDataBatch batch, int minorStart, int minorStopExclusive) throws FragmentSetupException, + IOException { + for (int minor = minorStart; minor < minorStopExclusive; minor++) { + final FragmentManager manager = workBus.getFragmentManager(getHandle(batch.getHeader(), minor)); + if (manager == null) { + // A missing manager means the query already terminated. We can simply drop this data. + continue; + } + + final boolean canRun = manager.handle(batch); + if (canRun) { + // logger.debug("Arriving batch means local batch can run, starting local batch."); + /* + * If we've reached the canRun threshold, we'll proceed. This expects manager.handle() to only return a single + * true. This is guaranteed by the interface. + */ + bee.startFragmentPendingRemote(manager); + } + } + } + + private static FragmentHandle getHandle(final FragmentRecordBatch batch, int index) { + return FragmentHandle.newBuilder() + .setQueryId(batch.getQueryId()) + .setMajorFragmentId(batch.getReceivingMajorFragmentId()) + .setMinorFragmentId(batch.getReceivingMinorFragmentId(index)) + .build(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java index c38138c09..e7c0ee571 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java @@ -33,7 +33,6 @@ import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.RpcOutcomeListener; import org.apache.drill.exec.testing.ControlsInjector; import org.apache.drill.exec.testing.ExecutionControls; -import org.apache.drill.exec.testing.ExecutionControlsInjector; public class DataTunnel { |