aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/work
diff options
context:
space:
mode:
authorSorabh Hamirwasia <shamirwasia@maprtech.com>2018-05-08 13:06:20 -0700
committerArina Ielchiieva <arina.yelchiyeva@gmail.com>2018-05-18 15:40:59 +0300
commit5dd8a6f60e006c2dc707f241b7619634e4e82bbd (patch)
tree2255dabe934929d5a39089d4e3a908b3c39b1b6c /exec/java-exec/src/main/java/org/apache/drill/exec/work
parent5a3a73ad098f77ad01d9366dc1a0f8f4ac539746 (diff)
DRILL-6255: Drillbit while sending control message to itself creates a connection instead of submitting locally
closes #1253
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/work')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java51
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java105
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java15
3 files changed, 63 insertions, 108 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index e562b167a..963f53a9d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -91,14 +91,16 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection>
}
case RpcType.REQ_FRAGMENT_STATUS_VALUE:
- bee.getContext().getWorkBus().statusUpdate( get(pBody, FragmentStatus.PARSER));
+ final FragmentStatus status = get(pBody, FragmentStatus.PARSER);
+ requestFragmentStatus(status);
// TODO: Support a type of message that has no response.
sender.send(ControlRpcConfig.OK);
break;
case RpcType.REQ_QUERY_CANCEL_VALUE: {
final QueryId queryId = get(pBody, QueryId.PARSER);
- if (bee.cancelForeman(queryId, null)) {
+ final Ack cancelStatus = requestQueryCancel(queryId);
+ if (cancelStatus.getOk()) {
sender.send(ControlRpcConfig.OK);
} else {
sender.send(ControlRpcConfig.FAIL);
@@ -108,21 +110,14 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection>
case RpcType.REQ_INITIALIZE_FRAGMENTS_VALUE: {
final InitializeFragments fragments = get(pBody, InitializeFragments.PARSER);
- final DrillbitContext drillbitContext = bee.getContext();
- for(int i = 0; i < fragments.getFragmentCount(); i++) {
- startNewFragment(fragments.getFragment(i), drillbitContext);
- }
+ initializeFragment(fragments);
sender.send(ControlRpcConfig.OK);
break;
}
case RpcType.REQ_QUERY_STATUS_VALUE: {
final QueryId queryId = get(pBody, QueryId.PARSER);
- final Foreman foreman = bee.getForemanForQueryId(queryId);
- if (foreman == null) {
- throw new RpcException("Query not running on node.");
- }
- final QueryProfile profile = foreman.getQueryManager().getQueryProfile();
+ final QueryProfile profile = requestQueryStatus(queryId);
sender.send(new Response(RpcType.RESP_QUERY_STATUS, profile));
break;
}
@@ -145,7 +140,7 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection>
* @param fragment
* @throws UserRpcException
*/
- private void startNewFragment(final PlanFragment fragment, final DrillbitContext drillbitContext)
+ public void startNewFragment(final PlanFragment fragment, final DrillbitContext drillbitContext)
throws UserRpcException {
logger.debug("Received remote fragment start instruction", fragment);
@@ -182,7 +177,7 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection>
/* (non-Javadoc)
* @see org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle)
*/
- private Ack cancelFragment(final FragmentHandle handle) {
+ public Ack cancelFragment(final FragmentHandle handle) {
/**
* For case 1, see {@link org.apache.drill.exec.work.foreman.QueryManager#cancelExecutingFragments}.
* In comments below, "active" refers to fragment states: SENDING, AWAITING_ALLOCATION, RUNNING and
@@ -213,7 +208,7 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection>
return Acks.OK;
}
- private Ack resumeFragment(final FragmentHandle handle) {
+ public Ack resumeFragment(final FragmentHandle handle) {
// resume a pending fragment
final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(handle);
if (manager != null) {
@@ -233,7 +228,7 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection>
return Acks.OK;
}
- private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) {
+ public Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) {
final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(finishedReceiver.getSender());
@@ -254,6 +249,32 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection>
return Acks.OK;
}
+ public Ack requestFragmentStatus(FragmentStatus status) {
+ bee.getContext().getWorkBus().statusUpdate( status);
+ return Acks.OK;
+ }
+
+ public Ack requestQueryCancel(QueryId queryId) {
+ return bee.cancelForeman(queryId, null) ? Acks.OK : Acks.FAIL;
+ }
+
+ public Ack initializeFragment(InitializeFragments fragments) throws RpcException {
+ final DrillbitContext drillbitContext = bee.getContext();
+ for (int i = 0; i < fragments.getFragmentCount(); i++) {
+ startNewFragment(fragments.getFragment(i), drillbitContext);
+ }
+
+ return Acks.OK;
+ }
+
+ public QueryProfile requestQueryStatus(QueryId queryId) throws RpcException {
+ final Foreman foreman = bee.getForemanForQueryId(queryId);
+ if (foreman == null) {
+ throw new RpcException("Query not running on node.");
+ }
+ return foreman.getQueryManager().getQueryProfile();
+ }
+
public CustomHandlerRegistry getHandlerRegistry() {
return handlerRegistry;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java
index 3d051bbdc..91b2a2a41 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java
@@ -44,11 +44,8 @@ import org.apache.drill.exec.work.EndpointListener;
import org.apache.drill.exec.work.WorkManager.WorkerBee;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.apache.drill.exec.work.fragment.FragmentStatusReporter;
-import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
import org.apache.drill.exec.work.fragment.RootFragmentManager;
-
-import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
@@ -141,7 +138,6 @@ public class FragmentsRunner {
}
}
-
/**
* Set up the non-root fragments for execution. Some may be local, and some may be remote.
* Messages are sent immediately, so they may start returning data even before we complete this.
@@ -158,17 +154,11 @@ public class FragmentsRunner {
* executed there. We need to start up the intermediate fragments first so that they will be
* ready once the leaf fragments start producing data. To satisfy both of these, we will
* make a pass through the fragments and put them into the remote maps according to their
- * leaf/intermediate state, as well as their target drillbit. Also filter the leaf/intermediate
- * fragments which are assigned to run on local Drillbit node (or Foreman node) into separate lists.
- *
- * This will help to schedule local
+ * leaf/intermediate state, as well as their target drillbit.
*/
- final Multimap<DrillbitEndpoint, PlanFragment> remoteLeafFragmentMap = ArrayListMultimap.create();
- final List<PlanFragment> localLeafFragmentList = new ArrayList<>();
- final Multimap<DrillbitEndpoint, PlanFragment> remoteIntFragmentMap = ArrayListMultimap.create();
- final List<PlanFragment> localIntFragmentList = new ArrayList<>();
+ final Multimap<DrillbitEndpoint, PlanFragment> leafFragmentMap = ArrayListMultimap.create();
+ final Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = ArrayListMultimap.create();
- final DrillbitEndpoint localDrillbitEndpoint = drillbitContext.getEndpoint();
// record all fragments for status purposes.
for (final PlanFragment planFragment : fragments) {
@@ -180,44 +170,38 @@ public class FragmentsRunner {
foreman.getQueryManager().addFragmentStatusTracker(planFragment, false);
if (planFragment.getLeafFragment()) {
- updateFragmentCollection(planFragment, localDrillbitEndpoint, localLeafFragmentList, remoteLeafFragmentMap);
+ leafFragmentMap.put(planFragment.getAssignment(), planFragment);
} else {
- updateFragmentCollection(planFragment, localDrillbitEndpoint, localIntFragmentList, remoteIntFragmentMap);
+ intFragmentMap.put(planFragment.getAssignment(), planFragment);
}
}
/*
* We need to wait for the intermediates to be sent so that they'll be set up by the time
- * the leaves start producing data. We'll use this latch to wait for the responses.
+ * the leaves start producing data. We'll use this latch to wait for the responses. All the local intermediate
+ * fragments are submitted locally without creating any actual control connection to itself.
*
* However, in order not to hang the process if any of the RPC requests fails, we always
* count down (see FragmentSubmitFailures), but we count the number of failures so that we'll
* know if any submissions did fail.
*/
- scheduleRemoteIntermediateFragments(remoteIntFragmentMap);
-
- // Setup local intermediate fragments
- for (final PlanFragment fragment : localIntFragmentList) {
- startLocalFragment(fragment);
- }
+ scheduleIntermediateFragments(intFragmentMap);
injector.injectChecked(foreman.getQueryContext().getExecutionControls(), "send-fragments", ForemanException.class);
/*
* Send the remote (leaf) fragments; we don't wait for these. Any problems will come in through
- * the regular sendListener event delivery.
+ * the regular sendListener event delivery˚. All the local leaf fragments are submitted locally without creating
+ * any actual control connection to itself.
*/
- for (final DrillbitEndpoint ep : remoteLeafFragmentMap.keySet()) {
- sendRemoteFragments(ep, remoteLeafFragmentMap.get(ep), null, null);
- }
-
- // Setup local leaf fragments
- for (final PlanFragment fragment : localLeafFragmentList) {
- startLocalFragment(fragment);
+ for (final DrillbitEndpoint ep : leafFragmentMap.keySet()) {
+ sendRemoteFragments(ep, leafFragmentMap.get(ep), null, null);
}
}
/**
- * Send all the remote fragments belonging to a single target drillbit in one request.
+ * Send all the remote fragments belonging to a single target drillbit in one request. If the assignment
+ * DrillbitEndpoint is local Drillbit then {@link Controller#getTunnel(DrillbitEndpoint)} takes care of submitting it
+ * locally without actually creating a Control Connection to itself.
*
* @param assignment the drillbit assigned to these fragments
* @param fragments the set of fragments
@@ -241,41 +225,21 @@ public class FragmentsRunner {
}
/**
- * Add planFragment into either of local fragment list or remote fragment map based on assigned Drillbit Endpoint node
- * and the local Drillbit Endpoint.
+ * Send intermediate fragment to the assigned Drillbit node. If the assignment DrillbitEndpoint is local Drillbit
+ * then {@link Controller#getTunnel(DrillbitEndpoint)} takes care of submitting it locally without actually creating
+ * a Control Connection to itself. Throws {@link UserException} in case of failure to send the fragment.
*
- * @param planFragment plan fragment
- * @param localEndPoint local endpoint
- * @param localFragmentList local fragment list
- * @param remoteFragmentMap remote fragment map
+ * @param intermediateFragmentMap - Map of Drillbit Endpoint to list of intermediate PlanFragment's
*/
- private void updateFragmentCollection(final PlanFragment planFragment, final DrillbitEndpoint localEndPoint,
- final List<PlanFragment> localFragmentList,
- final Multimap<DrillbitEndpoint, PlanFragment> remoteFragmentMap) {
- final DrillbitEndpoint assignedDrillbit = planFragment.getAssignment();
+ private void scheduleIntermediateFragments(final Multimap<DrillbitEndpoint, PlanFragment> intermediateFragmentMap) {
- if (assignedDrillbit.equals(localEndPoint)) {
- localFragmentList.add(planFragment);
- } else {
- remoteFragmentMap.put(assignedDrillbit, planFragment);
- }
- }
-
- /**
- * Send remote intermediate fragment to the assigned Drillbit node.
- * Throw exception in case of failure to send the fragment.
- *
- * @param remoteFragmentMap - Map of Drillbit Endpoint to list of PlanFragment's
- */
- private void scheduleRemoteIntermediateFragments(final Multimap<DrillbitEndpoint, PlanFragment> remoteFragmentMap) {
-
- final int numIntFragments = remoteFragmentMap.keySet().size();
+ final int numIntFragments = intermediateFragmentMap.keySet().size();
final ExtendedLatch endpointLatch = new ExtendedLatch(numIntFragments);
final FragmentSubmitFailures fragmentSubmitFailures = new FragmentSubmitFailures();
// send remote intermediate fragments
- for (final DrillbitEndpoint ep : remoteFragmentMap.keySet()) {
- sendRemoteFragments(ep, remoteFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures);
+ for (final DrillbitEndpoint ep : intermediateFragmentMap.keySet()) {
+ sendRemoteFragments(ep, intermediateFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures);
}
final long timeout = drillbitContext.getOptionManager().getLong(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT) * numIntFragments;
@@ -313,29 +277,6 @@ public class FragmentsRunner {
}
}
-
- /**
- * Start the locally assigned leaf or intermediate fragment
- *
- * @param fragment fragment
- */
- private void startLocalFragment(final PlanFragment fragment) throws ExecutionSetupException {
- logger.debug("Received local fragment start instruction", fragment);
-
- final FragmentContextImpl fragmentContext = new FragmentContextImpl(drillbitContext, fragment, drillbitContext.getFunctionImplementationRegistry());
- final FragmentStatusReporter statusReporter = new FragmentStatusReporter(fragmentContext);
- final FragmentExecutor fragmentExecutor = new FragmentExecutor(fragmentContext, fragment, statusReporter);
-
- // we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf.
- if (fragment.getLeafFragment()) {
- bee.addFragmentRunner(fragmentExecutor);
- } else {
- // isIntermediate, store for incoming data.
- final NonRootFragmentManager manager = new NonRootFragmentManager(fragment, fragmentExecutor, statusReporter);
- drillbitContext.getWorkBus().addFragmentManager(manager);
- }
- }
-
/**
* Used by {@link FragmentSubmitListener} to track the number of submission failures.
*/
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
index eb5765868..c0c5b0430 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
@@ -40,12 +40,9 @@ public class FragmentStatusReporter implements AutoCloseable {
protected final AtomicReference<DrillbitEndpoint> foremanDrillbit;
- protected final DrillbitEndpoint localDrillbit;
-
public FragmentStatusReporter(final ExecutorFragmentContext context) {
this.context = context;
this.foremanDrillbit = new AtomicReference<>(context.getForemanEndpoint());
- this.localDrillbit = context.getEndpoint();
}
/**
@@ -119,14 +116,10 @@ public class FragmentStatusReporter implements AutoCloseable {
return;
}
- if (localDrillbit.equals(foremanNode)) {
- // Update the status locally
- context.getWorkEventbus().statusUpdate(status);
- } else {
- // Send the status via Control Tunnel to remote foreman node
- final ControlTunnel tunnel = context.getController().getTunnel(foremanNode);
- tunnel.sendFragmentStatus(status);
- }
+ // Send status for both local and remote foreman node via Tunnel. For local there won't be any network connection
+ // created and it will be submitted locally using LocalControlConnectionManager
+ final ControlTunnel tunnel = context.getController().getTunnel(foremanNode);
+ tunnel.sendFragmentStatus(status);
}
/**