aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/work
diff options
context:
space:
mode:
authorVlad Rozov <vrozov@apache.org>2017-11-14 16:24:01 -0800
committerParth Chandra <parthc@apache.org>2018-01-11 17:11:41 -0800
commitc5af3aefe79c34d5b76bec8ce55875decca9e617 (patch)
tree35cda928016c1922da0bc8c7f253fa74f4c88082 /exec/java-exec/src/main/java/org/apache/drill/exec/work
parentcbb79e598705138086c453d69b5706655b070259 (diff)
DRILL-5961: For long running queries (> 10 min) Drill may raise FragmentSetupException for completed/cancelled fragments
This closes #1041
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/work')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java5
3 files changed, 7 insertions, 12 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index e9358197a..d75668c2b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -325,7 +325,9 @@ public class WorkManager implements AutoCloseable {
@Override
protected void cleanup() {
runningFragments.remove(fragmentHandle);
- workBus.removeFragmentManager(fragmentHandle);
+ if (!fragmentManager.isCancelled()) {
+ workBus.removeFragmentManager(fragmentHandle, false);
+ }
indicateIfSafeToExit();
}
});
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
index 2bbaf1bda..972b56a68 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlMessageHandler.java
@@ -193,7 +193,7 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection>
// Case 2: Cancel active intermediate fragment. Such a fragment will be in the work bus. Delegate cancel to the
// work bus.
- final boolean removed = bee.getContext().getWorkBus().cancelAndRemoveFragmentManagerIfExists(handle);
+ final boolean removed = bee.getContext().getWorkBus().removeFragmentManager(handle, true);
if (removed) {
return Acks.OK;
}
@@ -217,7 +217,7 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection>
private Ack resumeFragment(final FragmentHandle handle) {
// resume a pending fragment
- final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle);
+ final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(handle);
if (manager != null) {
manager.unpause();
return Acks.OK;
@@ -237,14 +237,12 @@ public class ControlMessageHandler implements RequestHandler<ControlConnection>
private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) {
- final FragmentManager manager =
- bee.getContext().getWorkBus().getFragmentManagerIfExists(finishedReceiver.getSender());
+ final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManager(finishedReceiver.getSender());
- FragmentExecutor executor;
if (manager != null) {
manager.receivingFragmentFinished(finishedReceiver.getReceiver());
} else {
- executor = bee.getFragmentRunner(finishedReceiver.getSender());
+ final FragmentExecutor executor = bee.getFragmentRunner(finishedReceiver.getSender());
if (executor != null) {
executor.receivingFragmentFinished(finishedReceiver.getReceiver());
} else {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index 7d1585be6..17a5965da 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -57,9 +57,4 @@ public class NonRootFragmentManager extends AbstractFragmentManager {
public void receivingFragmentFinished(final FragmentHandle handle) {
fragmentExecutor.receivingFragmentFinished(handle);
}
-
- @Override
- public synchronized void cancel() {
- super.cancel();
- }
}