diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java | 130 |
1 files changed, 23 insertions, 107 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java index 03118d754..33270fd86 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,55 +17,34 @@ */ package org.apache.drill.exec.rpc.data; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.DrillBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.socket.SocketChannel; import io.netty.util.concurrent.GenericFutureListener; -import java.io.IOException; -import java.util.concurrent.ThreadLocalRandom; - -import org.apache.drill.exec.exception.FragmentSetupException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.BitData.BitClientHandshake; import org.apache.drill.exec.proto.BitData.BitServerHandshake; -import org.apache.drill.exec.proto.BitData.FragmentRecordBatch; import org.apache.drill.exec.proto.BitData.RpcType; -import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.UserBitShared.RpcChannel; -import org.apache.drill.exec.proto.helper.QueryIdHelper; -import org.apache.drill.exec.rpc.Acks; import org.apache.drill.exec.rpc.BasicServer; import org.apache.drill.exec.rpc.OutOfMemoryHandler; import org.apache.drill.exec.rpc.ProtobufLengthDecoder; -import org.apache.drill.exec.rpc.Response; -import org.apache.drill.exec.rpc.ResponseSender; import org.apache.drill.exec.rpc.RpcException; -import org.apache.drill.exec.rpc.control.WorkEventBus; -import org.apache.drill.exec.server.BootStrapContext; -import org.apache.drill.exec.work.WorkManager.WorkerBee; -import org.apache.drill.exec.work.fragment.FragmentManager; import com.google.protobuf.MessageLite; -public class DataServer extends BasicServer<RpcType, BitServerConnection> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataServer.class); +public class DataServer extends BasicServer<RpcType, DataServerConnection> { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataServer.class); - private volatile ProxyCloseHandler proxyCloseHandler; - private final BootStrapContext context; - private final WorkEventBus workBus; - private final WorkerBee bee; + private final DataConnectionConfig config; - public DataServer(BootStrapContext context, BufferAllocator alloc, WorkEventBus workBus, - WorkerBee bee) { + public DataServer(DataConnectionConfig config) { super( - DataRpcConfig.getMapping(context.getConfig(), context.getExecutor()), - alloc.getAsByteBufAllocator(), - context.getBitLoopGroup()); - this.context = context; - this.workBus = workBus; - this.bee = bee; + DataRpcConfig.getMapping(config.getBootstrapContext().getConfig(), + config.getBootstrapContext().getExecutor()), + config.getAllocator().getAsByteBufAllocator(), + config.getBootstrapContext().getBitLoopGroup()); + this.config = config; } @Override @@ -74,19 +53,18 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> { } @Override - protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, BitServerConnection connection) { - this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(ch, connection)); - return proxyCloseHandler; + protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, DataServerConnection connection) { + return new ProxyCloseHandler(super.getCloseHandler(ch, connection)); } @Override - public BitServerConnection initRemoteConnection(SocketChannel channel) { + protected DataServerConnection initRemoteConnection(SocketChannel channel) { super.initRemoteConnection(channel); - return new BitServerConnection(channel, context.getAllocator()); + return new DataServerConnection(channel, config); } @Override - protected ServerHandshakeHandler<BitClientHandshake> getHandshakeHandler(final BitServerConnection connection) { + protected ServerHandshakeHandler<BitClientHandshake> getHandshakeHandler(final DataServerConnection connection) { return new ServerHandshakeHandler<BitClientHandshake>(RpcType.HANDSHAKE, BitClientHandshake.PARSER) { @Override @@ -101,79 +79,17 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> { inbound.getChannel())); } - return BitServerHandshake.newBuilder().setRpcVersion(DataRpcConfig.RPC_VERSION).build(); + final BitServerHandshake.Builder builder = BitServerHandshake.newBuilder(); + builder.setRpcVersion(DataRpcConfig.RPC_VERSION); + if (config.getAuthMechanismToUse() != null) { + builder.addAllAuthenticationMechanisms(config.getAuthProvider().getAllFactoryNames()); + } + return builder.build(); } }; } - private static FragmentHandle getHandle(FragmentRecordBatch batch, int index) { - return FragmentHandle.newBuilder() - .setQueryId(batch.getQueryId()) - .setMajorFragmentId(batch.getReceivingMajorFragmentId()) - .setMinorFragmentId(batch.getReceivingMinorFragmentId(index)) - .build(); - } - - private void submit(IncomingDataBatch batch, int minorStart, int minorStopExclusive) throws FragmentSetupException, - IOException { - for (int minor = minorStart; minor < minorStopExclusive; minor++) { - final FragmentManager manager = workBus.getFragmentManager(getHandle(batch.getHeader(), minor)); - if (manager == null) { - // A missing manager means the query already terminated. We can simply drop this data. - continue; - } - - final boolean canRun = manager.handle(batch); - if (canRun) { - // logger.debug("Arriving batch means local batch can run, starting local batch."); - /* - * If we've reached the canRun threshold, we'll proceed. This expects manager.handle() to only return a single - * true. This is guaranteed by the interface. - */ - bee.startFragmentPendingRemote(manager); - } - } - - } - - @Override - protected void handle(BitServerConnection connection, int rpcType, ByteBuf pBody, ByteBuf body, ResponseSender sender) throws RpcException { - assert rpcType == RpcType.REQ_RECORD_BATCH_VALUE; - - final FragmentRecordBatch fragmentBatch = get(pBody, FragmentRecordBatch.PARSER); - final AckSender ack = new AckSender(sender); - - - // increment so we don't get false returns. - ack.increment(); - - try { - - final IncomingDataBatch batch = new IncomingDataBatch(fragmentBatch, (DrillBuf) body, ack); - final int targetCount = fragmentBatch.getReceivingMinorFragmentIdCount(); - - // randomize who gets first transfer (and thus ownership) so memory usage is balanced when we're sharing amongst - // multiple fragments. - final int firstOwner = ThreadLocalRandom.current().nextInt(targetCount); - submit(batch, firstOwner, targetCount); - submit(batch, 0, firstOwner); - - } catch (IOException | FragmentSetupException e) { - logger.error("Failure while getting fragment manager. {}", - QueryIdHelper.getQueryIdentifiers(fragmentBatch.getQueryId(), - fragmentBatch.getReceivingMajorFragmentId(), - fragmentBatch.getReceivingMinorFragmentIdList()), e); - ack.clear(); - sender.send(new Response(RpcType.ACK, Acks.FAIL)); - } finally { - - // decrement the extra reference we grabbed at the top. - ack.sendOk(); - } - } - - private class ProxyCloseHandler implements GenericFutureListener<ChannelFuture> { private volatile GenericFutureListener<ChannelFuture> handler; @@ -191,7 +107,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> { } @Override - public OutOfMemoryHandler getOutOfMemoryHandler() { + protected OutOfMemoryHandler getOutOfMemoryHandler() { return new OutOfMemoryHandler() { @Override public void handle() { @@ -201,7 +117,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> { } @Override - public ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) { + protected ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) { return new DataProtobufLengthDecoder.Server(allocator, outOfMemoryHandler); } |