aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/rpc
diff options
context:
space:
mode:
authorSudheesh Katkam <sudheesh@apache.org>2017-01-25 19:04:33 -0800
committerSudheesh Katkam <sudheesh@apache.org>2017-02-24 19:01:43 -0800
commite2582a10bfb6475e691be326c6d0498a697e6a0f (patch)
treeb293a919a84463a9fee19457a4a0dea9abab01c8 /exec/java-exec/src/main/java/org/apache/drill/exec/rpc
parent180dd5648ab604b6396d91ba69a2f777f19bf79c (diff)
DRILL-4280: CORE (bit to bit authentication, data)
+ Support authentication in DataServer and DataClient + Add AuthenticationCommand as an initial command after handshake and before the command that initiates a connection + Add DataConnectionConfig to encapsulate configuration + Add DataServerRequestHandler to encapsulate all handling of requests to DataServer data
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/rpc')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java145
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java20
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionConfig.java (renamed from exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java)34
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java39
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionManager.java18
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataDefaultInstanceHandler.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java130
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerConnection.java42
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerRequestHandler.java115
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java1
11 files changed, 365 insertions, 187 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
index 9db551b46..a37008d5d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClient.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.rpc.data;
+import com.google.common.util.concurrent.SettableFuture;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.SocketChannel;
@@ -30,42 +31,54 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.rpc.BasicClient;
import org.apache.drill.exec.rpc.OutOfMemoryHandler;
import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
-import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.ResponseSender;
+import org.apache.drill.exec.rpc.RpcCommand;
import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.rpc.security.AuthenticationOutcomeListener;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
import com.google.protobuf.MessageLite;
+import org.apache.hadoop.security.UserGroupInformation;
-public class DataClient extends BasicClient<RpcType, DataClientConnection, BitClientHandshake, BitServerHandshake>{
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataClient.class);
+public class DataClient extends BasicClient<RpcType, DataClientConnection, BitClientHandshake, BitServerHandshake> {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataClient.class);
+ private final DrillbitEndpoint remoteEndpoint;
private volatile DataClientConnection connection;
- private final BufferAllocator allocator;
private final DataConnectionManager.CloseHandlerCreator closeHandlerFactory;
+ private final DataConnectionConfig config;
-
- public DataClient(DrillbitEndpoint remoteEndpoint, BootStrapContext context, DataConnectionManager.CloseHandlerCreator closeHandlerFactory) {
+ public DataClient(DrillbitEndpoint remoteEndpoint, DataConnectionConfig config,
+ DataConnectionManager.CloseHandlerCreator closeHandlerFactory) {
super(
- DataRpcConfig.getMapping(context.getConfig(), context.getExecutor()),
- context.getAllocator().getAsByteBufAllocator(),
- context.getBitClientLoopGroup(),
+ DataRpcConfig.getMapping(config.getBootstrapContext().getConfig(),
+ config.getBootstrapContext().getExecutor()),
+ config.getBootstrapContext().getAllocator().getAsByteBufAllocator(),
+ config.getBootstrapContext().getBitClientLoopGroup(),
RpcType.HANDSHAKE,
BitServerHandshake.class,
BitServerHandshake.PARSER);
+
+ this.remoteEndpoint = remoteEndpoint;
+ this.config = config;
this.closeHandlerFactory = closeHandlerFactory;
- this.allocator = context.getAllocator();
}
@Override
- public DataClientConnection initRemoteConnection(SocketChannel channel) {
+ protected DataClientConnection initRemoteConnection(SocketChannel channel) {
super.initRemoteConnection(channel);
this.connection = new DataClientConnection(channel, this);
return connection;
}
@Override
- protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, DataClientConnection clientConnection) {
+ protected GenericFutureListener<ChannelFuture>
+ getCloseHandler(SocketChannel ch, DataClientConnection clientConnection) {
return closeHandlerFactory.getHandler(clientConnection, super.getCloseHandler(ch, clientConnection));
}
@@ -75,27 +88,119 @@ public class DataClient extends BasicClient<RpcType, DataClientConnection, BitCl
}
@Override
- protected Response handle(DataClientConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+ protected void handle(DataClientConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody,
+ ResponseSender sender) throws RpcException {
throw new UnsupportedOperationException("DataClient is unidirectional by design.");
}
BufferAllocator getAllocator() {
- return allocator;
+ return config.getAllocator();
}
@Override
protected void validateHandshake(BitServerHandshake handshake) throws RpcException {
if (handshake.getRpcVersion() != DataRpcConfig.RPC_VERSION) {
- throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.", handshake.getRpcVersion(), DataRpcConfig.RPC_VERSION));
+ throw new RpcException(String.format("Invalid rpc version. Expected %d, actual %d.",
+ handshake.getRpcVersion(), DataRpcConfig.RPC_VERSION));
+ }
+
+ if (handshake.getAuthenticationMechanismsCount() != 0) { // remote requires authentication
+ final SaslClient saslClient;
+ try {
+ saslClient = config.getAuthFactory(handshake.getAuthenticationMechanismsList())
+ .createSaslClient(UserGroupInformation.getLoginUser(),
+ config.getSaslClientProperties(remoteEndpoint));
+ } catch (final IOException e) {
+ throw new RpcException(String.format("Failed to initiate authenticate to %s", remoteEndpoint.getAddress()), e);
+ }
+ if (saslClient == null) {
+ throw new RpcException("Unexpected failure. Could not initiate SASL exchange.");
+ }
+ connection.setSaslClient(saslClient);
+ } else {
+ if (config.getAuthMechanismToUse() != null) {
+ throw new RpcException(String.format("Drillbit (%s) does not require auth, but auth is enabled.",
+ remoteEndpoint.getAddress()));
+ }
}
}
@Override
- protected void finalizeConnection(BitServerHandshake handshake, DataClientConnection connection) {
+ protected <M extends MessageLite> RpcCommand<M, DataClientConnection>
+ getInitialCommand(final RpcCommand<M, DataClientConnection> command) {
+ final RpcCommand<M, DataClientConnection> initialCommand = super.getInitialCommand(command);
+ if (config.getAuthMechanismToUse() == null) {
+ return initialCommand;
+ } else {
+ return new AuthenticationCommand<>(initialCommand);
+ }
}
- public DataClientConnection getConnection() {
- return this.connection;
+ private class AuthenticationCommand<M extends MessageLite> implements RpcCommand<M, DataClientConnection> {
+
+ private final RpcCommand<M, DataClientConnection> command;
+
+ AuthenticationCommand(RpcCommand<M, DataClientConnection> command) {
+ this.command = command;
+ }
+
+ @Override
+ public void connectionAvailable(DataClientConnection connection) {
+ command.connectionFailed(FailureType.AUTHENTICATION, new SaslException("Should not reach here."));
+ }
+
+ @Override
+ public void connectionSucceeded(final DataClientConnection connection) {
+ final UserGroupInformation loginUser;
+ try {
+ loginUser = UserGroupInformation.getLoginUser();
+ } catch (final IOException e) {
+ logger.debug("Unexpected failure trying to login.", e);
+ command.connectionFailed(FailureType.AUTHENTICATION, e);
+ return;
+ }
+
+ final SettableFuture<Void> future = SettableFuture.create();
+ new AuthenticationOutcomeListener<>(DataClient.this, connection, RpcType.SASL_MESSAGE,
+ loginUser,
+ new RpcOutcomeListener<Void>() {
+ @Override
+ public void failed(RpcException ex) {
+ logger.debug("Authentication failed.", ex);
+ future.setException(ex);
+ }
+
+ @Override
+ public void success(Void value, ByteBuf buffer) {
+ future.set(null);
+ }
+
+ @Override
+ public void interrupted(InterruptedException e) {
+ logger.debug("Authentication failed.", e);
+ future.setException(e);
+ }
+ }).initiate(config.getAuthMechanismToUse());
+
+ try {
+ logger.trace("Waiting until authentication completes..");
+ future.get();
+ command.connectionSucceeded(connection);
+ } catch (InterruptedException e) {
+ command.connectionFailed(FailureType.AUTHENTICATION, e);
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
+ } catch (ExecutionException e) {
+ command.connectionFailed(FailureType.AUTHENTICATION, e);
+ }
+ }
+
+ @Override
+ public void connectionFailed(FailureType type, Throwable t) {
+ logger.debug("Authentication failed.", t);
+ command.connectionFailed(FailureType.AUTHENTICATION, t);
+ }
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
index d6d83e5d3..625ab25ae 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataClientConnection.java
@@ -24,13 +24,15 @@ import java.util.UUID;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.BitData.RpcType;
-import org.apache.drill.exec.rpc.RemoteConnection;
+import org.apache.drill.exec.rpc.AbstractClientConnection;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import com.google.protobuf.MessageLite;
+import org.slf4j.Logger;
-public class DataClientConnection extends RemoteConnection{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataClientConnection.class);
+// data connection on client-side (i.e. bit making request or sending data)
+public class DataClientConnection extends AbstractClientConnection {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataClientConnection.class);
private final DataClient client;
private final UUID id;
@@ -38,7 +40,6 @@ public class DataClientConnection extends RemoteConnection{
public DataClientConnection(SocketChannel channel, DataClient client) {
super(channel, "data client");
this.client = client;
- // we use a local listener pool unless a global one is provided.
this.id = UUID.randomUUID();
}
@@ -47,10 +48,10 @@ public class DataClientConnection extends RemoteConnection{
return client.getAllocator();
}
- public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutcomeListener<RECEIVE> outcomeListener, RpcType rpcType,
- SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
+ public <SEND extends MessageLite, RECEIVE extends MessageLite>
+ void send(RpcOutcomeListener<RECEIVE> outcomeListener, RpcType rpcType, SEND protobufBody,
+ Class<RECEIVE> clazz, ByteBuf... dataBodies) {
client.send(outcomeListener, this, rpcType, protobufBody, clazz, dataBodies);
-
}
@Override
@@ -83,5 +84,8 @@ public class DataClientConnection extends RemoteConnection{
return true;
}
-
+ @Override
+ protected Logger getLogger() {
+ return logger;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionConfig.java
index 44c8ddd42..0d03d7fdc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/BitServerConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionConfig.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,24 +17,30 @@
*/
package org.apache.drill.exec.rpc.data;
-import io.netty.channel.socket.SocketChannel;
-
+import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.memory.BufferAllocator;
-import org.apache.drill.exec.rpc.RemoteConnection;
+import org.apache.drill.exec.rpc.BitConnectionConfig;
+import org.apache.drill.exec.server.BootStrapContext;
-public class BitServerConnection extends RemoteConnection{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BitServerConnection.class);
+// config for bit to bit data connection
+// package private
+class DataConnectionConfig extends BitConnectionConfig {
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataConnectionConfig.class);
- private final BufferAllocator allocator;
+ private final DataServerRequestHandler handler;
- public BitServerConnection(SocketChannel channel, BufferAllocator allocator) {
- super(channel, "data server");
- this.allocator = allocator;
+ DataConnectionConfig(BufferAllocator allocator, BootStrapContext context, DataServerRequestHandler handler)
+ throws DrillbitStartupException {
+ super(allocator, context);
+ this.handler = handler;
}
@Override
- public BufferAllocator getAllocator() {
- return allocator;
+ public String getName() {
+ return "data server";
}
+ DataServerRequestHandler getMessageHandler() {
+ return handler;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
index 5c71b91cb..25c83b380 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
@@ -37,40 +37,27 @@ public class DataConnectionCreator implements AutoCloseable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataConnectionCreator.class);
private volatile DataServer server;
- private final BootStrapContext context;
- private final WorkEventBus workBus;
- private final WorkerBee bee;
- private final boolean allowPortHunting;
- private ConcurrentMap<DrillbitEndpoint, DataConnectionManager> connectionManager = Maps.newConcurrentMap();
- private final BufferAllocator dataAllocator;
+ private final ConcurrentMap<DrillbitEndpoint, DataConnectionManager> connectionManager = Maps.newConcurrentMap();
- public DataConnectionCreator(
- BootStrapContext context,
- BufferAllocator allocator,
- WorkEventBus workBus,
- WorkerBee bee,
- boolean allowPortHunting) {
- super();
- this.context = context;
- this.workBus = workBus;
- this.bee = bee;
- this.allowPortHunting = allowPortHunting;
- this.dataAllocator = allocator;
+ private final DataConnectionConfig config;
+
+ public DataConnectionCreator(BootStrapContext context, BufferAllocator allocator, WorkEventBus workBus,
+ WorkerBee bee) throws DrillbitStartupException {
+ config = new DataConnectionConfig(allocator, context, new DataServerRequestHandler(workBus, bee));
}
- public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws DrillbitStartupException {
- server = new DataServer(context, dataAllocator, workBus, bee);
+ public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint, boolean allowPortHunting) {
+ server = new DataServer(config);
int port = partialEndpoint.getControlPort() + 1;
- if (context.getConfig().hasPath(ExecConstants.INITIAL_DATA_PORT)) {
- port = context.getConfig().getInt(ExecConstants.INITIAL_DATA_PORT);
+ if (config.getBootstrapContext().getConfig().hasPath(ExecConstants.INITIAL_DATA_PORT)) {
+ port = config.getBootstrapContext().getConfig().getInt(ExecConstants.INITIAL_DATA_PORT);
}
port = server.bind(port, allowPortHunting);
- DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setDataPort(port).build();
- return completeEndpoint;
+ return partialEndpoint.toBuilder().setDataPort(port).build();
}
public DataTunnel getTunnel(DrillbitEndpoint endpoint) {
- DataConnectionManager newManager = new DataConnectionManager(endpoint, context);
+ DataConnectionManager newManager = new DataConnectionManager(endpoint, config);
DataConnectionManager oldManager = connectionManager.putIfAbsent(endpoint, newManager);
if(oldManager != null){
newManager = oldManager;
@@ -80,7 +67,7 @@ public class DataConnectionCreator implements AutoCloseable {
@Override
public void close() throws Exception {
- AutoCloseables.close(server, dataAllocator);
+ AutoCloseables.close(server, config.getAllocator());
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionManager.java
index 267b7e3ce..f620a80cc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionManager.java
@@ -21,14 +21,12 @@ import org.apache.drill.exec.proto.BitData.BitClientHandshake;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.UserBitShared.RpcChannel;
import org.apache.drill.exec.rpc.ReconnectingConnection;
-import org.apache.drill.exec.server.BootStrapContext;
public class DataConnectionManager extends ReconnectingConnection<DataClientConnection, BitClientHandshake>{
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataConnectionManager.class);
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataConnectionManager.class);
-
- private final DrillbitEndpoint endpoint;
- private final BootStrapContext context;
+ private final DrillbitEndpoint remoteEndpoint;
+ private final DataConnectionConfig config;
private final static BitClientHandshake HANDSHAKE = BitClientHandshake //
.newBuilder() //
@@ -36,15 +34,15 @@ public class DataConnectionManager extends ReconnectingConnection<DataClientConn
.setChannel(RpcChannel.BIT_DATA) //
.build();
- public DataConnectionManager(DrillbitEndpoint endpoint, BootStrapContext context) {
- super(HANDSHAKE, endpoint.getAddress(), endpoint.getDataPort());
- this.endpoint = endpoint;
- this.context = context;
+ public DataConnectionManager(DrillbitEndpoint remoteEndpoint, DataConnectionConfig config) {
+ super(HANDSHAKE, remoteEndpoint.getAddress(), remoteEndpoint.getDataPort());
+ this.remoteEndpoint = remoteEndpoint;
+ this.config = config;
}
@Override
protected DataClient getNewClient() {
- return new DataClient(endpoint, context, new CloseHandlerCreator());
+ return new DataClient(remoteEndpoint, config, new CloseHandlerCreator());
}
} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataDefaultInstanceHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataDefaultInstanceHandler.java
index fab275ac6..1dfe4c053 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataDefaultInstanceHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataDefaultInstanceHandler.java
@@ -22,6 +22,7 @@ import org.apache.drill.exec.proto.BitData.BitServerHandshake;
import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
import org.apache.drill.exec.proto.BitData.RpcType;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.SaslMessage;
import org.apache.drill.exec.rpc.RpcException;
import com.google.protobuf.MessageLite;
@@ -29,13 +30,14 @@ import com.google.protobuf.MessageLite;
public class DataDefaultInstanceHandler {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataDefaultInstanceHandler.class);
-
public static MessageLite getResponseDefaultInstanceClient(int rpcType) throws RpcException {
switch (rpcType) {
case RpcType.ACK_VALUE:
return Ack.getDefaultInstance();
case RpcType.HANDSHAKE_VALUE:
return BitServerHandshake.getDefaultInstance();
+ case RpcType.SASL_MESSAGE_VALUE:
+ return SaslMessage.getDefaultInstance();
default:
throw new UnsupportedOperationException();
@@ -50,6 +52,8 @@ public class DataDefaultInstanceHandler {
return BitClientHandshake.getDefaultInstance();
case RpcType.REQ_RECORD_BATCH_VALUE:
return FragmentRecordBatch.getDefaultInstance();
+ case RpcType.SASL_MESSAGE_VALUE:
+ return SaslMessage.getDefaultInstance();
default:
throw new UnsupportedOperationException();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
index cd3cdfe46..88bd37944 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataRpcConfig.java
@@ -26,6 +26,7 @@ import org.apache.drill.exec.proto.BitData.BitServerHandshake;
import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
import org.apache.drill.exec.proto.BitData.RpcType;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.SaslMessage;
import org.apache.drill.exec.rpc.Acks;
import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.RpcConfig;
@@ -40,6 +41,7 @@ public class DataRpcConfig {
.timeout(config.getInt(ExecConstants.BIT_RPC_TIMEOUT))
.add(RpcType.HANDSHAKE, BitClientHandshake.class, RpcType.HANDSHAKE, BitServerHandshake.class)
.add(RpcType.REQ_RECORD_BATCH, FragmentRecordBatch.class, RpcType.ACK, Ack.class)
+ .add(RpcType.SASL_MESSAGE, SaslMessage.class, RpcType.SASL_MESSAGE, SaslMessage.class)
.build();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index 03118d754..33270fd86 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,55 +17,34 @@
*/
package org.apache.drill.exec.rpc.data;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.DrillBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
-import java.io.IOException;
-import java.util.concurrent.ThreadLocalRandom;
-
-import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.BitData.BitClientHandshake;
import org.apache.drill.exec.proto.BitData.BitServerHandshake;
-import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
import org.apache.drill.exec.proto.BitData.RpcType;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.UserBitShared.RpcChannel;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
-import org.apache.drill.exec.rpc.Acks;
import org.apache.drill.exec.rpc.BasicServer;
import org.apache.drill.exec.rpc.OutOfMemoryHandler;
import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
-import org.apache.drill.exec.rpc.Response;
-import org.apache.drill.exec.rpc.ResponseSender;
import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.control.WorkEventBus;
-import org.apache.drill.exec.server.BootStrapContext;
-import org.apache.drill.exec.work.WorkManager.WorkerBee;
-import org.apache.drill.exec.work.fragment.FragmentManager;
import com.google.protobuf.MessageLite;
-public class DataServer extends BasicServer<RpcType, BitServerConnection> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataServer.class);
+public class DataServer extends BasicServer<RpcType, DataServerConnection> {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataServer.class);
- private volatile ProxyCloseHandler proxyCloseHandler;
- private final BootStrapContext context;
- private final WorkEventBus workBus;
- private final WorkerBee bee;
+ private final DataConnectionConfig config;
- public DataServer(BootStrapContext context, BufferAllocator alloc, WorkEventBus workBus,
- WorkerBee bee) {
+ public DataServer(DataConnectionConfig config) {
super(
- DataRpcConfig.getMapping(context.getConfig(), context.getExecutor()),
- alloc.getAsByteBufAllocator(),
- context.getBitLoopGroup());
- this.context = context;
- this.workBus = workBus;
- this.bee = bee;
+ DataRpcConfig.getMapping(config.getBootstrapContext().getConfig(),
+ config.getBootstrapContext().getExecutor()),
+ config.getAllocator().getAsByteBufAllocator(),
+ config.getBootstrapContext().getBitLoopGroup());
+ this.config = config;
}
@Override
@@ -74,19 +53,18 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
}
@Override
- protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, BitServerConnection connection) {
- this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(ch, connection));
- return proxyCloseHandler;
+ protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, DataServerConnection connection) {
+ return new ProxyCloseHandler(super.getCloseHandler(ch, connection));
}
@Override
- public BitServerConnection initRemoteConnection(SocketChannel channel) {
+ protected DataServerConnection initRemoteConnection(SocketChannel channel) {
super.initRemoteConnection(channel);
- return new BitServerConnection(channel, context.getAllocator());
+ return new DataServerConnection(channel, config);
}
@Override
- protected ServerHandshakeHandler<BitClientHandshake> getHandshakeHandler(final BitServerConnection connection) {
+ protected ServerHandshakeHandler<BitClientHandshake> getHandshakeHandler(final DataServerConnection connection) {
return new ServerHandshakeHandler<BitClientHandshake>(RpcType.HANDSHAKE, BitClientHandshake.PARSER) {
@Override
@@ -101,79 +79,17 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
inbound.getChannel()));
}
- return BitServerHandshake.newBuilder().setRpcVersion(DataRpcConfig.RPC_VERSION).build();
+ final BitServerHandshake.Builder builder = BitServerHandshake.newBuilder();
+ builder.setRpcVersion(DataRpcConfig.RPC_VERSION);
+ if (config.getAuthMechanismToUse() != null) {
+ builder.addAllAuthenticationMechanisms(config.getAuthProvider().getAllFactoryNames());
+ }
+ return builder.build();
}
};
}
- private static FragmentHandle getHandle(FragmentRecordBatch batch, int index) {
- return FragmentHandle.newBuilder()
- .setQueryId(batch.getQueryId())
- .setMajorFragmentId(batch.getReceivingMajorFragmentId())
- .setMinorFragmentId(batch.getReceivingMinorFragmentId(index))
- .build();
- }
-
- private void submit(IncomingDataBatch batch, int minorStart, int minorStopExclusive) throws FragmentSetupException,
- IOException {
- for (int minor = minorStart; minor < minorStopExclusive; minor++) {
- final FragmentManager manager = workBus.getFragmentManager(getHandle(batch.getHeader(), minor));
- if (manager == null) {
- // A missing manager means the query already terminated. We can simply drop this data.
- continue;
- }
-
- final boolean canRun = manager.handle(batch);
- if (canRun) {
- // logger.debug("Arriving batch means local batch can run, starting local batch.");
- /*
- * If we've reached the canRun threshold, we'll proceed. This expects manager.handle() to only return a single
- * true. This is guaranteed by the interface.
- */
- bee.startFragmentPendingRemote(manager);
- }
- }
-
- }
-
- @Override
- protected void handle(BitServerConnection connection, int rpcType, ByteBuf pBody, ByteBuf body, ResponseSender sender) throws RpcException {
- assert rpcType == RpcType.REQ_RECORD_BATCH_VALUE;
-
- final FragmentRecordBatch fragmentBatch = get(pBody, FragmentRecordBatch.PARSER);
- final AckSender ack = new AckSender(sender);
-
-
- // increment so we don't get false returns.
- ack.increment();
-
- try {
-
- final IncomingDataBatch batch = new IncomingDataBatch(fragmentBatch, (DrillBuf) body, ack);
- final int targetCount = fragmentBatch.getReceivingMinorFragmentIdCount();
-
- // randomize who gets first transfer (and thus ownership) so memory usage is balanced when we're sharing amongst
- // multiple fragments.
- final int firstOwner = ThreadLocalRandom.current().nextInt(targetCount);
- submit(batch, firstOwner, targetCount);
- submit(batch, 0, firstOwner);
-
- } catch (IOException | FragmentSetupException e) {
- logger.error("Failure while getting fragment manager. {}",
- QueryIdHelper.getQueryIdentifiers(fragmentBatch.getQueryId(),
- fragmentBatch.getReceivingMajorFragmentId(),
- fragmentBatch.getReceivingMinorFragmentIdList()), e);
- ack.clear();
- sender.send(new Response(RpcType.ACK, Acks.FAIL));
- } finally {
-
- // decrement the extra reference we grabbed at the top.
- ack.sendOk();
- }
- }
-
-
private class ProxyCloseHandler implements GenericFutureListener<ChannelFuture> {
private volatile GenericFutureListener<ChannelFuture> handler;
@@ -191,7 +107,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
}
@Override
- public OutOfMemoryHandler getOutOfMemoryHandler() {
+ protected OutOfMemoryHandler getOutOfMemoryHandler() {
return new OutOfMemoryHandler() {
@Override
public void handle() {
@@ -201,7 +117,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
}
@Override
- public ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) {
+ protected ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) {
return new DataProtobufLengthDecoder.Server(allocator, outOfMemoryHandler);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerConnection.java
new file mode 100644
index 000000000..70e262f31
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerConnection.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.data;
+
+import io.netty.channel.socket.SocketChannel;
+import org.apache.drill.exec.proto.BitData.RpcType;
+import org.apache.drill.exec.rpc.security.ServerAuthenticationHandler;
+import org.apache.drill.exec.rpc.AbstractServerConnection;
+import org.slf4j.Logger;
+
+// data connection on server-side (i.e. bit handling request or receiving data)
+public class DataServerConnection extends AbstractServerConnection<DataServerConnection> {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataServerConnection.class);
+
+ DataServerConnection(SocketChannel channel, DataConnectionConfig config) {
+ super(channel, config, config.getAuthMechanismToUse() == null
+ ? config.getMessageHandler()
+ : new ServerAuthenticationHandler<>(config.getMessageHandler(),
+ RpcType.SASL_MESSAGE_VALUE, RpcType.SASL_MESSAGE));
+ }
+
+ @Override
+ protected Logger getLogger() {
+ return logger;
+ }
+
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerRequestHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerRequestHandler.java
new file mode 100644
index 000000000..ff2e4a082
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServerRequestHandler.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.data;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.exception.FragmentSetupException;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
+import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.rpc.Acks;
+import org.apache.drill.exec.rpc.RequestHandler;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.ResponseSender;
+import org.apache.drill.exec.rpc.RpcBus;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.control.WorkEventBus;
+import org.apache.drill.exec.work.WorkManager;
+import org.apache.drill.exec.work.fragment.FragmentManager;
+
+import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
+
+// package private
+class DataServerRequestHandler implements RequestHandler<DataServerConnection> {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataServerRequestHandler.class);
+
+ private final WorkEventBus workBus;
+ private final WorkManager.WorkerBee bee;
+
+ public DataServerRequestHandler(WorkEventBus workBus, WorkManager.WorkerBee bee) {
+ this.workBus = workBus;
+ this.bee = bee;
+ }
+
+ @Override
+ public void handle(DataServerConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody,
+ ResponseSender sender) throws RpcException {
+ assert rpcType == BitData.RpcType.REQ_RECORD_BATCH_VALUE;
+
+ final FragmentRecordBatch fragmentBatch = RpcBus.get(pBody, FragmentRecordBatch.PARSER);
+ final AckSender ack = new AckSender(sender);
+
+ // increment so we don't get false returns.
+ ack.increment();
+
+ try {
+ final IncomingDataBatch batch = new IncomingDataBatch(fragmentBatch, (DrillBuf) dBody, ack);
+ final int targetCount = fragmentBatch.getReceivingMinorFragmentIdCount();
+
+ // randomize who gets first transfer (and thus ownership) so memory usage is balanced when we're sharing amongst
+ // multiple fragments.
+ final int firstOwner = ThreadLocalRandom.current().nextInt(targetCount);
+ submit(batch, firstOwner, targetCount);
+ submit(batch, 0, firstOwner);
+
+ } catch (IOException | FragmentSetupException e) {
+ logger.error("Failure while getting fragment manager. {}",
+ QueryIdHelper.getQueryIdentifiers(fragmentBatch.getQueryId(),
+ fragmentBatch.getReceivingMajorFragmentId(),
+ fragmentBatch.getReceivingMinorFragmentIdList()), e);
+ ack.clear();
+ sender.send(new Response(BitData.RpcType.ACK, Acks.FAIL));
+ } finally {
+
+ // decrement the extra reference we grabbed at the top.
+ ack.sendOk();
+ }
+ }
+
+ private void submit(IncomingDataBatch batch, int minorStart, int minorStopExclusive) throws FragmentSetupException,
+ IOException {
+ for (int minor = minorStart; minor < minorStopExclusive; minor++) {
+ final FragmentManager manager = workBus.getFragmentManager(getHandle(batch.getHeader(), minor));
+ if (manager == null) {
+ // A missing manager means the query already terminated. We can simply drop this data.
+ continue;
+ }
+
+ final boolean canRun = manager.handle(batch);
+ if (canRun) {
+ // logger.debug("Arriving batch means local batch can run, starting local batch.");
+ /*
+ * If we've reached the canRun threshold, we'll proceed. This expects manager.handle() to only return a single
+ * true. This is guaranteed by the interface.
+ */
+ bee.startFragmentPendingRemote(manager);
+ }
+ }
+ }
+
+ private static FragmentHandle getHandle(final FragmentRecordBatch batch, int index) {
+ return FragmentHandle.newBuilder()
+ .setQueryId(batch.getQueryId())
+ .setMajorFragmentId(batch.getReceivingMajorFragmentId())
+ .setMinorFragmentId(batch.getReceivingMinorFragmentId(index))
+ .build();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
index c38138c09..e7c0ee571 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
@@ -33,7 +33,6 @@ import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ExecutionControls;
-import org.apache.drill.exec.testing.ExecutionControlsInjector;
public class DataTunnel {