diff options
author | Sorabh Hamirwasia <shamirwasia@maprtech.com> | 2018-05-08 13:06:20 -0700 |
---|---|---|
committer | Arina Ielchiieva <arina.yelchiyeva@gmail.com> | 2018-05-18 15:40:59 +0300 |
commit | 5dd8a6f60e006c2dc707f241b7619634e4e82bbd (patch) | |
tree | 2255dabe934929d5a39089d4e3a908b3c39b1b6c /exec/java-exec/src/main/java/org/apache/drill/exec/work | |
parent | 5a3a73ad098f77ad01d9366dc1a0f8f4ac539746 (diff) |
DRILL-6255: Drillbit while sending control message to itself creates a connection instead of submitting locally
closes #1253
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/work')
3 files changed, 63 insertions, 108 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java index e562b167a..963f53a9d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java @@ -91,14 +91,16 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection> } case RpcType.REQ_FRAGMENT_STATUS_VALUE: - bee.getContext().getWorkBus().statusUpdate( get(pBody, FragmentStatus.PARSER)); + final FragmentStatus status = get(pBody, FragmentStatus.PARSER); + requestFragmentStatus(status); // TODO: Support a type of message that has no response. sender.send(ControlRpcConfig.OK); break; case RpcType.REQ_QUERY_CANCEL_VALUE: { final QueryId queryId = get(pBody, QueryId.PARSER); - if (bee.cancelForeman(queryId, null)) { + final Ack cancelStatus = requestQueryCancel(queryId); + if (cancelStatus.getOk()) { sender.send(ControlRpcConfig.OK); } else { sender.send(ControlRpcConfig.FAIL); @@ -108,21 +110,14 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection> case RpcType.REQ_INITIALIZE_FRAGMENTS_VALUE: { final InitializeFragments fragments = get(pBody, InitializeFragments.PARSER); - final DrillbitContext drillbitContext = bee.getContext(); - for(int i = 0; i < fragments.getFragmentCount(); i++) { - startNewFragment(fragments.getFragment(i), drillbitContext); - } + initializeFragment(fragments); sender.send(ControlRpcConfig.OK); break; } case RpcType.REQ_QUERY_STATUS_VALUE: { final QueryId queryId = get(pBody, QueryId.PARSER); - final Foreman foreman = bee.getForemanForQueryId(queryId); - if (foreman == null) { - throw new RpcException("Query not running on node."); - } - final QueryProfile profile = foreman.getQueryManager().getQueryProfile(); + final QueryProfile profile = requestQueryStatus(queryId); sender.send(new Response(RpcType.RESP_QUERY_STATUS, profile)); break; } @@ -145,7 +140,7 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection> * @param fragment * @throws UserRpcException */ - private void startNewFragment(final PlanFragment fragment, final DrillbitContext drillbitContext) + public void startNewFragment(final PlanFragment fragment, final DrillbitContext drillbitContext) throws UserRpcException { logger.debug("Received remote fragment start instruction", fragment); @@ -182,7 +177,7 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection> /* (non-Javadoc) * @see org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle) */ - private Ack cancelFragment(final FragmentHandle handle) { + public Ack cancelFragment(final FragmentHandle handle) { /** * For case 1, see {@link org.apache.drill.exec.work.foreman.QueryManager#cancelExecutingFragments}. * In comments below, "active" refers to fragment states: SENDING, AWAITING_ALLOCATION, RUNNING and @@ -213,7 +208,7 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection> return Acks.OK; } - private Ack resumeFragment(final FragmentHandle handle) { + public Ack resumeFragment(final FragmentHandle handle) { // resume a pending fragment final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(handle); if (manager != null) { @@ -233,7 +228,7 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection> return Acks.OK; } - private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) { + public Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) { final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(finishedReceiver.getSender()); @@ -254,6 +249,32 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection> return Acks.OK; } + public Ack requestFragmentStatus(FragmentStatus status) { + bee.getContext().getWorkBus().statusUpdate( status); + return Acks.OK; + } + + public Ack requestQueryCancel(QueryId queryId) { + return bee.cancelForeman(queryId, null) ? Acks.OK : Acks.FAIL; + } + + public Ack initializeFragment(InitializeFragments fragments) throws RpcException { + final DrillbitContext drillbitContext = bee.getContext(); + for (int i = 0; i < fragments.getFragmentCount(); i++) { + startNewFragment(fragments.getFragment(i), drillbitContext); + } + + return Acks.OK; + } + + public QueryProfile requestQueryStatus(QueryId queryId) throws RpcException { + final Foreman foreman = bee.getForemanForQueryId(queryId); + if (foreman == null) { + throw new RpcException("Query not running on node."); + } + return foreman.getQueryManager().getQueryProfile(); + } + public CustomHandlerRegistry getHandlerRegistry() { return handlerRegistry; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java index 3d051bbdc..91b2a2a41 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentsRunner.java @@ -44,11 +44,8 @@ import org.apache.drill.exec.work.EndpointListener; import org.apache.drill.exec.work.WorkManager.WorkerBee; import org.apache.drill.exec.work.fragment.FragmentExecutor; import org.apache.drill.exec.work.fragment.FragmentStatusReporter; -import org.apache.drill.exec.work.fragment.NonRootFragmentManager; import org.apache.drill.exec.work.fragment.RootFragmentManager; - -import java.util.ArrayList; import java.util.Collection; import java.util.LinkedList; import java.util.List; @@ -141,7 +138,6 @@ public class FragmentsRunner { } } - /** * Set up the non-root fragments for execution. Some may be local, and some may be remote. * Messages are sent immediately, so they may start returning data even before we complete this. @@ -158,17 +154,11 @@ public class FragmentsRunner { * executed there. We need to start up the intermediate fragments first so that they will be * ready once the leaf fragments start producing data. To satisfy both of these, we will * make a pass through the fragments and put them into the remote maps according to their - * leaf/intermediate state, as well as their target drillbit. Also filter the leaf/intermediate - * fragments which are assigned to run on local Drillbit node (or Foreman node) into separate lists. - * - * This will help to schedule local + * leaf/intermediate state, as well as their target drillbit. */ - final Multimap<DrillbitEndpoint, PlanFragment> remoteLeafFragmentMap = ArrayListMultimap.create(); - final List<PlanFragment> localLeafFragmentList = new ArrayList<>(); - final Multimap<DrillbitEndpoint, PlanFragment> remoteIntFragmentMap = ArrayListMultimap.create(); - final List<PlanFragment> localIntFragmentList = new ArrayList<>(); + final Multimap<DrillbitEndpoint, PlanFragment> leafFragmentMap = ArrayListMultimap.create(); + final Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = ArrayListMultimap.create(); - final DrillbitEndpoint localDrillbitEndpoint = drillbitContext.getEndpoint(); // record all fragments for status purposes. for (final PlanFragment planFragment : fragments) { @@ -180,44 +170,38 @@ public class FragmentsRunner { foreman.getQueryManager().addFragmentStatusTracker(planFragment, false); if (planFragment.getLeafFragment()) { - updateFragmentCollection(planFragment, localDrillbitEndpoint, localLeafFragmentList, remoteLeafFragmentMap); + leafFragmentMap.put(planFragment.getAssignment(), planFragment); } else { - updateFragmentCollection(planFragment, localDrillbitEndpoint, localIntFragmentList, remoteIntFragmentMap); + intFragmentMap.put(planFragment.getAssignment(), planFragment); } } /* * We need to wait for the intermediates to be sent so that they'll be set up by the time - * the leaves start producing data. We'll use this latch to wait for the responses. + * the leaves start producing data. We'll use this latch to wait for the responses. All the local intermediate + * fragments are submitted locally without creating any actual control connection to itself. * * However, in order not to hang the process if any of the RPC requests fails, we always * count down (see FragmentSubmitFailures), but we count the number of failures so that we'll * know if any submissions did fail. */ - scheduleRemoteIntermediateFragments(remoteIntFragmentMap); - - // Setup local intermediate fragments - for (final PlanFragment fragment : localIntFragmentList) { - startLocalFragment(fragment); - } + scheduleIntermediateFragments(intFragmentMap); injector.injectChecked(foreman.getQueryContext().getExecutionControls(), "send-fragments", ForemanException.class); /* * Send the remote (leaf) fragments; we don't wait for these. Any problems will come in through - * the regular sendListener event delivery. + * the regular sendListener event delivery˚. All the local leaf fragments are submitted locally without creating + * any actual control connection to itself. */ - for (final DrillbitEndpoint ep : remoteLeafFragmentMap.keySet()) { - sendRemoteFragments(ep, remoteLeafFragmentMap.get(ep), null, null); - } - - // Setup local leaf fragments - for (final PlanFragment fragment : localLeafFragmentList) { - startLocalFragment(fragment); + for (final DrillbitEndpoint ep : leafFragmentMap.keySet()) { + sendRemoteFragments(ep, leafFragmentMap.get(ep), null, null); } } /** - * Send all the remote fragments belonging to a single target drillbit in one request. + * Send all the remote fragments belonging to a single target drillbit in one request. If the assignment + * DrillbitEndpoint is local Drillbit then {@link Controller#getTunnel(DrillbitEndpoint)} takes care of submitting it + * locally without actually creating a Control Connection to itself. * * @param assignment the drillbit assigned to these fragments * @param fragments the set of fragments @@ -241,41 +225,21 @@ public class FragmentsRunner { } /** - * Add planFragment into either of local fragment list or remote fragment map based on assigned Drillbit Endpoint node - * and the local Drillbit Endpoint. + * Send intermediate fragment to the assigned Drillbit node. If the assignment DrillbitEndpoint is local Drillbit + * then {@link Controller#getTunnel(DrillbitEndpoint)} takes care of submitting it locally without actually creating + * a Control Connection to itself. Throws {@link UserException} in case of failure to send the fragment. * - * @param planFragment plan fragment - * @param localEndPoint local endpoint - * @param localFragmentList local fragment list - * @param remoteFragmentMap remote fragment map + * @param intermediateFragmentMap - Map of Drillbit Endpoint to list of intermediate PlanFragment's */ - private void updateFragmentCollection(final PlanFragment planFragment, final DrillbitEndpoint localEndPoint, - final List<PlanFragment> localFragmentList, - final Multimap<DrillbitEndpoint, PlanFragment> remoteFragmentMap) { - final DrillbitEndpoint assignedDrillbit = planFragment.getAssignment(); + private void scheduleIntermediateFragments(final Multimap<DrillbitEndpoint, PlanFragment> intermediateFragmentMap) { - if (assignedDrillbit.equals(localEndPoint)) { - localFragmentList.add(planFragment); - } else { - remoteFragmentMap.put(assignedDrillbit, planFragment); - } - } - - /** - * Send remote intermediate fragment to the assigned Drillbit node. - * Throw exception in case of failure to send the fragment. - * - * @param remoteFragmentMap - Map of Drillbit Endpoint to list of PlanFragment's - */ - private void scheduleRemoteIntermediateFragments(final Multimap<DrillbitEndpoint, PlanFragment> remoteFragmentMap) { - - final int numIntFragments = remoteFragmentMap.keySet().size(); + final int numIntFragments = intermediateFragmentMap.keySet().size(); final ExtendedLatch endpointLatch = new ExtendedLatch(numIntFragments); final FragmentSubmitFailures fragmentSubmitFailures = new FragmentSubmitFailures(); // send remote intermediate fragments - for (final DrillbitEndpoint ep : remoteFragmentMap.keySet()) { - sendRemoteFragments(ep, remoteFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures); + for (final DrillbitEndpoint ep : intermediateFragmentMap.keySet()) { + sendRemoteFragments(ep, intermediateFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures); } final long timeout = drillbitContext.getOptionManager().getLong(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT) * numIntFragments; @@ -313,29 +277,6 @@ public class FragmentsRunner { } } - - /** - * Start the locally assigned leaf or intermediate fragment - * - * @param fragment fragment - */ - private void startLocalFragment(final PlanFragment fragment) throws ExecutionSetupException { - logger.debug("Received local fragment start instruction", fragment); - - final FragmentContextImpl fragmentContext = new FragmentContextImpl(drillbitContext, fragment, drillbitContext.getFunctionImplementationRegistry()); - final FragmentStatusReporter statusReporter = new FragmentStatusReporter(fragmentContext); - final FragmentExecutor fragmentExecutor = new FragmentExecutor(fragmentContext, fragment, statusReporter); - - // we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf. - if (fragment.getLeafFragment()) { - bee.addFragmentRunner(fragmentExecutor); - } else { - // isIntermediate, store for incoming data. - final NonRootFragmentManager manager = new NonRootFragmentManager(fragment, fragmentExecutor, statusReporter); - drillbitContext.getWorkBus().addFragmentManager(manager); - } - } - /** * Used by {@link FragmentSubmitListener} to track the number of submission failures. */ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java index eb5765868..c0c5b0430 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java @@ -40,12 +40,9 @@ public class FragmentStatusReporter implements AutoCloseable { protected final AtomicReference<DrillbitEndpoint> foremanDrillbit; - protected final DrillbitEndpoint localDrillbit; - public FragmentStatusReporter(final ExecutorFragmentContext context) { this.context = context; this.foremanDrillbit = new AtomicReference<>(context.getForemanEndpoint()); - this.localDrillbit = context.getEndpoint(); } /** @@ -119,14 +116,10 @@ public class FragmentStatusReporter implements AutoCloseable { return; } - if (localDrillbit.equals(foremanNode)) { - // Update the status locally - context.getWorkEventbus().statusUpdate(status); - } else { - // Send the status via Control Tunnel to remote foreman node - final ControlTunnel tunnel = context.getController().getTunnel(foremanNode); - tunnel.sendFragmentStatus(status); - } + // Send status for both local and remote foreman node via Tunnel. For local there won't be any network connection + // created and it will be submitted locally using LocalControlConnectionManager + final ControlTunnel tunnel = context.getController().getTunnel(foremanNode); + tunnel.sendFragmentStatus(status); } /** |