aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment
diff options
context:
space:
mode:
authorVlad Rozov <vrozov@apache.org>2017-09-06 18:29:02 -0700
committerPaul Rogers <progers@maprtech.com>2017-09-16 23:28:51 -0700
commit75bd1d04b01d23fc14730d6aba20964582990fa3 (patch)
treea3dae4ab9c99deeac90c6bae7713a4392c4450ce /exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment
parent6adeb986016a769755fd5e8fc66244ee1e8d18e1 (diff)
DRILL-3449: When Foreman node dies, the FragmentExecutor still tries to send status updates to Foreman
closes #934
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java25
2 files changed, 23 insertions, 4 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index daa94b700..258e48577 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -300,6 +300,7 @@ public class FragmentExecutor implements Runnable {
} else {
statusReporter.stateChanged(outcome);
}
+ statusReporter.close();
}
@@ -444,6 +445,7 @@ public class FragmentExecutor implements Runnable {
logger.warn("Foreman {} no longer active. Cancelling fragment {}.",
foremanEndpoint.getAddress(),
QueryIdHelper.getQueryIdentifier(fragmentContext.getHandle()));
+ statusReporter.close();
FragmentExecutor.this.cancel();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
index 3dd9dc5fa..e37435c24 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java
@@ -27,19 +27,21 @@ import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.control.ControlTunnel;
+import java.util.concurrent.atomic.AtomicReference;
+
/**
* The status reporter is responsible for receiving changes in fragment state and propagating the status back to the
* Foreman through a control tunnel.
*/
-public class FragmentStatusReporter {
+public class FragmentStatusReporter implements AutoCloseable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStatusReporter.class);
private final FragmentContext context;
- private final ControlTunnel tunnel;
+ private final AtomicReference<ControlTunnel> tunnel;
public FragmentStatusReporter(final FragmentContext context, final ControlTunnel tunnel) {
this.context = context;
- this.tunnel = tunnel;
+ this.tunnel = new AtomicReference<>(tunnel);
}
/**
@@ -98,7 +100,12 @@ public class FragmentStatusReporter {
}
private void sendStatus(final FragmentStatus status) {
- tunnel.sendFragmentStatus(status);
+ final ControlTunnel tunnel = this.tunnel.get();
+ if (tunnel != null) {
+ tunnel.sendFragmentStatus(status);
+ } else {
+ logger.warn("{}: State {} is not reported as {} is closed", QueryIdHelper.getQueryIdentifier(context.getHandle()), status.getProfile().getState(), this);
+ }
}
/**
@@ -113,4 +120,14 @@ public class FragmentStatusReporter {
sendStatus(status);
}
+ @Override
+ public void close()
+ {
+ final ControlTunnel tunnel = this.tunnel.getAndSet(null);
+ if (tunnel != null) {
+ logger.debug("Closing {}", this);
+ } else {
+ logger.warn("{} was already closed", this);
+ }
+ }
}