aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java130
1 files changed, 23 insertions, 107 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index 03118d754..33270fd86 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,55 +17,34 @@
*/
package org.apache.drill.exec.rpc.data;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.DrillBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
-import java.io.IOException;
-import java.util.concurrent.ThreadLocalRandom;
-
-import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.BitData.BitClientHandshake;
import org.apache.drill.exec.proto.BitData.BitServerHandshake;
-import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
import org.apache.drill.exec.proto.BitData.RpcType;
-import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.UserBitShared.RpcChannel;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
-import org.apache.drill.exec.rpc.Acks;
import org.apache.drill.exec.rpc.BasicServer;
import org.apache.drill.exec.rpc.OutOfMemoryHandler;
import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
-import org.apache.drill.exec.rpc.Response;
-import org.apache.drill.exec.rpc.ResponseSender;
import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.control.WorkEventBus;
-import org.apache.drill.exec.server.BootStrapContext;
-import org.apache.drill.exec.work.WorkManager.WorkerBee;
-import org.apache.drill.exec.work.fragment.FragmentManager;
import com.google.protobuf.MessageLite;
-public class DataServer extends BasicServer<RpcType, BitServerConnection> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataServer.class);
+public class DataServer extends BasicServer<RpcType, DataServerConnection> {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataServer.class);
- private volatile ProxyCloseHandler proxyCloseHandler;
- private final BootStrapContext context;
- private final WorkEventBus workBus;
- private final WorkerBee bee;
+ private final DataConnectionConfig config;
- public DataServer(BootStrapContext context, BufferAllocator alloc, WorkEventBus workBus,
- WorkerBee bee) {
+ public DataServer(DataConnectionConfig config) {
super(
- DataRpcConfig.getMapping(context.getConfig(), context.getExecutor()),
- alloc.getAsByteBufAllocator(),
- context.getBitLoopGroup());
- this.context = context;
- this.workBus = workBus;
- this.bee = bee;
+ DataRpcConfig.getMapping(config.getBootstrapContext().getConfig(),
+ config.getBootstrapContext().getExecutor()),
+ config.getAllocator().getAsByteBufAllocator(),
+ config.getBootstrapContext().getBitLoopGroup());
+ this.config = config;
}
@Override
@@ -74,19 +53,18 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
}
@Override
- protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, BitServerConnection connection) {
- this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(ch, connection));
- return proxyCloseHandler;
+ protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, DataServerConnection connection) {
+ return new ProxyCloseHandler(super.getCloseHandler(ch, connection));
}
@Override
- public BitServerConnection initRemoteConnection(SocketChannel channel) {
+ protected DataServerConnection initRemoteConnection(SocketChannel channel) {
super.initRemoteConnection(channel);
- return new BitServerConnection(channel, context.getAllocator());
+ return new DataServerConnection(channel, config);
}
@Override
- protected ServerHandshakeHandler<BitClientHandshake> getHandshakeHandler(final BitServerConnection connection) {
+ protected ServerHandshakeHandler<BitClientHandshake> getHandshakeHandler(final DataServerConnection connection) {
return new ServerHandshakeHandler<BitClientHandshake>(RpcType.HANDSHAKE, BitClientHandshake.PARSER) {
@Override
@@ -101,79 +79,17 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
inbound.getChannel()));
}
- return BitServerHandshake.newBuilder().setRpcVersion(DataRpcConfig.RPC_VERSION).build();
+ final BitServerHandshake.Builder builder = BitServerHandshake.newBuilder();
+ builder.setRpcVersion(DataRpcConfig.RPC_VERSION);
+ if (config.getAuthMechanismToUse() != null) {
+ builder.addAllAuthenticationMechanisms(config.getAuthProvider().getAllFactoryNames());
+ }
+ return builder.build();
}
};
}
- private static FragmentHandle getHandle(FragmentRecordBatch batch, int index) {
- return FragmentHandle.newBuilder()
- .setQueryId(batch.getQueryId())
- .setMajorFragmentId(batch.getReceivingMajorFragmentId())
- .setMinorFragmentId(batch.getReceivingMinorFragmentId(index))
- .build();
- }
-
- private void submit(IncomingDataBatch batch, int minorStart, int minorStopExclusive) throws FragmentSetupException,
- IOException {
- for (int minor = minorStart; minor < minorStopExclusive; minor++) {
- final FragmentManager manager = workBus.getFragmentManager(getHandle(batch.getHeader(), minor));
- if (manager == null) {
- // A missing manager means the query already terminated. We can simply drop this data.
- continue;
- }
-
- final boolean canRun = manager.handle(batch);
- if (canRun) {
- // logger.debug("Arriving batch means local batch can run, starting local batch.");
- /*
- * If we've reached the canRun threshold, we'll proceed. This expects manager.handle() to only return a single
- * true. This is guaranteed by the interface.
- */
- bee.startFragmentPendingRemote(manager);
- }
- }
-
- }
-
- @Override
- protected void handle(BitServerConnection connection, int rpcType, ByteBuf pBody, ByteBuf body, ResponseSender sender) throws RpcException {
- assert rpcType == RpcType.REQ_RECORD_BATCH_VALUE;
-
- final FragmentRecordBatch fragmentBatch = get(pBody, FragmentRecordBatch.PARSER);
- final AckSender ack = new AckSender(sender);
-
-
- // increment so we don't get false returns.
- ack.increment();
-
- try {
-
- final IncomingDataBatch batch = new IncomingDataBatch(fragmentBatch, (DrillBuf) body, ack);
- final int targetCount = fragmentBatch.getReceivingMinorFragmentIdCount();
-
- // randomize who gets first transfer (and thus ownership) so memory usage is balanced when we're sharing amongst
- // multiple fragments.
- final int firstOwner = ThreadLocalRandom.current().nextInt(targetCount);
- submit(batch, firstOwner, targetCount);
- submit(batch, 0, firstOwner);
-
- } catch (IOException | FragmentSetupException e) {
- logger.error("Failure while getting fragment manager. {}",
- QueryIdHelper.getQueryIdentifiers(fragmentBatch.getQueryId(),
- fragmentBatch.getReceivingMajorFragmentId(),
- fragmentBatch.getReceivingMinorFragmentIdList()), e);
- ack.clear();
- sender.send(new Response(RpcType.ACK, Acks.FAIL));
- } finally {
-
- // decrement the extra reference we grabbed at the top.
- ack.sendOk();
- }
- }
-
-
private class ProxyCloseHandler implements GenericFutureListener<ChannelFuture> {
private volatile GenericFutureListener<ChannelFuture> handler;
@@ -191,7 +107,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
}
@Override
- public OutOfMemoryHandler getOutOfMemoryHandler() {
+ protected OutOfMemoryHandler getOutOfMemoryHandler() {
return new OutOfMemoryHandler() {
@Override
public void handle() {
@@ -201,7 +117,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
}
@Override
- public ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) {
+ protected ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) {
return new DataProtobufLengthDecoder.Server(allocator, outOfMemoryHandler);
}