diff options
author | Vlad Rozov <vrozov@apache.org> | 2017-09-06 18:29:02 -0700 |
---|---|---|
committer | Paul Rogers <progers@maprtech.com> | 2017-09-16 23:28:51 -0700 |
commit | 75bd1d04b01d23fc14730d6aba20964582990fa3 (patch) | |
tree | a3dae4ab9c99deeac90c6bae7713a4392c4450ce /exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment | |
parent | 6adeb986016a769755fd5e8fc66244ee1e8d18e1 (diff) |
DRILL-3449: When Foreman node dies, the FragmentExecutor still tries to send status updates to Foreman
closes #934
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment')
2 files changed, 23 insertions, 4 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java index daa94b700..258e48577 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java @@ -300,6 +300,7 @@ public class FragmentExecutor implements Runnable { } else { statusReporter.stateChanged(outcome); } + statusReporter.close(); } @@ -444,6 +445,7 @@ public class FragmentExecutor implements Runnable { logger.warn("Foreman {} no longer active. Cancelling fragment {}.", foremanEndpoint.getAddress(), QueryIdHelper.getQueryIdentifier(fragmentContext.getHandle())); + statusReporter.close(); FragmentExecutor.this.cancel(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java index 3dd9dc5fa..e37435c24 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentStatusReporter.java @@ -27,19 +27,21 @@ import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile; import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.control.ControlTunnel; +import java.util.concurrent.atomic.AtomicReference; + /** * The status reporter is responsible for receiving changes in fragment state and propagating the status back to the * Foreman through a control tunnel. */ -public class FragmentStatusReporter { +public class FragmentStatusReporter implements AutoCloseable { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStatusReporter.class); private final FragmentContext context; - private final ControlTunnel tunnel; + private final AtomicReference<ControlTunnel> tunnel; public FragmentStatusReporter(final FragmentContext context, final ControlTunnel tunnel) { this.context = context; - this.tunnel = tunnel; + this.tunnel = new AtomicReference<>(tunnel); } /** @@ -98,7 +100,12 @@ public class FragmentStatusReporter { } private void sendStatus(final FragmentStatus status) { - tunnel.sendFragmentStatus(status); + final ControlTunnel tunnel = this.tunnel.get(); + if (tunnel != null) { + tunnel.sendFragmentStatus(status); + } else { + logger.warn("{}: State {} is not reported as {} is closed", QueryIdHelper.getQueryIdentifier(context.getHandle()), status.getProfile().getState(), this); + } } /** @@ -113,4 +120,14 @@ public class FragmentStatusReporter { sendStatus(status); } + @Override + public void close() + { + final ControlTunnel tunnel = this.tunnel.getAndSet(null); + if (tunnel != null) { + logger.debug("Closing {}", this); + } else { + logger.warn("{} was already closed", this); + } + } } |