diff options
author | Steven Phillips <sphillips@maprtech.com> | 2014-07-22 18:11:44 -0700 |
---|---|---|
committer | Steven Phillips <sphillips@maprtech.com> | 2014-07-25 18:33:49 -0700 |
commit | 5e482c17d20bcc957be50d570d03f1a5fdfca75e (patch) | |
tree | ad92388fb0380c9a86381ee9491877bf35c6afbc /exec/java-exec/src/main | |
parent | 1e9930fbae8279afe3eed9e7f18392fb1a08688a (diff) |
DRILL-939: Add support for query cancellation
Diffstat (limited to 'exec/java-exec/src/main')
26 files changed, 284 insertions, 112 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java index 6690bf597..36c162abe 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java @@ -43,6 +43,7 @@ import org.apache.drill.exec.proto.UserProtos; import org.apache.drill.exec.proto.UserProtos.Property; import org.apache.drill.exec.proto.UserProtos.RpcType; import org.apache.drill.exec.proto.UserProtos.UserProperties; +import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection; import org.apache.drill.exec.rpc.ChannelClosedException; import org.apache.drill.exec.rpc.DrillRpcFuture; @@ -239,8 +240,9 @@ public class DrillClient implements Closeable, ConnectionThrottle{ return listener.getResults(); } - public void cancelQuery(QueryId id){ - client.send(RpcType.CANCEL_QUERY, id, Ack.class); + public DrillRpcFuture<Ack> cancelQuery(QueryId id){ + logger.debug("Cancelling query {}", QueryIdHelper.getQueryId(id)); + return client.send(RpcType.CANCEL_QUERY, id, Ack.class); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java index a5b65ecc0..3302e7cc6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java @@ -41,6 +41,7 @@ public class PrintingResultsListener implements UserResultsListener { int columnWidth; BufferAllocator allocator; volatile Exception exception; + QueryId queryId; public PrintingResultsListener(DrillConfig config, Format format, int columnWidth) { this.allocator = new TopLevelAllocator(config); @@ -97,7 +98,12 @@ public class PrintingResultsListener implements UserResultsListener { return count.get(); } + public QueryId getQueryId() { + return queryId; + } + @Override public void queryIdArrived(QueryId queryId) { + this.queryId = queryId; } }
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java index c42691894..3401bc767 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java @@ -75,6 +75,7 @@ public class FragmentContext implements Closeable { private volatile Throwable failureCause; private volatile boolean failed = false; + private volatile boolean cancelled = false; public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection, FunctionImplementationRegistry funcRegistry) throws OutOfMemoryException, ExecutionSetupException { @@ -117,6 +118,10 @@ public class FragmentContext implements Closeable { failureCause = cause; } + public void cancel() { + cancelled = true; + } + public DrillbitContext getDrillbitContext() { return context; } @@ -227,6 +232,10 @@ public class FragmentContext implements Closeable { return failed; } + public boolean isCancelled() { + return cancelled; + } + public FunctionImplementationRegistry getFunctionRegistry() { return funcRegistry; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java index 396f7a2fd..325e315aa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java @@ -139,7 +139,9 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{ @Override public void failed(RpcException ex) { sendCount.decrement(); - context.fail(ex); + if (!context.isCancelled() && !context.isFailed()) { + context.fail(ex); + } stop(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java index ee957d905..313fdecad 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java @@ -169,6 +169,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> RawFragmentBatch rawBatch = null; try { rawBatch = getNext(provider); + if (rawBatch == null && context.isCancelled()) { + return IterOutcome.STOP; + } } catch (IOException e) { context.fail(e); return IterOutcome.STOP; @@ -181,6 +184,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } try { while ((rawBatch = getNext(provider)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0); + if (rawBatch == null && context.isCancelled()) { + return IterOutcome.STOP; + } } catch (IOException e) { context.fail(e); return IterOutcome.STOP; @@ -300,6 +306,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> } else { batchLoaders[b].clear(); batchLoaders[b] = null; + if (context.isCancelled()) { + return IterOutcome.STOP; + } } } catch (IOException | SchemaChangeException e) { context.fail(e); @@ -340,6 +349,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP> while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) { nextBatch = getNext(fragProviders[node.batchId]); } + if (nextBatch == null && context.isCancelled()) { + return IterOutcome.STOP; + } } catch (IOException e) { context.fail(e); return IterOutcome.STOP; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 659863f19..69be256ec 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -154,8 +154,8 @@ public class PartitionSenderRootExec extends BaseRootExec { try { partitioner.partitionBatch(incoming); } catch (IOException e) { - incoming.kill(); context.fail(e); + incoming.kill(); return false; } for (VectorWrapper<?> v : incoming) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java index 120a61193..2dae5023f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java @@ -71,7 +71,9 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { wrapper = queue.take(); logger.debug("Got batch from queue"); } catch (InterruptedException e) { - context.fail(e); + if (!(context.isCancelled() || context.isFailed())) { + context.fail(e); + } return IterOutcome.STOP; } finally { stats.stopWait(); @@ -117,8 +119,11 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { private class Producer implements Runnable { + RecordBatchDataWrapper wrapper; + @Override public void run() { + try { if (stop) return; outer: while (true) { IterOutcome upstream = incoming.next(); @@ -135,14 +140,17 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { case OK_NEW_SCHEMA: case OK: try { - if (!stop) queue.put(new RecordBatchDataWrapper(new RecordBatchData(incoming), false, false)); + if (!stop) { + wrapper = new RecordBatchDataWrapper(new RecordBatchData(incoming), false, false); + queue.put(wrapper); + } } catch (InterruptedException e) { - context.fail(e); - try { - queue.putFirst(new RecordBatchDataWrapper(null, false, true)); - } catch (InterruptedException e1) { - throw new RuntimeException(e1); + if (!(context.isCancelled() || context.isFailed())) { + context.fail(e); } + wrapper.batch.getContainer().zeroVectors(); + incoming.cleanup(); + break outer; } break; default: @@ -152,14 +160,15 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { try { queue.put(new RecordBatchDataWrapper(null, true, false)); } catch (InterruptedException e) { - context.fail(e); - try { - queue.putFirst(new RecordBatchDataWrapper(null, false, true)); - } catch (InterruptedException e1) { - throw new RuntimeException(e1); + if (!(context.isCancelled() || context.isFailed())) { + context.fail(e); } + + } + } finally { + incoming.cleanup(); + logger.debug("Producer thread finished"); } - logger.debug("Producer thread finished"); } } @@ -174,7 +183,13 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { @Override protected void killIncoming() { - incoming.kill(); + producer.interrupt(); + stop = true; + try { + producer.join(); + } catch (InterruptedException e) { + logger.warn("Interrupted while waiting for producer thread"); + } } @Override @@ -182,7 +197,6 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { stop = true; clearQueue(); super.cleanup(); - incoming.cleanup(); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java index 742487075..79669fae6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java @@ -138,6 +138,9 @@ public class UnorderedReceiverBatch implements RecordBatch { if (batch == null) { batchLoader.clear(); + if (context.isCancelled()) { + return IterOutcome.STOP; + } return IterOutcome.NONE; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java index 3bbe23135..bb12a2244 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java @@ -31,6 +31,11 @@ public class QueryIdHelper { return (new UUID(queryId.getPart1(), queryId.getPart2())).toString(); } + public static QueryId getQueryIdFromString(String queryId) { + UUID uuid = UUID.fromString(queryId); + return QueryId.newBuilder().setPart1(uuid.getMostSignificantBits()).setPart2(uuid.getLeastSignificantBits()).build(); + } + public static String getQueryIdentifier(FragmentHandle h) { return getQueryId(h.getQueryId()) + ":" + h.getMajorFragmentId() + ":" + h.getMinorFragmentId(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java index 4c1f82d03..088b1204c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java @@ -69,6 +69,9 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements IterOutcome next = null; stats.stopProcessing(); try{ + if (context.isCancelled()) { + return IterOutcome.STOP; + } next = b.next(); }finally{ stats.startProcessing(); @@ -82,6 +85,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements stats.batchReceived(inputIndex, b.getRecordCount(), false); break; } + return next; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java index 432acab5b..9a2603965 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java @@ -51,10 +51,9 @@ public class ControlTunnel { manager.runCommand(b); } - public DrillRpcFuture<Ack> cancelFragment(FragmentHandle handle){ - CancelFragment b = new CancelFragment(handle); + public void cancelFragment(RpcOutcomeListener<Ack> outcomeListener, FragmentHandle handle){ + CancelFragment b = new CancelFragment(outcomeListener, handle); manager.runCommand(b); - return b.getFuture(); } public DrillRpcFuture<Ack> sendFragmentStatus(FragmentStatus status){ @@ -85,17 +84,17 @@ public class ControlTunnel { } - public static class CancelFragment extends FutureBitCommand<Ack, ControlConnection> { + public static class CancelFragment extends ListeningCommand<Ack, ControlConnection> { final FragmentHandle handle; - public CancelFragment(FragmentHandle handle) { - super(); + public CancelFragment(RpcOutcomeListener<Ack> listener, FragmentHandle handle) { + super(listener); this.handle = handle; } @Override public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) { - connection.send(outcomeListener, RpcType.REQ_CANCEL_FRAGMENT, handle, Ack.class); + connection.sendUnsafe(outcomeListener, RpcType.REQ_CANCEL_FRAGMENT, handle, Ack.class); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java index 28050eb60..cbfa1f943 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java @@ -20,7 +20,11 @@ package org.apache.drill.exec.rpc.control; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.LoadingCache; import org.apache.drill.exec.cache.DistributedMap; import org.apache.drill.exec.exception.FragmentSetupException; import org.apache.drill.exec.proto.BitControl.FragmentStatus; @@ -44,6 +48,10 @@ public class WorkEventBus { private final ConcurrentMap<QueryId, FragmentStatusListener> listeners = new ConcurrentHashMap<QueryId, FragmentStatusListener>( 16, 0.75f, 16); private final WorkerBee bee; + private final Cache<FragmentHandle,Void> cancelledFragments = CacheBuilder.newBuilder() + .maximumSize(10000) + .expireAfterWrite(10, TimeUnit.MINUTES) + .build(); public WorkEventBus(WorkerBee bee) { this.bee = bee; @@ -85,7 +93,16 @@ public class WorkEventBus { return managers.get(handle); } + public void cancelFragment(FragmentHandle handle) { + cancelledFragments.put(handle, null); + removeFragmentManager(handle); + } + public FragmentManager getOrCreateFragmentManager(FragmentHandle handle) throws FragmentSetupException{ + if (cancelledFragments.asMap().containsKey(handle)) { + logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle); + return null; + } FragmentManager manager = managers.get(handle); if (manager != null) return manager; DistributedMap<FragmentHandle, PlanFragment> planCache = bee.getContext().getCache().getMap(Foreman.FRAGMENT_CACHE); @@ -99,7 +116,7 @@ public class WorkEventBus { throw new FragmentSetupException("Received batch where fragment was not in cache."); } - FragmentManager newManager = new NonRootFragmentManager(fragment, bee.getContext()); + FragmentManager newManager = new NonRootFragmentManager(fragment, bee); // since their could be a race condition on the check, we'll use putIfAbsent so we don't have two competing // handlers. diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java index a44903244..42dee943b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java @@ -107,6 +107,11 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> { try { FragmentManager manager = workBus.getOrCreateFragmentManager(fragmentBatch.getHandle()); + if (manager == null) { + if (body != null) { + body.release(); + } + } BufferAllocator allocator = manager.getFragmentContext().getAllocator(); if(body != null){ if(!allocator.takeOwnership((AccountingByteBuf) body.unwrap())){ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java index 0c04fecde..be5bb8bc7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java @@ -32,6 +32,7 @@ public class UserRpcConfig { public static RpcConfig MAPPING = RpcConfig.newBuilder("USER-RPC-MAPPING") // .add(RpcType.HANDSHAKE, UserToBitHandshake.class, RpcType.HANDSHAKE, BitToUserHandshake.class) //user to bit. .add(RpcType.RUN_QUERY, RunQuery.class, RpcType.QUERY_HANDLE, QueryId.class) //user to bit + .add(RpcType.CANCEL_QUERY, QueryId.class, RpcType.ACK, Ack.class) //user to bit .add(RpcType.QUERY_RESULT, QueryResult.class, RpcType.ACK, Ack.class) //bit to user .build(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java index aaf3c2de8..9b5e83016 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java @@ -27,6 +27,7 @@ import java.io.IOException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; +import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake; import org.apache.drill.exec.proto.UserProtos.RequestResults; import org.apache.drill.exec.proto.UserProtos.RpcType; @@ -74,7 +75,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec switch (rpcType) { case RpcType.RUN_QUERY_VALUE: - logger.trace("Received query to run. Returning query handle."); + logger.debug("Received query to run. Returning query handle."); try { RunQuery query = RunQuery.PARSER.parseFrom(new ByteBufInputStream(pBody)); return new Response(RpcType.QUERY_HANDLE, worker.submitWork(connection, query)); @@ -83,7 +84,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec } case RpcType.REQUEST_RESULTS_VALUE: - logger.trace("Received results requests. Returning empty query result."); + logger.debug("Received results requests. Returning empty query result."); try { RequestResults req = RequestResults.PARSER.parseFrom(new ByteBufInputStream(pBody)); return new Response(RpcType.QUERY_RESULT, worker.getResult(connection, req)); @@ -92,8 +93,12 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec } case RpcType.CANCEL_QUERY_VALUE: - logger.warn("Cancel requested but not supported yet."); - return new Response(RpcType.ACK, Acks.OK); + try { + QueryId queryId = QueryId.PARSER.parseFrom(new ByteBufInputStream(pBody)); + return new Response(RpcType.ACK, worker.cancelQuery(queryId)); + } catch (InvalidProtocolBufferException e) { + throw new RpcException("Failure while decoding QueryId body.", e); + } default: throw new UnsupportedOperationException(String.format("UserServer received rpc of unknown type. Type was %d.", rpcType)); @@ -125,7 +130,12 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec public void sendResult(RpcOutcomeListener<Ack> listener, QueryWritableBatch result){ logger.trace("Sending result to client with {}", result); - send(listener, this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, result.getBuffers()); + send(listener, this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, false, result.getBuffers()); + } + + public void sendResult(RpcOutcomeListener<Ack> listener, QueryWritableBatch result, boolean allowInEventThread){ + logger.trace("Sending result to client with {}", result); + send(listener, this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, allowInEventThread, result.getBuffers()); } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java index 5ae42676a..9cbc2e70b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java @@ -35,6 +35,7 @@ import javax.xml.bind.annotation.XmlRootElement; import org.apache.drill.exec.proto.UserBitShared.QueryProfile; import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; +import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.store.sys.PStore; import org.apache.drill.exec.work.WorkManager; import org.apache.drill.exec.work.foreman.QueryStatus; @@ -169,4 +170,20 @@ public class ProfileResources { return new Viewable("/rest/profile/profile.ftl", wrapper); } + + @GET + @Path("/profiles/cancel/{queryid}") + @Produces(MediaType.TEXT_PLAIN) + public String cancelQuery(@PathParam("queryid") String queryId) throws IOException { + PStore<QueryProfile> profiles = work.getContext().getPersistentStoreProvider().getPStore(QueryStatus.QUERY_PROFILE); + QueryProfile profile = profiles.get(queryId); + if (profile != null && (profile.getState() == QueryState.RUNNING || profile.getState() == QueryState.PENDING)) { + work.getUserWorker().cancelQuery(QueryIdHelper.getQueryIdFromString(queryId)); + return "Cancelled query " + queryId; + } + if (profile == null) { + return "No such query: " + queryId; + } + return "Query " + queryId + " not running"; + } }
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java index f92e3c563..706f9a380 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java @@ -29,6 +29,7 @@ import org.apache.drill.exec.proto.UserBitShared.StreamProfile; import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.Collections2; +import org.apache.drill.exec.proto.helper.QueryIdHelper; import java.text.DateFormat; import java.text.NumberFormat; @@ -44,15 +45,21 @@ import java.util.TreeMap; public class ProfileWrapper { public QueryProfile profile; + public String id; public ProfileWrapper(QueryProfile profile) { this.profile = profile; + this.id = QueryIdHelper.getQueryId(profile.getId()); } public QueryProfile getProfile() { return profile; } + public String getId() { + return id; + } + @Override public String toString() { StringBuilder builder = new StringBuilder(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java index 7d5d37ff4..0407361b2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java @@ -187,6 +187,10 @@ public class WorkManager implements Closeable { return runningFragments.get(handle); } + public void removeFragment(FragmentHandle handle) { + runningFragments.remove(handle); + } + public Foreman getForemanForQueryId(QueryId queryId) { return queries.get(queryId); } @@ -212,9 +216,13 @@ public class WorkManager implements Closeable { try { while (true) { // logger.debug("Polling for pending work tasks."); - Runnable r = pendingTasks.take(); + RunnableWrapper r = pendingTasks.take(); if (r != null) { logger.debug("Starting pending task {}", r); + if (r.inner instanceof FragmentExecutor) { + FragmentExecutor fragmentExecutor = (FragmentExecutor) r.inner; + runningFragments.put(fragmentExecutor.getContext().getHandle(), fragmentExecutor); + } executor.execute(r); } @@ -228,7 +236,7 @@ public class WorkManager implements Closeable { private class RunnableWrapper implements Runnable { - private final Runnable inner; + final Runnable inner; private final String id; public RunnableWrapper(Runnable r, String id){ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java index ee51f3b3d..afd3fa266 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java @@ -125,7 +125,7 @@ public class ControlHandlerImpl implements ControlMessageHandler { NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel); try{ FragmentRoot rootOperator = bee.getContext().getPlanReader().readFragmentOperator(fragment.getFragmentJson()); - FragmentExecutor fr = new FragmentExecutor(context, rootOperator, listener); + FragmentExecutor fr = new FragmentExecutor(context, bee, rootOperator, listener); bee.addFragmentRunner(fr); } catch (Exception e) { listener.fail(fragment.getHandle(), "Failure due to uncaught exception", e); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java index 41d70a5a2..bb56e1046 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java @@ -76,14 +76,14 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ @Override public void cleanup() { - if (!finished) { + if (!finished && !context.isCancelled()) { IllegalStateException e = new IllegalStateException("Cleanup before finished"); context.fail(e); throw e; } if (!buffer.isEmpty()) { - if (!context.isFailed()) { + if (!context.isFailed() && !context.isCancelled()) { context.fail(new IllegalStateException("Batches still in queue during cleanup")); logger.error("{} Batches in queue.", buffer.size()); RawFragmentBatch batch; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 4cc3e63f1..b1ed8a5ea 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -185,7 +185,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{ void cleanupAndSendResult(QueryResult result){ bee.retireForeman(this); - initiatingClient.sendResult(new ResponseSendListener(), new QueryWritableBatch(result)); + initiatingClient.sendResult(new ResponseSendListener(), new QueryWritableBatch(result), true); state.updateState(QueryState.RUNNING, QueryState.COMPLETED); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java index ed1a42816..4e1ca2236 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java @@ -19,6 +19,8 @@ package org.apache.drill.exec.work.foreman; import io.netty.buffer.ByteBuf; +import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -37,6 +39,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; import org.apache.drill.exec.proto.UserProtos.RunQuery; import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.control.ControlTunnel; import org.apache.drill.exec.rpc.control.Controller; import org.apache.drill.exec.rpc.control.WorkEventBus; import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection; @@ -63,6 +66,9 @@ public class QueryManager implements FragmentStatusListener{ private QueryId queryId; private FragmentExecutor rootRunner; private RunQuery query; + private volatile boolean running = false; + private volatile boolean cancelled = false; + private volatile boolean stopped = false; public QueryManager(QueryId id, RunQuery query, PStoreProvider pStoreProvider, ForemanManagerListener foremanManagerListener, Controller controller, Foreman foreman) { super(); @@ -101,7 +107,7 @@ public class QueryManager implements FragmentStatusListener{ // add fragment to local node. status.add(new FragmentData(rootFragment.getHandle(), null, true)); logger.debug("Fragment added to local node."); - rootRunner = new FragmentExecutor(rootContext, rootOperator, new RootStatusHandler(rootContext, rootFragment)); + rootRunner = new FragmentExecutor(rootContext, bee, rootOperator, new RootStatusHandler(rootContext, rootFragment)); RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner); if(buffers.isDone()){ @@ -127,6 +133,10 @@ public class QueryManager implements FragmentStatusListener{ } logger.debug("Fragment runs setup is complete."); + running = true; + if (cancelled && !stopped) { + stopQuery(); + } } private void sendRemoteFragment(PlanFragment fragment){ @@ -193,26 +203,37 @@ public class QueryManager implements FragmentStatusListener{ private void stopQuery(){ workBus.removeFragmentStatusListener(queryId); // Stop all queries with a currently active status. -// for(FragmentData data: map.values()){ -// FragmentHandle handle = data.getStatus().getHandle(); -// switch(data.getStatus().getState()){ -// case SENDING: -// case AWAITING_ALLOCATION: -// case RUNNING: -// if(data.isLocal()){ -// rootRunner.cancel(); -// }else{ -// tun.get(data.getEndpoint()).cancelFragment(handle).addLightListener(new CancelListener(data.endpoint, handle)); -// } -// break; -// default: -// break; -// } -// } + List<FragmentData> fragments = status.getFragmentData(); + Collections.sort(fragments, new Comparator<FragmentData>() { + @Override + public int compare(FragmentData o1, FragmentData o2) { + return o2.getHandle().getMajorFragmentId() - o1.getHandle().getMajorFragmentId(); + } + }); + for(FragmentData data: fragments){ + FragmentHandle handle = data.getStatus().getHandle(); + switch(data.getStatus().getProfile().getState()){ + case SENDING: + case AWAITING_ALLOCATION: + case RUNNING: + if(data.isLocal()){ + rootRunner.cancel(); + }else{ + controller.getTunnel(data.getEndpoint()).cancelFragment(new CancelListener(data.getEndpoint(), handle), handle); + } + break; + default: + break; + } + } } public void cancel(){ - stopQuery(); + cancelled = true; + if (running) { + stopQuery(); + stopped = true; + } } private class CancelListener extends EndpointListener<Ack, FragmentHandle>{ @@ -234,7 +255,7 @@ public class QueryManager implements FragmentStatusListener{ // do nothing. } - }; + } public RpcOutcomeListener<Ack> getSubmitListener(DrillbitEndpoint endpoint, PlanFragment value){ return new FragmentSubmitListener(endpoint, value); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java index 70de95817..62293fc93 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.work.foreman; +import com.google.common.collect.Lists; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.exec.cache.DistributedCache; import org.apache.drill.exec.cache.DistributedCache.CacheConfig; @@ -38,6 +39,9 @@ import org.apache.drill.exec.store.sys.PStoreProvider; import org.apache.drill.exec.work.foreman.Foreman.ForemanManagerListener; import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; public class QueryStatus { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryStatus.class); @@ -48,6 +52,7 @@ public class QueryStatus { // doesn't need to be thread safe as fragmentDataMap is generated in a single thread and then accessed by multiple threads for reads only. private IntObjectOpenHashMap<IntObjectOpenHashMap<FragmentData>> fragmentDataMap = new IntObjectOpenHashMap<IntObjectOpenHashMap<FragmentData>>(); + private List<FragmentData> fragmentDataSet = Lists.newArrayList(); private final String queryId; private final QueryId id; @@ -73,6 +78,10 @@ public class QueryStatus { this.foreman = foreman; } + public List<FragmentData> getFragmentData() { + return fragmentDataSet; + } + public void setPlanText(String planText){ this.planText = planText; updateCache(); @@ -106,6 +115,7 @@ public class QueryStatus { } minorMap.put(minorFragmentId, data); + fragmentDataSet.add(data); } void update(FragmentStatus status, boolean updateCache){ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java index 51421a77e..735e66332 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java @@ -32,6 +32,7 @@ import org.apache.drill.exec.work.CancelableQuery; import org.apache.drill.exec.work.StatusProvider; import com.codahale.metrics.Timer; +import org.apache.drill.exec.work.WorkManager.WorkerBee; /** * Responsible for running a single fragment on a single Drillbit. Listens/responds to status request and cancellation @@ -44,10 +45,13 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid private final FragmentRoot rootOperator; private RootExec root; private final FragmentContext context; + private final WorkerBee bee; private final StatusReporter listener; + private Thread executionThread; - public FragmentExecutor(FragmentContext context, FragmentRoot rootOperator, StatusReporter listener){ + public FragmentExecutor(FragmentContext context, WorkerBee bee, FragmentRoot rootOperator, StatusReporter listener){ this.context = context; + this.bee = bee; this.rootOperator = rootOperator; this.listener = listener; } @@ -60,6 +64,11 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid @Override public void cancel() { updateState(FragmentState.CANCELLED); + logger.debug("Cancelled Fragment {}", context.getHandle()); + context.cancel(); + if (executionThread != null) { + executionThread.interrupt(); + } } public UserClientConnection getClient(){ @@ -68,71 +77,75 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid @Override public void run() { - final String originalThread = Thread.currentThread().getName(); - String newThreadName = String.format("%s:frag:%s:%s", // - QueryIdHelper.getQueryId(context.getHandle().getQueryId()), // - context.getHandle().getMajorFragmentId(), - context.getHandle().getMinorFragmentId() - ); - Thread.currentThread().setName(newThreadName); - - boolean closed = false; try { - root = ImplCreator.getExec(context, rootOperator); - } catch (AssertionError | Exception e) { - context.fail(e); - logger.debug("Failure while initializing operator tree", e); - internalFail(e); - return; - } + final String originalThread = Thread.currentThread().getName(); + String newThreadName = String.format("%s:frag:%s:%s", // + QueryIdHelper.getQueryId(context.getHandle().getQueryId()), // + context.getHandle().getMajorFragmentId(), + context.getHandle().getMinorFragmentId() + ); + Thread.currentThread().setName(newThreadName); + executionThread = Thread.currentThread(); + + boolean closed = false; + try { + root = ImplCreator.getExec(context, rootOperator); + } catch (AssertionError | Exception e) { + context.fail(e); + logger.debug("Failure while initializing operator tree", e); + internalFail(e); + return; + } - logger.debug("Starting fragment runner. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId()); - if(!updateState(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING, false)){ - internalFail(new RuntimeException(String.format("Run was called when fragment was in %s state. FragmentRunnables should only be started when they are currently in awaiting allocation state.", FragmentState.valueOf(state.get())))); - return; - } + logger.debug("Starting fragment runner. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId()); + if(!updateState(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING, false)){ + internalFail(new RuntimeException(String.format("Run was called when fragment was in %s state. FragmentRunnables should only be started when they are currently in awaiting allocation state.", FragmentState.valueOf(state.get())))); + return; + } - // run the query until root.next returns false. - try{ - while(state.get() == FragmentState.RUNNING_VALUE){ - if(!root.next()){ - if(context.isFailed()){ - updateState(FragmentState.RUNNING, FragmentState.FAILED, false); - }else{ - updateState(FragmentState.RUNNING, FragmentState.FINISHED, false); - } + // run the query until root.next returns false. + try{ + while(state.get() == FragmentState.RUNNING_VALUE){ + if(!root.next()){ + if(context.isFailed()){ + updateState(FragmentState.RUNNING, FragmentState.FAILED, false); + }else{ + updateState(FragmentState.RUNNING, FragmentState.FINISHED, false); + } + } } - } - root.stop(); - if(context.isFailed()) { - internalFail(context.getFailureCause()); - } + root.stop(); + if(context.isFailed()) { + internalFail(context.getFailureCause()); + } - closed = true; - - context.close(); - }catch(AssertionError | Exception ex){ - logger.debug("Caught exception while running fragment", ex); - internalFail(ex); - }finally{ - Thread.currentThread().setName(originalThread); - if(!closed) { - try { - root.stop(); - if(context.isFailed()) { - internalFail(context.getFailureCause()); + closed = true; + + context.close(); + }catch(AssertionError | Exception ex){ + logger.debug("Caught exception while running fragment", ex); + internalFail(ex); + }finally{ + Thread.currentThread().setName(originalThread); + if(!closed) { + try { + if(context.isFailed()) { + internalFail(context.getFailureCause()); + } + context.close(); + } catch (RuntimeException e) { + logger.warn("Failure while closing context in failed state.", e); } - context.close(); - } catch (RuntimeException e) { - logger.warn("Failure while closing context in failed state.", e); } } + } finally { + logger.debug("Fragment runner complete. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId()); + bee.removeFragment(context.getHandle()); } - logger.debug("Fragment runner complete. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId()); } private void internalFail(Throwable excep){ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java index 51bf81c77..48d14662e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java @@ -36,6 +36,7 @@ import org.apache.drill.exec.record.RawFragmentBatch; import org.apache.drill.exec.rpc.RemoteConnection; import org.apache.drill.exec.rpc.ResponseSender; import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.work.WorkManager.WorkerBee; import org.apache.drill.exec.work.batch.IncomingBuffers; /** @@ -48,12 +49,15 @@ public class NonRootFragmentManager implements FragmentManager { private final StatusReporter runnerListener; private volatile FragmentExecutor runner; private volatile boolean cancel = false; + private final WorkerBee bee; private final FragmentContext context; private List<RemoteConnection> connections = new CopyOnWriteArrayList<>(); - public NonRootFragmentManager(PlanFragment fragment, DrillbitContext context) throws FragmentSetupException{ + public NonRootFragmentManager(PlanFragment fragment, WorkerBee bee) throws FragmentSetupException{ try{ this.fragment = fragment; + DrillbitContext context = bee.getContext(); + this.bee = bee; this.root = context.getPlanReader().readFragmentOperator(fragment.getFragmentJson()); this.context = new FragmentContext(context, fragment, null, context.getFunctionImplementationRegistry()); this.buffers = new IncomingBuffers(root, this.context); @@ -81,7 +85,7 @@ public class NonRootFragmentManager implements FragmentManager { synchronized(this){ if(runner != null) throw new IllegalStateException("Get Runnable can only be run once."); if(cancel) return null; - runner = new FragmentExecutor(context, root, runnerListener); + runner = new FragmentExecutor(context, bee, root, runnerListener); return this.runner; } diff --git a/exec/java-exec/src/main/resources/rest/profile/profile.ftl b/exec/java-exec/src/main/resources/rest/profile/profile.ftl index 8abb31615..ffe7e46f6 100644 --- a/exec/java-exec/src/main/resources/rest/profile/profile.ftl +++ b/exec/java-exec/src/main/resources/rest/profile/profile.ftl @@ -49,6 +49,9 @@ </div> <button type="submit" class="btn btn-default">Re-run query</button> </form> + <form action="/profiles/cancel/${model.id}" method="GET"> + <button type="link" class="btn btn-default">Cancel query</button> + </form> <div class="page-header"> </div> <h3>Visualized Plan</h3> |