aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment
diff options
context:
space:
mode:
authorJacques Nadeau <jacques@apache.org>2015-05-14 20:07:29 -0700
committerJacques Nadeau <jacques@apache.org>2015-05-14 22:14:56 -0700
commitaaf9fb834e02b9a3483758dd6eb475cc781db866 (patch)
treeb261f2b3594d62bc09e26cffa98b00393b24f851 /exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment
parent4b0b3a67ab5e2db2baf34250bdedb174fce648ad (diff)
DRILL-3052, DRILL-3066: Improve fragment state management in face of early cancellation.
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java106
1 files changed, 56 insertions, 50 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index ffb76b10c..e5e070069 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.work.fragment;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.drill.common.DeferredException;
@@ -53,6 +54,7 @@ public class FragmentExecutor implements Runnable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentExecutor.class);
private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(FragmentExecutor.class);
+ private final AtomicBoolean hasCloseoutThread = new AtomicBoolean(false);
private final String fragmentName;
private final FragmentContext fragmentContext;
private final StatusReporter listener;
@@ -139,25 +141,10 @@ public class FragmentExecutor implements Runnable {
* so we need to be careful about the state transitions that can result.
*/
public void cancel() {
- /*
- * When cancel() is called before run(), root is not initialized and the executor is not
- * ready to accept external events. So do not wait to change the state.
- *
- * For example, consider the case when the Foreman sets up the root fragment executor which is
- * waiting on incoming data, but the Foreman fails to setup non-root fragment executors. The
- * run() method on the root executor will never be called, and the executor will never be ready
- * to accept external events. This would make the cancelling thread wait forever, if it was waiting on
- * acceptExternalEvents.
- */
- synchronized (this) {
- if (root != null) {
- acceptExternalEvents.awaitUninterruptibly();
- } else {
- // This fragment may or may not start running. If it doesn't then closeOutResources() will never be called.
- // Assuming it's safe to call closeOutResources() multiple times, we call it here explicitly in case this
- // fragment will never start running.
- closeOutResources();
- }
+ final boolean thisIsOnlyThread = this.hasCloseoutThread.compareAndSet(false, true);
+
+ if (!thisIsOnlyThread) {
+ acceptExternalEvents.awaitUninterruptibly();
/*
* We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called.
@@ -165,14 +152,33 @@ public class FragmentExecutor implements Runnable {
updateState(FragmentState.CANCELLATION_REQUESTED);
/*
- * Interrupt the thread so that it exits from any blocking operation it could be executing currently.
+ * Interrupt the thread so that it exits from any blocking operation it could be executing currently. We
+ * synchronize here to ensure we don't accidentally create a race condition where we interrupt the close out
+ * procedure of the main thread.
*/
- final Thread myThread = myThreadRef.get();
- if (myThread != null) {
- logger.debug("Interrupting fragment thread {}", myThread.getName());
- myThread.interrupt();
+ synchronized (myThreadRef) {
+ final Thread myThread = myThreadRef.get();
+ if (myThread != null) {
+ logger.debug("Interrupting fragment thread {}", myThread.getName());
+ myThread.interrupt();
+ }
}
+ } else {
+ updateState(FragmentState.CANCELLATION_REQUESTED);
+ cleanup(FragmentState.FINISHED);
}
+
+ }
+
+ private void cleanup(FragmentState state) {
+
+ closeOutResources();
+
+ updateState(state);
+ // send the final state of the fragment. only the main execution thread can send the final state and it can
+ // only be sent once.
+ sendFinalState();
+
}
/**
@@ -203,6 +209,11 @@ public class FragmentExecutor implements Runnable {
@Override
public void run() {
+ // if a cancel thread has already entered this executor, we have not reason to continue.
+ if (!hasCloseoutThread.compareAndSet(false, true)) {
+ return;
+ }
+
final Thread myThread = Thread.currentThread();
myThreadRef.set(myThread);
final String originalThreadName = myThread.getName();
@@ -216,31 +227,24 @@ public class FragmentExecutor implements Runnable {
myThread.setName(newThreadName);
- synchronized (this) {
- /*
- * fragmentState might have changed even before this method is called e.g. cancel()
- */
- if (shouldContinue()) {
- // if we didn't get the root operator when the executor was created, create it now.
- final FragmentRoot rootOperator = this.rootOperator != null ? this.rootOperator :
- drillbitContext.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
+ // if we didn't get the root operator when the executor was created, create it now.
+ final FragmentRoot rootOperator = this.rootOperator != null ? this.rootOperator :
+ drillbitContext.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
root = ImplCreator.getExec(fragmentContext, rootOperator);
if (root == null) {
return;
}
- clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener);
- updateState(FragmentState.RUNNING);
+ clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener);
+ updateState(FragmentState.RUNNING);
- acceptExternalEvents.countDown();
+ acceptExternalEvents.countDown();
- final DrillbitEndpoint endpoint = drillbitContext.getEndpoint();
- logger.debug("Starting fragment {}:{} on {}:{}",
- fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId(),
- endpoint.getAddress(), endpoint.getUserPort());
- }
- }
+ final DrillbitEndpoint endpoint = drillbitContext.getEndpoint();
+ logger.debug("Starting fragment {}:{} on {}:{}",
+ fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId(),
+ endpoint.getAddress(), endpoint.getUserPort());
final UserGroupInformation queryUserUgi = fragmentContext.isImpersonationEnabled() ?
ImpersonationUtil.createProxyUgi(fragmentContext.getQueryUserName()) :
@@ -275,21 +279,23 @@ public class FragmentExecutor implements Runnable {
fail(e);
} finally {
+ // no longer allow this thread to be interrupted. We synchronize here to make sure that cancel can't set an
+ // interruption after we have moved beyond this block.
+ synchronized (myThreadRef) {
+ myThreadRef.set(null);
+ Thread.interrupted();
+ }
+
// We need to sure we countDown at least once. We'll do it here to guarantee that.
acceptExternalEvents.countDown();
- closeOutResources();
-
- updateState(FragmentState.FINISHED);
- // send the final state of the fragment. only the main execution thread can send the final state and it can
- // only be sent once.
- sendFinalState();
+ // here we could be in FAILED, RUNNING, or CANCELLATION_REQUESTED
+ cleanup(FragmentState.FINISHED);
clusterCoordinator.removeDrillbitStatusListener(drillbitStatusListener);
myThread.setName(originalThreadName);
- myThreadRef.set(null);
}
}
@@ -404,10 +410,10 @@ public class FragmentExecutor implements Runnable {
errorStateChange(current, target);
}
- // these should never be requested.
+ // these should never be requested.
+ case CANCELLED:
case SENDING:
case AWAITING_ALLOCATION:
- case CANCELLED:
default:
errorStateChange(current, target);
}