aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
blob: a5c4d4291a96b85736899c1ef536d1034dc5c7e7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.work.foreman;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.netty.buffer.ByteBuf;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.store.TransientStore;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.FragmentState;
import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryInfo;
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
import org.apache.drill.exec.proto.UserBitShared.QueryProfile.Builder;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.store.sys.PersistentStore;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
import org.apache.drill.exec.work.EndpointListener;

import com.carrotsearch.hppc.IntObjectHashMap;
import com.carrotsearch.hppc.predicates.IntObjectPredicate;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;

/**
 * Each Foreman holds its own QueryManager.  This manages the events associated with execution of a particular query across all fragments.
 */
public class QueryManager implements AutoCloseable {
  private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryManager.class);

  private final Map<DrillbitEndpoint, NodeTracker> nodeMap = Maps.newHashMap();
  private final QueryId queryId;
  private final String stringQueryId;
  private final RunQuery runQuery;
  private final Foreman foreman;

  /*
   * Doesn't need to be thread safe as fragmentDataMap is generated in a single thread and then
   * accessed by multiple threads for reads only.
   */
  private final IntObjectHashMap<IntObjectHashMap<FragmentData>> fragmentDataMap =
      new IntObjectHashMap<>();
  private final List<FragmentData> fragmentDataSet = Lists.newArrayList();

  private final PersistentStore<QueryProfile> completedProfileStore;
  private final TransientStore<QueryInfo> runningProfileStore;

  // the following mutable variables are used to capture ongoing query status
  private String planText;
  private long startTime = System.currentTimeMillis();
  private long endTime;
  private long planningEndTime;
  private long queueWaitEndTime;

  // How many nodes have finished their execution.  Query is complete when all nodes are complete.
  private final AtomicInteger finishedNodes = new AtomicInteger(0);

  // How many fragments have finished their execution.
  private final AtomicInteger finishedFragments = new AtomicInteger(0);

  // Is the query saved in transient store
  private boolean inTransientStore;

  /**
   * Total query cost. This value is used to place the query into a queue
   * and so has meaning to the user who wants to predict queue placement.
   */

  private double totalCost;

  private String queueName;

  public QueryManager(final QueryId queryId, final RunQuery runQuery, final PersistentStoreProvider storeProvider,
      final ClusterCoordinator coordinator, final Foreman foreman) {
    this.queryId =  queryId;
    this.runQuery = runQuery;
    this.foreman = foreman;

    stringQueryId = QueryIdHelper.getQueryId(queryId);

    this.completedProfileStore = foreman.getQueryContext().getProfileStoreContext().getCompletedProfileStore();
    this.runningProfileStore = foreman.getQueryContext().getProfileStoreContext().getRunningProfileStore();
  }

  private static boolean isTerminal(final FragmentState state) {
    return state == FragmentState.FAILED
        || state == FragmentState.FINISHED
        || state == FragmentState.CANCELLED;
  }

  private boolean updateFragmentStatus(final FragmentStatus fragmentStatus) {
    final FragmentHandle fragmentHandle = fragmentStatus.getHandle();
    final int majorFragmentId = fragmentHandle.getMajorFragmentId();
    final int minorFragmentId = fragmentHandle.getMinorFragmentId();
    final FragmentData data = fragmentDataMap.get(majorFragmentId).get(minorFragmentId);

    final FragmentState oldState = data.getState();
    final boolean inTerminalState = isTerminal(oldState);
    final FragmentState currentState = fragmentStatus.getProfile().getState();

    if (inTerminalState || (oldState == FragmentState.CANCELLATION_REQUESTED && !isTerminal(currentState))) {
      // Already in a terminal state, or invalid state transition from CANCELLATION_REQUESTED. This shouldn't happen.
      logger.warn(String.format("Received status message for fragment %s after fragment was in state %s. New state was %s",
        QueryIdHelper.getQueryIdentifier(fragmentHandle), oldState, currentState));
      return false;
    }

    data.setStatus(fragmentStatus);
    return oldState != currentState;
  }

  private void fragmentDone(final FragmentStatus status) {
    final boolean stateChanged = updateFragmentStatus(status);

    if (stateChanged) {
      // since we're in the fragment done clause and this was a change from previous
      final NodeTracker node = nodeMap.get(status.getProfile().getEndpoint());
      node.fragmentComplete();
      finishedFragments.incrementAndGet();
    }
  }

  private void addFragment(final FragmentData fragmentData) {
    final FragmentHandle fragmentHandle = fragmentData.getHandle();
    final int majorFragmentId = fragmentHandle.getMajorFragmentId();
    final int minorFragmentId = fragmentHandle.getMinorFragmentId();

    IntObjectHashMap<FragmentData> minorMap = fragmentDataMap.get(majorFragmentId);
    if (minorMap == null) {
      minorMap = new IntObjectHashMap<>();
      fragmentDataMap.put(majorFragmentId, minorMap);
    }
    minorMap.put(minorFragmentId, fragmentData);
    fragmentDataSet.add(fragmentData);
  }

  public String getFragmentStatesAsString() {
    return fragmentDataMap.toString();
  }

  void addFragmentStatusTracker(final PlanFragment fragment, final boolean isRoot) {
    final DrillbitEndpoint assignment = fragment.getAssignment();

    NodeTracker tracker = nodeMap.get(assignment);
    if (tracker == null) {
      tracker = new NodeTracker(assignment);
      nodeMap.put(assignment, tracker);
    }

    tracker.addFragment();
    addFragment(new FragmentData(fragment.getHandle(), assignment, isRoot));
  }

  /**
   * Stop all fragments with currently *known* active status (active as in SENDING, AWAITING_ALLOCATION, RUNNING).
   *
   * For the actual cancel calls for intermediate and leaf fragments, see
   * {@link org.apache.drill.exec.work.batch.ControlMessageHandler#cancelFragment}
   * (1) Root fragment: pending or running, send the cancel signal through a tunnel.
   * (2) Intermediate fragment: pending or running, send the cancel signal through a tunnel (for local and remote
   *    fragments). The actual cancel is done by delegating the cancel to the work bus.
   * (3) Leaf fragment: running, send the cancel signal through a tunnel. The cancel is done directly.
   */
  void cancelExecutingFragments(final DrillbitContext drillbitContext) {
    @SuppressWarnings("resource")
    final Controller controller = drillbitContext.getController();
    for(final FragmentData data : fragmentDataSet) {
      switch(data.getState()) {
      case SENDING:
      case AWAITING_ALLOCATION:
      case RUNNING:
        final FragmentHandle handle = data.getHandle();
        final DrillbitEndpoint endpoint = data.getEndpoint();
        // TODO is the CancelListener redundant? Does the FragmentStatusListener get notified of the same?
        controller.getTunnel(endpoint).cancelFragment(new SignalListener(endpoint, handle,
            SignalListener.Signal.CANCEL), handle);
        break;

      case FINISHED:
      case CANCELLATION_REQUESTED:
      case CANCELLED:
      case FAILED:
        // nothing to do
        break;
      }
    }
  }

  /**
   * Sends a resume signal to all fragments, regardless of their state, since the fragment might have paused before
   * sending any message. Resume all fragments through the control tunnel.
   */
  void unpauseExecutingFragments(final DrillbitContext drillbitContext) {
    @SuppressWarnings("resource")
    final Controller controller = drillbitContext.getController();
    for(final FragmentData data : fragmentDataSet) {
      final DrillbitEndpoint endpoint = data.getEndpoint();
      final FragmentHandle handle = data.getHandle();
      controller.getTunnel(endpoint).unpauseFragment(new SignalListener(endpoint, handle,
        SignalListener.Signal.UNPAUSE), handle);
    }
  }

  @Override
  public void close() throws Exception { }

  /*
     * This assumes that the FragmentStatusListener implementation takes action when it hears
     * that the target fragment has acknowledged the signal. As a result, this listener doesn't do anything
     * but log messages.
     */
  private static class SignalListener extends EndpointListener<Ack, FragmentHandle> {
    /**
     * An enum of possible signals that {@link SignalListener} listens to.
     */
    public static enum Signal { CANCEL, UNPAUSE }

    private final Signal signal;

    public SignalListener(final DrillbitEndpoint endpoint, final FragmentHandle handle, final Signal signal) {
      super(endpoint, handle);
      this.signal = signal;
    }

    @Override
    public void failed(final RpcException ex) {
      logger.error("Failure while attempting to {} fragment {} on endpoint {} with {}.", signal, value, endpoint, ex);
    }

    @Override
    public void success(final Ack ack, final ByteBuf buf) {
      if (!ack.getOk()) {
        logger.warn("Remote node {} responded negative on {} request for fragment {} with {}.", endpoint, signal, value,
          ack);
      }
    }

    @Override
    public void interrupted(final InterruptedException ex) {
      logger.error("Interrupted while waiting for RPC outcome of action fragment {}. " +
          "Endpoint {}, Fragment handle {}", signal, endpoint, value, ex);
    }
  }

  void updateEphemeralState(final QueryState queryState) {
    // If query is already in zk transient store, ignore the transient state update option.
    // Else, they will not be removed from transient store upon completion.
    if (!inTransientStore && !foreman.getQueryContext().getOptions().getOption(ExecConstants.QUERY_TRANSIENT_STATE_UPDATE)) {
      return;
    }

    switch (queryState) {
      case PREPARING:
      case PLANNING:
      case ENQUEUED:
      case STARTING:
      case RUNNING:
      case CANCELLATION_REQUESTED:
        runningProfileStore.put(stringQueryId, getQueryInfo());  // store as ephemeral query profile.
        inTransientStore = true;
        break;
      case COMPLETED:
      case CANCELED:
      case FAILED:
        try {
          runningProfileStore.remove(stringQueryId);
          inTransientStore = false;
        } catch (final Exception e) {
          logger.warn("Failure while trying to delete the stored profile for the query [{}]", stringQueryId, e);
        }
        break;

      default:
        throw new IllegalStateException("unrecognized queryState " + queryState);
    }
  }

  void writeFinalProfile(UserException ex) {
    try {
      // TODO(DRILL-2362) when do these ever get deleted?
      completedProfileStore.put(stringQueryId, getQueryProfile(ex));
    } catch (Exception e) {
      logger.error("Failure while storing Query Profile", e);
    }
  }

  private QueryInfo getQueryInfo() {
    final String queryText = foreman.getQueryText();
    QueryInfo.Builder queryInfoBuilder = QueryInfo.newBuilder()
        .setState(foreman.getState())
        .setUser(foreman.getQueryContext().getQueryUserName())
        .setForeman(foreman.getQueryContext().getCurrentEndpoint())
        .setStart(startTime)
        .setTotalCost(totalCost)
        .setQueueName(queueName == null ? "-" : queueName)
        .setOptionsJson(getQueryOptionsAsJson());

    if (queryText != null) {
      queryInfoBuilder.setQuery(queryText);
    }

    return queryInfoBuilder.build();
  }

  public QueryProfile getQueryProfile() {
    return getQueryProfile(null);
  }

  private QueryProfile getQueryProfile(UserException ex) {
    final QueryProfile.Builder profileBuilder = QueryProfile.newBuilder()
        .setUser(foreman.getQueryContext().getQueryUserName())
        .setType(runQuery.getType())
        .setId(queryId)
        .setQueryId(QueryIdHelper.getQueryId(queryId))
        .setState(foreman.getState())
        .setForeman(foreman.getQueryContext().getCurrentEndpoint())
        .setStart(startTime)
        .setEnd(endTime)
        .setPlanEnd(planningEndTime)
        .setQueueWaitEnd(queueWaitEndTime)
        .setTotalFragments(fragmentDataSet.size())
        .setFinishedFragments(finishedFragments.get())
        .setTotalCost(totalCost)
        .setQueueName(queueName == null ? "-" : queueName)
        .setOptionsJson(getQueryOptionsAsJson());

    if (ex != null) {
      profileBuilder.setError(ex.getMessage(false));
      profileBuilder.setVerboseError(ex.getVerboseMessage(false));
      profileBuilder.setErrorId(ex.getErrorId());
      if (ex.getErrorLocation() != null) {
        profileBuilder.setErrorNode(ex.getErrorLocation());
      }
    }

    if (planText != null) {
      profileBuilder.setPlan(planText);
    }

    final String queryText = foreman.getQueryText();
    if (queryText != null) {
      profileBuilder.setQuery(queryText);
    }

    fragmentDataMap.forEach(new OuterIter(profileBuilder));

    return profileBuilder.build();
  }

  private String getQueryOptionsAsJson() {
    try {
      OptionList optionList = foreman.getQueryContext().getOptions().getOptionList();
      return foreman.getQueryContext().getLpPersistence().getMapper().writeValueAsString(optionList);
    } catch (JsonProcessingException e) {
      throw new DrillRuntimeException("Error while trying to convert option list to json string", e);
    }
  }

  private class OuterIter implements IntObjectPredicate<IntObjectHashMap<FragmentData>> {
    private final QueryProfile.Builder profileBuilder;

    public OuterIter(Builder profileBuilder) {
      this.profileBuilder = profileBuilder;
    }

    @Override
    public boolean apply(final int majorFragmentId, final IntObjectHashMap<FragmentData> minorMap) {
      final MajorFragmentProfile.Builder builder = MajorFragmentProfile.newBuilder().setMajorFragmentId(majorFragmentId);
      minorMap.forEach(new InnerIter(builder));
      profileBuilder.addFragmentProfile(builder);
      return true;
    }
  }

  private class InnerIter implements IntObjectPredicate<FragmentData> {
    private final MajorFragmentProfile.Builder builder;

    public InnerIter(MajorFragmentProfile.Builder fb) {
      this.builder = fb;
    }

    @Override
    public boolean apply(int key, FragmentData data) {
      builder.addMinorFragmentProfile(data.getProfile());
      return true;
    }
  }

  void setPlanText(final String planText) {
    this.planText = planText;
  }

  void markStartTime() {
    startTime = System.currentTimeMillis();
  }

  void markEndTime() {
    endTime = System.currentTimeMillis();
  }

  void markPlanningEndTime() {
    planningEndTime = System.currentTimeMillis();
  }

  void markQueueWaitEndTime() {
    queueWaitEndTime = System.currentTimeMillis();
  }

  public void setTotalCost(double totalCost) {
    this.totalCost = totalCost;
  }

  public void setQueueName(String queueName) {
    this.queueName = queueName;
  }

  /**
   * Internal class used to track the number of pending completion messages required from particular node. This allows
   * to know for each node that is part of this query, what portion of fragments are still outstanding. In the case that
   * there is a node failure, we can then correctly track how many outstanding messages will never arrive.
   */
  private class NodeTracker {
    private final DrillbitEndpoint endpoint;
    private final AtomicInteger totalFragments = new AtomicInteger(0);
    private final AtomicInteger completedFragments = new AtomicInteger(0);

    public NodeTracker(final DrillbitEndpoint endpoint) {
      this.endpoint = endpoint;
    }

    /**
     * Increments the number of fragment this node is running.
     */
    public void addFragment() {
      totalFragments.incrementAndGet();
    }

    /**
     * Increments the number of fragments completed on this node.  Once the number of fragments completed
     * equals the number of fragments running, this will be marked as a finished node and result in the finishedNodes being incremented.
     *
     * If the number of remaining nodes has been decremented to zero, this will allow the query to move to a completed state.
     */
    public void fragmentComplete() {
      if (totalFragments.get() == completedFragments.incrementAndGet()) {
        nodeComplete();
      }
    }

    /**
     * Increments the number of fragments completed on this node until we mark this node complete. Note that this uses
     * the internal fragmentComplete() method so whether we have failure or success, the nodeComplete event will only
     * occur once. (Two threads could be decrementing the fragment at the same time since this will likely come from an
     * external event).
     *
     * @return true if the node has fragments that are pending (non-terminal state); false if all fragments running on
     * this node have already terminated.
     */
    public boolean nodeDead() {
      if (completedFragments.get() == totalFragments.get()) {
        return false;
      }
      while (completedFragments.get() < totalFragments.get()) {
        fragmentComplete();
      }
      return true;
    }

  }

  /**
   * Increments the number of currently complete nodes and returns the number of completed nodes. If the there are no
   * more pending nodes, moves the query to a terminal state.
   */
  private void nodeComplete() {
    final int finishedNodes = this.finishedNodes.incrementAndGet();
    final int totalNodes = nodeMap.size();
    Preconditions.checkArgument(finishedNodes <= totalNodes, "The finished node count exceeds the total node count");
    final int remaining = totalNodes - finishedNodes;
    if (remaining == 0) {
      // this target state may be adjusted in moveToState() based on current FAILURE/CANCELLATION_REQUESTED status
      foreman.addToEventQueue(QueryState.COMPLETED, null);
    } else {
      logger.debug("Foreman is still waiting for completion message from {} nodes containing {} fragments", remaining,
          this.fragmentDataSet.size() - finishedFragments.get());
    }
  }

  public FragmentStatusListener getFragmentStatusListener(){
    return fragmentStatusListener;
  }

  private final FragmentStatusListener fragmentStatusListener = new FragmentStatusListener() {
    @Override
    public void statusUpdate(final FragmentStatus status) {
      logger.debug("New fragment status was provided to QueryManager of {}", status);
      switch(status.getProfile().getState()) {
      case AWAITING_ALLOCATION:
      case RUNNING:
      case CANCELLATION_REQUESTED:
        updateFragmentStatus(status);
        break;

      case FAILED:
        foreman.addToEventQueue(QueryState.FAILED, new UserRemoteException(status.getProfile().getError()));
        // fall-through.
      case FINISHED:
      case CANCELLED:
        fragmentDone(status);
        break;

      default:
        throw new UnsupportedOperationException(String.format("Received status of %s", status));
      }
    }
  };


  public DrillbitStatusListener getDrillbitStatusListener() {
    return drillbitStatusListener;
  }

  private final DrillbitStatusListener drillbitStatusListener = new DrillbitStatusListener() {

    @Override
    public void drillbitRegistered(final Set<DrillbitEndpoint> registeredDrillbits) {
    }

    @Override
    public void drillbitUnregistered(final Set<DrillbitEndpoint> unregisteredDrillbits) {
      final StringBuilder failedNodeList = new StringBuilder();
      boolean atLeastOneFailure = false;

      for (final DrillbitEndpoint ep : unregisteredDrillbits) {
        final NodeTracker tracker = nodeMap.get(ep);
        if (tracker == null) {
          continue; // fragments were not assigned to this Drillbit
        }

        // mark node as dead.
        if (!tracker.nodeDead()) {
          continue; // fragments assigned to this Drillbit completed
        }

        // fragments were running on the Drillbit, capture node name for exception or logging message
        if (atLeastOneFailure) {
          failedNodeList.append(", ");
        } else {
          atLeastOneFailure = true;
        }
        failedNodeList.append(ep.getAddress());
        failedNodeList.append(":");
        failedNodeList.append(ep.getUserPort());
      }

      if (atLeastOneFailure) {
        logger.warn("Drillbits [{}] no longer registered in cluster.  Canceling query {}",
            failedNodeList, QueryIdHelper.getQueryId(queryId));
        foreman.addToEventQueue(QueryState.FAILED,
            new ForemanException(String.format("One more more nodes lost connectivity during query.  Identified nodes were [%s].",
                failedNodeList)));
      }

    }
  };
}