diff options
author | Sorabh Hamirwasia <shamirwasia@maprtech.com> | 2018-05-08 13:06:20 -0700 |
---|---|---|
committer | Arina Ielchiieva <arina.yelchiyeva@gmail.com> | 2018-05-18 15:40:59 +0300 |
commit | 5dd8a6f60e006c2dc707f241b7619634e4e82bbd (patch) | |
tree | 2255dabe934929d5a39089d4e3a908b3c39b1b6c /exec/java-exec/src/main/java/org/apache/drill/exec/rpc | |
parent | 5a3a73ad098f77ad01d9366dc1a0f8f4ac539746 (diff) |
DRILL-6255: Drillbit while sending control message to itself creates a connection instead of submitting locally
closes #1253
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/rpc')
8 files changed, 443 insertions, 70 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitRpcUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitRpcUtility.java index c3980336f..0bc01f536 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitRpcUtility.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitRpcUtility.java @@ -103,6 +103,24 @@ public final class BitRpcUtility { } } + /** + * Verifies if local and remote Drillbit Endpoint has same control server by using address and control port + * information. This method is used instead of equals in {@link DrillbitEndpoint} because DrillbitEndpoint stores + * state information in it. + * For local Drillbit a reference is stored in {@link org.apache.drill.exec.server.DrillbitContext} as soon as + * Drillbit is started in {@link org.apache.drill.exec.service.ServiceEngine#start} with state as STARTUP, but + * while planning minor fragment the assignment list is used from active list of Drillbits in which state for local + * Drillbit will not be STARTUP + * @param local - DrillbitEndpoint instance for local bit + * @param remote - DrillbitEndpoint instance for remote bit + * @return true if address and control port for local and remote are same. + * false - otherwise + */ + public static boolean isLocalControlServer(DrillbitEndpoint local, DrillbitEndpoint remote) { + return local.hasAddress() && local.hasControlPort() && remote.hasAddress() && remote.hasControlPort() && + local.getAddress().equals(remote.getAddress()) && local.getControlPort() == remote.getControlPort(); + } + // Suppress default constructor private BitRpcUtility() { } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java index 800cf3cc8..e27729c1f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMap; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import com.google.common.collect.Maps; +import org.apache.drill.exec.rpc.BitRpcUtility; public class ConnectionManagerRegistry implements Iterable<ControlConnectionManager> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ConnectionManagerRegistry.class); @@ -40,9 +41,12 @@ public class ConnectionManagerRegistry implements Iterable<ControlConnectionMana public ControlConnectionManager getConnectionManager(DrillbitEndpoint remoteEndpoint) { assert localEndpoint != null : "DrillbitEndpoint must be set before a connection manager can be retrieved"; + + final boolean isLocalServer = BitRpcUtility.isLocalControlServer(localEndpoint, remoteEndpoint); ControlConnectionManager m = registry.get(remoteEndpoint); if (m == null) { - m = new ControlConnectionManager(config, localEndpoint, remoteEndpoint); + m = (isLocalServer) ? new LocalControlConnectionManager(config, remoteEndpoint) + : new RemoteControlConnectionManager(config, localEndpoint, remoteEndpoint); final ControlConnectionManager m2 = registry.putIfAbsent(remoteEndpoint, m); if (m2 != null) { m = m2; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java index b19fb8bb7..f114af44d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.rpc.control; +import com.google.common.annotations.VisibleForTesting; import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.rpc.BitConnectionConfig; @@ -24,8 +25,8 @@ import org.apache.drill.exec.server.BootStrapContext; import org.apache.drill.exec.work.batch.ControlMessageHandler; // config for bit to bit connection -// package private -class ControlConnectionConfig extends BitConnectionConfig { +@VisibleForTesting +public class ControlConnectionConfig extends BitConnectionConfig { // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlConnectionConfig.class); private final ControlMessageHandler handler; @@ -41,8 +42,8 @@ class ControlConnectionConfig extends BitConnectionConfig { return "control"; // unused } - ControlMessageHandler getMessageHandler() { + @VisibleForTesting + public ControlMessageHandler getMessageHandler() { return handler; } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java index 6bfcbd5d6..240421ef7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java @@ -25,14 +25,10 @@ import org.apache.drill.exec.rpc.ReconnectingConnection; /** * Maintains connection between two particular bits. */ -public class ControlConnectionManager extends ReconnectingConnection<ControlConnection, BitControlHandshake>{ +public abstract class ControlConnectionManager extends ReconnectingConnection<ControlConnection, BitControlHandshake>{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlConnectionManager.class); - private final ControlConnectionConfig config; - private final DrillbitEndpoint remoteEndpoint; - - public ControlConnectionManager(ControlConnectionConfig config, DrillbitEndpoint localEndpoint, - DrillbitEndpoint remoteEndpoint) { + public ControlConnectionManager(DrillbitEndpoint localEndpoint, DrillbitEndpoint remoteEndpoint) { super( BitControlHandshake.newBuilder() .setRpcVersion(ControlRpcConfig.RPC_VERSION) @@ -40,14 +36,8 @@ public class ControlConnectionManager extends ReconnectingConnection<ControlConn .build(), remoteEndpoint.getAddress(), remoteEndpoint.getControlPort()); - - this.config = config; - this.remoteEndpoint = remoteEndpoint; } @Override - protected BasicClient<?, ControlConnection, BitControlHandshake, ?> getNewClient() { - return new ControlClient(config, remoteEndpoint, new CloseHandlerCreator()); - } - + protected abstract BasicClient<?, ControlConnection, BitControlHandshake, ?> getNewClient(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java index 1a4af9054..492d4de62 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java @@ -17,11 +17,20 @@ */ package org.apache.drill.exec.rpc.control; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonDeserializer; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.protobuf.ByteString; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.Message; +import com.google.protobuf.MessageLite; +import com.google.protobuf.Parser; import io.netty.buffer.ByteBuf; import io.netty.buffer.DrillBuf; - -import java.util.concurrent.TimeUnit; - import org.apache.drill.exec.proto.BitControl.CustomMessage; import org.apache.drill.exec.proto.BitControl.FinishedReceiver; import org.apache.drill.exec.proto.BitControl.FragmentStatus; @@ -38,19 +47,7 @@ import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.RpcOutcomeListener; import org.apache.drill.exec.rpc.control.Controller.CustomSerDe; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.JsonDeserializer; -import com.fasterxml.jackson.databind.JsonSerializer; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectReader; -import com.fasterxml.jackson.databind.ObjectWriter; -import com.fasterxml.jackson.databind.module.SimpleModule; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; -import com.google.protobuf.MessageLite; -import com.google.protobuf.Parser; - +import java.util.concurrent.TimeUnit; public class ControlTunnel { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlTunnel.class); @@ -99,8 +96,7 @@ public class ControlTunnel { return b.getFuture(); } - - public static class SendFragmentStatus extends FutureBitCommand<Ack, ControlConnection> { + public static class SendFragmentStatus extends FutureBitCommand<Ack, ControlConnection, RpcType, FragmentStatus> { final FragmentStatus status; public SendFragmentStatus(FragmentStatus status) { @@ -110,13 +106,22 @@ public class ControlTunnel { @Override public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) { - connection.sendUnsafe(outcomeListener, RpcType.REQ_FRAGMENT_STATUS, status, Ack.class); + connection.sendUnsafe(outcomeListener, getRpcType(), status, Ack.class); } - } + @Override + public RpcType getRpcType() { + return RpcType.REQ_FRAGMENT_STATUS; + } + @Override + public FragmentStatus getMessage() { + return status; + } - public static class ReceiverFinished extends ListeningCommand<Ack, ControlConnection> { + } + + public static class ReceiverFinished extends ListeningCommand<Ack, ControlConnection, RpcType, FinishedReceiver> { final FinishedReceiver finishedReceiver; public ReceiverFinished(RpcOutcomeListener<Ack> listener, FinishedReceiver finishedReceiver) { @@ -126,11 +131,21 @@ public class ControlTunnel { @Override public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) { - connection.send(outcomeListener, RpcType.REQ_RECEIVER_FINISHED, finishedReceiver, Ack.class); + connection.send(outcomeListener, getRpcType(), finishedReceiver, Ack.class); + } + + @Override + public RpcType getRpcType() { + return RpcType.REQ_RECEIVER_FINISHED; + } + + @Override + public FinishedReceiver getMessage() { + return finishedReceiver; } } - public static class SignalFragment extends ListeningCommand<Ack, ControlConnection> { + public static class SignalFragment extends ListeningCommand<Ack, ControlConnection, RpcType, FragmentHandle> { final FragmentHandle handle; final RpcType type; @@ -145,9 +160,18 @@ public class ControlTunnel { connection.sendUnsafe(outcomeListener, type, handle, Ack.class); } + @Override + public RpcType getRpcType() { + return type; + } + + @Override + public FragmentHandle getMessage() { + return handle; + } } - public static class SendFragment extends ListeningCommand<Ack, ControlConnection> { + public static class SendFragment extends ListeningCommand<Ack, ControlConnection, RpcType, InitializeFragments> { final InitializeFragments fragments; public SendFragment(RpcOutcomeListener<Ack> listener, InitializeFragments fragments) { @@ -157,12 +181,21 @@ public class ControlTunnel { @Override public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) { - connection.send(outcomeListener, RpcType.REQ_INITIALIZE_FRAGMENTS, fragments, Ack.class); + connection.send(outcomeListener, getRpcType(), fragments, Ack.class); + } + + @Override + public RpcType getRpcType() { + return RpcType.REQ_INITIALIZE_FRAGMENTS; } + @Override + public InitializeFragments getMessage() { + return fragments; + } } - public static class RequestProfile extends FutureBitCommand<QueryProfile, ControlConnection> { + public static class RequestProfile extends FutureBitCommand<QueryProfile, ControlConnection, RpcType, QueryId> { final QueryId queryId; public RequestProfile(QueryId queryId) { @@ -172,11 +205,21 @@ public class ControlTunnel { @Override public void doRpcCall(RpcOutcomeListener<QueryProfile> outcomeListener, ControlConnection connection) { - connection.send(outcomeListener, RpcType.REQ_QUERY_STATUS, queryId, QueryProfile.class); + connection.send(outcomeListener, getRpcType(), queryId, QueryProfile.class); + } + + @Override + public RpcType getRpcType() { + return RpcType.REQ_QUERY_STATUS; + } + + @Override + public QueryId getMessage() { + return queryId; } } - public static class CancelQuery extends FutureBitCommand<Ack, ControlConnection> { + public static class CancelQuery extends FutureBitCommand<Ack, ControlConnection, RpcType, QueryId> { final QueryId queryId; public CancelQuery(QueryId queryId) { @@ -186,7 +229,17 @@ public class ControlTunnel { @Override public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) { - connection.send(outcomeListener, RpcType.REQ_QUERY_CANCEL, queryId, Ack.class); + connection.send(outcomeListener, getRpcType(), queryId, Ack.class); + } + + @Override + public RpcType getRpcType() { + return RpcType.REQ_QUERY_CANCEL; + } + + @Override + public QueryId getMessage() { + return queryId; } } @@ -204,8 +257,8 @@ public class ControlTunnel { return new CustomTunnel<SEND, RECEIVE>(messageTypeId, send, receive); } - - private static class CustomMessageSender extends ListeningCommand<CustomMessage, ControlConnection> { + public static class CustomMessageSender extends + ListeningCommand<CustomMessage, ControlConnection, RpcType, CustomMessage> { private CustomMessage message; private ByteBuf[] dataBodies; @@ -218,12 +271,27 @@ public class ControlTunnel { @Override public void doRpcCall(RpcOutcomeListener<CustomMessage> outcomeListener, ControlConnection connection) { - connection.send(outcomeListener, RpcType.REQ_CUSTOM, message, CustomMessage.class, dataBodies); + connection.send(outcomeListener, getRpcType(), message, CustomMessage.class, dataBodies); + } + + @Override + public RpcType getRpcType() { + return RpcType.REQ_CUSTOM; + } + + @Override + public CustomMessage getMessage() { + return message; + } + + public ByteBuf[] getDataBodies() { + return dataBodies; } } - private static class SyncCustomMessageSender extends FutureBitCommand<CustomMessage, ControlConnection> { + public static class SyncCustomMessageSender extends + FutureBitCommand<CustomMessage, ControlConnection, RpcType, CustomMessage> { private CustomMessage message; private ByteBuf[] dataBodies; @@ -236,7 +304,21 @@ public class ControlTunnel { @Override public void doRpcCall(RpcOutcomeListener<CustomMessage> outcomeListener, ControlConnection connection) { - connection.send(outcomeListener, RpcType.REQ_CUSTOM, message, CustomMessage.class, dataBodies); + connection.send(outcomeListener, getRpcType(), message, CustomMessage.class, dataBodies); + } + + @Override + public RpcType getRpcType() { + return RpcType.REQ_CUSTOM; + } + + @Override + public CustomMessage getMessage() { + return message; + } + + public ByteBuf[] getDataBodies() { + return dataBodies; } } @@ -261,8 +343,7 @@ public class ControlTunnel { return serde.deserializeReceived(message.getMessage().toByteArray()); } - public RECEIVE get(long timeout, TimeUnit unit) throws Exception, - InvalidProtocolBufferException { + public RECEIVE get(long timeout, TimeUnit unit) throws Exception, InvalidProtocolBufferException { CustomMessage message = future.checkedGet(timeout, unit); return serde.deserializeReceived(message.getMessage().toByteArray()); } @@ -270,7 +351,6 @@ public class ControlTunnel { public DrillBuf getBuffer() throws RpcException { return (DrillBuf) future.getBuffer(); } - } @@ -351,21 +431,15 @@ public class ControlTunnel { } catch (Exception e) { innerListener.failed(new RpcException("Failure while parsing message locally.", e)); } - } @Override public void interrupted(InterruptedException e) { innerListener.interrupted(e); } - } - } - - - public static class ProtoSerDe<MSG extends MessageLite> implements CustomSerDe<MSG> { private final Parser<MSG> parser; @@ -420,7 +494,5 @@ public class ControlTunnel { public MSG deserializeReceived(byte[] bytes) throws Exception { return (MSG) reader.readValue(bytes); } - } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/LocalControlConnectionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/LocalControlConnectionManager.java new file mode 100644 index 000000000..fa6a2e9a5 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/LocalControlConnectionManager.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.rpc.control; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; +import org.apache.drill.exec.exception.OutOfMemoryException; +import org.apache.drill.exec.proto.BitControl; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.rpc.BasicClient; +import org.apache.drill.exec.rpc.Response; +import org.apache.drill.exec.rpc.RpcCommand; +import org.apache.drill.exec.rpc.RpcConstants; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.work.batch.ControlMessageHandler; + +public class LocalControlConnectionManager extends ControlConnectionManager { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalControlConnectionManager.class); + + private final ControlConnectionConfig config; + + public LocalControlConnectionManager(ControlConnectionConfig config, DrillbitEndpoint localEndpoint) { + super(localEndpoint, localEndpoint); + this.config = config; + } + + @Override + protected BasicClient<?, ControlConnection, BitControl.BitControlHandshake, ?> getNewClient() { + throw new UnsupportedOperationException("LocalControlConnectionManager doesn't support creating a control client"); + } + + @Override + public void runCommand(RpcCommand cmd) { + final int rpcType = cmd.getRpcType().getNumber(); + final ControlMessageHandler messageHandler = config.getMessageHandler(); + + if (RpcConstants.EXTRA_DEBUGGING) { + logger.debug("Received bit com message of type {} over local connection manager", rpcType); + } + + switch (rpcType) { + + case BitControl.RpcType.REQ_CANCEL_FRAGMENT_VALUE: { + final ControlTunnel.SignalFragment signalFragment = ((ControlTunnel.SignalFragment) cmd); + final RpcOutcomeListener<Ack> outcomeListener = signalFragment.getOutcomeListener(); + final Ack ackResponse = messageHandler.cancelFragment(signalFragment.getMessage()); + outcomeListener.success(ackResponse, null); + break; + } + + case BitControl.RpcType.REQ_CUSTOM_VALUE: { + final ByteBuf[] dataBodies; + final RpcOutcomeListener<BitControl.CustomMessage> outcomeListener; + + if (cmd instanceof ControlTunnel.CustomMessageSender) { + dataBodies = ((ControlTunnel.CustomMessageSender)cmd).getDataBodies(); + outcomeListener = ((ControlTunnel.CustomMessageSender)cmd).getOutcomeListener(); + } else if (cmd instanceof ControlTunnel.SyncCustomMessageSender) { + dataBodies = ((ControlTunnel.SyncCustomMessageSender)cmd).getDataBodies(); + outcomeListener = ((ControlTunnel.SyncCustomMessageSender)cmd).getOutcomeListener(); + } else { + throw new UnsupportedOperationException("Unknown Custom Type control message received"); + } + + DrillBuf reqDrillBuff; + try { + reqDrillBuff = convertToByteBuf(dataBodies); + } catch (Exception ex) { + outcomeListener.failed(new RpcException("Failed to allocate memory while sending request in " + + "LocalControlConnectionManager#convertToByteBuff", ex)); + return; + } finally { + releaseByteBuf(dataBodies); + } + + try { + BitControl.CustomMessage message = (BitControl.CustomMessage) cmd.getMessage(); + final Response response = messageHandler.getHandlerRegistry().handle(message, reqDrillBuff); + DrillBuf responseBuffer; + try { + responseBuffer = convertToByteBuf(response.dBodies); + } catch (Exception ex) { + outcomeListener.failed(new RpcException("Failed to allocate memory while sending response in " + + "LocalControlConnectionManager#convertToByteBuff", ex)); + return; + } finally { + releaseByteBuf(response.dBodies); + } + + // Passed responseBuffer will be owned by consumer + outcomeListener.success((BitControl.CustomMessage)response.pBody, responseBuffer); + } catch (RpcException ex) { + cmd.getOutcomeListener().failed(ex); + } finally { + // Release the reqDrillBuff passed into handler + releaseByteBuf(reqDrillBuff); + } + break; + } + + case BitControl.RpcType.REQ_RECEIVER_FINISHED_VALUE: { + final ControlTunnel.ReceiverFinished receiverFinished = ((ControlTunnel.ReceiverFinished) cmd); + final RpcOutcomeListener<Ack> outcomeListener = receiverFinished.getOutcomeListener(); + final Ack ackResponse = messageHandler.receivingFragmentFinished(receiverFinished.getMessage()); + outcomeListener.success(ackResponse, null); + break; + } + + case BitControl.RpcType.REQ_FRAGMENT_STATUS_VALUE: { + final ControlTunnel.SendFragmentStatus fragmentStatus = ((ControlTunnel.SendFragmentStatus) cmd); + final RpcOutcomeListener<Ack> outcomeListener = fragmentStatus.getOutcomeListener(); + final Ack ackResponse = messageHandler.requestFragmentStatus(fragmentStatus.getMessage()); + outcomeListener.success(ackResponse, null); + break; + } + + case BitControl.RpcType.REQ_QUERY_CANCEL_VALUE: { + final ControlTunnel.CancelQuery cancelQuery = ((ControlTunnel.CancelQuery) cmd); + final RpcOutcomeListener<Ack> outcomeListener = cancelQuery.getOutcomeListener(); + final Ack ackResponse = messageHandler.requestQueryCancel(cancelQuery.getMessage()); + outcomeListener.success(ackResponse, null); + break; + } + + case BitControl.RpcType.REQ_INITIALIZE_FRAGMENTS_VALUE: { + final ControlTunnel.SendFragment sendFragment = ((ControlTunnel.SendFragment) cmd); + final RpcOutcomeListener<Ack> outcomeListener = sendFragment.getOutcomeListener(); + + try { + final Ack ackResponse = messageHandler.initializeFragment(sendFragment.getMessage()); + outcomeListener.success(ackResponse, null); + } catch (RpcException ex) { + outcomeListener.failed(ex); + } + break; + } + + case BitControl.RpcType.REQ_QUERY_STATUS_VALUE: { + final ControlTunnel.RequestProfile requestProfile = ((ControlTunnel.RequestProfile) cmd); + final RpcOutcomeListener<UserBitShared.QueryProfile> outcomeListener = requestProfile.getOutcomeListener(); + + try { + final UserBitShared.QueryProfile profile = messageHandler.requestQueryStatus(requestProfile.getMessage()); + outcomeListener.success(profile, null); + } catch (RpcException ex) { + outcomeListener.failed(ex); + } + break; + } + + case BitControl.RpcType.REQ_UNPAUSE_FRAGMENT_VALUE: { + final ControlTunnel.SignalFragment signalFragment = ((ControlTunnel.SignalFragment) cmd); + final RpcOutcomeListener<Ack> outcomeListener = signalFragment.getOutcomeListener(); + final Ack ackResponse = messageHandler.resumeFragment(signalFragment.getMessage()); + outcomeListener.success(ackResponse, null); + break; + } + + default: + final RpcException rpcException = new RpcException(String.format("Unsupported control request type %s " + + "received on LocalControlConnectionManager", rpcType)); + cmd.getOutcomeListener().failed(rpcException); + } + } + + /** + * Copies ByteBuf in the input array into a single DrillBuf + * @param byteBuffArray - input array of ByteBuf's + * @return DrillBuf - output Drillbuf with all the input bytes. + * @throws OutOfMemoryException + */ + private DrillBuf convertToByteBuf(ByteBuf[] byteBuffArray) throws OutOfMemoryException { + + if (byteBuffArray == null) { + return null; + } + + int bytesToCopy = 0; + for (ByteBuf b : byteBuffArray) { + final int validBytes = b.readableBytes(); + if (0 == validBytes) { + b.release(); + } else { + bytesToCopy += validBytes; + } + } + final DrillBuf drillBuff = config.getAllocator().buffer(bytesToCopy); + + for (ByteBuf b : byteBuffArray) { + final int validBytes = b.readableBytes(); + drillBuff.writeBytes(b, 0, validBytes); + } + + return drillBuff; + } + + /** + * Releases all the ByteBuf inside the input ByteBuf array + * @param byteBuffArray - input array of ByteBuf's + */ + private void releaseByteBuf(ByteBuf[] byteBuffArray) { + if (byteBuffArray != null) { + for (ByteBuf b : byteBuffArray) { + b.release(); + } + } + } + + /** + * Releases the input ByteBuf + * @param byteBuf - input ByteBuf + */ + private void releaseByteBuf(ByteBuf byteBuf) { + if (byteBuf != null) { + byteBuf.release(); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/RemoteControlConnectionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/RemoteControlConnectionManager.java new file mode 100644 index 000000000..6a4dc21b9 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/RemoteControlConnectionManager.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.rpc.control; + +import org.apache.drill.exec.proto.BitControl; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.rpc.BasicClient; + +public class RemoteControlConnectionManager extends ControlConnectionManager { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteControlConnectionManager.class); + + private final ControlConnectionConfig config; + private final DrillbitEndpoint remoteEndpoint; + + public RemoteControlConnectionManager(ControlConnectionConfig config, DrillbitEndpoint + localEndpoint, DrillbitEndpoint remoteEndpoint) { + super(localEndpoint, remoteEndpoint); + this.config = config; + this.remoteEndpoint = remoteEndpoint; + } + + @Override + protected BasicClient<?, ControlConnection, BitControl.BitControlHandshake, ?> getNewClient() { + return new ControlClient(config, remoteEndpoint, new CloseHandlerCreator()); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java index 7e286fa32..7b05b81e3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.rpc.data; +import com.google.protobuf.MessageLite; import io.netty.buffer.ByteBuf; import java.util.concurrent.Semaphore; @@ -43,7 +44,6 @@ public class DataTunnel { private ExecutionControls testControls; private org.slf4j.Logger testLogger; - public DataTunnel(DataConnectionManager manager) { this.manager = manager; } @@ -69,7 +69,7 @@ public class DataTunnel { public void sendRecordBatch(RpcOutcomeListener<Ack> outcomeListener, FragmentWritableBatch batch) { SendBatchAsyncListen b = new SendBatchAsyncListen(outcomeListener, batch); - try{ + try { if (isInjectionControlSet) { // Wait for interruption if set. Used to simulate the fragment interruption while the fragment is waiting for // semaphore acquire. We expect the @@ -78,9 +78,9 @@ public class DataTunnel { sendingSemaphore.acquire(); manager.runCommand(b); - }catch(final InterruptedException e){ + } catch (final InterruptedException e) { // Release the buffers first before informing the listener about the interrupt. - for(ByteBuf buffer : batch.getBuffers()) { + for (ByteBuf buffer : batch.getBuffers()) { buffer.release(); } @@ -119,7 +119,7 @@ public class DataTunnel { } } - private class SendBatchAsyncListen extends ListeningCommand<Ack, DataClientConnection> { + private class SendBatchAsyncListen extends ListeningCommand<Ack, DataClientConnection, RpcType, MessageLite> { final FragmentWritableBatch batch; public SendBatchAsyncListen(RpcOutcomeListener<Ack> listener, FragmentWritableBatch batch) { @@ -129,7 +129,18 @@ public class DataTunnel { @Override public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, DataClientConnection connection) { - connection.send(new ThrottlingOutcomeListener(outcomeListener), RpcType.REQ_RECORD_BATCH, batch.getHeader(), Ack.class, batch.getBuffers()); + connection.send(new ThrottlingOutcomeListener(outcomeListener), getRpcType(), batch.getHeader(), + Ack.class, batch.getBuffers()); + } + + @Override + public RpcType getRpcType() { + return RpcType.REQ_RECORD_BATCH; + } + + @Override + public MessageLite getMessage() { + return batch.getHeader(); } @Override @@ -139,7 +150,7 @@ public class DataTunnel { @Override public void connectionFailed(FailureType type, Throwable t) { - for(ByteBuf buffer : batch.getBuffers()) { + for (ByteBuf buffer : batch.getBuffers()) { buffer.release(); } super.connectionFailed(type, t); |