diff options
author | Jacques Nadeau <jacques@apache.org> | 2015-05-14 20:07:29 -0700 |
---|---|---|
committer | Jacques Nadeau <jacques@apache.org> | 2015-05-14 22:14:56 -0700 |
commit | aaf9fb834e02b9a3483758dd6eb475cc781db866 (patch) | |
tree | b261f2b3594d62bc09e26cffa98b00393b24f851 /exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment | |
parent | 4b0b3a67ab5e2db2baf34250bdedb174fce648ad (diff) |
DRILL-3052, DRILL-3066: Improve fragment state management in face of early cancellation.
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java | 106 |
1 files changed, 56 insertions, 50 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java index ffb76b10c..e5e070069 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.work.fragment; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.drill.common.DeferredException; @@ -53,6 +54,7 @@ public class FragmentExecutor implements Runnable { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentExecutor.class); private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(FragmentExecutor.class); + private final AtomicBoolean hasCloseoutThread = new AtomicBoolean(false); private final String fragmentName; private final FragmentContext fragmentContext; private final StatusReporter listener; @@ -139,25 +141,10 @@ public class FragmentExecutor implements Runnable { * so we need to be careful about the state transitions that can result. */ public void cancel() { - /* - * When cancel() is called before run(), root is not initialized and the executor is not - * ready to accept external events. So do not wait to change the state. - * - * For example, consider the case when the Foreman sets up the root fragment executor which is - * waiting on incoming data, but the Foreman fails to setup non-root fragment executors. The - * run() method on the root executor will never be called, and the executor will never be ready - * to accept external events. This would make the cancelling thread wait forever, if it was waiting on - * acceptExternalEvents. - */ - synchronized (this) { - if (root != null) { - acceptExternalEvents.awaitUninterruptibly(); - } else { - // This fragment may or may not start running. If it doesn't then closeOutResources() will never be called. - // Assuming it's safe to call closeOutResources() multiple times, we call it here explicitly in case this - // fragment will never start running. - closeOutResources(); - } + final boolean thisIsOnlyThread = this.hasCloseoutThread.compareAndSet(false, true); + + if (!thisIsOnlyThread) { + acceptExternalEvents.awaitUninterruptibly(); /* * We set the cancel requested flag but the actual cancellation is managed by the run() loop, if called. @@ -165,14 +152,33 @@ public class FragmentExecutor implements Runnable { updateState(FragmentState.CANCELLATION_REQUESTED); /* - * Interrupt the thread so that it exits from any blocking operation it could be executing currently. + * Interrupt the thread so that it exits from any blocking operation it could be executing currently. We + * synchronize here to ensure we don't accidentally create a race condition where we interrupt the close out + * procedure of the main thread. */ - final Thread myThread = myThreadRef.get(); - if (myThread != null) { - logger.debug("Interrupting fragment thread {}", myThread.getName()); - myThread.interrupt(); + synchronized (myThreadRef) { + final Thread myThread = myThreadRef.get(); + if (myThread != null) { + logger.debug("Interrupting fragment thread {}", myThread.getName()); + myThread.interrupt(); + } } + } else { + updateState(FragmentState.CANCELLATION_REQUESTED); + cleanup(FragmentState.FINISHED); } + + } + + private void cleanup(FragmentState state) { + + closeOutResources(); + + updateState(state); + // send the final state of the fragment. only the main execution thread can send the final state and it can + // only be sent once. + sendFinalState(); + } /** @@ -203,6 +209,11 @@ public class FragmentExecutor implements Runnable { @Override public void run() { + // if a cancel thread has already entered this executor, we have not reason to continue. + if (!hasCloseoutThread.compareAndSet(false, true)) { + return; + } + final Thread myThread = Thread.currentThread(); myThreadRef.set(myThread); final String originalThreadName = myThread.getName(); @@ -216,31 +227,24 @@ public class FragmentExecutor implements Runnable { myThread.setName(newThreadName); - synchronized (this) { - /* - * fragmentState might have changed even before this method is called e.g. cancel() - */ - if (shouldContinue()) { - // if we didn't get the root operator when the executor was created, create it now. - final FragmentRoot rootOperator = this.rootOperator != null ? this.rootOperator : - drillbitContext.getPlanReader().readFragmentOperator(fragment.getFragmentJson()); + // if we didn't get the root operator when the executor was created, create it now. + final FragmentRoot rootOperator = this.rootOperator != null ? this.rootOperator : + drillbitContext.getPlanReader().readFragmentOperator(fragment.getFragmentJson()); root = ImplCreator.getExec(fragmentContext, rootOperator); if (root == null) { return; } - clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener); - updateState(FragmentState.RUNNING); + clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener); + updateState(FragmentState.RUNNING); - acceptExternalEvents.countDown(); + acceptExternalEvents.countDown(); - final DrillbitEndpoint endpoint = drillbitContext.getEndpoint(); - logger.debug("Starting fragment {}:{} on {}:{}", - fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId(), - endpoint.getAddress(), endpoint.getUserPort()); - } - } + final DrillbitEndpoint endpoint = drillbitContext.getEndpoint(); + logger.debug("Starting fragment {}:{} on {}:{}", + fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId(), + endpoint.getAddress(), endpoint.getUserPort()); final UserGroupInformation queryUserUgi = fragmentContext.isImpersonationEnabled() ? ImpersonationUtil.createProxyUgi(fragmentContext.getQueryUserName()) : @@ -275,21 +279,23 @@ public class FragmentExecutor implements Runnable { fail(e); } finally { + // no longer allow this thread to be interrupted. We synchronize here to make sure that cancel can't set an + // interruption after we have moved beyond this block. + synchronized (myThreadRef) { + myThreadRef.set(null); + Thread.interrupted(); + } + // We need to sure we countDown at least once. We'll do it here to guarantee that. acceptExternalEvents.countDown(); - closeOutResources(); - - updateState(FragmentState.FINISHED); - // send the final state of the fragment. only the main execution thread can send the final state and it can - // only be sent once. - sendFinalState(); + // here we could be in FAILED, RUNNING, or CANCELLATION_REQUESTED + cleanup(FragmentState.FINISHED); clusterCoordinator.removeDrillbitStatusListener(drillbitStatusListener); myThread.setName(originalThreadName); - myThreadRef.set(null); } } @@ -404,10 +410,10 @@ public class FragmentExecutor implements Runnable { errorStateChange(current, target); } - // these should never be requested. + // these should never be requested. + case CANCELLED: case SENDING: case AWAITING_ALLOCATION: - case CANCELLED: default: errorStateChange(current, target); } |