aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main
diff options
context:
space:
mode:
authorSteven Phillips <sphillips@maprtech.com>2014-07-22 18:11:44 -0700
committerSteven Phillips <sphillips@maprtech.com>2014-07-25 18:33:49 -0700
commit5e482c17d20bcc957be50d570d03f1a5fdfca75e (patch)
treead92388fb0380c9a86381ee9491877bf35c6afbc /exec/java-exec/src/main
parent1e9930fbae8279afe3eed9e7f18392fb1a08688a (diff)
DRILL-939: Add support for query cancellation
Diffstat (limited to 'exec/java-exec/src/main')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java6
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java44
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java19
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java20
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java59
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java119
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java8
-rw-r--r--exec/java-exec/src/main/resources/rest/profile/profile.ftl3
26 files changed, 284 insertions, 112 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 6690bf597..36c162abe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -43,6 +43,7 @@ import org.apache.drill.exec.proto.UserProtos;
import org.apache.drill.exec.proto.UserProtos.Property;
import org.apache.drill.exec.proto.UserProtos.RpcType;
import org.apache.drill.exec.proto.UserProtos.UserProperties;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.BasicClientWithConnection.ServerConnection;
import org.apache.drill.exec.rpc.ChannelClosedException;
import org.apache.drill.exec.rpc.DrillRpcFuture;
@@ -239,8 +240,9 @@ public class DrillClient implements Closeable, ConnectionThrottle{
return listener.getResults();
}
- public void cancelQuery(QueryId id){
- client.send(RpcType.CANCEL_QUERY, id, Ack.class);
+ public DrillRpcFuture<Ack> cancelQuery(QueryId id){
+ logger.debug("Cancelling query {}", QueryIdHelper.getQueryId(id));
+ return client.send(RpcType.CANCEL_QUERY, id, Ack.class);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
index a5b65ecc0..3302e7cc6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/PrintingResultsListener.java
@@ -41,6 +41,7 @@ public class PrintingResultsListener implements UserResultsListener {
int columnWidth;
BufferAllocator allocator;
volatile Exception exception;
+ QueryId queryId;
public PrintingResultsListener(DrillConfig config, Format format, int columnWidth) {
this.allocator = new TopLevelAllocator(config);
@@ -97,7 +98,12 @@ public class PrintingResultsListener implements UserResultsListener {
return count.get();
}
+ public QueryId getQueryId() {
+ return queryId;
+ }
+
@Override
public void queryIdArrived(QueryId queryId) {
+ this.queryId = queryId;
}
} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index c42691894..3401bc767 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -75,6 +75,7 @@ public class FragmentContext implements Closeable {
private volatile Throwable failureCause;
private volatile boolean failed = false;
+ private volatile boolean cancelled = false;
public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
FunctionImplementationRegistry funcRegistry) throws OutOfMemoryException, ExecutionSetupException {
@@ -117,6 +118,10 @@ public class FragmentContext implements Closeable {
failureCause = cause;
}
+ public void cancel() {
+ cancelled = true;
+ }
+
public DrillbitContext getDrillbitContext() {
return context;
}
@@ -227,6 +232,10 @@ public class FragmentContext implements Closeable {
return failed;
}
+ public boolean isCancelled() {
+ return cancelled;
+ }
+
public FunctionImplementationRegistry getFunctionRegistry() {
return funcRegistry;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
index 396f7a2fd..325e315aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SingleSenderCreator.java
@@ -139,7 +139,9 @@ public class SingleSenderCreator implements RootCreator<SingleSender>{
@Override
public void failed(RpcException ex) {
sendCount.decrement();
- context.fail(ex);
+ if (!context.isCancelled() && !context.isFailed()) {
+ context.fail(ex);
+ }
stop();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index ee957d905..313fdecad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -169,6 +169,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
RawFragmentBatch rawBatch = null;
try {
rawBatch = getNext(provider);
+ if (rawBatch == null && context.isCancelled()) {
+ return IterOutcome.STOP;
+ }
} catch (IOException e) {
context.fail(e);
return IterOutcome.STOP;
@@ -181,6 +184,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
}
try {
while ((rawBatch = getNext(provider)) != null && rawBatch.getHeader().getDef().getRecordCount() == 0);
+ if (rawBatch == null && context.isCancelled()) {
+ return IterOutcome.STOP;
+ }
} catch (IOException e) {
context.fail(e);
return IterOutcome.STOP;
@@ -300,6 +306,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
} else {
batchLoaders[b].clear();
batchLoaders[b] = null;
+ if (context.isCancelled()) {
+ return IterOutcome.STOP;
+ }
}
} catch (IOException | SchemaChangeException e) {
context.fail(e);
@@ -340,6 +349,9 @@ public class MergingRecordBatch extends AbstractRecordBatch<MergingReceiverPOP>
while (nextBatch != null && nextBatch.getHeader().getDef().getRecordCount() == 0) {
nextBatch = getNext(fragProviders[node.batchId]);
}
+ if (nextBatch == null && context.isCancelled()) {
+ return IterOutcome.STOP;
+ }
} catch (IOException e) {
context.fail(e);
return IterOutcome.STOP;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 659863f19..69be256ec 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -154,8 +154,8 @@ public class PartitionSenderRootExec extends BaseRootExec {
try {
partitioner.partitionBatch(incoming);
} catch (IOException e) {
- incoming.kill();
context.fail(e);
+ incoming.kill();
return false;
}
for (VectorWrapper<?> v : incoming) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index 120a61193..2dae5023f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -71,7 +71,9 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
wrapper = queue.take();
logger.debug("Got batch from queue");
} catch (InterruptedException e) {
- context.fail(e);
+ if (!(context.isCancelled() || context.isFailed())) {
+ context.fail(e);
+ }
return IterOutcome.STOP;
} finally {
stats.stopWait();
@@ -117,8 +119,11 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
private class Producer implements Runnable {
+ RecordBatchDataWrapper wrapper;
+
@Override
public void run() {
+ try {
if (stop) return;
outer: while (true) {
IterOutcome upstream = incoming.next();
@@ -135,14 +140,17 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
case OK_NEW_SCHEMA:
case OK:
try {
- if (!stop) queue.put(new RecordBatchDataWrapper(new RecordBatchData(incoming), false, false));
+ if (!stop) {
+ wrapper = new RecordBatchDataWrapper(new RecordBatchData(incoming), false, false);
+ queue.put(wrapper);
+ }
} catch (InterruptedException e) {
- context.fail(e);
- try {
- queue.putFirst(new RecordBatchDataWrapper(null, false, true));
- } catch (InterruptedException e1) {
- throw new RuntimeException(e1);
+ if (!(context.isCancelled() || context.isFailed())) {
+ context.fail(e);
}
+ wrapper.batch.getContainer().zeroVectors();
+ incoming.cleanup();
+ break outer;
}
break;
default:
@@ -152,14 +160,15 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
try {
queue.put(new RecordBatchDataWrapper(null, true, false));
} catch (InterruptedException e) {
- context.fail(e);
- try {
- queue.putFirst(new RecordBatchDataWrapper(null, false, true));
- } catch (InterruptedException e1) {
- throw new RuntimeException(e1);
+ if (!(context.isCancelled() || context.isFailed())) {
+ context.fail(e);
}
+
+ }
+ } finally {
+ incoming.cleanup();
+ logger.debug("Producer thread finished");
}
- logger.debug("Producer thread finished");
}
}
@@ -174,7 +183,13 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
@Override
protected void killIncoming() {
- incoming.kill();
+ producer.interrupt();
+ stop = true;
+ try {
+ producer.join();
+ } catch (InterruptedException e) {
+ logger.warn("Interrupted while waiting for producer thread");
+ }
}
@Override
@@ -182,7 +197,6 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
stop = true;
clearQueue();
super.cleanup();
- incoming.cleanup();
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
index 742487075..79669fae6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/unorderedreceiver/UnorderedReceiverBatch.java
@@ -138,6 +138,9 @@ public class UnorderedReceiverBatch implements RecordBatch {
if (batch == null) {
batchLoader.clear();
+ if (context.isCancelled()) {
+ return IterOutcome.STOP;
+ }
return IterOutcome.NONE;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
index 3bbe23135..bb12a2244 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/proto/helper/QueryIdHelper.java
@@ -31,6 +31,11 @@ public class QueryIdHelper {
return (new UUID(queryId.getPart1(), queryId.getPart2())).toString();
}
+ public static QueryId getQueryIdFromString(String queryId) {
+ UUID uuid = UUID.fromString(queryId);
+ return QueryId.newBuilder().setPart1(uuid.getMostSignificantBits()).setPart2(uuid.getLeastSignificantBits()).build();
+ }
+
public static String getQueryIdentifier(FragmentHandle h) {
return getQueryId(h.getQueryId()) + ":" + h.getMajorFragmentId() + ":" + h.getMinorFragmentId();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
index 4c1f82d03..088b1204c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatch.java
@@ -69,6 +69,9 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
IterOutcome next = null;
stats.stopProcessing();
try{
+ if (context.isCancelled()) {
+ return IterOutcome.STOP;
+ }
next = b.next();
}finally{
stats.startProcessing();
@@ -82,6 +85,7 @@ public abstract class AbstractRecordBatch<T extends PhysicalOperator> implements
stats.batchReceived(inputIndex, b.getRecordCount(), false);
break;
}
+
return next;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
index 432acab5b..9a2603965 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControlTunnel.java
@@ -51,10 +51,9 @@ public class ControlTunnel {
manager.runCommand(b);
}
- public DrillRpcFuture<Ack> cancelFragment(FragmentHandle handle){
- CancelFragment b = new CancelFragment(handle);
+ public void cancelFragment(RpcOutcomeListener<Ack> outcomeListener, FragmentHandle handle){
+ CancelFragment b = new CancelFragment(outcomeListener, handle);
manager.runCommand(b);
- return b.getFuture();
}
public DrillRpcFuture<Ack> sendFragmentStatus(FragmentStatus status){
@@ -85,17 +84,17 @@ public class ControlTunnel {
}
- public static class CancelFragment extends FutureBitCommand<Ack, ControlConnection> {
+ public static class CancelFragment extends ListeningCommand<Ack, ControlConnection> {
final FragmentHandle handle;
- public CancelFragment(FragmentHandle handle) {
- super();
+ public CancelFragment(RpcOutcomeListener<Ack> listener, FragmentHandle handle) {
+ super(listener);
this.handle = handle;
}
@Override
public void doRpcCall(RpcOutcomeListener<Ack> outcomeListener, ControlConnection connection) {
- connection.send(outcomeListener, RpcType.REQ_CANCEL_FRAGMENT, handle, Ack.class);
+ connection.sendUnsafe(outcomeListener, RpcType.REQ_CANCEL_FRAGMENT, handle, Ack.class);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
index 28050eb60..cbfa1f943 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
@@ -20,7 +20,11 @@ package org.apache.drill.exec.rpc.control;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.LoadingCache;
import org.apache.drill.exec.cache.DistributedMap;
import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
@@ -44,6 +48,10 @@ public class WorkEventBus {
private final ConcurrentMap<QueryId, FragmentStatusListener> listeners = new ConcurrentHashMap<QueryId, FragmentStatusListener>(
16, 0.75f, 16);
private final WorkerBee bee;
+ private final Cache<FragmentHandle,Void> cancelledFragments = CacheBuilder.newBuilder()
+ .maximumSize(10000)
+ .expireAfterWrite(10, TimeUnit.MINUTES)
+ .build();
public WorkEventBus(WorkerBee bee) {
this.bee = bee;
@@ -85,7 +93,16 @@ public class WorkEventBus {
return managers.get(handle);
}
+ public void cancelFragment(FragmentHandle handle) {
+ cancelledFragments.put(handle, null);
+ removeFragmentManager(handle);
+ }
+
public FragmentManager getOrCreateFragmentManager(FragmentHandle handle) throws FragmentSetupException{
+ if (cancelledFragments.asMap().containsKey(handle)) {
+ logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
+ return null;
+ }
FragmentManager manager = managers.get(handle);
if (manager != null) return manager;
DistributedMap<FragmentHandle, PlanFragment> planCache = bee.getContext().getCache().getMap(Foreman.FRAGMENT_CACHE);
@@ -99,7 +116,7 @@ public class WorkEventBus {
throw new FragmentSetupException("Received batch where fragment was not in cache.");
}
- FragmentManager newManager = new NonRootFragmentManager(fragment, bee.getContext());
+ FragmentManager newManager = new NonRootFragmentManager(fragment, bee);
// since their could be a race condition on the check, we'll use putIfAbsent so we don't have two competing
// handlers.
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index a44903244..42dee943b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -107,6 +107,11 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
try {
FragmentManager manager = workBus.getOrCreateFragmentManager(fragmentBatch.getHandle());
+ if (manager == null) {
+ if (body != null) {
+ body.release();
+ }
+ }
BufferAllocator allocator = manager.getFragmentContext().getAllocator();
if(body != null){
if(!allocator.takeOwnership((AccountingByteBuf) body.unwrap())){
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
index 0c04fecde..be5bb8bc7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserRpcConfig.java
@@ -32,6 +32,7 @@ public class UserRpcConfig {
public static RpcConfig MAPPING = RpcConfig.newBuilder("USER-RPC-MAPPING") //
.add(RpcType.HANDSHAKE, UserToBitHandshake.class, RpcType.HANDSHAKE, BitToUserHandshake.class) //user to bit.
.add(RpcType.RUN_QUERY, RunQuery.class, RpcType.QUERY_HANDLE, QueryId.class) //user to bit
+ .add(RpcType.CANCEL_QUERY, QueryId.class, RpcType.ACK, Ack.class) //user to bit
.add(RpcType.QUERY_RESULT, QueryResult.class, RpcType.ACK, Ack.class) //bit to user
.build();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index aaf3c2de8..9b5e83016 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -27,6 +27,7 @@ import java.io.IOException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
import org.apache.drill.exec.proto.UserProtos.RequestResults;
import org.apache.drill.exec.proto.UserProtos.RpcType;
@@ -74,7 +75,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
switch (rpcType) {
case RpcType.RUN_QUERY_VALUE:
- logger.trace("Received query to run. Returning query handle.");
+ logger.debug("Received query to run. Returning query handle.");
try {
RunQuery query = RunQuery.PARSER.parseFrom(new ByteBufInputStream(pBody));
return new Response(RpcType.QUERY_HANDLE, worker.submitWork(connection, query));
@@ -83,7 +84,7 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
}
case RpcType.REQUEST_RESULTS_VALUE:
- logger.trace("Received results requests. Returning empty query result.");
+ logger.debug("Received results requests. Returning empty query result.");
try {
RequestResults req = RequestResults.PARSER.parseFrom(new ByteBufInputStream(pBody));
return new Response(RpcType.QUERY_RESULT, worker.getResult(connection, req));
@@ -92,8 +93,12 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
}
case RpcType.CANCEL_QUERY_VALUE:
- logger.warn("Cancel requested but not supported yet.");
- return new Response(RpcType.ACK, Acks.OK);
+ try {
+ QueryId queryId = QueryId.PARSER.parseFrom(new ByteBufInputStream(pBody));
+ return new Response(RpcType.ACK, worker.cancelQuery(queryId));
+ } catch (InvalidProtocolBufferException e) {
+ throw new RpcException("Failure while decoding QueryId body.", e);
+ }
default:
throw new UnsupportedOperationException(String.format("UserServer received rpc of unknown type. Type was %d.", rpcType));
@@ -125,7 +130,12 @@ public class UserServer extends BasicServer<RpcType, UserServer.UserClientConnec
public void sendResult(RpcOutcomeListener<Ack> listener, QueryWritableBatch result){
logger.trace("Sending result to client with {}", result);
- send(listener, this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, result.getBuffers());
+ send(listener, this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, false, result.getBuffers());
+ }
+
+ public void sendResult(RpcOutcomeListener<Ack> listener, QueryWritableBatch result, boolean allowInEventThread){
+ logger.trace("Sending result to client with {}", result);
+ send(listener, this, RpcType.QUERY_RESULT, result.getHeader(), Ack.class, allowInEventThread, result.getBuffers());
}
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java
index 5ae42676a..9cbc2e70b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileResources.java
@@ -35,6 +35,7 @@ import javax.xml.bind.annotation.XmlRootElement;
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.store.sys.PStore;
import org.apache.drill.exec.work.WorkManager;
import org.apache.drill.exec.work.foreman.QueryStatus;
@@ -169,4 +170,20 @@ public class ProfileResources {
return new Viewable("/rest/profile/profile.ftl", wrapper);
}
+
+ @GET
+ @Path("/profiles/cancel/{queryid}")
+ @Produces(MediaType.TEXT_PLAIN)
+ public String cancelQuery(@PathParam("queryid") String queryId) throws IOException {
+ PStore<QueryProfile> profiles = work.getContext().getPersistentStoreProvider().getPStore(QueryStatus.QUERY_PROFILE);
+ QueryProfile profile = profiles.get(queryId);
+ if (profile != null && (profile.getState() == QueryState.RUNNING || profile.getState() == QueryState.PENDING)) {
+ work.getUserWorker().cancelQuery(QueryIdHelper.getQueryIdFromString(queryId));
+ return "Cancelled query " + queryId;
+ }
+ if (profile == null) {
+ return "No such query: " + queryId;
+ }
+ return "Query " + queryId + " not running";
+ }
} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java
index f92e3c563..706f9a380 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/ProfileWrapper.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.proto.UserBitShared.StreamProfile;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Collections2;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
import java.text.DateFormat;
import java.text.NumberFormat;
@@ -44,15 +45,21 @@ import java.util.TreeMap;
public class ProfileWrapper {
public QueryProfile profile;
+ public String id;
public ProfileWrapper(QueryProfile profile) {
this.profile = profile;
+ this.id = QueryIdHelper.getQueryId(profile.getId());
}
public QueryProfile getProfile() {
return profile;
}
+ public String getId() {
+ return id;
+ }
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 7d5d37ff4..0407361b2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -187,6 +187,10 @@ public class WorkManager implements Closeable {
return runningFragments.get(handle);
}
+ public void removeFragment(FragmentHandle handle) {
+ runningFragments.remove(handle);
+ }
+
public Foreman getForemanForQueryId(QueryId queryId) {
return queries.get(queryId);
}
@@ -212,9 +216,13 @@ public class WorkManager implements Closeable {
try {
while (true) {
// logger.debug("Polling for pending work tasks.");
- Runnable r = pendingTasks.take();
+ RunnableWrapper r = pendingTasks.take();
if (r != null) {
logger.debug("Starting pending task {}", r);
+ if (r.inner instanceof FragmentExecutor) {
+ FragmentExecutor fragmentExecutor = (FragmentExecutor) r.inner;
+ runningFragments.put(fragmentExecutor.getContext().getHandle(), fragmentExecutor);
+ }
executor.execute(r);
}
@@ -228,7 +236,7 @@ public class WorkManager implements Closeable {
private class RunnableWrapper implements Runnable {
- private final Runnable inner;
+ final Runnable inner;
private final String id;
public RunnableWrapper(Runnable r, String id){
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
index ee51f3b3d..afd3fa266 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
@@ -125,7 +125,7 @@ public class ControlHandlerImpl implements ControlMessageHandler {
NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);
try{
FragmentRoot rootOperator = bee.getContext().getPlanReader().readFragmentOperator(fragment.getFragmentJson());
- FragmentExecutor fr = new FragmentExecutor(context, rootOperator, listener);
+ FragmentExecutor fr = new FragmentExecutor(context, bee, rootOperator, listener);
bee.addFragmentRunner(fr);
} catch (Exception e) {
listener.fail(fragment.getHandle(), "Failure due to uncaught exception", e);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 41d70a5a2..bb56e1046 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -76,14 +76,14 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
@Override
public void cleanup() {
- if (!finished) {
+ if (!finished && !context.isCancelled()) {
IllegalStateException e = new IllegalStateException("Cleanup before finished");
context.fail(e);
throw e;
}
if (!buffer.isEmpty()) {
- if (!context.isFailed()) {
+ if (!context.isFailed() && !context.isCancelled()) {
context.fail(new IllegalStateException("Batches still in queue during cleanup"));
logger.error("{} Batches in queue.", buffer.size());
RawFragmentBatch batch;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 4cc3e63f1..b1ed8a5ea 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -185,7 +185,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object>{
void cleanupAndSendResult(QueryResult result){
bee.retireForeman(this);
- initiatingClient.sendResult(new ResponseSendListener(), new QueryWritableBatch(result));
+ initiatingClient.sendResult(new ResponseSendListener(), new QueryWritableBatch(result), true);
state.updateState(QueryState.RUNNING, QueryState.COMPLETED);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index ed1a42816..4e1ca2236 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -19,6 +19,8 @@ package org.apache.drill.exec.work.foreman;
import io.netty.buffer.ByteBuf;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
@@ -37,6 +39,7 @@ import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.control.ControlTunnel;
import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.rpc.control.WorkEventBus;
import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
@@ -63,6 +66,9 @@ public class QueryManager implements FragmentStatusListener{
private QueryId queryId;
private FragmentExecutor rootRunner;
private RunQuery query;
+ private volatile boolean running = false;
+ private volatile boolean cancelled = false;
+ private volatile boolean stopped = false;
public QueryManager(QueryId id, RunQuery query, PStoreProvider pStoreProvider, ForemanManagerListener foremanManagerListener, Controller controller, Foreman foreman) {
super();
@@ -101,7 +107,7 @@ public class QueryManager implements FragmentStatusListener{
// add fragment to local node.
status.add(new FragmentData(rootFragment.getHandle(), null, true));
logger.debug("Fragment added to local node.");
- rootRunner = new FragmentExecutor(rootContext, rootOperator, new RootStatusHandler(rootContext, rootFragment));
+ rootRunner = new FragmentExecutor(rootContext, bee, rootOperator, new RootStatusHandler(rootContext, rootFragment));
RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
if(buffers.isDone()){
@@ -127,6 +133,10 @@ public class QueryManager implements FragmentStatusListener{
}
logger.debug("Fragment runs setup is complete.");
+ running = true;
+ if (cancelled && !stopped) {
+ stopQuery();
+ }
}
private void sendRemoteFragment(PlanFragment fragment){
@@ -193,26 +203,37 @@ public class QueryManager implements FragmentStatusListener{
private void stopQuery(){
workBus.removeFragmentStatusListener(queryId);
// Stop all queries with a currently active status.
-// for(FragmentData data: map.values()){
-// FragmentHandle handle = data.getStatus().getHandle();
-// switch(data.getStatus().getState()){
-// case SENDING:
-// case AWAITING_ALLOCATION:
-// case RUNNING:
-// if(data.isLocal()){
-// rootRunner.cancel();
-// }else{
-// tun.get(data.getEndpoint()).cancelFragment(handle).addLightListener(new CancelListener(data.endpoint, handle));
-// }
-// break;
-// default:
-// break;
-// }
-// }
+ List<FragmentData> fragments = status.getFragmentData();
+ Collections.sort(fragments, new Comparator<FragmentData>() {
+ @Override
+ public int compare(FragmentData o1, FragmentData o2) {
+ return o2.getHandle().getMajorFragmentId() - o1.getHandle().getMajorFragmentId();
+ }
+ });
+ for(FragmentData data: fragments){
+ FragmentHandle handle = data.getStatus().getHandle();
+ switch(data.getStatus().getProfile().getState()){
+ case SENDING:
+ case AWAITING_ALLOCATION:
+ case RUNNING:
+ if(data.isLocal()){
+ rootRunner.cancel();
+ }else{
+ controller.getTunnel(data.getEndpoint()).cancelFragment(new CancelListener(data.getEndpoint(), handle), handle);
+ }
+ break;
+ default:
+ break;
+ }
+ }
}
public void cancel(){
- stopQuery();
+ cancelled = true;
+ if (running) {
+ stopQuery();
+ stopped = true;
+ }
}
private class CancelListener extends EndpointListener<Ack, FragmentHandle>{
@@ -234,7 +255,7 @@ public class QueryManager implements FragmentStatusListener{
// do nothing.
}
- };
+ }
public RpcOutcomeListener<Ack> getSubmitListener(DrillbitEndpoint endpoint, PlanFragment value){
return new FragmentSubmitListener(endpoint, value);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
index 70de95817..62293fc93 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.work.foreman;
+import com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.cache.DistributedCache;
import org.apache.drill.exec.cache.DistributedCache.CacheConfig;
@@ -38,6 +39,9 @@ import org.apache.drill.exec.store.sys.PStoreProvider;
import org.apache.drill.exec.work.foreman.Foreman.ForemanManagerListener;
import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
public class QueryStatus {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryStatus.class);
@@ -48,6 +52,7 @@ public class QueryStatus {
// doesn't need to be thread safe as fragmentDataMap is generated in a single thread and then accessed by multiple threads for reads only.
private IntObjectOpenHashMap<IntObjectOpenHashMap<FragmentData>> fragmentDataMap = new IntObjectOpenHashMap<IntObjectOpenHashMap<FragmentData>>();
+ private List<FragmentData> fragmentDataSet = Lists.newArrayList();
private final String queryId;
private final QueryId id;
@@ -73,6 +78,10 @@ public class QueryStatus {
this.foreman = foreman;
}
+ public List<FragmentData> getFragmentData() {
+ return fragmentDataSet;
+ }
+
public void setPlanText(String planText){
this.planText = planText;
updateCache();
@@ -106,6 +115,7 @@ public class QueryStatus {
}
minorMap.put(minorFragmentId, data);
+ fragmentDataSet.add(data);
}
void update(FragmentStatus status, boolean updateCache){
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 51421a77e..735e66332 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -32,6 +32,7 @@ import org.apache.drill.exec.work.CancelableQuery;
import org.apache.drill.exec.work.StatusProvider;
import com.codahale.metrics.Timer;
+import org.apache.drill.exec.work.WorkManager.WorkerBee;
/**
* Responsible for running a single fragment on a single Drillbit. Listens/responds to status request and cancellation
@@ -44,10 +45,13 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
private final FragmentRoot rootOperator;
private RootExec root;
private final FragmentContext context;
+ private final WorkerBee bee;
private final StatusReporter listener;
+ private Thread executionThread;
- public FragmentExecutor(FragmentContext context, FragmentRoot rootOperator, StatusReporter listener){
+ public FragmentExecutor(FragmentContext context, WorkerBee bee, FragmentRoot rootOperator, StatusReporter listener){
this.context = context;
+ this.bee = bee;
this.rootOperator = rootOperator;
this.listener = listener;
}
@@ -60,6 +64,11 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
@Override
public void cancel() {
updateState(FragmentState.CANCELLED);
+ logger.debug("Cancelled Fragment {}", context.getHandle());
+ context.cancel();
+ if (executionThread != null) {
+ executionThread.interrupt();
+ }
}
public UserClientConnection getClient(){
@@ -68,71 +77,75 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
@Override
public void run() {
- final String originalThread = Thread.currentThread().getName();
- String newThreadName = String.format("%s:frag:%s:%s", //
- QueryIdHelper.getQueryId(context.getHandle().getQueryId()), //
- context.getHandle().getMajorFragmentId(),
- context.getHandle().getMinorFragmentId()
- );
- Thread.currentThread().setName(newThreadName);
-
- boolean closed = false;
try {
- root = ImplCreator.getExec(context, rootOperator);
- } catch (AssertionError | Exception e) {
- context.fail(e);
- logger.debug("Failure while initializing operator tree", e);
- internalFail(e);
- return;
- }
+ final String originalThread = Thread.currentThread().getName();
+ String newThreadName = String.format("%s:frag:%s:%s", //
+ QueryIdHelper.getQueryId(context.getHandle().getQueryId()), //
+ context.getHandle().getMajorFragmentId(),
+ context.getHandle().getMinorFragmentId()
+ );
+ Thread.currentThread().setName(newThreadName);
+ executionThread = Thread.currentThread();
+
+ boolean closed = false;
+ try {
+ root = ImplCreator.getExec(context, rootOperator);
+ } catch (AssertionError | Exception e) {
+ context.fail(e);
+ logger.debug("Failure while initializing operator tree", e);
+ internalFail(e);
+ return;
+ }
- logger.debug("Starting fragment runner. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
- if(!updateState(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING, false)){
- internalFail(new RuntimeException(String.format("Run was called when fragment was in %s state. FragmentRunnables should only be started when they are currently in awaiting allocation state.", FragmentState.valueOf(state.get()))));
- return;
- }
+ logger.debug("Starting fragment runner. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
+ if(!updateState(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING, false)){
+ internalFail(new RuntimeException(String.format("Run was called when fragment was in %s state. FragmentRunnables should only be started when they are currently in awaiting allocation state.", FragmentState.valueOf(state.get()))));
+ return;
+ }
- // run the query until root.next returns false.
- try{
- while(state.get() == FragmentState.RUNNING_VALUE){
- if(!root.next()){
- if(context.isFailed()){
- updateState(FragmentState.RUNNING, FragmentState.FAILED, false);
- }else{
- updateState(FragmentState.RUNNING, FragmentState.FINISHED, false);
- }
+ // run the query until root.next returns false.
+ try{
+ while(state.get() == FragmentState.RUNNING_VALUE){
+ if(!root.next()){
+ if(context.isFailed()){
+ updateState(FragmentState.RUNNING, FragmentState.FAILED, false);
+ }else{
+ updateState(FragmentState.RUNNING, FragmentState.FINISHED, false);
+ }
+ }
}
- }
- root.stop();
- if(context.isFailed()) {
- internalFail(context.getFailureCause());
- }
+ root.stop();
+ if(context.isFailed()) {
+ internalFail(context.getFailureCause());
+ }
- closed = true;
-
- context.close();
- }catch(AssertionError | Exception ex){
- logger.debug("Caught exception while running fragment", ex);
- internalFail(ex);
- }finally{
- Thread.currentThread().setName(originalThread);
- if(!closed) {
- try {
- root.stop();
- if(context.isFailed()) {
- internalFail(context.getFailureCause());
+ closed = true;
+
+ context.close();
+ }catch(AssertionError | Exception ex){
+ logger.debug("Caught exception while running fragment", ex);
+ internalFail(ex);
+ }finally{
+ Thread.currentThread().setName(originalThread);
+ if(!closed) {
+ try {
+ if(context.isFailed()) {
+ internalFail(context.getFailureCause());
+ }
+ context.close();
+ } catch (RuntimeException e) {
+ logger.warn("Failure while closing context in failed state.", e);
}
- context.close();
- } catch (RuntimeException e) {
- logger.warn("Failure while closing context in failed state.", e);
}
}
+ } finally {
+ logger.debug("Fragment runner complete. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
+ bee.removeFragment(context.getHandle());
}
- logger.debug("Fragment runner complete. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
}
private void internalFail(Throwable excep){
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index 51bf81c77..48d14662e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -36,6 +36,7 @@ import org.apache.drill.exec.record.RawFragmentBatch;
import org.apache.drill.exec.rpc.RemoteConnection;
import org.apache.drill.exec.rpc.ResponseSender;
import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.work.WorkManager.WorkerBee;
import org.apache.drill.exec.work.batch.IncomingBuffers;
/**
@@ -48,12 +49,15 @@ public class NonRootFragmentManager implements FragmentManager {
private final StatusReporter runnerListener;
private volatile FragmentExecutor runner;
private volatile boolean cancel = false;
+ private final WorkerBee bee;
private final FragmentContext context;
private List<RemoteConnection> connections = new CopyOnWriteArrayList<>();
- public NonRootFragmentManager(PlanFragment fragment, DrillbitContext context) throws FragmentSetupException{
+ public NonRootFragmentManager(PlanFragment fragment, WorkerBee bee) throws FragmentSetupException{
try{
this.fragment = fragment;
+ DrillbitContext context = bee.getContext();
+ this.bee = bee;
this.root = context.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
this.context = new FragmentContext(context, fragment, null, context.getFunctionImplementationRegistry());
this.buffers = new IncomingBuffers(root, this.context);
@@ -81,7 +85,7 @@ public class NonRootFragmentManager implements FragmentManager {
synchronized(this){
if(runner != null) throw new IllegalStateException("Get Runnable can only be run once.");
if(cancel) return null;
- runner = new FragmentExecutor(context, root, runnerListener);
+ runner = new FragmentExecutor(context, bee, root, runnerListener);
return this.runner;
}
diff --git a/exec/java-exec/src/main/resources/rest/profile/profile.ftl b/exec/java-exec/src/main/resources/rest/profile/profile.ftl
index 8abb31615..ffe7e46f6 100644
--- a/exec/java-exec/src/main/resources/rest/profile/profile.ftl
+++ b/exec/java-exec/src/main/resources/rest/profile/profile.ftl
@@ -49,6 +49,9 @@
</div>
<button type="submit" class="btn btn-default">Re-run query</button>
</form>
+ <form action="/profiles/cancel/${model.id}" method="GET">
+ <button type="link" class="btn btn-default">Cancel query</button>
+ </form>
<div class="page-header">
</div>
<h3>Visualized Plan</h3>