diff options
author | Vlad Rozov <vrozov@apache.org> | 2017-11-14 16:24:01 -0800 |
---|---|---|
committer | Parth Chandra <parthc@apache.org> | 2018-01-11 17:11:41 -0800 |
commit | c5af3aefe79c34d5b76bec8ce55875decca9e617 (patch) | |
tree | 35cda928016c1922da0bc8c7f253fa74f4c88082 /exec/java-exec/src/main/java/org/apache/drill/exec/work | |
parent | cbb79e598705138086c453d69b5706655b070259 (diff) |
DRILL-5961: For long running queries (> 10 min) Drill may raise FragmentSetupException for completed/cancelled fragments
This closes #1041
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/work')
3 files changed, 7 insertions, 12 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java index e9358197a..d75668c2b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java @@ -325,7 +325,9 @@ public class WorkManager implements AutoCloseable { @Override protected void cleanup() { runningFragments.remove(fragmentHandle); - workBus.removeFragmentManager(fragmentHandle); + if (!fragmentManager.isCancelled()) { + workBus.removeFragmentManager(fragmentHandle, false); + } indicateIfSafeToExit(); } }); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java index 2bbaf1bda..972b56a68 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java @@ -193,7 +193,7 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection> // Case 2: Cancel active intermediate fragment. Such a fragment will be in the work bus. Delegate cancel to the // work bus. - final boolean removed = bee.getContext().getWorkBus().cancelAndRemoveFragmentManagerIfExists(handle); + final boolean removed = bee.getContext().getWorkBus().removeFragmentManager(handle, true); if (removed) { return Acks.OK; } @@ -217,7 +217,7 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection> private Ack resumeFragment(final FragmentHandle handle) { // resume a pending fragment - final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle); + final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(handle); if (manager != null) { manager.unpause(); return Acks.OK; @@ -237,14 +237,12 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection> private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) { - final FragmentManager manager = - bee.getContext().getWorkBus().getFragmentManagerIfExists(finishedReceiver.getSender()); + final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(finishedReceiver.getSender()); - FragmentExecutor executor; if (manager != null) { manager.receivingFragmentFinished(finishedReceiver.getReceiver()); } else { - executor = bee.getFragmentRunner(finishedReceiver.getSender()); + final FragmentExecutor executor = bee.getFragmentRunner(finishedReceiver.getSender()); if (executor != null) { executor.receivingFragmentFinished(finishedReceiver.getReceiver()); } else { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java index 7d1585be6..17a5965da 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java @@ -57,9 +57,4 @@ public class NonRootFragmentManager extends AbstractFragmentManager { public void receivingFragmentFinished(final FragmentHandle handle) { fragmentExecutor.receivingFragmentFinished(handle); } - - @Override - public synchronized void cancel() { - super.cancel(); - } } |