aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/rpc
diff options
context:
space:
mode:
authorSorabh Hamirwasia <shamirwasia@maprtech.com>2018-05-08 13:06:20 -0700
committerArina Ielchiieva <arina.yelchiyeva@gmail.com>2018-05-18 15:40:59 +0300
commit5dd8a6f60e006c2dc707f241b7619634e4e82bbd (patch)
tree2255dabe934929d5a39089d4e3a908b3c39b1b6c /exec/java-exec/src/main/java/org/apache/drill/exec/rpc
parent5a3a73ad098f77ad01d9366dc1a0f8f4ac539746 (diff)
DRILL-6255: Drillbit while sending control message to itself creates a connection instead of submitting locally
closes #1253
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/rpc')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitRpcUtility.java18
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java162
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/LocalControlConnectionManager.java236
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/RemoteControlConnectionManager.java41
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java25
8 files changed, 443 insertions, 70 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitRpcUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitRpcUtility.java
index c3980336f..0bc01f536 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitRpcUtility.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BitRpcUtility.java
@@ -103,6 +103,24 @@ public final class BitRpcUtility {
}
}
+ /**
+ * Verifies if local and remote Drillbit Endpoint has same control server by using address and control port
+ * information. This method is used instead of equals in {@link DrillbitEndpoint} because DrillbitEndpoint stores
+ * state information in it.
+ * For local Drillbit a reference is stored in {@link org.apache.drill.exec.server.DrillbitContext} as soon as
+ * Drillbit is started in {@link org.apache.drill.exec.service.ServiceEngine#start} with state as STARTUP, but
+ * while planning minor fragment the assignment list is used from active list of Drillbits in which state for local
+ * Drillbit will not be STARTUP
+ * @param local - DrillbitEndpoint instance for local bit
+ * @param remote - DrillbitEndpoint instance for remote bit
+ * @return true if address and control port for local and remote are same.
+ * false - otherwise
+ */
+ public static boolean isLocalControlServer(DrillbitEndpoint local, DrillbitEndpoint remote) {
+ return local.hasAddress() && local.hasControlPort() && remote.hasAddress() && remote.hasControlPort() &&
+ local.getAddress().equals(remote.getAddress()) && local.getControlPort() == remote.getControlPort();
+ }
+
// Suppress default constructor
private BitRpcUtility() {
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java
index 800cf3cc8..e27729c1f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ConnectionManagerRegistry.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import com.google.common.collect.Maps;
+import org.apache.drill.exec.rpc.BitRpcUtility;
public class ConnectionManagerRegistry implements Iterable<ControlConnectionManager> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ConnectionManagerRegistry.class);
@@ -40,9 +41,12 @@ public class ConnectionManagerRegistry implements Iterable<ControlConnectionMana
public ControlConnectionManager getConnectionManager(DrillbitEndpoint remoteEndpoint) {
assert localEndpoint != null :
"DrillbitEndpoint must be set before a connection manager can be retrieved";
+
+ final boolean isLocalServer = BitRpcUtility.isLocalControlServer(localEndpoint, remoteEndpoint);
ControlConnectionManager m = registry.get(remoteEndpoint);
if (m == null) {
- m = new ControlConnectionManager(config, localEndpoint, remoteEndpoint);
+ m = (isLocalServer) ? new LocalControlConnectionManager(config, remoteEndpoint)
+ : new RemoteControlConnectionManager(config, localEndpoint, remoteEndpoint);
final ControlConnectionManager m2 = registry.putIfAbsent(remoteEndpoint, m);
if (m2 != null) {
m = m2;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java
index b19fb8bb7..f114af44d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionConfig.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.rpc.control;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.drill.exec.exception.DrillbitStartupException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.rpc.BitConnectionConfig;
@@ -24,8 +25,8 @@ import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.work.batch.ControlMessageHandler;
// config for bit to bit connection
-// package private
-class ControlConnectionConfig extends BitConnectionConfig {
+@VisibleForTesting
+public class ControlConnectionConfig extends BitConnectionConfig {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlConnectionConfig.class);
private final ControlMessageHandler handler;
@@ -41,8 +42,8 @@ class ControlConnectionConfig extends BitConnectionConfig {
return "control"; // unused
}
- ControlMessageHandler getMessageHandler() {
+ @VisibleForTesting
+ public ControlMessageHandler getMessageHandler() {
return handler;
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
index 6bfcbd5d6..240421ef7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlConnectionManager.java
@@ -25,14 +25,10 @@ import org.apache.drill.exec.rpc.ReconnectingConnection;
/**
* Maintains connection between two particular bits.
*/
-public class ControlConnectionManager extends ReconnectingConnection<ControlConnection, BitControlHandshake>{
+public abstract class ControlConnectionManager extends ReconnectingConnection<ControlConnection, BitControlHandshake>{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlConnectionManager.class);
- private final ControlConnectionConfig config;
- private final DrillbitEndpoint remoteEndpoint;
-
- public ControlConnectionManager(ControlConnectionConfig config, DrillbitEndpoint localEndpoint,
- DrillbitEndpoint remoteEndpoint) {
+ public ControlConnectionManager(DrillbitEndpoint localEndpoint, DrillbitEndpoint remoteEndpoint) {
super(
BitControlHandshake.newBuilder()
.setRpcVersion(ControlRpcConfig.RPC_VERSION)
@@ -40,14 +36,8 @@ public class ControlConnectionManager extends ReconnectingConnection<ControlConn
.build(),
remoteEndpoint.getAddress(),
remoteEndpoint.getControlPort());
-
- this.config = config;
- this.remoteEndpoint = remoteEndpoint;
}
@Override
- protected BasicClient<?, ControlConnection, BitControlHandshake, ?> getNewClient() {
- return new ControlClient(config, remoteEndpoint, new CloseHandlerCreator());
- }
-
+ protected abstract BasicClient<?, ControlConnection, BitControlHandshake, ?> getNewClient();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
index 1a4af9054..492d4de62 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
@@ -17,11 +17,20 @@
*/
package org.apache.drill.exec.rpc.control;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.MessageLite;
+import com.google.protobuf.Parser;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
-
-import java.util.concurrent.TimeUnit;
-
import org.apache.drill.exec.proto.BitControl.CustomMessage;
import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
@@ -38,19 +47,7 @@ import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.control.Controller.CustomSerDe;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonSerializer;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectReader;
-import com.fasterxml.jackson.databind.ObjectWriter;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.Message;
-import com.google.protobuf.MessageLite;
-import com.google.protobuf.Parser;
-
+import java.util.concurrent.TimeUnit;
public class ControlTunnel {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlTunnel.class);
@@ -99,8 +96,7 @@ public class ControlTunnel {
return b.getFuture();
}
-
- public static class SendFragmentStatus extends FutureBitCommand<Ack, ControlConnection> {
+ public static class SendFragmentStatus extends FutureBitCommand<Ack, ControlConnection, RpcType, FragmentStatus> {
final FragmentStatus status;
public SendFragmentStatus(FragmentStatus status) {
@@ -110,13 +106,22 @@ public class ControlTunnel {
@Override
public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) {
- connection.sendUnsafe(outcomeListener, RpcType.REQ_FRAGMENT_STATUS, status, Ack.class);
+ connection.sendUnsafe(outcomeListener, getRpcType(), status, Ack.class);
}
- }
+ @Override
+ public RpcType getRpcType() {
+ return RpcType.REQ_FRAGMENT_STATUS;
+ }
+ @Override
+ public FragmentStatus getMessage() {
+ return status;
+ }
- public static class ReceiverFinished extends ListeningCommand<Ack, ControlConnection> {
+ }
+
+ public static class ReceiverFinished extends ListeningCommand<Ack, ControlConnection, RpcType, FinishedReceiver> {
final FinishedReceiver finishedReceiver;
public ReceiverFinished(RpcOutcomeListener<Ack> listener, FinishedReceiver finishedReceiver) {
@@ -126,11 +131,21 @@ public class ControlTunnel {
@Override
public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) {
- connection.send(outcomeListener, RpcType.REQ_RECEIVER_FINISHED, finishedReceiver, Ack.class);
+ connection.send(outcomeListener, getRpcType(), finishedReceiver, Ack.class);
+ }
+
+ @Override
+ public RpcType getRpcType() {
+ return RpcType.REQ_RECEIVER_FINISHED;
+ }
+
+ @Override
+ public FinishedReceiver getMessage() {
+ return finishedReceiver;
}
}
- public static class SignalFragment extends ListeningCommand<Ack, ControlConnection> {
+ public static class SignalFragment extends ListeningCommand<Ack, ControlConnection, RpcType, FragmentHandle> {
final FragmentHandle handle;
final RpcType type;
@@ -145,9 +160,18 @@ public class ControlTunnel {
connection.sendUnsafe(outcomeListener, type, handle, Ack.class);
}
+ @Override
+ public RpcType getRpcType() {
+ return type;
+ }
+
+ @Override
+ public FragmentHandle getMessage() {
+ return handle;
+ }
}
- public static class SendFragment extends ListeningCommand<Ack, ControlConnection> {
+ public static class SendFragment extends ListeningCommand<Ack, ControlConnection, RpcType, InitializeFragments> {
final InitializeFragments fragments;
public SendFragment(RpcOutcomeListener<Ack> listener, InitializeFragments fragments) {
@@ -157,12 +181,21 @@ public class ControlTunnel {
@Override
public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) {
- connection.send(outcomeListener, RpcType.REQ_INITIALIZE_FRAGMENTS, fragments, Ack.class);
+ connection.send(outcomeListener, getRpcType(), fragments, Ack.class);
+ }
+
+ @Override
+ public RpcType getRpcType() {
+ return RpcType.REQ_INITIALIZE_FRAGMENTS;
}
+ @Override
+ public InitializeFragments getMessage() {
+ return fragments;
+ }
}
- public static class RequestProfile extends FutureBitCommand<QueryProfile, ControlConnection> {
+ public static class RequestProfile extends FutureBitCommand<QueryProfile, ControlConnection, RpcType, QueryId> {
final QueryId queryId;
public RequestProfile(QueryId queryId) {
@@ -172,11 +205,21 @@ public class ControlTunnel {
@Override
public void doRpcCall(RpcOutcomeListener<QueryProfile> outcomeListener, ControlConnection connection) {
- connection.send(outcomeListener, RpcType.REQ_QUERY_STATUS, queryId, QueryProfile.class);
+ connection.send(outcomeListener, getRpcType(), queryId, QueryProfile.class);
+ }
+
+ @Override
+ public RpcType getRpcType() {
+ return RpcType.REQ_QUERY_STATUS;
+ }
+
+ @Override
+ public QueryId getMessage() {
+ return queryId;
}
}
- public static class CancelQuery extends FutureBitCommand<Ack, ControlConnection> {
+ public static class CancelQuery extends FutureBitCommand<Ack, ControlConnection, RpcType, QueryId> {
final QueryId queryId;
public CancelQuery(QueryId queryId) {
@@ -186,7 +229,17 @@ public class ControlTunnel {
@Override
public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) {
- connection.send(outcomeListener, RpcType.REQ_QUERY_CANCEL, queryId, Ack.class);
+ connection.send(outcomeListener, getRpcType(), queryId, Ack.class);
+ }
+
+ @Override
+ public RpcType getRpcType() {
+ return RpcType.REQ_QUERY_CANCEL;
+ }
+
+ @Override
+ public QueryId getMessage() {
+ return queryId;
}
}
@@ -204,8 +257,8 @@ public class ControlTunnel {
return new CustomTunnel<SEND, RECEIVE>(messageTypeId, send, receive);
}
-
- private static class CustomMessageSender extends ListeningCommand<CustomMessage, ControlConnection> {
+ public static class CustomMessageSender extends
+ ListeningCommand<CustomMessage, ControlConnection, RpcType, CustomMessage> {
private CustomMessage message;
private ByteBuf[] dataBodies;
@@ -218,12 +271,27 @@ public class ControlTunnel {
@Override
public void doRpcCall(RpcOutcomeListener<CustomMessage> outcomeListener, ControlConnection connection) {
- connection.send(outcomeListener, RpcType.REQ_CUSTOM, message, CustomMessage.class, dataBodies);
+ connection.send(outcomeListener, getRpcType(), message, CustomMessage.class, dataBodies);
+ }
+
+ @Override
+ public RpcType getRpcType() {
+ return RpcType.REQ_CUSTOM;
+ }
+
+ @Override
+ public CustomMessage getMessage() {
+ return message;
+ }
+
+ public ByteBuf[] getDataBodies() {
+ return dataBodies;
}
}
- private static class SyncCustomMessageSender extends FutureBitCommand<CustomMessage, ControlConnection> {
+ public static class SyncCustomMessageSender extends
+ FutureBitCommand<CustomMessage, ControlConnection, RpcType, CustomMessage> {
private CustomMessage message;
private ByteBuf[] dataBodies;
@@ -236,7 +304,21 @@ public class ControlTunnel {
@Override
public void doRpcCall(RpcOutcomeListener<CustomMessage> outcomeListener, ControlConnection connection) {
- connection.send(outcomeListener, RpcType.REQ_CUSTOM, message, CustomMessage.class, dataBodies);
+ connection.send(outcomeListener, getRpcType(), message, CustomMessage.class, dataBodies);
+ }
+
+ @Override
+ public RpcType getRpcType() {
+ return RpcType.REQ_CUSTOM;
+ }
+
+ @Override
+ public CustomMessage getMessage() {
+ return message;
+ }
+
+ public ByteBuf[] getDataBodies() {
+ return dataBodies;
}
}
@@ -261,8 +343,7 @@ public class ControlTunnel {
return serde.deserializeReceived(message.getMessage().toByteArray());
}
- public RECEIVE get(long timeout, TimeUnit unit) throws Exception,
- InvalidProtocolBufferException {
+ public RECEIVE get(long timeout, TimeUnit unit) throws Exception, InvalidProtocolBufferException {
CustomMessage message = future.checkedGet(timeout, unit);
return serde.deserializeReceived(message.getMessage().toByteArray());
}
@@ -270,7 +351,6 @@ public class ControlTunnel {
public DrillBuf getBuffer() throws RpcException {
return (DrillBuf) future.getBuffer();
}
-
}
@@ -351,21 +431,15 @@ public class ControlTunnel {
} catch (Exception e) {
innerListener.failed(new RpcException("Failure while parsing message locally.", e));
}
-
}
@Override
public void interrupted(InterruptedException e) {
innerListener.interrupted(e);
}
-
}
-
}
-
-
-
public static class ProtoSerDe<MSG extends MessageLite> implements CustomSerDe<MSG> {
private final Parser<MSG> parser;
@@ -420,7 +494,5 @@ public class ControlTunnel {
public MSG deserializeReceived(byte[] bytes) throws Exception {
return (MSG) reader.readValue(bytes);
}
-
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/LocalControlConnectionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/LocalControlConnectionManager.java
new file mode 100644
index 000000000..fa6a2e9a5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/LocalControlConnectionManager.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.control;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.exec.exception.OutOfMemoryException;
+import org.apache.drill.exec.proto.BitControl;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.rpc.BasicClient;
+import org.apache.drill.exec.rpc.Response;
+import org.apache.drill.exec.rpc.RpcCommand;
+import org.apache.drill.exec.rpc.RpcConstants;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.work.batch.ControlMessageHandler;
+
+public class LocalControlConnectionManager extends ControlConnectionManager {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalControlConnectionManager.class);
+
+ private final ControlConnectionConfig config;
+
+ public LocalControlConnectionManager(ControlConnectionConfig config, DrillbitEndpoint localEndpoint) {
+ super(localEndpoint, localEndpoint);
+ this.config = config;
+ }
+
+ @Override
+ protected BasicClient<?, ControlConnection, BitControl.BitControlHandshake, ?> getNewClient() {
+ throw new UnsupportedOperationException("LocalControlConnectionManager doesn't support creating a control client");
+ }
+
+ @Override
+ public void runCommand(RpcCommand cmd) {
+ final int rpcType = cmd.getRpcType().getNumber();
+ final ControlMessageHandler messageHandler = config.getMessageHandler();
+
+ if (RpcConstants.EXTRA_DEBUGGING) {
+ logger.debug("Received bit com message of type {} over local connection manager", rpcType);
+ }
+
+ switch (rpcType) {
+
+ case BitControl.RpcType.REQ_CANCEL_FRAGMENT_VALUE: {
+ final ControlTunnel.SignalFragment signalFragment = ((ControlTunnel.SignalFragment) cmd);
+ final RpcOutcomeListener<Ack> outcomeListener = signalFragment.getOutcomeListener();
+ final Ack ackResponse = messageHandler.cancelFragment(signalFragment.getMessage());
+ outcomeListener.success(ackResponse, null);
+ break;
+ }
+
+ case BitControl.RpcType.REQ_CUSTOM_VALUE: {
+ final ByteBuf[] dataBodies;
+ final RpcOutcomeListener<BitControl.CustomMessage> outcomeListener;
+
+ if (cmd instanceof ControlTunnel.CustomMessageSender) {
+ dataBodies = ((ControlTunnel.CustomMessageSender)cmd).getDataBodies();
+ outcomeListener = ((ControlTunnel.CustomMessageSender)cmd).getOutcomeListener();
+ } else if (cmd instanceof ControlTunnel.SyncCustomMessageSender) {
+ dataBodies = ((ControlTunnel.SyncCustomMessageSender)cmd).getDataBodies();
+ outcomeListener = ((ControlTunnel.SyncCustomMessageSender)cmd).getOutcomeListener();
+ } else {
+ throw new UnsupportedOperationException("Unknown Custom Type control message received");
+ }
+
+ DrillBuf reqDrillBuff;
+ try {
+ reqDrillBuff = convertToByteBuf(dataBodies);
+ } catch (Exception ex) {
+ outcomeListener.failed(new RpcException("Failed to allocate memory while sending request in " +
+ "LocalControlConnectionManager#convertToByteBuff", ex));
+ return;
+ } finally {
+ releaseByteBuf(dataBodies);
+ }
+
+ try {
+ BitControl.CustomMessage message = (BitControl.CustomMessage) cmd.getMessage();
+ final Response response = messageHandler.getHandlerRegistry().handle(message, reqDrillBuff);
+ DrillBuf responseBuffer;
+ try {
+ responseBuffer = convertToByteBuf(response.dBodies);
+ } catch (Exception ex) {
+ outcomeListener.failed(new RpcException("Failed to allocate memory while sending response in " +
+ "LocalControlConnectionManager#convertToByteBuff", ex));
+ return;
+ } finally {
+ releaseByteBuf(response.dBodies);
+ }
+
+ // Passed responseBuffer will be owned by consumer
+ outcomeListener.success((BitControl.CustomMessage)response.pBody, responseBuffer);
+ } catch (RpcException ex) {
+ cmd.getOutcomeListener().failed(ex);
+ } finally {
+ // Release the reqDrillBuff passed into handler
+ releaseByteBuf(reqDrillBuff);
+ }
+ break;
+ }
+
+ case BitControl.RpcType.REQ_RECEIVER_FINISHED_VALUE: {
+ final ControlTunnel.ReceiverFinished receiverFinished = ((ControlTunnel.ReceiverFinished) cmd);
+ final RpcOutcomeListener<Ack> outcomeListener = receiverFinished.getOutcomeListener();
+ final Ack ackResponse = messageHandler.receivingFragmentFinished(receiverFinished.getMessage());
+ outcomeListener.success(ackResponse, null);
+ break;
+ }
+
+ case BitControl.RpcType.REQ_FRAGMENT_STATUS_VALUE: {
+ final ControlTunnel.SendFragmentStatus fragmentStatus = ((ControlTunnel.SendFragmentStatus) cmd);
+ final RpcOutcomeListener<Ack> outcomeListener = fragmentStatus.getOutcomeListener();
+ final Ack ackResponse = messageHandler.requestFragmentStatus(fragmentStatus.getMessage());
+ outcomeListener.success(ackResponse, null);
+ break;
+ }
+
+ case BitControl.RpcType.REQ_QUERY_CANCEL_VALUE: {
+ final ControlTunnel.CancelQuery cancelQuery = ((ControlTunnel.CancelQuery) cmd);
+ final RpcOutcomeListener<Ack> outcomeListener = cancelQuery.getOutcomeListener();
+ final Ack ackResponse = messageHandler.requestQueryCancel(cancelQuery.getMessage());
+ outcomeListener.success(ackResponse, null);
+ break;
+ }
+
+ case BitControl.RpcType.REQ_INITIALIZE_FRAGMENTS_VALUE: {
+ final ControlTunnel.SendFragment sendFragment = ((ControlTunnel.SendFragment) cmd);
+ final RpcOutcomeListener<Ack> outcomeListener = sendFragment.getOutcomeListener();
+
+ try {
+ final Ack ackResponse = messageHandler.initializeFragment(sendFragment.getMessage());
+ outcomeListener.success(ackResponse, null);
+ } catch (RpcException ex) {
+ outcomeListener.failed(ex);
+ }
+ break;
+ }
+
+ case BitControl.RpcType.REQ_QUERY_STATUS_VALUE: {
+ final ControlTunnel.RequestProfile requestProfile = ((ControlTunnel.RequestProfile) cmd);
+ final RpcOutcomeListener<UserBitShared.QueryProfile> outcomeListener = requestProfile.getOutcomeListener();
+
+ try {
+ final UserBitShared.QueryProfile profile = messageHandler.requestQueryStatus(requestProfile.getMessage());
+ outcomeListener.success(profile, null);
+ } catch (RpcException ex) {
+ outcomeListener.failed(ex);
+ }
+ break;
+ }
+
+ case BitControl.RpcType.REQ_UNPAUSE_FRAGMENT_VALUE: {
+ final ControlTunnel.SignalFragment signalFragment = ((ControlTunnel.SignalFragment) cmd);
+ final RpcOutcomeListener<Ack> outcomeListener = signalFragment.getOutcomeListener();
+ final Ack ackResponse = messageHandler.resumeFragment(signalFragment.getMessage());
+ outcomeListener.success(ackResponse, null);
+ break;
+ }
+
+ default:
+ final RpcException rpcException = new RpcException(String.format("Unsupported control request type %s " +
+ "received on LocalControlConnectionManager", rpcType));
+ cmd.getOutcomeListener().failed(rpcException);
+ }
+ }
+
+ /**
+ * Copies ByteBuf in the input array into a single DrillBuf
+ * @param byteBuffArray - input array of ByteBuf's
+ * @return DrillBuf - output Drillbuf with all the input bytes.
+ * @throws OutOfMemoryException
+ */
+ private DrillBuf convertToByteBuf(ByteBuf[] byteBuffArray) throws OutOfMemoryException {
+
+ if (byteBuffArray == null) {
+ return null;
+ }
+
+ int bytesToCopy = 0;
+ for (ByteBuf b : byteBuffArray) {
+ final int validBytes = b.readableBytes();
+ if (0 == validBytes) {
+ b.release();
+ } else {
+ bytesToCopy += validBytes;
+ }
+ }
+ final DrillBuf drillBuff = config.getAllocator().buffer(bytesToCopy);
+
+ for (ByteBuf b : byteBuffArray) {
+ final int validBytes = b.readableBytes();
+ drillBuff.writeBytes(b, 0, validBytes);
+ }
+
+ return drillBuff;
+ }
+
+ /**
+ * Releases all the ByteBuf inside the input ByteBuf array
+ * @param byteBuffArray - input array of ByteBuf's
+ */
+ private void releaseByteBuf(ByteBuf[] byteBuffArray) {
+ if (byteBuffArray != null) {
+ for (ByteBuf b : byteBuffArray) {
+ b.release();
+ }
+ }
+ }
+
+ /**
+ * Releases the input ByteBuf
+ * @param byteBuf - input ByteBuf
+ */
+ private void releaseByteBuf(ByteBuf byteBuf) {
+ if (byteBuf != null) {
+ byteBuf.release();
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/RemoteControlConnectionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/RemoteControlConnectionManager.java
new file mode 100644
index 000000000..6a4dc21b9
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/RemoteControlConnectionManager.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.rpc.control;
+
+import org.apache.drill.exec.proto.BitControl;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.rpc.BasicClient;
+
+public class RemoteControlConnectionManager extends ControlConnectionManager {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteControlConnectionManager.class);
+
+ private final ControlConnectionConfig config;
+ private final DrillbitEndpoint remoteEndpoint;
+
+ public RemoteControlConnectionManager(ControlConnectionConfig config, DrillbitEndpoint
+ localEndpoint, DrillbitEndpoint remoteEndpoint) {
+ super(localEndpoint, remoteEndpoint);
+ this.config = config;
+ this.remoteEndpoint = remoteEndpoint;
+ }
+
+ @Override
+ protected BasicClient<?, ControlConnection, BitControl.BitControlHandshake, ?> getNewClient() {
+ return new ControlClient(config, remoteEndpoint, new CloseHandlerCreator());
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
index 7e286fa32..7b05b81e3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.rpc.data;
+import com.google.protobuf.MessageLite;
import io.netty.buffer.ByteBuf;
import java.util.concurrent.Semaphore;
@@ -43,7 +44,6 @@ public class DataTunnel {
private ExecutionControls testControls;
private org.slf4j.Logger testLogger;
-
public DataTunnel(DataConnectionManager manager) {
this.manager = manager;
}
@@ -69,7 +69,7 @@ public class DataTunnel {
public void sendRecordBatch(RpcOutcomeListener<Ack> outcomeListener, FragmentWritableBatch batch) {
SendBatchAsyncListen b = new SendBatchAsyncListen(outcomeListener, batch);
- try{
+ try {
if (isInjectionControlSet) {
// Wait for interruption if set. Used to simulate the fragment interruption while the fragment is waiting for
// semaphore acquire. We expect the
@@ -78,9 +78,9 @@ public class DataTunnel {
sendingSemaphore.acquire();
manager.runCommand(b);
- }catch(final InterruptedException e){
+ } catch (final InterruptedException e) {
// Release the buffers first before informing the listener about the interrupt.
- for(ByteBuf buffer : batch.getBuffers()) {
+ for (ByteBuf buffer : batch.getBuffers()) {
buffer.release();
}
@@ -119,7 +119,7 @@ public class DataTunnel {
}
}
- private class SendBatchAsyncListen extends ListeningCommand<Ack, DataClientConnection> {
+ private class SendBatchAsyncListen extends ListeningCommand<Ack, DataClientConnection, RpcType, MessageLite> {
final FragmentWritableBatch batch;
public SendBatchAsyncListen(RpcOutcomeListener<Ack> listener, FragmentWritableBatch batch) {
@@ -129,7 +129,18 @@ public class DataTunnel {
@Override
public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, DataClientConnection connection) {
- connection.send(new ThrottlingOutcomeListener(outcomeListener), RpcType.REQ_RECORD_BATCH, batch.getHeader(), Ack.class, batch.getBuffers());
+ connection.send(new ThrottlingOutcomeListener(outcomeListener), getRpcType(), batch.getHeader(),
+ Ack.class, batch.getBuffers());
+ }
+
+ @Override
+ public RpcType getRpcType() {
+ return RpcType.REQ_RECORD_BATCH;
+ }
+
+ @Override
+ public MessageLite getMessage() {
+ return batch.getHeader();
}
@Override
@@ -139,7 +150,7 @@ public class DataTunnel {
@Override
public void connectionFailed(FailureType type, Throwable t) {
- for(ByteBuf buffer : batch.getBuffers()) {
+ for (ByteBuf buffer : batch.getBuffers()) {
buffer.release();
}
super.connectionFailed(type, t);