aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java19
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java63
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java144
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java108
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java172
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java21
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/NamedThreadFactory.java56
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java49
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java33
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java158
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java37
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java18
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java23
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java133
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjector.java112
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionSite.java72
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/testing/SimulatedExceptions.java164
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/CancelableQuery.java22
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/StatusProvider.java29
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java255
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java101
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java1004
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java29
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java323
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java255
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java199
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java1
61 files changed, 2135 insertions, 1607 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 93d06f00f..cd0a0a285 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -202,4 +202,8 @@ public interface ExecConstants {
public static final String ENABLE_WINDOW_FUNCTIONS = "window.enable";
public static final OptionValidator ENABLE_WINDOW_FUNCTIONS_VALIDATOR = new BooleanValidator(ENABLE_WINDOW_FUNCTIONS, false);
+
+ public static final String DRILLBIT_EXCEPTION_INJECTIONS = "drill.exec.testing.exception-injections";
+ public static final OptionValidator DRILLBIT_EXCEPTION_INJECTIONS_VALIDATOR =
+ new StringValidator(DRILLBIT_EXCEPTION_INJECTIONS, "");
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
index 04b955bbb..c3a873ccc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java
@@ -98,7 +98,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
public DrillClient(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator) {
this.ownsZkConnection = coordinator == null;
this.ownsAllocator = allocator == null;
- this.allocator = allocator == null ? new TopLevelAllocator(config) : allocator;
+ this.allocator = ownsAllocator ? new TopLevelAllocator(config) : allocator;
this.config = config;
this.clusterCoordinator = coordinator;
this.reconnectTimes = config.getInt(ExecConstants.BIT_RETRY_TIMES);
@@ -131,7 +131,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
/**
* Connects the client to a Drillbit server
*
- * @throws IOException
+ * @throws RpcException
*/
public void connect() throws RpcException {
connect(null, new Properties());
@@ -176,7 +176,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
connected = true;
}
- protected EventLoopGroup createEventLoop(int size, String prefix) {
+ protected static EventLoopGroup createEventLoop(int size, String prefix) {
return TransportCheck.createEventLoopGroup(size, prefix);
}
@@ -204,12 +204,8 @@ public class DrillClient implements Closeable, ConnectionThrottle{
private void connect(DrillbitEndpoint endpoint) throws RpcException {
FutureHandler f = new FutureHandler();
- try {
- client.connect(f, endpoint, props, getUserCredentials());
- f.checkedGet();
- } catch (InterruptedException e) {
- throw new RpcException(e);
- }
+ client.connect(f, endpoint, props, getUserCredentials());
+ f.checkedGet();
}
public BufferAllocator getAllocator() {
@@ -219,6 +215,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{
/**
* Closes this client's connection to the server
*/
+ @Override
public void close() {
if (this.client != null) {
this.client.close();
@@ -286,15 +283,13 @@ public class DrillClient implements Closeable, ConnectionThrottle{
* Submits a Logical plan for direct execution (bypasses parsing)
*
* @param plan the plan to execute
- * @return a handle for the query result
- * @throws RpcException
*/
public void runQuery(QueryType type, String plan, UserResultsListener resultsListener) {
client.submitQuery(resultsListener, newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build());
}
private class ListHoldingResultsListener implements UserResultsListener {
- private Vector<QueryResultBatch> results = new Vector<QueryResultBatch>();
+ private Vector<QueryResultBatch> results = new Vector<>();
private SettableFuture<List<QueryResultBatch>> future = SettableFuture.create();
private UserProtos.RunQuery query ;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
index 5c657248c..493f6ce8e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java
@@ -294,14 +294,11 @@ public class ClassTransformer {
if (templateDefinition.getExternalInterface().isAssignableFrom(c)) {
logger.debug("Done compiling (bytecode size={}, time:{} millis).", DrillStringUtils.readable(totalBytecodeSize), (System.nanoTime() - t1) / 1000000);
return c;
- } else {
- throw new ClassTransformationException("The requested class did not implement the expected interface.");
}
+
+ throw new ClassTransformationException("The requested class did not implement the expected interface.");
} catch (CompileException | IOException | ClassNotFoundException e) {
throw new ClassTransformationException(String.format("Failure generating transformation classes for value: \n %s", entireClass), e);
}
-
}
-
}
-
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java
index 035c1aa2c..27868f01a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -32,10 +33,15 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import com.google.common.collect.Maps;
public class LocalClusterCoordinator extends ClusterCoordinator {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalClusterCoordinator.class);
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalClusterCoordinator.class);
- private volatile Map<RegistrationHandle, DrillbitEndpoint> endpoints = Maps.newConcurrentMap();
- private volatile ConcurrentMap<String, DistributedSemaphore> semaphores = Maps.newConcurrentMap();
+ /*
+ * Since we hand out the endpoints list in {@see #getAvailableEndpoints()}, we use a
+ * {@see java.util.concurrent.ConcurrentHashMap} because those guarantee not to throw
+ * ConcurrentModificationException.
+ */
+ private final Map<RegistrationHandle, DrillbitEndpoint> endpoints = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, DistributedSemaphore> semaphores = Maps.newConcurrentMap();
@Override
public void close() throws IOException {
@@ -43,21 +49,21 @@ public class LocalClusterCoordinator extends ClusterCoordinator {
}
@Override
- public void start(long millis) throws Exception {
+ public void start(final long millis) throws Exception {
logger.debug("Local Cluster Coordinator started.");
}
@Override
- public RegistrationHandle register(DrillbitEndpoint data) {
+ public RegistrationHandle register(final DrillbitEndpoint data) {
logger.debug("Endpoint registered {}.", data);
- Handle h = new Handle();
+ final Handle h = new Handle();
endpoints.put(h, data);
return h;
}
@Override
- public void unregister(RegistrationHandle handle) {
- if(handle == null) {
+ public void unregister(final RegistrationHandle handle) {
+ if (handle == null) {
return;
}
@@ -69,8 +75,8 @@ public class LocalClusterCoordinator extends ClusterCoordinator {
return endpoints.values();
}
- private class Handle implements RegistrationHandle{
- UUID id = UUID.randomUUID();
+ private class Handle implements RegistrationHandle {
+ private final UUID id = UUID.randomUUID();
@Override
public int hashCode() {
@@ -82,7 +88,7 @@ public class LocalClusterCoordinator extends ClusterCoordinator {
}
@Override
- public boolean equals(Object obj) {
+ public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
@@ -92,7 +98,7 @@ public class LocalClusterCoordinator extends ClusterCoordinator {
if (getClass() != obj.getClass()) {
return false;
}
- Handle other = (Handle) obj;
+ final Handle other = (Handle) obj;
if (!getOuterType().equals(other.getOuterType())) {
return false;
}
@@ -109,41 +115,38 @@ public class LocalClusterCoordinator extends ClusterCoordinator {
private LocalClusterCoordinator getOuterType() {
return LocalClusterCoordinator.this;
}
-
}
@Override
- public DistributedSemaphore getSemaphore(String name, int maximumLeases) {
- semaphores.putIfAbsent(name, new LocalSemaphore(maximumLeases));
+ public DistributedSemaphore getSemaphore(final String name, final int maximumLeases) {
+ if (!semaphores.containsKey(name)) {
+ semaphores.putIfAbsent(name, new LocalSemaphore(maximumLeases));
+ }
return semaphores.get(name);
}
- public class LocalSemaphore implements DistributedSemaphore{
-
- private final Semaphore inner;
- private final LocalLease lease = new LocalLease();
+ public class LocalSemaphore implements DistributedSemaphore {
+ private final Semaphore semaphore;
+ private final LocalLease localLease = new LocalLease();
- public LocalSemaphore(int size) {
- inner = new Semaphore(size);
+ public LocalSemaphore(final int size) {
+ semaphore = new Semaphore(size);
}
@Override
- public DistributedLease acquire(long timeout, TimeUnit unit) throws Exception {
- if(!inner.tryAcquire(timeout, unit)) {
+ public DistributedLease acquire(final long timeout, final TimeUnit timeUnit) throws Exception {
+ if (!semaphore.tryAcquire(timeout, timeUnit)) {
return null;
- }else{
- return lease;
+ } else {
+ return localLease;
}
}
- private class LocalLease implements DistributedLease{
-
+ private class LocalLease implements DistributedLease {
@Override
public void close() throws Exception {
- inner.release();
+ semaphore.release();
}
-
}
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 2a6660e0e..5e31e5cc2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.ops;
import io.netty.buffer.DrillBuf;
-import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -27,9 +26,9 @@ import java.util.Map;
import net.hydromatic.optiq.SchemaPlus;
import net.hydromatic.optiq.jdbc.SimpleOptiqSchema;
+import org.apache.drill.common.DeferredException;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.exec.compile.QueryClassLoader;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.CodeGenerator;
@@ -39,7 +38,6 @@ import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.UserBitShared.UserCredentials;
import org.apache.drill.exec.rpc.control.ControlTunnel;
import org.apache.drill.exec.rpc.data.DataTunnel;
import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
@@ -49,53 +47,52 @@ import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.work.batch.IncomingBuffers;
-import com.carrotsearch.hppc.LongObjectOpenHashMap;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
* Contextual objects required for execution of a particular fragment.
*/
public class FragmentContext implements AutoCloseable, UdfUtilities {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
-
-
- private Map<DrillbitEndpoint, DataTunnel> tunnels = Maps.newHashMap();
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class);
+ private final Map<DrillbitEndpoint, DataTunnel> tunnels = Maps.newHashMap();
private final DrillbitContext context;
private final UserClientConnection connection;
private final FragmentStats stats;
private final FunctionImplementationRegistry funcRegistry;
- private final QueryClassLoader loader;
private final BufferAllocator allocator;
private final PlanFragment fragment;
- private List<Thread> daemonThreads = Lists.newLinkedList();
- private QueryDateTimeInfo queryDateTimeInfo;
+ private final QueryDateTimeInfo queryDateTimeInfo;
private IncomingBuffers buffers;
private final OptionManager fragmentOptions;
- private final UserCredentials credentials;
private final BufferManager bufferManager;
- private volatile Throwable failureCause;
+ private final DeferredException deferredException = new DeferredException();
private volatile FragmentContextState state = FragmentContextState.OK;
+ /*
+ * TODO we need a state that indicates that cancellation has been requested and
+ * is in progress. Early termination (such as from limit queries) could also use
+ * this, as the cleanup steps should be exactly the same.
+ */
private static enum FragmentContextState {
OK,
FAILED,
CANCELED
}
- public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection,
- FunctionImplementationRegistry funcRegistry) throws OutOfMemoryException, ExecutionSetupException {
-
+ public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment,
+ final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry)
+ throws ExecutionSetupException {
this.context = dbContext;
this.connection = connection;
this.fragment = fragment;
this.funcRegistry = funcRegistry;
- this.credentials = fragment.getCredentials();
- this.queryDateTimeInfo = new QueryDateTimeInfo(fragment.getQueryStartTime(), fragment.getTimeZone());
+ queryDateTimeInfo = new QueryDateTimeInfo(fragment.getQueryStartTime(), fragment.getTimeZone());
+
logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial());
logger.debug("Fragment max allocation: {}", fragment.getMemMax());
+
try {
OptionList list;
if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) {
@@ -103,7 +100,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
} else {
list = dbContext.getConfig().getMapper().readValue(fragment.getOptionsJson(), OptionList.class);
}
- this.fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
+ fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list);
} catch (Exception e) {
throw new ExecutionSetupException("Failure while reading plan options.", e);
}
@@ -111,15 +108,15 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
// Add the fragment context to the root allocator.
// The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments
try {
- this.allocator = dbContext.getAllocator().getChildAllocator(this, fragment.getMemInitial(), fragment.getMemMax(), true);
+ allocator = context.getAllocator().getChildAllocator(
+ this, fragment.getMemInitial(), fragment.getMemMax(), true);
assert (allocator != null);
- }catch(Throwable e){
+ } catch(Throwable e) {
throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e);
}
- this.stats = new FragmentStats(allocator, dbContext.getMetrics(), fragment.getAssignment());
- this.bufferManager = new BufferManager(this.allocator, this);
- this.loader = new QueryClassLoader(dbContext.getConfig(), fragmentOptions);
+ stats = new FragmentStats(allocator, dbContext.getMetrics(), fragment.getAssignment());
+ bufferManager = new BufferManager(this.allocator, this);
}
public OptionManager getOptions() {
@@ -133,7 +130,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
public void fail(Throwable cause) {
logger.error("Fragment Context received failure.", cause);
setState(FragmentContextState.FAILED);
- failureCause = cause;
+ deferredException.addThrowable(cause);
}
public void cancel() {
@@ -161,11 +158,11 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
fail(new UnsupportedOperationException("Schema tree can only be created in root fragment. " +
"This is a non-root fragment."));
return null;
- } else {
- SchemaPlus root = SimpleOptiqSchema.createRootSchema(false);
- context.getStorage().getSchemaFactory().registerSchemas(connection.getSession(), root);
- return root;
}
+
+ final SchemaPlus root = SimpleOptiqSchema.createRootSchema(false);
+ context.getStorage().getSchemaFactory().registerSchemas(connection.getSession(), root);
+ return root;
}
/**
@@ -177,14 +174,17 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
}
public FragmentStats getStats() {
- return this.stats;
+ return stats;
}
+ @Override
public QueryDateTimeInfo getQueryDateTimeInfo(){
return this.queryDateTimeInfo;
}
- public DrillbitEndpoint getForemanEndpoint() {return fragment.getForeman();}
+ public DrillbitEndpoint getForemanEndpoint() {
+ return fragment.getForeman();
+ }
/**
* The FragmentHandle for this Fragment
@@ -194,31 +194,37 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
return fragment.getHandle();
}
+ private String getFragIdString() {
+ final FragmentHandle handle = getHandle();
+ final String frag = handle != null ? handle.getMajorFragmentId() + ":" + handle.getMinorFragmentId() : "0:0";
+ return frag;
+ }
+
/**
* Get this fragment's allocator.
- * @return
+ * @return the allocator
*/
@Deprecated
public BufferAllocator getAllocator() {
- if(allocator == null){
- FragmentHandle handle=getHandle();
- String frag=handle!=null?handle.getMajorFragmentId()+":"+handle.getMinorFragmentId():"0:0";
- logger.debug("Fragment:"+frag+" Allocator is NULL");
+ if (allocator == null) {
+ logger.debug("Fragment: " + getFragIdString() + " Allocator is NULL");
}
return allocator;
}
- public BufferAllocator getNewChildAllocator(long initialReservation,
- long maximumReservation,
- boolean applyFragmentLimit) throws OutOfMemoryException {
+ public BufferAllocator getNewChildAllocator(final long initialReservation,
+ final long maximumReservation,
+ final boolean applyFragmentLimit) throws OutOfMemoryException {
return allocator.getChildAllocator(this, initialReservation, maximumReservation, applyFragmentLimit);
}
- public <T> T getImplementationClass(ClassGenerator<T> cg) throws ClassTransformationException, IOException {
+ public <T> T getImplementationClass(final ClassGenerator<T> cg)
+ throws ClassTransformationException, IOException {
return getImplementationClass(cg.getCodeGenerator());
}
- public <T> T getImplementationClass(CodeGenerator<T> cg) throws ClassTransformationException, IOException {
+ public <T> T getImplementationClass(final CodeGenerator<T> cg)
+ throws ClassTransformationException, IOException {
return context.getCompiler().getImplementationClass(cg);
}
@@ -238,7 +244,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
return connection;
}
- public ControlTunnel getControlTunnel(DrillbitEndpoint endpoint) {
+ public ControlTunnel getControlTunnel(final DrillbitEndpoint endpoint) {
return context.getController().getTunnel(endpoint);
}
@@ -251,24 +257,12 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
return tunnel;
}
- /**
- * Add a new thread to this fragment's context. This thread will likely run for the life of the fragment but should be
- * terminated when the fragment completes. When the fragment completes, the threads will be interrupted.
- *
- * @param thread
- */
- public void addDaemonThread(Thread thread) {
- daemonThreads.add(thread);
- thread.start();
-
- }
-
public IncomingBuffers getBuffers() {
return buffers;
}
public Throwable getFailureCause() {
- return failureCause;
+ return deferredException.getException();
}
public boolean isFailed() {
@@ -283,43 +277,36 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
return funcRegistry;
}
- public UserCredentials getCredentials() {
- return credentials;
- }
-
- public QueryClassLoader getClassLoader() {
- return loader;
- }
-
public DrillConfig getConfig() {
return context.getConfig();
}
- public void setFragmentLimit(long limit) {
- this.allocator.setFragmentLimit(limit);
+ public void setFragmentLimit(final long limit) {
+ allocator.setFragmentLimit(limit);
+ }
+
+ public DeferredException getDeferredException() {
+ return deferredException;
}
@Override
public void close() throws Exception {
- for (Thread thread: daemonThreads) {
- thread.interrupt();
- }
- bufferManager.close();
-
- if (buffers != null) {
- buffers.close();
- }
+ /*
+ * TODO wait for threads working on this Fragment to terminate (or at least stop working
+ * on this Fragment's query)
+ */
+ deferredException.suppressingClose(bufferManager);
+ deferredException.suppressingClose(buffers);
+ deferredException.suppressingClose(allocator);
- FragmentHandle handle=getHandle();
- String frag=handle!=null?handle.getMajorFragmentId()+":"+handle.getMinorFragmentId():"0:0";
- allocator.close();
- logger.debug("Fragment:"+frag+" After close allocator is: "+allocator!=null?"OK":"NULL");
+ deferredException.close(); // must be last, as this may throw
}
public DrillBuf replace(DrillBuf old, int newSize) {
return bufferManager.replace(old, newSize);
}
+ @Override
public DrillBuf getManagedBuffer() {
return bufferManager.getManagedBuffer();
}
@@ -327,5 +314,4 @@ public class FragmentContext implements AutoCloseable, UdfUtilities {
public DrillBuf getManagedBuffer(int size) {
return bufferManager.getManagedBuffer(size);
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index fb6b4aa13..3b51a6916 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -24,35 +24,30 @@ import net.hydromatic.optiq.SchemaPlus;
import net.hydromatic.optiq.jdbc.SimpleOptiqSchema;
import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.expr.fn.impl.DateUtility;
-import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.planner.sql.DrillOperatorTable;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.rpc.control.WorkEventBus;
-import org.apache.drill.exec.rpc.data.DataConnectionCreator;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.server.options.QueryOptionManager;
import org.apache.drill.exec.store.StoragePluginRegistry;
+// TODO except for a couple of tests, this is only created by Foreman
+// TODO the many methods that just return drillbitContext.getXxx() should be replaced with getDrillbitContext()
// TODO - consider re-name to PlanningContext, as the query execution context actually appears
// in fragment contexts
-public class QueryContext implements AutoCloseable, UdfUtilities{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
+public class QueryContext implements AutoCloseable, UdfUtilities {
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
- private final QueryId queryId;
private final DrillbitContext drillbitContext;
- private final WorkEventBus workBus;
- private UserSession session;
- private OptionManager queryOptions;
+ private final UserSession session;
+ private final OptionManager queryOptions;
private final PlannerSettings plannerSettings;
private final DrillOperatorTable table;
@@ -62,32 +57,32 @@ public class QueryContext implements AutoCloseable, UdfUtilities{
private static final int INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES = 1024 * 1024;
private static final int MAX_OFF_HEAP_ALLOCATION_IN_BYTES = 16 * 1024 * 1024;
- // flag to indicate if close has been called, after calling close the first
- // time this is set to true and the close method becomes a no-op
+ /*
+ * Flag to indicate if close has been called, after calling close the first
+ * time this is set to true and the close method becomes a no-op.
+ */
private boolean closed = false;
- public QueryContext(final UserSession session, QueryId queryId, final DrillbitContext drllbitContext) {
- super();
- this.queryId = queryId;
+ public QueryContext(final UserSession session, final DrillbitContext drllbitContext) {
this.drillbitContext = drllbitContext;
- this.workBus = drllbitContext.getWorkBus();
this.session = session;
- this.queryOptions = new QueryOptionManager(session.getOptions());
- this.plannerSettings = new PlannerSettings(queryOptions, getFunctionRegistry());
+ queryOptions = new QueryOptionManager(session.getOptions());
+ plannerSettings = new PlannerSettings(queryOptions, getFunctionRegistry());
plannerSettings.setNumEndPoints(drillbitContext.getBits().size());
- this.table = new DrillOperatorTable(getFunctionRegistry());
+ table = new DrillOperatorTable(getFunctionRegistry());
- long queryStartTime = System.currentTimeMillis();
- int timeZone = DateUtility.getIndex(System.getProperty("user.timezone"));
- this.queryDateTimeInfo = new QueryDateTimeInfo(queryStartTime, timeZone);
+ final long queryStartTime = System.currentTimeMillis();
+ final int timeZone = DateUtility.getIndex(System.getProperty("user.timezone"));
+ queryDateTimeInfo = new QueryDateTimeInfo(queryStartTime, timeZone);
try {
- this.allocator = drllbitContext.getAllocator().getChildAllocator(null, INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES, MAX_OFF_HEAP_ALLOCATION_IN_BYTES, false);
+ allocator = drllbitContext.getAllocator().getChildAllocator(null, INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES,
+ MAX_OFF_HEAP_ALLOCATION_IN_BYTES, false);
} catch (OutOfMemoryException e) {
throw new DrillRuntimeException("Error creating off-heap allocator for planning context.",e);
}
// TODO(DRILL-1942) the new allocator has this capability built-in, so this can be removed once that is available
- this.bufferManager = new BufferManager(this.allocator, null);
+ bufferManager = new BufferManager(this.allocator, null);
}
@@ -95,7 +90,7 @@ public class QueryContext implements AutoCloseable, UdfUtilities{
return plannerSettings;
}
- public UserSession getSession(){
+ public UserSession getSession() {
return session;
}
@@ -103,18 +98,18 @@ public class QueryContext implements AutoCloseable, UdfUtilities{
return allocator;
}
- public SchemaPlus getNewDefaultSchema(){
- SchemaPlus rootSchema = getRootSchema();
- SchemaPlus defaultSchema = session.getDefaultSchema(rootSchema);
- if(defaultSchema == null){
+ public SchemaPlus getNewDefaultSchema() {
+ final SchemaPlus rootSchema = getRootSchema();
+ final SchemaPlus defaultSchema = session.getDefaultSchema(rootSchema);
+ if (defaultSchema == null) {
return rootSchema;
- }else{
- return defaultSchema;
}
+
+ return defaultSchema;
}
- public SchemaPlus getRootSchema(){
- SchemaPlus rootSchema = SimpleOptiqSchema.createRootSchema(false);
+ public SchemaPlus getRootSchema() {
+ final SchemaPlus rootSchema = SimpleOptiqSchema.createRootSchema(false);
drillbitContext.getSchemaFactory().registerSchemas(session, rootSchema);
return rootSchema;
}
@@ -123,43 +118,23 @@ public class QueryContext implements AutoCloseable, UdfUtilities{
return queryOptions;
}
- public OptionManager getSessionOptions() {
- return session.getOptions();
- }
-
- public DrillbitEndpoint getCurrentEndpoint(){
+ public DrillbitEndpoint getCurrentEndpoint() {
return drillbitContext.getEndpoint();
}
- public QueryId getQueryId() {
- return queryId;
- }
-
- public StoragePluginRegistry getStorage(){
+ public StoragePluginRegistry getStorage() {
return drillbitContext.getStorage();
}
- public Collection<DrillbitEndpoint> getActiveEndpoints(){
+ public Collection<DrillbitEndpoint> getActiveEndpoints() {
return drillbitContext.getBits();
}
- public PhysicalPlanReader getPlanReader(){
- return drillbitContext.getPlanReader();
- }
-
- public DataConnectionCreator getDataConnectionsPool(){
- return drillbitContext.getDataConnectionsPool();
- }
-
- public DrillConfig getConfig(){
+ public DrillConfig getConfig() {
return drillbitContext.getConfig();
}
- public WorkEventBus getWorkBus(){
- return workBus;
- }
-
- public FunctionImplementationRegistry getFunctionRegistry(){
+ public FunctionImplementationRegistry getFunctionRegistry() {
return drillbitContext.getFunctionImplementationRegistry();
}
@@ -167,10 +142,6 @@ public class QueryContext implements AutoCloseable, UdfUtilities{
return table;
}
- public ClusterCoordinator getClusterCoordinator() {
- return drillbitContext.getClusterCoordinator();
- }
-
@Override
public QueryDateTimeInfo getQueryDateTimeInfo() {
return queryDateTimeInfo;
@@ -183,10 +154,13 @@ public class QueryContext implements AutoCloseable, UdfUtilities{
@Override
public void close() throws Exception {
- if (!closed) {
- // TODO(DRILL-1942) the new allocator has this capability built-in, so this can be removed once that is available
- bufferManager.close();
- allocator.close();
+ try {
+ if (!closed) {
+ // TODO(DRILL-1942) the new allocator has this capability built-in, so this can be removed once that is available
+ bufferManager.close();
+ allocator.close();
+ }
+ } finally {
closed = true;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
index 9f89f249b..b1a71a5d5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java
@@ -29,6 +29,7 @@ import org.apache.drill.common.logical.data.Filter;
import org.apache.drill.common.logical.data.GroupingAggregate;
import org.apache.drill.common.logical.data.Join;
import org.apache.drill.common.logical.data.JoinCondition;
+import org.apache.drill.common.logical.data.LogicalOperator;
import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.common.logical.data.Order;
import org.apache.drill.common.logical.data.Order.Ordering;
@@ -52,10 +53,8 @@ import org.apache.drill.exec.physical.config.SelectionVectorRemover;
import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.physical.config.StreamingAggregate;
import org.apache.drill.exec.physical.config.WindowPOP;
-import org.apache.drill.exec.rpc.user.UserServer;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.StoragePlugin;
-import org.apache.drill.exec.work.foreman.ForemanException;
import org.eigenbase.rel.RelFieldCollation.Direction;
import org.eigenbase.rel.RelFieldCollation.NullDirection;
@@ -63,87 +62,49 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
-public class BasicOptimizer extends Optimizer{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicOptimizer.class);
+public class BasicOptimizer extends Optimizer {
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicOptimizer.class);
- private DrillConfig config;
- private QueryContext context;
- private UserServer.UserClientConnection userSession;
+ private final QueryContext queryContext;
- public BasicOptimizer(DrillConfig config, QueryContext context, UserServer.UserClientConnection userSession){
- this.config = config;
- this.context = context;
- this.userSession = userSession;
- logCurrentOptionValues();
- }
-
- private void logCurrentOptionValues(){
-// Iterator<DrillOptionValue> optionVals = userSession.getSessionOptionIterator();
-// DrillOptionValue val = null;
-// String output = "";
-// output += "SessionOptions: {\n";
-// for ( ;optionVals.hasNext(); val = optionVals.next()){
-// if (val != null) {
-// output += val.getOptionName() + ":" + val.getValue() + ",\n";
-// }
-// }
-// output += "}";
-// logger.debug(output);
- }
-
- /**
- * Get the current value of an option. Session options override global options.
- *
- * @param name - the name of the option
- * @return - value of the option
- */
- private Object getOptionValue(String name) {
-// Object val = userSession.getSessionLevelOption(name);
-// if (val == null) {
-// context.getOptionValue(name);
-// }
-// return val;
- return null;
+ public BasicOptimizer(final QueryContext queryContext) {
+ this.queryContext = queryContext;
}
@Override
- public void init(DrillConfig config) {
-
+ public void init(final DrillConfig config) {
}
@Override
- public PhysicalPlan optimize(OptimizationContext context, LogicalPlan plan) throws OptimizerException{
- Object obj = new Object();
- Collection<SinkOperator> roots = plan.getGraph().getRoots();
- List<PhysicalOperator> physOps = new ArrayList<PhysicalOperator>(roots.size());
- LogicalConverter converter = new LogicalConverter(plan);
- for ( SinkOperator op : roots){
- PhysicalOperator pop = op.accept(converter, obj);
+ public PhysicalPlan optimize(final OptimizationContext context, final LogicalPlan plan)
+ throws OptimizerException {
+ final Object obj = new Object();
+ final Collection<SinkOperator> roots = plan.getGraph().getRoots();
+ final List<PhysicalOperator> physOps = new ArrayList<>(roots.size());
+ final LogicalConverter converter = new LogicalConverter(plan);
+
+ for (SinkOperator op : roots) {
+ final PhysicalOperator pop = op.accept(converter, obj);
physOps.add(pop);
}
- PlanProperties props = PlanProperties.builder()
+ final PlanProperties logicalProperties = plan.getProperties();
+ final PlanProperties props = PlanProperties.builder()
.type(PlanProperties.PlanType.APACHE_DRILL_PHYSICAL)
- .version(plan.getProperties().version)
- .generator(plan.getProperties().generator)
+ .version(logicalProperties.version)
+ .generator(logicalProperties.generator)
.options(new JSONOptions(context.getOptions().getOptionList())).build();
- PhysicalPlan p = new PhysicalPlan(props, physOps);
+ final PhysicalPlan p = new PhysicalPlan(props, physOps);
return p;
- //return new PhysicalPlan(props, physOps);
- }
-
- @Override
- public void close() {
-
}
public static class BasicOptimizationContext implements OptimizationContext {
-
- private OptionManager ops;
- public BasicOptimizationContext(QueryContext c){
- this.ops = c.getOptions();
+ private final OptionManager ops;
+ public BasicOptimizationContext(final QueryContext c) {
+ ops = c.getOptions();
}
@Override
@@ -159,64 +120,65 @@ public class BasicOptimizer extends Optimizer{
private class LogicalConverter extends AbstractLogicalVisitor<PhysicalOperator, Object, OptimizerException> {
- // storing a reference to the plan for access to other elements outside of the query graph
- // such as the storage engine configs
- LogicalPlan logicalPlan;
+ /*
+ * Store a reference to the plan for access to other elements outside of the query graph
+ * such as the storage engine configs.
+ */
+ private final LogicalPlan logicalPlan;
- public LogicalConverter(LogicalPlan logicalPlan){
+ public LogicalConverter(final LogicalPlan logicalPlan) {
this.logicalPlan = logicalPlan;
}
@Override
public PhysicalOperator visitGroupingAggregate(GroupingAggregate groupBy, Object value) throws OptimizerException {
-
- List<Ordering> orderDefs = Lists.newArrayList();
-
-
+ final List<Ordering> orderDefs = Lists.newArrayList();
PhysicalOperator input = groupBy.getInput().accept(this, value);
- if(groupBy.getKeys().length > 0){
- for(NamedExpression e : groupBy.getKeys()){
+ if (groupBy.getKeys().length > 0) {
+ for(NamedExpression e : groupBy.getKeys()) {
orderDefs.add(new Ordering(Direction.ASCENDING, e.getExpr(), NullDirection.FIRST));
}
input = new Sort(input, orderDefs, false);
}
- StreamingAggregate sa = new StreamingAggregate(input, groupBy.getKeys(), groupBy.getExprs(), 1.0f);
+ final StreamingAggregate sa = new StreamingAggregate(input, groupBy.getKeys(), groupBy.getExprs(), 1.0f);
return sa;
}
@Override
- public PhysicalOperator visitWindow(Window window, Object value) throws OptimizerException {
+ public PhysicalOperator visitWindow(final Window window, final Object value) throws OptimizerException {
PhysicalOperator input = window.getInput().accept(this, value);
-
- List<Ordering> ods = Lists.newArrayList();
+ final List<Ordering> ods = Lists.newArrayList();
input = new Sort(input, ods, false);
- return new WindowPOP(input, window.getWithins(), window.getAggregations(), window.getOrderings(), window.getStart(), window.getEnd());
+ return new WindowPOP(input, window.getWithins(), window.getAggregations(),
+ window.getOrderings(), window.getStart(), window.getEnd());
}
@Override
- public PhysicalOperator visitOrder(Order order, Object value) throws OptimizerException {
- PhysicalOperator input = order.getInput().accept(this, value);
- List<Ordering> ods = Lists.newArrayList();
+ public PhysicalOperator visitOrder(final Order order, final Object value) throws OptimizerException {
+ final PhysicalOperator input = order.getInput().accept(this, value);
+ final List<Ordering> ods = Lists.newArrayList();
for (Ordering o : order.getOrderings()){
ods.add(o);
}
+
return new SelectionVectorRemover(new Sort(input, ods, false));
}
@Override
- public PhysicalOperator visitLimit(org.apache.drill.common.logical.data.Limit limit, Object value) throws OptimizerException {
- PhysicalOperator input = limit.getInput().accept(this, value);
+ public PhysicalOperator visitLimit(final org.apache.drill.common.logical.data.Limit limit,
+ final Object value) throws OptimizerException {
+ final PhysicalOperator input = limit.getInput().accept(this, value);
return new SelectionVectorRemover(new Limit(input, limit.getFirst(), limit.getLast()));
}
@Override
- public PhysicalOperator visitJoin(Join join, Object value) throws OptimizerException {
+ public PhysicalOperator visitJoin(final Join join, final Object value) throws OptimizerException {
PhysicalOperator leftOp = join.getLeft().accept(this, value);
- List<Ordering> leftOrderDefs = Lists.newArrayList();
+ final List<Ordering> leftOrderDefs = Lists.newArrayList();
for(JoinCondition jc : join.getConditions()){
leftOrderDefs.add(new Ordering(Direction.ASCENDING, jc.getLeft()));
}
@@ -224,28 +186,28 @@ public class BasicOptimizer extends Optimizer{
leftOp = new SelectionVectorRemover(leftOp);
PhysicalOperator rightOp = join.getRight().accept(this, value);
- List<Ordering> rightOrderDefs = Lists.newArrayList();
+ final List<Ordering> rightOrderDefs = Lists.newArrayList();
for(JoinCondition jc : join.getConditions()){
rightOrderDefs.add(new Ordering(Direction.ASCENDING, jc.getRight()));
}
rightOp = new Sort(rightOp, rightOrderDefs, false);
rightOp = new SelectionVectorRemover(rightOp);
- MergeJoinPOP mjp = new MergeJoinPOP(leftOp, rightOp, Arrays.asList(join.getConditions()), join.getJoinType());
+ final MergeJoinPOP mjp = new MergeJoinPOP(leftOp, rightOp, Arrays.asList(join.getConditions()),
+ join.getJoinType());
return new SelectionVectorRemover(mjp);
}
-
-
@Override
- public PhysicalOperator visitScan(Scan scan, Object obj) throws OptimizerException {
- StoragePluginConfig config = logicalPlan.getStorageEngineConfig(scan.getStorageEngine());
+ public PhysicalOperator visitScan(final Scan scan, final Object obj) throws OptimizerException {
+ final StoragePluginConfig config = logicalPlan.getStorageEngineConfig(scan.getStorageEngine());
if(config == null) {
- throw new OptimizerException(String.format("Logical plan referenced the storage engine config %s but the logical plan didn't have that available as a config.", scan.getStorageEngine()));
+ throw new OptimizerException(
+ String.format("Logical plan referenced the storage engine config %s but the logical plan didn't have that available as a config.",
+ scan.getStorageEngine()));
}
- StoragePlugin storagePlugin;
try {
- storagePlugin = context.getStorage().getPlugin(config);
+ final StoragePlugin storagePlugin = queryContext.getStorage().getPlugin(config);
return storagePlugin.getPhysicalScan(scan.getSelection());
} catch (IOException | ExecutionSetupException e) {
throw new OptimizerException("Failure while attempting to retrieve storage engine.", e);
@@ -253,27 +215,27 @@ public class BasicOptimizer extends Optimizer{
}
@Override
- public PhysicalOperator visitStore(Store store, Object obj) throws OptimizerException {
- if (!store.iterator().hasNext()) {
+ public PhysicalOperator visitStore(final Store store, final Object obj) throws OptimizerException {
+ final Iterator<LogicalOperator> iterator = store.iterator();
+ if (!iterator.hasNext()) {
throw new OptimizerException("Store node in logical plan does not have a child.");
}
- return new Screen(store.iterator().next().accept(this, obj), context.getCurrentEndpoint());
+ return new Screen(iterator.next().accept(this, obj), queryContext.getCurrentEndpoint());
}
@Override
- public PhysicalOperator visitProject(Project project, Object obj) throws OptimizerException {
- return new org.apache.drill.exec.physical.config.Project(Arrays.asList(project.getSelections()), project.iterator().next().accept(this, obj));
+ public PhysicalOperator visitProject(final Project project, final Object obj) throws OptimizerException {
+ return new org.apache.drill.exec.physical.config.Project(
+ Arrays.asList(project.getSelections()), project.iterator().next().accept(this, obj));
}
@Override
- public PhysicalOperator visitFilter(Filter filter, Object obj) throws OptimizerException {
- TypeProtos.MajorType.Builder b = TypeProtos.MajorType.getDefaultInstance().newBuilderForType();
+ public PhysicalOperator visitFilter(final Filter filter, final Object obj) throws OptimizerException {
+ final TypeProtos.MajorType.Builder b = TypeProtos.MajorType.getDefaultInstance().newBuilderForType();
b.setMode(DataMode.REQUIRED);
b.setMinorType(MinorType.BIGINT);
- PhysicalOperator child = filter.iterator().next().accept(this, obj);
+ final PhysicalOperator child = filter.iterator().next().accept(this, obj);
return new SelectionVectorRemover(new org.apache.drill.exec.physical.config.Filter(child, filter.getExpr(), 1.0f));
}
-
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java
index 979c5e241..9e650b096 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java
@@ -22,18 +22,14 @@ import org.apache.drill.common.logical.LogicalPlan;
import org.apache.drill.exec.physical.PhysicalPlan;
public class IdentityOptimizer extends Optimizer {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IdentityOptimizer.class);
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IdentityOptimizer.class);
@Override
- public void init(DrillConfig config) {
+ public void init(final DrillConfig config) {
}
@Override
- public PhysicalPlan optimize(OptimizationContext context, LogicalPlan plan) {
+ public PhysicalPlan optimize(final OptimizationContext context, final LogicalPlan plan) {
return null;
}
-
- @Override
- public void close() {
- }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java
index 34d0622c6..aed6299e0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java
@@ -17,8 +17,6 @@
*/
package org.apache.drill.exec.opt;
-import java.io.Closeable;
-
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.DrillConfigurationException;
import org.apache.drill.common.logical.LogicalPlan;
@@ -26,22 +24,19 @@ import org.apache.drill.exec.exception.OptimizerException;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.server.options.OptionManager;
-public abstract class Optimizer implements Closeable{
-
+public abstract class Optimizer {
public static String OPTIMIZER_IMPL_KEY = "drill.exec.optimizer.implementation";
public abstract void init(DrillConfig config);
-
public abstract PhysicalPlan optimize(OptimizationContext context, LogicalPlan plan) throws OptimizerException;
- public abstract void close();
- public static Optimizer getOptimizer(DrillConfig config) throws DrillConfigurationException{
- Optimizer o = config.getInstanceOf(OPTIMIZER_IMPL_KEY, Optimizer.class);
- o.init(config);
- return o;
+ public static Optimizer getOptimizer(final DrillConfig config) throws DrillConfigurationException {
+ final Optimizer optimizer = config.getInstanceOf(OPTIMIZER_IMPL_KEY, Optimizer.class);
+ optimizer.init(config);
+ return optimizer;
}
- public interface OptimizationContext{
+ public interface OptimizationContext {
public int getPriority();
public OptionManager getOptions();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index 412da8535..a00df9d27 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
public abstract class BaseRootExec implements RootExec {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseRootExec.class);
protected OperatorStats stats = null;
protected OperatorContext oContext = null;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
index a644c3497..8fd68b224 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java
@@ -17,8 +17,6 @@
*/
package org.apache.drill.exec.physical.impl;
-
-import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
/**
@@ -26,8 +24,6 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
* output nodes and storage nodes. They are there driving force behind the completion of a query.
*/
public interface RootExec {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootExec.class);
-
/**
* Do the next batch of work.
* @return Whether or not additional batches of work are necessary. False means that this fragment is done.
@@ -44,5 +40,4 @@ public interface RootExec {
* @param handle
*/
public void receivingFragmentFinished(FragmentHandle handle);
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index d88420058..2d1a1367e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -23,7 +23,6 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
@@ -58,7 +57,7 @@ public class ScreenCreator implements RootCreator<Screen>{
static class ScreenRoot extends BaseRootExec {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class);
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class);
volatile boolean ok = true;
private final SendingAccountor sendCount = new SendingAccountor();
@@ -67,7 +66,6 @@ public class ScreenCreator implements RootCreator<Screen>{
final FragmentContext context;
final UserClientConnection connection;
private RecordMaterializer materializer;
- private boolean first = true;
public enum Metric implements MetricDef {
BYTES_SENT;
@@ -146,7 +144,7 @@ public class ScreenCreator implements RootCreator<Screen>{
}
case OK_NEW_SCHEMA:
materializer = new VectorRecordMaterializer(context, incoming);
- // fall through.
+ //$FALL-THROUGH$
case OK:
// context.getStats().batchesCompleted.inc(1);
// context.getStats().recordsCompleted.inc(incoming.getRecordCount());
@@ -160,7 +158,6 @@ public class ScreenCreator implements RootCreator<Screen>{
}
sendCount.increment();
- first = false;
return true;
default:
throw new UnsupportedOperationException();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
index 3920f9ccd..8794188b5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java
@@ -46,6 +46,7 @@ public class SendingAccountor {
batchesSent.set(0);
} catch (InterruptedException e) {
logger.warn("Failure while waiting for send complete.", e);
+ // TODO InterruptedException
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
index e6c3fba8d..2a59e22aa 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java
@@ -20,31 +20,21 @@ package org.apache.drill.exec.physical.impl.materialize;
import io.netty.buffer.ByteBuf;
import java.util.Arrays;
-import java.util.List;
-import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryResult;
-import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef;
-import org.apache.drill.exec.proto.UserBitShared.SerializedField;
-import org.apache.drill.exec.record.BatchSchema;
-import org.apache.drill.exec.record.MaterializedField;
-
-import com.google.common.collect.Lists;
public class QueryWritableBatch {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWritableBatch.class);
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWritableBatch.class);
private final QueryResult header;
private final ByteBuf[] buffers;
-
public QueryWritableBatch(QueryResult header, ByteBuf... buffers) {
- super();
this.header = header;
this.buffers = buffers;
}
- public ByteBuf[] getBuffers(){
+ public ByteBuf[] getBuffers() {
return buffers;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
index 352e7aedf..42b1080bd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java
@@ -307,6 +307,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart
logger.error("Failure while building final partition table.", ex);
context.fail(ex);
return false;
+ // TODO InterruptedException
}
return true;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
index 4c9b33bf5..c50cb8af0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java
@@ -73,6 +73,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
context.fail(e);
}
return IterOutcome.STOP;
+ // TODO InterruptedException
} finally {
stats.stopWait();
}
@@ -147,6 +148,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
}
} catch (InterruptedException e) {
logger.warn("Producer thread is interrupted.", e);
+ // TODO InterruptedException
} finally {
if (stop) {
try {
@@ -154,6 +156,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
queue.put(new RecordBatchDataWrapper(null, true, false));
} catch (InterruptedException e) {
logger.error("Unable to enqueue the last batch indicator. Something is broken.", e);
+ // TODO InterruptedException
}
}
if (wrapper!=null) {
@@ -181,6 +184,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
producer.join();
} catch (InterruptedException e) {
logger.warn("Interrupted while waiting for producer thread");
+ // TODO InterruptedException
}
}
@@ -191,6 +195,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch {
cleanUpLatch.await();
} catch (InterruptedException e) {
logger.warn("Interrupted while waiting for producer to clean up first. I will try to clean up now...", e);
+ // TODO InterruptedException
} finally {
super.cleanup();
clearQueue();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java
index 2436a0ea8..bc9ed74f9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java
@@ -42,6 +42,7 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> {
if (root == null) {
root = o;
}
+ // TODO should complain otherwise
}
public void addSendExchange(Exchange e, Fragment sendingToFragment) throws ForemanSetupException{
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index eebd40ec9..12043cee5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -33,7 +33,6 @@ import com.google.common.collect.Ordering;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.util.DrillStringUtils;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.expr.fn.impl.DateUtility;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.ops.QueryDateTimeInfo;
import org.apache.drill.exec.physical.EndpointAffinity;
@@ -145,7 +144,7 @@ public class SimpleParallelizer {
* @param planningSet
* @return Returns a list of leaf fragments in fragment dependency graph.
*/
- private Set<Wrapper> constructFragmentDependencyGraph(PlanningSet planningSet) {
+ private static Set<Wrapper> constructFragmentDependencyGraph(PlanningSet planningSet) {
// Set up dependency of fragments based on the affinity of exchange that separates the fragments.
for(Wrapper currentFragmentWrapper : planningSet) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
index b5d3f4a72..dc63ef95c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java
@@ -31,24 +31,21 @@ import org.eigenbase.sql.SqlLiteral;
import org.eigenbase.sql.SqlNode;
import org.eigenbase.sql.SqlSetOption;
-public class SetOptionHandler extends AbstractSqlHandler{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SetOptionHandler.class);
-
- QueryContext context;
+public class SetOptionHandler extends AbstractSqlHandler {
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SetOptionHandler.class);
+ private final QueryContext context;
public SetOptionHandler(QueryContext context) {
- super();
this.context = context;
}
-
@Override
public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException, ForemanSetupException {
- SqlSetOption option = unwrap(sqlNode, SqlSetOption.class);
- String scope = option.getScope();
- String name = option.getName();
- SqlNode value = option.getValue();
+ final SqlSetOption option = unwrap(sqlNode, SqlSetOption.class);
+ final String scope = option.getScope();
+ final String name = option.getName();
+ final SqlNode value = option.getValue();
OptionValue.OptionType type;
if (value instanceof SqlLiteral) {
switch (scope.toLowerCase()) {
@@ -70,9 +67,5 @@ public class SetOptionHandler extends AbstractSqlHandler{
}
return DirectPlan.createDirectPlan(context, true, String.format("%s updated.", name));
-
}
-
-
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
index f358097cb..72ae130ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java
@@ -43,7 +43,6 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClient.class);
private final Bootstrap b;
- private volatile boolean connect = false;
protected R connection;
private final T handshakeType;
private final Class<HANDSHAKE_RESPONSE> responseClass;
@@ -81,7 +80,6 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
new InboundHandler(connection), //
new RpcExceptionHandler() //
);
- connect = true;
}
}); //
@@ -180,7 +178,6 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
try {
BasicClient.this.validateHandshake(value);
BasicClient.this.finalizeConnection(value, connection);
- BasicClient.this.connect = true;
l.connectionSucceeded(connection);
// logger.debug("Handshake completed succesfully.");
} catch (RpcException ex) {
@@ -218,6 +215,7 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection
connection.getChannel().close().get();
} catch (InterruptedException | ExecutionException e) {
logger.warn("Failure whiel shutting {}", this.getClass().getName(), e);
+ // TODO InterruptedException
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
index c00df4e6e..0e0398d8f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java
@@ -145,7 +145,7 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
return null;
}
- public int bind(final int initialPort, boolean allowPortHunting) throws InterruptedException, DrillbitStartupException {
+ public int bind(final int initialPort, boolean allowPortHunting) throws DrillbitStartupException {
int port = initialPort - 1;
while (true) {
try {
@@ -170,6 +170,7 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection
eventLoopGroup.shutdownGracefully().get();
} catch (InterruptedException | ExecutionException e) {
logger.warn("Failure while shutting down {}. ", this.getClass().getName(), e);
+ // TODO InterruptedException
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/NamedThreadFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/NamedThreadFactory.java
index 2b49579a3..e7580b248 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/NamedThreadFactory.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/NamedThreadFactory.java
@@ -20,29 +20,61 @@ package org.apache.drill.exec.rpc;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
+/**
+ * {@link ThreadFactory} for {@link ExecutorServices} that names threads sequentially.
+ * Creates Threads named with the prefix specified at construction time. Created threads
+ * have the daemon bit set and priority Thread.MAX_PRIORITY.
+ *
+ * <p>An instance creates names with an instance-specific prefix suffixed with sequential
+ * integers.</p>
+ *
+ * <p>Concurrency: See {@link newThread}.</p>
+ */
public class NamedThreadFactory implements ThreadFactory {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NamedThreadFactory.class);
- private final AtomicInteger nextId = new AtomicInteger();
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NamedThreadFactory.class);
+ private final AtomicInteger nextId = new AtomicInteger(); // used to generate unique ids
private final String prefix;
- public NamedThreadFactory(String prefix) {
+ /**
+ * Constructor.
+ *
+ * @param prefix the string prefix that will be used to name threads created by this factory
+ */
+ public NamedThreadFactory(final String prefix) {
this.prefix = prefix;
}
+ /**
+ * Creates a sequentially named thread running a given Runnable.
+ * <p>
+ * The thread's name will be this instance's prefix concatenated with
+ * this instance's next<sup><a href="#fn-1">*</a></sup> sequential integer.
+ * </p>
+ * <p>
+ * Concurrency: Thread-safe.
+ * </p>
+ * <p>
+ * (Concurrent calls get different numbers.
+ * Calls started after other calls complete get later/higher numbers than
+ * those other calls.
+ * </p>
+ * <p>
+ * <a name="fn-1" />*However, for concurrent calls, the order of numbers
+ * is not defined.)
+ */
@Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r, prefix + nextId.incrementAndGet());
+ public Thread newThread(final Runnable runnable) {
+ final Thread thread = new Thread(runnable, prefix + nextId.incrementAndGet());
+ thread.setDaemon(true);
+
try {
- if (t.isDaemon()) {
- t.setDaemon(true);
- }
- if (t.getPriority() != Thread.MAX_PRIORITY) {
- t.setPriority(Thread.MAX_PRIORITY);
+ if (thread.getPriority() != Thread.MAX_PRIORITY) {
+ thread.setPriority(Thread.MAX_PRIORITY);
}
} catch (Exception ignored) {
// Doesn't matter even if failed to set.
+ logger.info("ignored exception " + ignored);
}
- return t;
+ return thread;
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
index f214c4d1e..9948d3e64 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java
@@ -114,6 +114,7 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConne
}
} catch (InterruptedException e) {
cmd.connectionFailed(FailureType.CONNECTION, e);
+ // TODO InterruptedException
} catch (ExecutionException e) {
throw new IllegalStateException();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
index 3a139f8a3..a72dd326c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java
@@ -54,6 +54,7 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
return true;
}catch(InterruptedException e){
listener.failed(new RpcException(e));
+ // TODO InterruptedException
return false;
}
}
@@ -109,6 +110,7 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
channel.close().get();
} catch (InterruptedException | ExecutionException e) {
logger.warn("Caught exception while closing channel.", e);
+ // TODO InterruptedException
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
index 5eab16a1a..b97496330 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
@@ -30,7 +30,6 @@ import java.io.Closeable;
import java.util.Arrays;
import java.util.List;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
import org.apache.drill.exec.proto.UserBitShared.DrillPBError;
import org.apache.drill.exec.work.ErrorHelper;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
index 7f84a2b24..af60ff080 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java
@@ -39,7 +39,7 @@ public interface Controller extends Closeable {
*/
public ControlTunnel getTunnel(DrillbitEndpoint node) ;
- public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws InterruptedException, DrillbitStartupException;
+ public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws DrillbitStartupException;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
index f8f6fd795..631d479f9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java
@@ -46,7 +46,7 @@ public class ControllerImpl implements Controller {
}
@Override
- public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws InterruptedException, DrillbitStartupException {
+ public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws DrillbitStartupException {
server = new ControlServer(handler, context, connectionRegistry);
int port = context.getConfig().getInt(ExecConstants.INITIAL_BIT_PORT);
port = server.bind(port, allowPortHunting);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
index d6b86375a..a5a544197 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
@@ -26,8 +26,6 @@ import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.work.WorkManager.WorkerBee;
import org.apache.drill.exec.work.foreman.FragmentStatusListener;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
import org.apache.drill.exec.work.fragment.FragmentManager;
@@ -37,60 +35,53 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Maps;
public class WorkEventBus {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkEventBus.class);
-
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkEventBus.class);
private final ConcurrentMap<FragmentHandle, FragmentManager> managers = Maps.newConcurrentMap();
- private final ConcurrentMap<QueryId, FragmentStatusListener> listeners = new ConcurrentHashMap<QueryId, FragmentStatusListener>(
- 16, 0.75f, 16);
- private final WorkerBee bee;
- private final Cache<FragmentHandle,Integer> recentlyFinishedFragments = CacheBuilder.newBuilder()
+ private final ConcurrentMap<QueryId, FragmentStatusListener> listeners =
+ new ConcurrentHashMap<>(16, 0.75f, 16);
+ private final Cache<FragmentHandle, Integer> recentlyFinishedFragments = CacheBuilder.newBuilder()
.maximumSize(10000)
-
.expireAfterWrite(10, TimeUnit.MINUTES)
.build();
- public WorkEventBus(WorkerBee bee) {
- this.bee = bee;
- }
-
- public void removeFragmentStatusListener(QueryId queryId) {
+ public void removeFragmentStatusListener(final QueryId queryId) {
logger.debug("Removing fragment status listener for queryId {}.", queryId);
listeners.remove(queryId);
}
- public void setFragmentStatusListener(QueryId queryId, FragmentStatusListener listener) throws ForemanSetupException {
+ public void addFragmentStatusListener(final QueryId queryId, final FragmentStatusListener listener)
+ throws ForemanSetupException {
logger.debug("Adding fragment status listener for queryId {}.", queryId);
- FragmentStatusListener old = listeners.putIfAbsent(queryId, listener);
+ final FragmentStatusListener old = listeners.putIfAbsent(queryId, listener);
if (old != null) {
throw new ForemanSetupException (
"Failure. The provided handle already exists in the listener pool. You need to remove one listener before adding another.");
}
}
- public void status(FragmentStatus status) {
- FragmentStatusListener l = listeners.get(status.getHandle().getQueryId());
- if (l == null) {
+ public void statusUpdate(final FragmentStatus status) {
+ final FragmentStatusListener listener = listeners.get(status.getHandle().getQueryId());
+ if (listener == null) {
logger.warn("A fragment message arrived but there was no registered listener for that message: {}.", status);
} else {
- l.statusUpdate(status);
+ listener.statusUpdate(status);
}
}
- public void setFragmentManager(FragmentManager fragmentManager) {
+ public void addFragmentManager(final FragmentManager fragmentManager) {
logger.debug("Manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle()));
- FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), fragmentManager);
+ final FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), fragmentManager);
if (old != null) {
throw new IllegalStateException(
"Tried to set fragment manager when has already been set for the provided fragment handle.");
}
}
- public FragmentManager getFragmentManagerIfExists(FragmentHandle handle){
+ public FragmentManager getFragmentManagerIfExists(final FragmentHandle handle) {
return managers.get(handle);
-
}
- public FragmentManager getFragmentManager(FragmentHandle handle) throws FragmentSetupException {
+ public FragmentManager getFragmentManager(final FragmentHandle handle) throws FragmentSetupException {
// check if this was a recently canceled fragment. If so, throw away message.
if (recentlyFinishedFragments.asMap().containsKey(handle)) {
logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
@@ -98,17 +89,17 @@ public class WorkEventBus {
}
// since non-leaf fragments are sent first, it is an error condition if the manager is unavailable.
- FragmentManager m = managers.get(handle);
+ final FragmentManager m = managers.get(handle);
if(m != null) {
return m;
}
- throw new FragmentSetupException("Failed to receive plan fragment that was required for id: " + QueryIdHelper.getQueryIdentifier(handle));
+ throw new FragmentSetupException("Failed to receive plan fragment that was required for id: "
+ + QueryIdHelper.getQueryIdentifier(handle));
}
- public void removeFragmentManager(FragmentHandle handle) {
+ public void removeFragmentManager(final FragmentHandle handle) {
logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle));
recentlyFinishedFragments.put(handle, 1);
managers.remove(handle);
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
index 33f0d0937..a76d75391 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java
@@ -50,7 +50,7 @@ public class DataConnectionCreator implements Closeable {
this.allowPortHunting = allowPortHunting;
}
- public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws InterruptedException, DrillbitStartupException {
+ public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws DrillbitStartupException {
server = new DataServer(context, workBus, dataHandler);
int port = server.bind(partialEndpoint.getControlPort() + 1, allowPortHunting);
DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setDataPort(port).build();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
index e0392fdd4..33e066511 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
@@ -21,50 +21,43 @@ import io.netty.buffer.DrillBuf;
import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
-import org.apache.drill.exec.proto.BitData.RpcType;
import org.apache.drill.exec.record.RawFragmentBatch;
-import org.apache.drill.exec.rpc.Acks;
-import org.apache.drill.exec.rpc.RemoteConnection;
-import org.apache.drill.exec.rpc.Response;
-import org.apache.drill.exec.rpc.ResponseSender;
-import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.work.WorkManager.WorkerBee;
import org.apache.drill.exec.work.fragment.FragmentManager;
import java.io.IOException;
public class DataResponseHandlerImpl implements DataResponseHandler{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataResponseHandlerImpl.class);
-
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataResponseHandlerImpl.class);
private final WorkerBee bee;
- public DataResponseHandlerImpl(WorkerBee bee) {
- super();
+ public DataResponseHandlerImpl(final WorkerBee bee) {
this.bee = bee;
}
-
@Override
public void informOutOfMemory() {
logger.error("Out of memory outside any particular fragment.");
}
-
- public void handle(FragmentManager manager, FragmentRecordBatch fragmentBatch, DrillBuf data, AckSender sender) throws FragmentSetupException, IOException {
-// logger.debug("Fragment Batch received {}", fragmentBatch);
+ @Override
+ public void handle(final FragmentManager manager, final FragmentRecordBatch fragmentBatch,
+ final DrillBuf data, final AckSender sender) throws FragmentSetupException, IOException {
+// logger.debug("Fragment Batch received {}", fragmentBatch);
boolean canRun = manager.handle(new RawFragmentBatch(fragmentBatch, data, sender));
if (canRun) {
-// logger.debug("Arriving batch means local batch can run, starting local batch.");
- // if we've reached the canRun threshold, we'll proceed. This expects handler.handle() to only return a single
- // true.
+// logger.debug("Arriving batch means local batch can run, starting local batch.");
+ /*
+ * If we've reached the canRun threshold, we'll proceed. This expects handler.handle() to
+ * only return a single true.
+ */
bee.startFragmentPendingRemote(manager);
}
if (fragmentBatch.getIsLastBatch() && !manager.isWaiting()) {
-// logger.debug("Removing handler. Is Last Batch {}. Is Waiting for more {}", fragmentBatch.getIsLastBatch(),
-// manager.isWaiting());
+// logger.debug("Removing handler. Is Last Batch {}. Is Waiting for more {}", fragmentBatch.getIsLastBatch(),
+// manager.isWaiting());
bee.getContext().getWorkBus().removeFragmentManager(manager.getHandle());
}
-
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
index 5aa4aa67c..11f549672 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
@@ -50,6 +50,7 @@ public class DataTunnel {
manager.runCommand(b);
}catch(InterruptedException e){
outcomeListener.failed(new RpcException("Interrupted while trying to get sending semaphore.", e));
+ // TODO InterruptedException
}
}
@@ -60,6 +61,7 @@ public class DataTunnel {
manager.runCommand(b);
}catch(InterruptedException e){
b.connectionFailed(FailureType.CONNECTION, new RpcException("Interrupted while trying to get sending semaphore.", e));
+ // TODO InterruptedException
}
return b.getFuture();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
index 4e7fc925e..925154d90 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java
@@ -57,7 +57,7 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand
}
public void connect(RpcConnectionHandler<ServerConnection> handler, DrillbitEndpoint endpoint, UserProperties props, UserBitShared.UserCredentials credentials)
- throws RpcException, InterruptedException {
+ throws RpcException {
UserToBitHandshake.Builder hsBuilder = UserToBitHandshake.newBuilder()
.setRpcVersion(UserRpcConfig.RPC_VERSION)
.setSupportListening(true)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
index dffb9a179..c76d324fe 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java
@@ -29,7 +29,6 @@ import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake;
-import org.apache.drill.exec.proto.UserProtos.RequestResults;
import org.apache.drill.exec.proto.UserProtos.RpcType;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 6521303d8..b60670723 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -17,8 +17,7 @@
*/
package org.apache.drill.exec.server;
-import java.io.Closeable;
-
+import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.coord.ClusterCoordinator;
@@ -52,27 +51,32 @@ import com.google.common.io.Closeables;
/**
* Starts, tracks and stops all the required services for a Drillbit daemon to work.
*/
-public class Drillbit implements Closeable{
- private static final org.slf4j.Logger logger;
+public class Drillbit implements AutoCloseable {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Drillbit.class);
static {
- logger = org.slf4j.LoggerFactory.getLogger(Drillbit.class);
Environment.logEnv("Drillbit environment:.", logger);
}
- public static Drillbit start(StartupOptions options) throws DrillbitStartupException {
- return start(DrillConfig.create(options.getConfigLocation()));
+ public static Drillbit start(final StartupOptions options) throws DrillbitStartupException {
+ return start(DrillConfig.create(options.getConfigLocation()), null);
+ }
+
+ public static Drillbit start(final DrillConfig config) throws DrillbitStartupException {
+ return start(config, null);
}
- public static Drillbit start(DrillConfig config) throws DrillbitStartupException {
+ public static Drillbit start(final DrillConfig config, final RemoteServiceSet remoteServiceSet)
+ throws DrillbitStartupException {
+ logger.debug("Setting up Drillbit.");
Drillbit bit;
try {
- logger.debug("Setting up Drillbit.");
- bit = new Drillbit(config, null);
+ bit = new Drillbit(config, remoteServiceSet);
} catch (Exception ex) {
throw new DrillbitStartupException("Failure while initializing values in Drillbit.", ex);
}
+
+ logger.debug("Starting Drillbit.");
try {
- logger.debug("Starting Drillbit.");
bit.run();
} catch (Exception e) {
bit.close();
@@ -152,90 +156,94 @@ public class Drillbit implements Closeable{
}
}
- public static void main(String[] cli) throws DrillbitStartupException {
+ public static void main(final String[] cli) throws DrillbitStartupException {
StartupOptions options = StartupOptions.parse(cli);
start(options);
}
- final ClusterCoordinator coord;
- final ServiceEngine engine;
- final PStoreProvider storeProvider;
- final WorkManager manager;
- final BootStrapContext context;
- final Server embeddedJetty;
-
- private volatile RegistrationHandle handle;
+ private final ClusterCoordinator coord;
+ private final ServiceEngine engine;
+ private final PStoreProvider storeProvider;
+ private final WorkManager manager;
+ private final BootStrapContext context;
+ private final Server embeddedJetty;
+ private RegistrationHandle registrationHandle;
public Drillbit(DrillConfig config, RemoteServiceSet serviceSet) throws Exception {
- boolean allowPortHunting = serviceSet != null;
- boolean enableHttp = config.getBoolean(ExecConstants.HTTP_ENABLE);
- this.context = new BootStrapContext(config);
- this.manager = new WorkManager(context);
- this.engine = new ServiceEngine(manager.getControlMessageHandler(), manager.getUserWorker(), context, manager.getWorkBus(), manager.getDataHandler(), allowPortHunting);
-
- if(enableHttp) {
- this.embeddedJetty = new Server(config.getInt(ExecConstants.HTTP_PORT));
+ final boolean allowPortHunting = serviceSet != null;
+ final boolean enableHttp = config.getBoolean(ExecConstants.HTTP_ENABLE);
+ context = new BootStrapContext(config);
+ manager = new WorkManager(context);
+ engine = new ServiceEngine(manager.getControlMessageHandler(), manager.getUserWorker(), context,
+ manager.getWorkBus(), manager.getDataHandler(), allowPortHunting);
+
+ if (enableHttp) {
+ embeddedJetty = new Server(config.getInt(ExecConstants.HTTP_PORT));
} else {
- this.embeddedJetty = null;
+ embeddedJetty = null;
}
- if(serviceSet != null) {
- this.coord = serviceSet.getCoordinator();
- this.storeProvider = new CachingStoreProvider(new LocalPStoreProvider(config));
+ if (serviceSet != null) {
+ coord = serviceSet.getCoordinator();
+ storeProvider = new CachingStoreProvider(new LocalPStoreProvider(config));
} else {
- Runtime.getRuntime().addShutdownHook(new ShutdownThread(config));
- this.coord = new ZKClusterCoordinator(config);
- this.storeProvider = new PStoreRegistry(this.coord, config).newPStoreProvider();
+ coord = new ZKClusterCoordinator(config);
+ storeProvider = new PStoreRegistry(this.coord, config).newPStoreProvider();
}
}
- private void startJetty() throws Exception{
+ private void startJetty() throws Exception {
if (embeddedJetty == null) {
return;
}
- ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
-
- ErrorHandler errorHandler = new ErrorHandler();
+ final ErrorHandler errorHandler = new ErrorHandler();
errorHandler.setShowStacks(true);
errorHandler.setShowMessageInTitle(true);
- context.setErrorHandler(errorHandler);
- context.setContextPath("/");
- embeddedJetty.setHandler(context);
- ServletHolder h = new ServletHolder(new ServletContainer(new DrillRestServer(manager)));
-// h.setInitParameter(ServerProperties.PROVIDER_PACKAGES, "org.apache.drill.exec.server");
- h.setInitOrder(1);
- context.addServlet(h, "/*");
- context.addServlet(new ServletHolder(new MetricsServlet(this.context.getMetrics())), "/status/metrics");
- context.addServlet(new ServletHolder(new ThreadDumpServlet()), "/status/threads");
-
- ServletHolder staticHolder = new ServletHolder("static", DefaultServlet.class);
+
+ final ServletContextHandler servletContextHandler =
+ new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
+ servletContextHandler.setErrorHandler(errorHandler);
+ servletContextHandler.setContextPath("/");
+ embeddedJetty.setHandler(servletContextHandler);
+
+ final ServletHolder servletHolder = new ServletHolder(new ServletContainer(new DrillRestServer(manager)));
+// servletHolder.setInitParameter(ServerProperties.PROVIDER_PACKAGES, "org.apache.drill.exec.server");
+ servletHolder.setInitOrder(1);
+ servletContextHandler.addServlet(servletHolder, "/*");
+
+ servletContextHandler.addServlet(
+ new ServletHolder(new MetricsServlet(context.getMetrics())), "/status/metrics");
+ servletContextHandler.addServlet(new ServletHolder(new ThreadDumpServlet()), "/status/threads");
+
+ final ServletHolder staticHolder = new ServletHolder("static", DefaultServlet.class);
staticHolder.setInitParameter("resourceBase", Resource.newClassPathResource("/rest/static").toString());
staticHolder.setInitParameter("dirAllowed","false");
staticHolder.setInitParameter("pathInfoOnly","true");
- context.addServlet(staticHolder,"/static/*");
+ servletContextHandler.addServlet(staticHolder,"/static/*");
embeddedJetty.start();
-
}
-
-
public void run() throws Exception {
coord.start(10000);
storeProvider.start();
- DrillbitEndpoint md = engine.start();
+ final DrillbitEndpoint md = engine.start();
manager.start(md, engine.getController(), engine.getDataConnectionCreator(), coord, storeProvider);
- manager.getContext().getStorage().init();
- manager.getContext().getOptionManager().init();
+ final DrillbitContext drillbitContext = manager.getContext();
+ drillbitContext.getStorage().init();
+ drillbitContext.getOptionManager().init();
javaPropertiesToSystemOptions();
- handle = coord.register(md);
+ registrationHandle = coord.register(md);
startJetty();
+
+ Runtime.getRuntime().addShutdownHook(new ShutdownThread());
}
+ @Override
public void close() {
- if (coord != null && handle != null) {
- coord.unregister(handle);
+ if (coord != null && registrationHandle != null) {
+ coord.unregister(registrationHandle);
}
try {
@@ -243,28 +251,27 @@ public class Drillbit implements Closeable{
} catch (InterruptedException e) {
logger.warn("Interrupted while sleeping during coordination deregistration.");
}
- try {
- if (embeddedJetty != null) {
+
+ if (embeddedJetty != null) {
+ try {
embeddedJetty.stop();
+ } catch (Exception e) {
+ logger.warn("Failure while shutting down embedded jetty server.");
}
- } catch (Exception e) {
- logger.warn("Failure while shutting down embedded jetty server.");
}
+
Closeables.closeQuietly(engine);
- try{
- storeProvider.close();
- }catch(Exception e){
- logger.warn("Failure while closing store provider.", e);
- }
+ AutoCloseables.close(storeProvider, logger);
Closeables.closeQuietly(coord);
- Closeables.closeQuietly(manager);
+ AutoCloseables.close(manager, logger);
Closeables.closeQuietly(context);
+
logger.info("Shutdown completed.");
}
private class ShutdownThread extends Thread {
- ShutdownThread(DrillConfig config) {
- this.setName("ShutdownHook");
+ ShutdownThread() {
+ setName("Drillbit-ShutdownHook");
}
@Override
@@ -272,14 +279,9 @@ public class Drillbit implements Closeable{
logger.info("Received shutdown request.");
close();
}
-
- }
- public ClusterCoordinator getCoordinator() {
- return coord;
}
public DrillbitContext getContext() {
- return this.manager.getContext();
+ return manager.getContext();
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index 1b35aecd7..dbf3c7426 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -37,6 +37,7 @@ import org.apache.drill.exec.server.options.SystemOptionManager;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.StoragePluginRegistry.DrillSchemaFactory;
import org.apache.drill.exec.store.sys.PStoreProvider;
+import org.apache.drill.exec.testing.SimulatedExceptions;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Preconditions;
@@ -45,8 +46,7 @@ public class DrillbitContext {
// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitContext.class);
private final BootStrapContext context;
-
- private PhysicalPlanReader reader;
+ private final PhysicalPlanReader reader;
private final ClusterCoordinator coord;
private final DataConnectionCreator connectionsPool;
private final DrillbitEndpoint endpoint;
@@ -59,10 +59,11 @@ public class DrillbitContext {
private final PStoreProvider provider;
private final CodeCompiler compiler;
private final ExecutorService executor;
+ private final SimulatedExceptions simulatedExceptions = new SimulatedExceptions();
- public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord, Controller controller, DataConnectionCreator connectionsPool, WorkEventBus workBus, PStoreProvider provider,
+ public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord,
+ Controller controller, DataConnectionCreator connectionsPool, WorkEventBus workBus, PStoreProvider provider,
ExecutorService executor) {
- super();
Preconditions.checkNotNull(endpoint);
Preconditions.checkNotNull(context);
Preconditions.checkNotNull(controller);
@@ -87,7 +88,7 @@ public class DrillbitContext {
return functionRegistry;
}
- public WorkEventBus getWorkBus(){
+ public WorkEventBus getWorkBus() {
return workBus;
}
@@ -99,7 +100,7 @@ public class DrillbitContext {
return systemOptions;
}
- public DrillbitEndpoint getEndpoint(){
+ public DrillbitEndpoint getEndpoint() {
return endpoint;
}
@@ -107,11 +108,11 @@ public class DrillbitContext {
return context.getConfig();
}
- public Collection<DrillbitEndpoint> getBits(){
+ public Collection<DrillbitEndpoint> getBits() {
return coord.getAvailableEndpoints();
}
- public BufferAllocator getAllocator(){
+ public BufferAllocator getAllocator() {
return context.getAllocator();
}
@@ -119,35 +120,35 @@ public class DrillbitContext {
return operatorCreatorRegistry;
}
- public StoragePluginRegistry getStorage(){
+ public StoragePluginRegistry getStorage() {
return this.storagePlugins;
}
- public EventLoopGroup getBitLoopGroup(){
+ public EventLoopGroup getBitLoopGroup() {
return context.getBitLoopGroup();
}
- public DataConnectionCreator getDataConnectionsPool(){
+ public DataConnectionCreator getDataConnectionsPool() {
return connectionsPool;
}
- public Controller getController(){
+ public Controller getController() {
return controller;
}
- public MetricRegistry getMetrics(){
+ public MetricRegistry getMetrics() {
return context.getMetrics();
}
- public PhysicalPlanReader getPlanReader(){
+ public PhysicalPlanReader getPlanReader() {
return reader;
}
- public PStoreProvider getPersistentStoreProvider(){
+ public PStoreProvider getPersistentStoreProvider() {
return provider;
}
- public DrillSchemaFactory getSchemaFactory(){
+ public DrillSchemaFactory getSchemaFactory() {
return storagePlugins.getSchemaFactory();
}
@@ -162,4 +163,8 @@ public class DrillbitContext {
public ExecutorService getExecutor() {
return executor;
}
+
+ public SimulatedExceptions getSimulatedExceptions() {
+ return simulatedExceptions;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java
index 5c2aedee0..34ebf80f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java
@@ -72,10 +72,6 @@ public class DrillConfigIterator implements Iterable<OptionValue> {
case NULL:
throw new IllegalStateException("Config value \"" + name + "\" has NULL type");
-/* TODO(cwestin)
- optionValue = OptionValue.createOption(kind, OptionType.BOOT, name, "");
- break;
-*/
}
return optionValue;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
index 45c0ce80d..c3de19014 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
@@ -19,8 +19,8 @@ package org.apache.drill.exec.server.options;
import java.util.concurrent.ConcurrentHashMap;
-public class SessionOptionManager extends InMemoryOptionManager{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SessionOptionManager.class);
+public class SessionOptionManager extends InMemoryOptionManager {
+// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SessionOptionManager.class);
public SessionOptionManager(OptionManager systemOptions) {
super(systemOptions, new ConcurrentHashMap<String, OptionValue>());
@@ -30,5 +30,4 @@ public class SessionOptionManager extends InMemoryOptionManager{
boolean supportsOption(OptionValue value) {
return value.type == OptionValue.OptionType.SESSION;
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index fa4725d5c..608fac737 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -95,6 +95,7 @@ public class SystemOptionManager implements OptionManager {
QueryClassLoader.JAVA_COMPILER_DEBUG,
ExecConstants.ENABLE_VERBOSE_ERRORS,
ExecConstants.ENABLE_WINDOW_FUNCTIONS_VALIDATOR,
+ ExecConstants.DRILLBIT_EXCEPTION_INJECTIONS_VALIDATOR,
ClassTransformer.SCALAR_REPLACEMENT_VALIDATOR,
};
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
index ae04bad56..d22798de9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
@@ -38,14 +38,12 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryInfo;
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
-import org.apache.drill.exec.store.sys.EStore;
import org.apache.drill.exec.store.sys.PStore;
import org.apache.drill.exec.store.sys.PStoreProvider;
import org.apache.drill.exec.work.WorkManager;
import org.apache.drill.exec.work.foreman.Foreman;
-import org.apache.drill.exec.work.foreman.QueryStatus;
+import org.apache.drill.exec.work.foreman.QueryManager;
import org.glassfish.jersey.server.mvc.Viewable;
import com.google.common.collect.Lists;
@@ -137,8 +135,8 @@ public class ProfileResources {
PStore<QueryProfile> completed = null;
PStore<QueryInfo> running = null;
try {
- completed = provider().getStore(QueryStatus.QUERY_PROFILE);
- running = provider().getStore(QueryStatus.RUNNING_QUERY_INFO);
+ completed = provider().getStore(QueryManager.QUERY_PROFILE);
+ running = provider().getStore(QueryManager.RUNNING_QUERY_INFO);
} catch (IOException e) {
logger.debug("Failed to get profiles from persistent or ephemeral store.");
return new QProfiles(new ArrayList<ProfileInfo>(), new ArrayList<ProfileInfo>());
@@ -177,12 +175,12 @@ public class ProfileResources {
// first check local running
Foreman f = work.getBee().getForemanForQueryId(id);
if(f != null){
- return f.getQueryStatus().getAsProfile();
+ return f.getQueryManager().getQueryProfile();
}
// then check remote running
try{
- PStore<QueryInfo> runningQueries = provider().getStore(QueryStatus.RUNNING_QUERY_INFO);
+ PStore<QueryInfo> runningQueries = provider().getStore(QueryManager.RUNNING_QUERY_INFO);
QueryInfo info = runningQueries.get(queryId);
return work.getContext().getController().getTunnel(info.getForeman()).requestQueryProfile(id).checkedGet(2, TimeUnit.SECONDS);
}catch(Exception e){
@@ -191,7 +189,7 @@ public class ProfileResources {
// then check blob store
try{
- PStore<QueryProfile> profiles = provider().getStore(QueryStatus.QUERY_PROFILE);
+ PStore<QueryProfile> profiles = provider().getStore(QueryManager.QUERY_PROFILE);
return profiles.get(queryId);
}catch(Exception e){
logger.warn("Failure to load query profile for query {}", queryId, e);
@@ -208,7 +206,7 @@ public class ProfileResources {
@Produces(MediaType.APPLICATION_JSON)
public String getProfileJSON(@PathParam("queryid") String queryId) {
try {
- return new String(QueryStatus.QUERY_PROFILE.getSerializer().serialize(getQueryProfile(queryId)));
+ return new String(QueryManager.QUERY_PROFILE.getSerializer().serialize(getQueryProfile(queryId)));
} catch (IOException e) {
logger.debug("Failed to serialize profile for: " + queryId);
return ("{ 'message' : 'error (unable to serialize profile)' }");
@@ -242,7 +240,7 @@ public class ProfileResources {
// then check remote running
try{
- PStore<QueryInfo> runningQueries = provider().getStore(QueryStatus.RUNNING_QUERY_INFO);
+ PStore<QueryInfo> runningQueries = provider().getStore(QueryManager.RUNNING_QUERY_INFO);
QueryInfo info = runningQueries.get(queryId);
Ack a = work.getContext().getController().getTunnel(info.getForeman()).requestCancelQuery(id).checkedGet(2, TimeUnit.SECONDS);
if(a.getOk()){
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
index ff6e13cd0..2efc9a998 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java
@@ -57,7 +57,7 @@ public class ServiceEngine implements Closeable{
this.allowPortHunting = allowPortHunting;
}
- public DrillbitEndpoint start() throws DrillbitStartupException, InterruptedException, UnknownHostException{
+ public DrillbitEndpoint start() throws DrillbitStartupException, UnknownHostException{
int userPort = userServer.bind(config.getInt(ExecConstants.INITIAL_USER_PORT), allowPortHunting);
String address = useIP ? InetAddress.getLocalHost().getHostAddress() : InetAddress.getLocalHost().getCanonicalHostName();
DrillbitEndpoint partialEndpoint = DrillbitEndpoint.newBuilder()
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
index baa998dfa..ebee7a861 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java
@@ -56,13 +56,13 @@ public class FilePStore<V> implements PStore<V> {
this.fs = fs;
try {
- mk(basePath);
+ mkdirs(basePath);
} catch (IOException e) {
throw new RuntimeException("Failure setting pstore configuration path.");
}
}
- private void mk(Path path) throws IOException{
+ private void mkdirs(Path path) throws IOException{
fs.mkdirs(path);
}
@@ -112,20 +112,20 @@ public class FilePStore<V> implements PStore<V> {
}
}
- private Path p(String name) throws IOException {
+ private Path makePath(String name) {
Preconditions.checkArgument(
!name.contains("/") &&
!name.contains(":") &&
!name.contains(".."));
- Path f = new Path(basePath, name + DRILL_SYS_FILE_SUFFIX);
+ final Path path = new Path(basePath, name + DRILL_SYS_FILE_SUFFIX);
// do this to check file name.
- return f;
+ return path;
}
public V get(String key) {
try{
- Path path = p(key);
+ Path path = makePath(key);
if(!fs.exists(path)){
return null;
}
@@ -133,15 +133,16 @@ public class FilePStore<V> implements PStore<V> {
throw new RuntimeException(e);
}
- try (InputStream is = fs.open(p(key))) {
+ final Path path = makePath(key);
+ try (InputStream is = fs.open(path)) {
return config.getSerializer().deserialize(IOUtils.toByteArray(is));
} catch (IOException e) {
- throw new RuntimeException(e);
+ throw new RuntimeException("Unable to deserialize \"" + path + "\"", e);
}
}
public void put(String key, V value) {
- try (OutputStream os = fs.create(p(key))) {
+ try (OutputStream os = fs.create(makePath(key))) {
IOUtils.write(config.getSerializer().serialize(value), os);
} catch (IOException e) {
throw new RuntimeException(e);
@@ -151,7 +152,7 @@ public class FilePStore<V> implements PStore<V> {
@Override
public boolean putIfAbsent(String key, V value) {
try {
- Path p = p(key);
+ Path p = makePath(key);
if (fs.exists(p)) {
return false;
} else {
@@ -165,7 +166,7 @@ public class FilePStore<V> implements PStore<V> {
public void delete(String key) {
try {
- fs.delete(p(key), false);
+ fs.delete(makePath(key), false);
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java
new file mode 100644
index 000000000..68cbf0895
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Injection for a single exception. Specifies how many times to inject it, and how many times to skip
+ * injecting it before the first injection. This class is used internally for tracking injected
+ * exceptions; injected exceptions are specified via the
+ * {@link org.apache.drill.exec.ExecConstants#DRILLBIT_EXCEPTION_INJECTIONS} system option.
+ */
+public class ExceptionInjection {
+ private final String desc; // description of the injection site
+
+ private final AtomicInteger nSkip; // the number of times to skip the injection; starts >= 0
+ private final AtomicInteger nThrow; // the number of times to do the injection, after any skips; starts > 0
+
+ private final Class<? extends Throwable> exceptionClass;
+
+ /**
+ * Constructor.
+ *
+ * @param desc description of the injection site; useful for multiple injections in a single class
+ * @param nSkip non-negative number of times to skip injecting the exception
+ * @param nFire positive number of times to inject the exception
+ * @param exceptionClass
+ */
+ public ExceptionInjection(final String desc, final int nSkip, final int nFire,
+ final Class<? extends Throwable> exceptionClass) {
+ this.desc = desc;
+ this.nSkip = new AtomicInteger(nSkip);
+ this.nThrow = new AtomicInteger(nFire);
+ this.exceptionClass = exceptionClass;
+ }
+
+ /**
+ * Constructs the exception to throw, if it is time to throw it.
+ *
+ * @return the exception to throw, or null if it isn't time to throw it
+ */
+ private Throwable constructException() {
+ final int remainingSkips = nSkip.decrementAndGet();
+ if (remainingSkips >= 0) {
+ return null;
+ }
+
+ final int remainingFirings = nThrow.decrementAndGet();
+ if (remainingFirings < 0) {
+ return null;
+ }
+
+ // if we get here, we should throw the specified exception
+ Constructor<?> constructor;
+ try {
+ constructor = exceptionClass.getConstructor(String.class);
+ } catch(NoSuchMethodException e) {
+ throw new RuntimeException("No constructor found that takes a single String argument");
+ }
+
+ Throwable throwable;
+ try {
+ throwable = (Throwable) constructor.newInstance(desc);
+ } catch(InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+ throw new IllegalStateException("Couldn't construct exception instance", e);
+ }
+
+ return throwable;
+ }
+
+ /**
+ * Throw the unchecked exception specified by this injection.
+ *
+ * @throws IllegalStateException if it's time to throw, and the injection specified a checked exception
+ */
+ public void throwUnchecked() {
+ final Throwable throwable = constructException();
+ if (throwable == null) {
+ return;
+ }
+
+ if (throwable instanceof RuntimeException) {
+ final RuntimeException e = (RuntimeException) throwable;
+ throw e;
+ }
+ if (throwable instanceof Error) {
+ final Error e = (Error) throwable;
+ throw e;
+ }
+
+ throw new IllegalStateException("throwable was not an unchecked exception");
+ }
+
+ /**
+ * Throw the checked exception specified by this injection.
+ *
+ * @param exceptionClass the class of the exception to throw
+ * @throws T if it is time to throw the exception
+ * @throws IllegalStateException if it is time to throw the exception, and the exception's class
+ * is incompatible with the class specified by the injection
+ */
+ public <T extends Throwable> void throwChecked(final Class<T> exceptionClass) throws T {
+ final Throwable throwable = constructException();
+ if (throwable == null) {
+ return;
+ }
+
+ if (exceptionClass.isAssignableFrom(throwable.getClass())) {
+ final T exception = exceptionClass.cast(throwable);
+ throw exception;
+ }
+
+ throw new IllegalStateException("Constructed Throwable(" + throwable.getClass().getName()
+ + ") is incompatible with exceptionClass("+ exceptionClass.getName() + ")");
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjector.java
new file mode 100644
index 000000000..54bc351eb
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjector.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import org.apache.drill.exec.server.DrillbitContext;
+
+/**
+ * Injects exceptions at execution time for testing. Any class that wants to simulate exceptions
+ * for testing should have it's own private static instance of an injector (similar to the use
+ * of loggers).
+ *
+ * <p>See {@link org.apache.drill.exec.testing.TestExceptionInjection} for examples of
+ * use.
+ */
+public class ExceptionInjector {
+ private final Class<?> clazz; // the class that owns this injector
+
+ /**
+ * Constructor. Classes should use the static {@link #getInjector()} method to obtain
+ * their injector.
+ *
+ * @param clazz the owning class
+ */
+ private ExceptionInjector(final Class<?> clazz) {
+ this.clazz = clazz;
+ }
+
+ /**
+ * Create an injector.
+ *
+ * @param clazz the owning class
+ * @return the newly created injector
+ */
+ public static ExceptionInjector getInjector(final Class<?> clazz) {
+ return new ExceptionInjector(clazz);
+ }
+
+ /**
+ * Get the injector's owning class.
+ *
+ * @return the injector's owning class
+ */
+ public Class<?> getSiteClass() {
+ return clazz;
+ }
+
+ /**
+ * Lookup an injection within this class that matches the site description.
+ *
+ * @param drillbitContext
+ * @param desc the site description
+ * @return the injection, if there is one; null otherwise
+ */
+ private ExceptionInjection getInjection(final DrillbitContext drillbitContext, final String desc) {
+ final SimulatedExceptions simulatedExceptions = drillbitContext.getSimulatedExceptions();
+ final ExceptionInjection exceptionInjection = simulatedExceptions.lookupInjection(drillbitContext, this, desc);
+ return exceptionInjection;
+ }
+
+ /**
+ * Inject (throw) an unchecked exception at this point, if an injection is specified, and it is time
+ * for it to be thrown.
+ *
+ * <p>Implementors use this in their code at a site where they want to simulate an exception
+ * during testing.
+ *
+ * @param drillbitContext
+ * @param desc the site description
+ * throws the exception specified by the injection, if it is time
+ */
+ public void injectUnchecked(final DrillbitContext drillbitContext, final String desc) {
+ final ExceptionInjection exceptionInjection = getInjection(drillbitContext, desc);
+ if (exceptionInjection != null) {
+ exceptionInjection.throwUnchecked();
+ }
+ }
+
+ /**
+ * Inject (throw) a checked exception at this point, if an injection is specified, and it is time
+ * for it to be thrown.
+ *
+ * <p>Implementors use this in their code at a site where they want to simulate an exception
+ * during testing.
+ *
+ * @param drillbitContext
+ * @param desc the site description
+ * @param exceptionClass the expected class of the exception (or a super class of it)
+ * @throws T the exception specified by the injection, if it is time
+ */
+ public <T extends Throwable> void injectChecked(
+ final DrillbitContext drillbitContext, final String desc, final Class<T> exceptionClass) throws T {
+ final ExceptionInjection exceptionInjection = getInjection(drillbitContext, desc);
+ if (exceptionInjection != null) {
+ exceptionInjection.throwChecked(exceptionClass);
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionSite.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionSite.java
new file mode 100644
index 000000000..9e19fdd16
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionSite.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import com.google.common.base.Preconditions;
+
+public class InjectionSite {
+ private final Class<?> clazz;
+ private final String desc;
+
+ public InjectionSite(final Class<?> clazz, final String desc) {
+ Preconditions.checkNotNull(clazz);
+ Preconditions.checkNotNull(desc);
+
+ this.clazz = clazz;
+ this.desc = desc;
+ }
+
+ public Class<?> getSiteClass() {
+ return clazz;
+ }
+
+ public String getDesc() {
+ return desc;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null) {
+ return false;
+ }
+
+ if (this == o) {
+ return true;
+ }
+
+ if (!(o instanceof InjectionSite)) {
+ return false;
+ }
+
+ final InjectionSite other = (InjectionSite) o;
+ if (clazz != other.clazz) {
+ return false;
+ }
+
+ if (!desc.equals(other.desc)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return (clazz.hashCode() + 13) ^ (1 - desc.hashCode());
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/SimulatedExceptions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/SimulatedExceptions.java
new file mode 100644
index 000000000..0292c08ce
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/SimulatedExceptions.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.server.options.OptionValue;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * Tracks the simulated exceptions that will be injected for testing purposes.
+ */
+public class SimulatedExceptions {
+ /**
+ * Caches the currently specified ExceptionInjections. Updated when
+ * {@link org.apache.drill.exec.ExecConstants.DRILLBIT_EXCEPTION_INJECTIONS} is noticed
+ * to have changed.
+ */
+ private HashMap<InjectionSite, ExceptionInjection> exMap = null;
+
+ /**
+ * The string that was parsed to produce exMap; we keep it as a means to quickly detect whether
+ * the option string has changed or not between calls to getOption().
+ */
+ private String exString = null;
+
+ /**
+ * POJO used to parse JSON-specified exception injection.
+ */
+ public static class InjectionOption {
+ public String siteClass;
+ public String desc;
+ public int nSkip;
+ public int nFire;
+ public String exceptionClass;
+ }
+
+ /**
+ * POJO used to parse JSON-specified set of exception injections.
+ */
+ public static class InjectionOptions {
+ public InjectionOption injections[];
+ }
+
+ /**
+ * Look for an exception injection matching the given injector and site description.
+ *
+ * @param drillbitContext
+ * @param injector the injector, which indicates a class
+ * @param desc the injection site description
+ * @return the exception injection, if there is one for the injector and site; null otherwise
+ */
+ public synchronized ExceptionInjection lookupInjection(
+ final DrillbitContext drillbitContext, final ExceptionInjector injector, final String desc) {
+ // get the option string
+ final OptionManager optionManager = drillbitContext.getOptionManager();
+ final OptionValue optionValue = optionManager.getOption(ExecConstants.DRILLBIT_EXCEPTION_INJECTIONS);
+ final String opString = optionValue.string_val;
+
+ // if the option string is empty, there's nothing to inject
+ if ((opString == null) || opString.isEmpty()) {
+ // clear these in case there used to be something to inject
+ exMap = null;
+ exString = null;
+ return null;
+ }
+
+ // if the option string is different from before, recreate the injection map
+ if ((exString == null) || (exString != opString) && !exString.equals(opString)) {
+ // parse the option string into JSON
+ final ObjectMapper objectMapper = new ObjectMapper();
+ InjectionOptions injectionOptions;
+ try {
+ injectionOptions = objectMapper.readValue(opString, InjectionOptions.class);
+ } catch(IOException e) {
+ throw new RuntimeException("Couldn't parse exception injections", e);
+ }
+
+ // create a new map from the option JSON
+ exMap = new HashMap<>();
+ for(InjectionOption injectionOption : injectionOptions.injections) {
+ addToMap(exMap, injectionOption);
+ }
+
+ // this is the current set of options in effect
+ exString = opString;
+ }
+
+ // lookup the request
+ final InjectionSite injectionSite = new InjectionSite(injector.getSiteClass(), desc);
+ final ExceptionInjection injection = exMap.get(injectionSite);
+ return injection;
+ }
+
+ /**
+ * Adds a single exception injection to the injection map
+ *
+ * <p>Validates injection options before adding to the map, and throws various exceptions for
+ * validation failures.
+ *
+ * @param exMap the injection map
+ * @param injectionOption the option to add
+ */
+ private static void addToMap(
+ final HashMap<InjectionSite, ExceptionInjection> exMap, final InjectionOption injectionOption) {
+ Class<?> siteClass;
+ try {
+ siteClass = Class.forName(injectionOption.siteClass);
+ } catch(ClassNotFoundException e) {
+ throw new RuntimeException("Injection siteClass not found", e);
+ }
+
+ if ((injectionOption.desc == null) || injectionOption.desc.isEmpty()) {
+ throw new RuntimeException("Injection desc is null or empty");
+ }
+
+ if (injectionOption.nSkip < 0) {
+ throw new RuntimeException("Injection nSkip is not non-negative");
+ }
+
+ if (injectionOption.nFire <= 0) {
+ throw new RuntimeException("Injection nFire is non-positive");
+ }
+
+ Class<?> clazz;
+ try {
+ clazz = Class.forName(injectionOption.exceptionClass);
+ } catch(ClassNotFoundException e) {
+ throw new RuntimeException("Injected exceptionClass not found", e);
+ }
+
+ if (!Throwable.class.isAssignableFrom(clazz)) {
+ throw new RuntimeException("Injected exceptionClass is not a Throwable");
+ }
+
+ @SuppressWarnings("unchecked")
+ final Class<? extends Throwable> exceptionClass = (Class<? extends Throwable>) clazz;
+
+ final InjectionSite injectionSite = new InjectionSite(siteClass, injectionOption.desc);
+ final ExceptionInjection exceptionInjection = new ExceptionInjection(
+ injectionOption.desc, injectionOption.nSkip, injectionOption.nFire, exceptionClass);
+ exMap.put(injectionSite, exceptionInjection);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/CancelableQuery.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/CancelableQuery.java
deleted file mode 100644
index 5b11943ec..000000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/CancelableQuery.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.work;
-
-public interface CancelableQuery {
- public void cancel();
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
index 9743d6e0c..4f99f859b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
@@ -25,14 +25,13 @@ import org.apache.drill.exec.proto.BitControl.PlanFragment;
import com.google.common.base.Preconditions;
public class QueryWorkUnit {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWorkUnit.class);
-
+ // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWorkUnit.class);
private final PlanFragment rootFragment; // for local
private final FragmentRoot rootOperator; // for local
private final List<PlanFragment> fragments;
- public QueryWorkUnit(FragmentRoot rootOperator, PlanFragment rootFragment, List<PlanFragment> fragments) {
- super();
+ public QueryWorkUnit(final FragmentRoot rootOperator, final PlanFragment rootFragment,
+ final List<PlanFragment> fragments) {
Preconditions.checkNotNull(rootFragment);
Preconditions.checkNotNull(fragments);
Preconditions.checkNotNull(rootOperator);
@@ -53,11 +52,4 @@ public class QueryWorkUnit {
public FragmentRoot getRootOperator() {
return rootOperator;
}
-
-
-
-
-
-
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/StatusProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/StatusProvider.java
deleted file mode 100644
index 6086f74a5..000000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/StatusProvider.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.work;
-
-import org.apache.drill.exec.proto.BitControl.FragmentStatus;
-
-public interface StatusProvider {
-
- /**
- * Provides the current status of the FragmentExecutor's work.
- * @return Status if currently. Null if in another state.
- */
- public FragmentStatus getStatus();
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 125c56df0..a08630ab7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -17,25 +17,21 @@
*/
package org.apache.drill.exec.work;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.drill.common.SelfCleaningRunnable;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.DrillRpcFuture;
import org.apache.drill.exec.rpc.NamedThreadFactory;
import org.apache.drill.exec.rpc.RpcException;
@@ -50,7 +46,6 @@ import org.apache.drill.exec.store.sys.PStoreProvider;
import org.apache.drill.exec.work.batch.ControlHandlerImpl;
import org.apache.drill.exec.work.batch.ControlMessageHandler;
import org.apache.drill.exec.work.foreman.Foreman;
-import org.apache.drill.exec.work.foreman.QueryStatus;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.apache.drill.exec.work.fragment.FragmentManager;
import org.apache.drill.exec.work.user.UserWorker;
@@ -59,23 +54,24 @@ import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import com.google.common.collect.Queues;
-public class WorkManager implements Closeable {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkManager.class);
-
- private Set<FragmentManager> incomingFragments = Collections.newSetFromMap(Maps
- .<FragmentManager, Boolean> newConcurrentMap());
-
- private LinkedBlockingQueue<RunnableWrapper> pendingTasks = Queues.newLinkedBlockingQueue();
-
- private Map<FragmentHandle, FragmentExecutor> runningFragments = Maps.newConcurrentMap();
+/**
+ * Manages the running fragments in a Drillbit. Periodically requests run-time stats updates from fragments
+ * running elsewhere.
+ */
+public class WorkManager implements AutoCloseable {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkManager.class);
- private ConcurrentMap<QueryId, Foreman> queries = Maps.newConcurrentMap();
+ /*
+ * We use a {@see java.util.concurrent.ConcurrentHashMap} because it promises never to throw a
+ * {@see java.util.ConcurrentModificationException}; we need that because the statusThread may
+ * iterate over the map while other threads add FragmentExecutors via the {@see #WorkerBee}.
+ */
+ private final Map<FragmentHandle, FragmentExecutor> runningFragments = new ConcurrentHashMap<>();
- private ConcurrentMap<QueryId, QueryStatus> status = Maps.newConcurrentMap();
+ private final ConcurrentMap<QueryId, Foreman> queries = Maps.newConcurrentMap();
- private BootStrapContext bContext;
+ private final BootStrapContext bContext;
private DrillbitContext dContext;
private final ControlMessageHandler controlMessageWorker;
@@ -83,48 +79,50 @@ public class WorkManager implements Closeable {
private final UserWorker userWorker;
private final WorkerBee bee;
private final WorkEventBus workBus;
- private ExecutorService executor;
- private final EventThread eventThread;
+ private final ExecutorService executor;
private final StatusThread statusThread;
- private Controller controller;
- public WorkManager(BootStrapContext context) {
- this.bee = new WorkerBee();
- this.workBus = new WorkEventBus(bee);
+ /**
+ * How often the StatusThread collects statistics about running fragments.
+ */
+ private final static int STATUS_PERIOD_SECONDS = 5;
+
+ public WorkManager(final BootStrapContext context) {
this.bContext = context;
- this.controlMessageWorker = new ControlHandlerImpl(bee);
- this.userWorker = new UserWorker(bee);
- this.eventThread = new EventThread();
- this.statusThread = new StatusThread();
- this.dataHandler = new DataResponseHandlerImpl(bee);
+ bee = new WorkerBee(); // TODO should this just be an interface?
+ workBus = new WorkEventBus(); // TODO should this just be an interface?
+
+ /*
+ * TODO
+ * This executor isn't bounded in any way and could create an arbitrarily large number of
+ * threads, possibly choking the machine. We should really put an upper bound on the number of
+ * threads that can be created. Ideally, this might be computed based on the number of cores or
+ * some similar metric; ThreadPoolExecutor can impose an upper bound, and might be a better choice.
+ */
+ executor = Executors.newCachedThreadPool(new NamedThreadFactory("WorkManager-"));
+
+ // TODO references to this escape here (via WorkerBee) before construction is done
+ controlMessageWorker = new ControlHandlerImpl(bee); // TODO getFragmentRunner(), getForemanForQueryId()
+ userWorker = new UserWorker(bee); // TODO should just be an interface? addNewForeman(), getForemanForQueryId()
+ statusThread = new StatusThread();
+ dataHandler = new DataResponseHandlerImpl(bee); // TODO only uses startFragmentPendingRemote()
}
- public void start(DrillbitEndpoint endpoint, Controller controller,
- DataConnectionCreator data, ClusterCoordinator coord, PStoreProvider provider) {
- this.executor = Executors.newCachedThreadPool(new NamedThreadFactory("WorkManager-"));
- this.dContext = new DrillbitContext(endpoint, bContext, coord, controller, data, workBus, provider, executor);
- // executor = Executors.newFixedThreadPool(dContext.getConfig().getInt(ExecConstants.EXECUTOR_THREADS)
- this.controller = controller;
- this.eventThread.start();
- this.statusThread.start();
+ public void start(final DrillbitEndpoint endpoint, final Controller controller,
+ final DataConnectionCreator data, final ClusterCoordinator coord, final PStoreProvider provider) {
+ dContext = new DrillbitContext(endpoint, bContext, coord, controller, data, workBus, provider, executor);
+ statusThread.start();
+
// TODO remove try block once metrics moved from singleton, For now catch to avoid unit test failures
try {
dContext.getMetrics().register(
- MetricRegistry.name("drill.exec.work.running_fragments." + dContext.getEndpoint().getUserPort()),
+ MetricRegistry.name("drill.exec.work.running_fragments." + dContext.getEndpoint().getUserPort()),
new Gauge<Integer>() {
@Override
public Integer getValue() {
return runningFragments.size();
}
- });
- dContext.getMetrics().register(
- MetricRegistry.name("drill.exec.work.pendingTasks" + dContext.getEndpoint().getUserPort()),
- new Gauge<Integer>() {
- @Override
- public Integer getValue() {
- return pendingTasks.size();
- }
- });
+ });
} catch (IllegalArgumentException e) {
logger.warn("Exception while registering metrics", e);
}
@@ -155,7 +153,7 @@ public class WorkManager implements Closeable {
}
@Override
- public void close() throws IOException {
+ public void close() throws Exception {
try {
if (executor != null) {
executor.awaitTermination(1, TimeUnit.SECONDS);
@@ -169,147 +167,90 @@ public class WorkManager implements Closeable {
return dContext;
}
- private static String getId(FragmentHandle handle){
- return "FragmentExecutor: " + QueryIdHelper.getQueryId(handle.getQueryId()) + ':' + handle.getMajorFragmentId() + ':' + handle.getMinorFragmentId();
- }
-
- // create this so items can see the data here whether or not they are in this package.
+ /**
+ * Narrowed interface to WorkManager that is made available to tasks it is managing.
+ */
public class WorkerBee {
-
-
-
- public void addFragmentRunner(FragmentExecutor runner) {
- logger.debug("Adding pending task {}", runner);
- RunnableWrapper wrapper = new RunnableWrapper(runner, getId(runner.getContext().getHandle()));
- pendingTasks.add(wrapper);
- }
-
- public void addNewForeman(Foreman foreman) {
- String id = "Foreman: " + QueryIdHelper.getQueryId(foreman.getQueryId());
- RunnableWrapper wrapper = new RunnableWrapper(foreman, id);
- pendingTasks.add(wrapper);
+ public void addNewForeman(final Foreman foreman) {
queries.put(foreman.getQueryId(), foreman);
+ executor.execute(new SelfCleaningRunnable(foreman) {
+ @Override
+ protected void cleanup() {
+ queries.remove(foreman.getQueryId(), foreman);
+ }
+ });
}
- public void addFragmentPendingRemote(FragmentManager handler) {
- incomingFragments.add(handler);
- }
-
- public void startFragmentPendingRemote(FragmentManager handler) {
- incomingFragments.remove(handler);
- FragmentExecutor runner = handler.getRunnable();
- RunnableWrapper wrapper = new RunnableWrapper(runner, getId(runner.getContext().getHandle()));
- pendingTasks.add(wrapper);
- }
-
- public FragmentExecutor getFragmentRunner(FragmentHandle handle) {
- return runningFragments.get(handle);
+ public Foreman getForemanForQueryId(final QueryId queryId) {
+ return queries.get(queryId);
}
- public void removeFragment(FragmentHandle handle) {
- runningFragments.remove(handle);
+ public DrillbitContext getContext() {
+ return dContext;
}
- public Foreman getForemanForQueryId(QueryId queryId) {
- return queries.get(queryId);
+ public void startFragmentPendingRemote(final FragmentManager handler) {
+ executor.execute(handler.getRunnable());
}
- public void retireForeman(Foreman foreman) {
- queries.remove(foreman.getQueryId(), foreman);
+ public void addFragmentRunner(final FragmentExecutor fragmentExecutor) {
+ final FragmentHandle fragmentHandle = fragmentExecutor.getContext().getHandle();
+ runningFragments.put(fragmentHandle, fragmentExecutor);
+ executor.execute(new SelfCleaningRunnable(fragmentExecutor) {
+ @Override
+ protected void cleanup() {
+ runningFragments.remove(fragmentHandle);
+ }
+ });
}
- public DrillbitContext getContext() {
- return dContext;
+ public FragmentExecutor getFragmentRunner(final FragmentHandle handle) {
+ return runningFragments.get(handle);
}
-
}
+ /**
+ * Periodically gather current statistics. {@link QueryManager} uses a FragmentStatusListener to
+ * maintain changes to state, and should be current. However, we want to collect current statistics
+ * about RUNNING queries, such as current memory consumption, number of rows processed, and so on.
+ * The FragmentStatusListener only tracks changes to state, so the statistics kept there will be
+ * stale; this thread probes for current values.
+ */
private class StatusThread extends Thread {
public StatusThread() {
- this.setDaemon(true);
- this.setName("WorkManager Status Reporter");
+ setDaemon(true);
+ setName("WorkManager.StatusThread");
}
@Override
public void run() {
- while(true){
- List<DrillRpcFuture<Ack>> futures = Lists.newArrayList();
- for(FragmentExecutor e : runningFragments.values()){
- FragmentStatus status = e.getStatus();
- if(status == null){
+ while(true) {
+ final List<DrillRpcFuture<Ack>> futures = Lists.newArrayList();
+ for(FragmentExecutor fragmentExecutor : runningFragments.values()) {
+ final FragmentStatus status = fragmentExecutor.getStatus();
+ if (status == null) {
continue;
}
- DrillbitEndpoint ep = e.getContext().getForemanEndpoint();
- futures.add(controller.getTunnel(ep).sendFragmentStatus(status));
+
+ final DrillbitEndpoint ep = fragmentExecutor.getContext().getForemanEndpoint();
+ futures.add(dContext.getController().getTunnel(ep).sendFragmentStatus(status));
}
- for(DrillRpcFuture<Ack> future : futures){
- try{
+ for(DrillRpcFuture<Ack> future : futures) {
+ try {
future.checkedGet();
- }catch(RpcException ex){
+ } catch(RpcException ex) {
logger.info("Failure while sending intermediate fragment status to Foreman", ex);
}
}
- try{
- Thread.sleep(5000);
- }catch(InterruptedException e){
+ try {
+ Thread.sleep(STATUS_PERIOD_SECONDS * 1000);
+ } catch(InterruptedException e) {
// exit status thread on interrupt.
break;
}
}
}
-
- }
-
- private class EventThread extends Thread {
- public EventThread() {
- this.setDaemon(true);
- this.setName("WorkManager Event Thread");
- }
-
- @Override
- public void run() {
- try {
- while (true) {
- // logger.debug("Polling for pending work tasks.");
- RunnableWrapper r = pendingTasks.take();
- if (r != null) {
- logger.debug("Starting pending task {}", r);
- if (r.inner instanceof FragmentExecutor) {
- FragmentExecutor fragmentExecutor = (FragmentExecutor) r.inner;
- runningFragments.put(fragmentExecutor.getContext().getHandle(), fragmentExecutor);
- }
- executor.execute(r);
- }
-
- }
- } catch (InterruptedException e) {
- logger.info("Work Manager stopping as it was interrupted.");
- }
- }
-
}
-
- private class RunnableWrapper implements Runnable {
-
- final Runnable inner;
- private final String id;
-
- public RunnableWrapper(Runnable r, String id){
- this.inner = r;
- this.id = id;
- }
-
- @Override
- public void run() {
- try{
- inner.run();
- }catch(Exception | Error e){
- logger.error("Failure while running wrapper [{}]", id, e);
- }
- }
-
- }
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
index 3228da9a9..3a7123d6c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java
@@ -20,8 +20,6 @@ package org.apache.drill.exec.work.batch;
import static org.apache.drill.exec.rpc.RpcBus.get;
import io.netty.buffer.ByteBuf;
-import java.io.IOException;
-
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.proto.BitControl.FinishedReceiver;
@@ -33,7 +31,6 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.Acks;
import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.RpcConstants;
@@ -42,123 +39,129 @@ import org.apache.drill.exec.rpc.UserRpcException;
import org.apache.drill.exec.rpc.control.ControlConnection;
import org.apache.drill.exec.rpc.control.ControlTunnel;
import org.apache.drill.exec.rpc.data.DataRpcConfig;
+import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.work.WorkManager.WorkerBee;
import org.apache.drill.exec.work.foreman.Foreman;
-import org.apache.drill.exec.work.foreman.QueryStatus;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.apache.drill.exec.work.fragment.FragmentManager;
import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
import org.apache.drill.exec.work.fragment.NonRootStatusReporter;
public class ControlHandlerImpl implements ControlMessageHandler {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlHandlerImpl.class);
-
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlHandlerImpl.class);
private final WorkerBee bee;
- public ControlHandlerImpl(WorkerBee bee) {
- super();
+ public ControlHandlerImpl(final WorkerBee bee) {
this.bee = bee;
}
@Override
- public Response handle(ControlConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException {
+ public Response handle(final ControlConnection connection, final int rpcType,
+ final ByteBuf pBody, final ByteBuf dBody) throws RpcException {
if (RpcConstants.EXTRA_DEBUGGING) {
logger.debug("Received bit com message of type {}", rpcType);
}
switch (rpcType) {
- case RpcType.REQ_CANCEL_FRAGMENT_VALUE:
- FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
+ case RpcType.REQ_CANCEL_FRAGMENT_VALUE: {
+ final FragmentHandle handle = get(pBody, FragmentHandle.PARSER);
cancelFragment(handle);
return DataRpcConfig.OK;
+ }
- case RpcType.REQ_RECEIVER_FINISHED_VALUE:
- FinishedReceiver finishedReceiver = get(pBody, FinishedReceiver.PARSER);
+ case RpcType.REQ_RECEIVER_FINISHED_VALUE: {
+ final FinishedReceiver finishedReceiver = get(pBody, FinishedReceiver.PARSER);
receivingFragmentFinished(finishedReceiver);
return DataRpcConfig.OK;
+ }
case RpcType.REQ_FRAGMENT_STATUS_VALUE:
- bee.getContext().getWorkBus().status( get(pBody, FragmentStatus.PARSER));
+ bee.getContext().getWorkBus().statusUpdate( get(pBody, FragmentStatus.PARSER));
// TODO: Support a type of message that has no response.
return DataRpcConfig.OK;
- case RpcType.REQ_QUERY_CANCEL_VALUE:
- QueryId id = get(pBody, QueryId.PARSER);
- Foreman f = bee.getForemanForQueryId(id);
- if(f != null){
- f.cancel();
+ case RpcType.REQ_QUERY_CANCEL_VALUE: {
+ final QueryId queryId = get(pBody, QueryId.PARSER);
+ final Foreman foreman = bee.getForemanForQueryId(queryId);
+ if (foreman != null) {
+ foreman.cancel();
return DataRpcConfig.OK;
- }else{
+ } else {
return DataRpcConfig.FAIL;
}
+ }
- case RpcType.REQ_INIATILIZE_FRAGMENTS_VALUE:
- InitializeFragments fragments = get(pBody, InitializeFragments.PARSER);
- for(int i =0; i < fragments.getFragmentCount(); i++){
+ case RpcType.REQ_INIATILIZE_FRAGMENTS_VALUE: {
+ final InitializeFragments fragments = get(pBody, InitializeFragments.PARSER);
+ for(int i = 0; i < fragments.getFragmentCount(); i++) {
startNewRemoteFragment(fragments.getFragment(i));
}
return DataRpcConfig.OK;
+ }
- case RpcType.REQ_QUERY_STATUS_VALUE:
- QueryId queryId = get(pBody, QueryId.PARSER);
- Foreman foreman = bee.getForemanForQueryId(queryId);
- QueryProfile profile;
+ case RpcType.REQ_QUERY_STATUS_VALUE: {
+ final QueryId queryId = get(pBody, QueryId.PARSER);
+ final Foreman foreman = bee.getForemanForQueryId(queryId);
if (foreman == null) {
throw new RpcException("Query not running on node.");
- } else {
- profile = bee.getForemanForQueryId(queryId).getQueryStatus().getAsProfile();
}
+ final QueryProfile profile = foreman.getQueryManager().getQueryProfile();
return new Response(RpcType.RESP_QUERY_STATUS, profile);
+ }
default:
throw new RpcException("Not yet supported.");
}
-
}
@Override
- public void startNewRemoteFragment(PlanFragment fragment) throws UserRpcException {
+ public void startNewRemoteFragment(final PlanFragment fragment) throws UserRpcException {
logger.debug("Received remote fragment start instruction", fragment);
+ final DrillbitContext drillbitContext = bee.getContext();
try {
// we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf.
- if(fragment.getLeafFragment()){
- FragmentContext context = new FragmentContext(bee.getContext(), fragment, null, bee.getContext().getFunctionImplementationRegistry());
- ControlTunnel tunnel = bee.getContext().getController().getTunnel(fragment.getForeman());
- NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);
- FragmentRoot rootOperator = bee.getContext().getPlanReader().readFragmentOperator(fragment.getFragmentJson());
- FragmentExecutor fr = new FragmentExecutor(context, bee, rootOperator, listener);
+ if (fragment.getLeafFragment()) {
+ final FragmentContext context = new FragmentContext(drillbitContext, fragment, null,
+ drillbitContext.getFunctionImplementationRegistry());
+ final ControlTunnel tunnel = drillbitContext.getController().getTunnel(fragment.getForeman());
+ final NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel);
+ final FragmentRoot rootOperator = drillbitContext.getPlanReader().readFragmentOperator(
+ fragment.getFragmentJson());
+ final FragmentExecutor fr = new FragmentExecutor(context, rootOperator, listener);
bee.addFragmentRunner(fr);
- }else{ // isIntermediate, store for incoming data.
- NonRootFragmentManager manager = new NonRootFragmentManager(fragment, bee);
- bee.getContext().getWorkBus().setFragmentManager(manager);
+ } else {
+ // isIntermediate, store for incoming data.
+ final NonRootFragmentManager manager = new NonRootFragmentManager(fragment, drillbitContext);
+ drillbitContext.getWorkBus().addFragmentManager(manager);
}
} catch (Exception e) {
- throw new UserRpcException(bee.getContext().getEndpoint(), "Failure while trying to start remote fragment", e);
+ throw new UserRpcException(drillbitContext.getEndpoint(),
+ "Failure while trying to start remote fragment", e);
} catch (OutOfMemoryError t) {
if (t.getMessage().startsWith("Direct buffer")) {
- throw new UserRpcException(bee.getContext().getEndpoint(), "Out of direct memory while trying to start remote fragment", t);
+ throw new UserRpcException(drillbitContext.getEndpoint(),
+ "Out of direct memory while trying to start remote fragment", t);
} else {
throw t;
}
}
-
}
/* (non-Javadoc)
* @see org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle)
*/
@Override
- public Ack cancelFragment(FragmentHandle handle) {
- FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle);
+ public Ack cancelFragment(final FragmentHandle handle) {
+ final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle);
if (manager != null) {
// try remote fragment cancel.
manager.cancel();
} else {
// then try local cancel.
- FragmentExecutor runner = bee.getFragmentRunner(handle);
+ final FragmentExecutor runner = bee.getFragmentRunner(handle);
if (runner != null) {
runner.cancel();
}
@@ -167,8 +170,9 @@ public class ControlHandlerImpl implements ControlMessageHandler {
return Acks.OK;
}
- public Ack receivingFragmentFinished(FinishedReceiver finishedReceiver) {
- FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(finishedReceiver.getSender());
+ private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) {
+ final FragmentManager manager =
+ bee.getContext().getWorkBus().getFragmentManagerIfExists(finishedReceiver.getSender());
FragmentExecutor executor;
if (manager != null) {
@@ -184,5 +188,4 @@ public class ControlHandlerImpl implements ControlMessageHandler {
return Acks.OK;
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
index f0b4983f6..2a79e42c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
@@ -162,6 +162,7 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
return batch;
} catch (InterruptedException e) {
return null;
+ // TODO InterruptedException
}
}
if (w == null) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 3d5b94849..2430e644c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -162,6 +162,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
b = buffer.take();
} catch (InterruptedException e) {
return null;
+ // TODO InterruptedException
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java
index ca52f0cde..80f2ca113 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java
@@ -26,7 +26,7 @@ import java.util.Set;
* Interface to define the listener to take actions when the set of active drillbits is changed.
*/
public interface DrillbitStatusListener {
-
+ // TODO this doesn't belong here
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitStatusListener.class);
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 8e0780b23..9650ee5d4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -19,27 +19,27 @@ package org.apache.drill.exec.work.foreman;
import io.netty.buffer.ByteBuf;
-import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import com.google.common.base.Preconditions;
+
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.LogicalPlan;
import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.coord.DistributedSemaphore;
import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease;
import org.apache.drill.exec.exception.OptimizerException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.ops.QueryDateTimeInfo;
import org.apache.drill.exec.opt.BasicOptimizer;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.FragmentRoot;
@@ -64,10 +64,11 @@ import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.BaseRpcOutcomeListener;
import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.control.Controller;
import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.testing.ExceptionInjector;
import org.apache.drill.exec.util.Pointer;
import org.apache.drill.exec.work.EndpointListener;
import org.apache.drill.exec.work.ErrorHelper;
@@ -76,117 +77,120 @@ import org.apache.drill.exec.work.WorkManager.WorkerBee;
import org.apache.drill.exec.work.batch.IncomingBuffers;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.apache.drill.exec.work.fragment.RootFragmentManager;
+import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
/**
- * Foreman manages all queries where this is the driving/root node.
+ * Foreman manages all the fragments (local and remote) for a single query where this
+ * is the driving/root node.
*
* The flow is as follows:
- * - Foreman is submitted as a runnable.
- * - Runnable does query planning.
- * - PENDING > RUNNING
- * - Runnable sends out starting fragments
- * - Status listener are activated
- * - Foreman listens for state move messages.
- *
+ * - Foreman is submitted as a runnable.
+ * - Runnable does query planning.
+ * - state changes from PENDING to RUNNING
+ * - Runnable sends out starting fragments
+ * - Status listener are activated
+ * - The Runnable's run() completes, but the Foreman stays around
+ * - Foreman listens for state change messages.
+ * - state change messages can drive the state to FAILED or CANCELED, in which case
+ * messages are sent to running fragments to terminate
+ * - when all fragments complete, state change messages drive the state to COMPLETED
*/
-public class Foreman implements Runnable, Closeable, Comparable<Object> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class);
-
- private QueryId queryId;
- private RunQuery queryRequest;
- private QueryContext context;
- private QueryManager queryManager;
- private WorkerBee bee;
- private UserClientConnection initiatingClient;
+public class Foreman implements Runnable {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class);
+ private final static ExceptionInjector injector = ExceptionInjector.getInjector(Foreman.class);
+
+ private final QueryId queryId;
+ private final RunQuery queryRequest;
+ private final QueryContext queryContext;
+ private final QueryManager queryManager; // handles lower-level details of query execution
+ private final WorkerBee bee; // provides an interface to submit tasks
+ private final DrillbitContext drillbitContext;
+ private final UserClientConnection initiatingClient; // used to send responses
private volatile QueryState state;
- private final DistributedSemaphore smallSemaphore;
- private final DistributedSemaphore largeSemaphore;
- private final long queueThreshold;
- private final long queueTimeout;
- private volatile DistributedLease lease;
- private final boolean queuingEnabled;
+ private volatile DistributedLease lease; // used to limit the number of concurrent queries
+
+ private FragmentExecutor rootRunner; // root Fragment
- private FragmentExecutor rootRunner;
- private final CountDownLatch acceptExternalEvents = new CountDownLatch(1);
- private final StateListener stateListener = new StateListener();
+ private final CountDownLatch acceptExternalEvents = new CountDownLatch(1); // gates acceptance of external events
+ private final StateListener stateListener = new StateListener(); // source of external events
private final ResponseSendListener responseListener = new ResponseSendListener();
+ private final ForemanResult foremanResult = new ForemanResult();
- public Foreman(WorkerBee bee, DrillbitContext dContext, UserClientConnection connection, QueryId queryId,
- RunQuery queryRequest) {
+ /**
+ * Constructor. Sets up the Foreman, but does not initiate any execution.
+ *
+ * @param bee used to submit additional work
+ * @param drillbitContext
+ * @param connection
+ * @param queryId the id for the query
+ * @param queryRequest the query to execute
+ */
+ public Foreman(final WorkerBee bee, final DrillbitContext drillbitContext,
+ final UserClientConnection connection, final QueryId queryId, final RunQuery queryRequest) {
+ this.bee = bee;
this.queryId = queryId;
this.queryRequest = queryRequest;
- this.context = new QueryContext(connection.getSession(), queryId, dContext);
+ this.drillbitContext = drillbitContext;
- // set up queuing
- this.queuingEnabled = context.getOptions().getOption(ExecConstants.ENABLE_QUEUE_KEY).bool_val;
- if (queuingEnabled) {
- int smallQueue = context.getOptions().getOption(ExecConstants.SMALL_QUEUE_KEY).num_val.intValue();
- int largeQueue = context.getOptions().getOption(ExecConstants.LARGE_QUEUE_KEY).num_val.intValue();
- this.largeSemaphore = dContext.getClusterCoordinator().getSemaphore("query.large", largeQueue);
- this.smallSemaphore = dContext.getClusterCoordinator().getSemaphore("query.small", smallQueue);
- this.queueThreshold = context.getOptions().getOption(ExecConstants.QUEUE_THRESHOLD_KEY).num_val;
- this.queueTimeout = context.getOptions().getOption(ExecConstants.QUEUE_TIMEOUT_KEY).num_val;
- } else {
- this.largeSemaphore = null;
- this.smallSemaphore = null;
- this.queueThreshold = 0;
- this.queueTimeout = 0;
- }
- // end queuing setup.
-
- this.initiatingClient = connection;
- this.queryManager = new QueryManager(queryId, queryRequest, bee.getContext().getPersistentStoreProvider(),
- stateListener, this);
- this.bee = bee;
+ initiatingClient = connection;
+ queryContext = new QueryContext(connection.getSession(), drillbitContext);
+ queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getPersistentStoreProvider(),
+ stateListener, this); // TODO reference escapes before ctor is complete via stateListener, this
recordNewState(QueryState.PENDING);
}
- public QueryContext getContext() {
- return context;
+ /**
+ * Get the QueryContext created for the query.
+ *
+ * @return the QueryContext
+ */
+ public QueryContext getQueryContext() {
+ return queryContext;
}
- public void cancel() {
- stateListener.moveToState(QueryState.CANCELED, null);
+ /**
+ * Get the QueryManager created for the query.
+ *
+ * @return the QueryManager
+ */
+ public QueryManager getQueryManager() {
+ return queryManager;
}
- private void cleanup(QueryResult result) {
- logger.info("foreman cleaning up - status: {}", queryManager.getStatus());
-
- bee.retireForeman(this);
- context.getWorkBus().removeFragmentStatusListener(queryId);
- context.getClusterCoordinator().removeDrillbitStatusListener(queryManager);
-
- try {
- try {
- context.close();
- } catch (Exception e) {
- moveToState(QueryState.FAILED, e);
- return;
- }
-
- if (result != null) {
- initiatingClient.sendResult(responseListener, new QueryWritableBatch(result), true);
- }
- } finally {
- releaseLease();
- }
+ /**
+ * Cancel the query. Asynchronous -- it may take some time for all remote fragments to be
+ * terminated.
+ */
+ public void cancel() {
+ // Note this can be called from outside of run() on another thread, or after run() completes
+ stateListener.moveToState(QueryState.CANCELLATION_REQUESTED, null);
}
/**
- * Called by execution pool to do foreman setup. Actual query execution is a separate phase (and can be scheduled).
+ * Called by execution pool to do query setup, and kick off remote execution.
+ *
+ * <p>Note that completion of this function is not the end of the Foreman's role
+ * in the query's lifecycle.
*/
+ @Override
public void run() {
+ // rename the thread we're using for debugging purposes
+ final Thread currentThread = Thread.currentThread();
+ final String originalName = currentThread.getName();
+ currentThread.setName(QueryIdHelper.getQueryId(queryId) + ":foreman");
+
+ // track how long the query takes
+ queryManager.markStartTime();
- final String originalThread = Thread.currentThread().getName();
- Thread.currentThread().setName(QueryIdHelper.getQueryId(queryId) + ":foreman");
- getStatus().markStart();
- // convert a run query request into action
try {
+ injector.injectChecked(drillbitContext, "run-try-beginning", ForemanException.class);
+
+ // convert a run query request into action
switch (queryRequest.getType()) {
case LOGICAL:
parseAndRunLogicalPlan(queryRequest.getPlan());
@@ -200,39 +204,71 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
default:
throw new IllegalStateException();
}
+ injector.injectChecked(drillbitContext, "run-try-end", ForemanException.class);
} catch (ForemanException e) {
moveToState(QueryState.FAILED, e);
-
} catch (AssertionError | Exception ex) {
- moveToState(QueryState.FAILED, new ForemanException("Unexpected exception during fragment initialization: " + ex.getMessage(), ex));
-
+ moveToState(QueryState.FAILED,
+ new ForemanException("Unexpected exception during fragment initialization: " + ex.getMessage(), ex));
} catch (OutOfMemoryError e) {
+ /*
+ * FragmentExecutors use a DrillbitStatusListener to watch out for the death of their query's Foreman.
+ * So, if we die here, they should get notified about that, and cancel themselves; we don't have to
+ * attempt to notify them, which might not work under these conditions.
+ */
+ /*
+ * TODO this will kill everything in this JVM; why can't we just free all allocation
+ * associated with this Foreman and allow others to continue?
+ */
System.out.println("Out of memory, exiting.");
e.printStackTrace();
System.out.flush();
System.exit(-1);
-
} finally {
- Thread.currentThread().setName(originalThread);
+ /*
+ * Begin accepting external events.
+ *
+ * Doing this here in the finally clause will guarantee that it occurs. Otherwise, if there
+ * is an exception anywhere during setup, it wouldn't occur, and any events that are generated
+ * as a result of any partial setup that was done (such as the FragmentSubmitListener,
+ * the ResponseSendListener, or an external call to cancel()), will hang the thread that makes the
+ * event delivery call.
+ *
+ * If we do throw an exception during setup, and have already moved to QueryState.FAILED, we just need to
+ * make sure that we can't make things any worse as those events are delivered, but allow
+ * any necessary remaining cleanup to proceed.
+ */
+ acceptExternalEvents.countDown();
+
+ // restore the thread's original name
+ currentThread.setName(originalName);
}
+
+ /*
+ * Note that despite the run() completing, the Foreman continues to exist, and receives
+ * events (indirectly, through the QueryManager's use of stateListener), about fragment
+ * completions. It won't go away until everything is completed, failed, or cancelled.
+ */
}
private void releaseLease() {
- if (lease != null) {
+ while (lease != null) {
try {
lease.close();
+ lease = null;
+ } catch (InterruptedException e) {
+ // if we end up here, the while loop will try again
} catch (Exception e) {
logger.warn("Failure while releasing lease.", e);
+ break;
}
- ;
}
-
}
- private void parseAndRunLogicalPlan(String json) throws ExecutionSetupException {
+ private void parseAndRunLogicalPlan(final String json) throws ExecutionSetupException {
LogicalPlan logicalPlan;
try {
- logicalPlan = context.getPlanReader().readLogicalPlan(json);
+ logicalPlan = drillbitContext.getPlanReader().readLogicalPlan(json);
} catch (IOException e) {
throw new ForemanException("Failure parsing logical plan.", e);
}
@@ -244,7 +280,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
log(logicalPlan);
- PhysicalPlan physicalPlan = convert(logicalPlan);
+ final PhysicalPlan physicalPlan = convert(logicalPlan);
if (logicalPlan.getProperties().resultMode == ResultMode.PHYSICAL) {
returnPhysical(physicalPlan);
@@ -252,20 +288,19 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
}
log(physicalPlan);
-
runPhysicalPlan(physicalPlan);
}
- private void log(LogicalPlan plan) {
+ private void log(final LogicalPlan plan) {
if (logger.isDebugEnabled()) {
- logger.debug("Logical {}", plan.unparse(context.getConfig()));
+ logger.debug("Logical {}", plan.unparse(queryContext.getConfig()));
}
}
- private void log(PhysicalPlan plan) {
+ private void log(final PhysicalPlan plan) {
if (logger.isDebugEnabled()) {
try {
- String planText = context.getConfig().getMapper().writeValueAsString(plan);
+ String planText = queryContext.getConfig().getMapper().writeValueAsString(plan);
logger.debug("Physical {}", planText);
} catch (IOException e) {
logger.warn("Error while attempting to log physical plan.", e);
@@ -273,60 +308,54 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
}
}
- private void returnPhysical(PhysicalPlan plan) throws ExecutionSetupException {
- String jsonPlan = plan.unparse(context.getConfig().getMapper().writer());
- runPhysicalPlan(DirectPlan.createDirectPlan(context, new PhysicalFromLogicalExplain(jsonPlan)));
+ private void returnPhysical(final PhysicalPlan plan) throws ExecutionSetupException {
+ final String jsonPlan = plan.unparse(queryContext.getConfig().getMapper().writer());
+ runPhysicalPlan(DirectPlan.createDirectPlan(queryContext, new PhysicalFromLogicalExplain(jsonPlan)));
}
public static class PhysicalFromLogicalExplain {
- public String json;
+ public final String json;
- public PhysicalFromLogicalExplain(String json) {
- super();
+ public PhysicalFromLogicalExplain(final String json) {
this.json = json;
}
-
}
- private void parseAndRunPhysicalPlan(String json) throws ExecutionSetupException {
+ private void parseAndRunPhysicalPlan(final String json) throws ExecutionSetupException {
try {
- PhysicalPlan plan = context.getPlanReader().readPhysicalPlan(json);
+ final PhysicalPlan plan = drillbitContext.getPlanReader().readPhysicalPlan(json);
runPhysicalPlan(plan);
} catch (IOException e) {
throw new ForemanSetupException("Failure while parsing physical plan.", e);
}
}
- private void runPhysicalPlan(PhysicalPlan plan) throws ExecutionSetupException {
-
+ private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupException {
validatePlan(plan);
setupSortMemoryAllocations(plan);
acquireQuerySemaphore(plan);
final QueryWorkUnit work = getQueryWorkUnit(plan);
-
- this.context.getWorkBus().setFragmentStatusListener(work.getRootFragment().getHandle().getQueryId(), queryManager);
- this.context.getClusterCoordinator().addDrillbitStatusListener(queryManager);
-
- logger.debug("Submitting fragments to run.");
-
+ final List<PlanFragment> planFragments = work.getFragments();
final PlanFragment rootPlanFragment = work.getRootFragment();
assert queryId == rootPlanFragment.getHandle().getQueryId();
- queryManager.setup(rootPlanFragment.getHandle(), context.getCurrentEndpoint(), work.getFragments().size());
+ drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager);
+ drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager);
+
+ logger.debug("Submitting fragments to run.");
// set up the root fragment first so we'll have incoming buffers available.
setupRootFragment(rootPlanFragment, initiatingClient, work.getRootOperator());
- setupNonRootFragments(work.getFragments());
- bee.getContext().getAllocator().resetFragmentLimits();
+ setupNonRootFragments(planFragments);
+ drillbitContext.getAllocator().resetFragmentLimits(); // TODO a global effect for this query?!?
moveToState(QueryState.RUNNING, null);
logger.debug("Fragments running.");
-
}
- private void validatePlan(PhysicalPlan plan) throws ForemanSetupException{
+ private static void validatePlan(final PhysicalPlan plan) throws ForemanSetupException {
if (plan.getProperties().resultMode != ResultMode.EXEC) {
throw new ForemanSetupException(String.format(
"Failure running plan. You requested a result mode of %s and a physical plan can only be output as EXEC",
@@ -334,369 +363,632 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> {
}
}
- private void setupSortMemoryAllocations(PhysicalPlan plan){
- int sortCount = 0;
+ private void setupSortMemoryAllocations(final PhysicalPlan plan) {
+ // look for external sorts
+ final List<ExternalSort> sortList = new LinkedList<>();
for (PhysicalOperator op : plan.getSortedOperators()) {
if (op instanceof ExternalSort) {
- sortCount++;
+ sortList.add((ExternalSort) op);
}
}
- if (sortCount > 0) {
- long maxWidthPerNode = context.getOptions().getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val;
+ // if there are any sorts, compute the maximum allocation, and set it on them
+ if (sortList.size() > 0) {
+ final OptionManager optionManager = queryContext.getOptions();
+ final long maxWidthPerNode = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val;
long maxAllocPerNode = Math.min(DrillConfig.getMaxDirectMemory(),
- context.getConfig().getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC));
+ queryContext.getConfig().getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC));
maxAllocPerNode = Math.min(maxAllocPerNode,
- context.getOptions().getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val);
- long maxSortAlloc = maxAllocPerNode / (sortCount * maxWidthPerNode);
+ optionManager.getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val);
+ final long maxSortAlloc = maxAllocPerNode / (sortList.size() * maxWidthPerNode);
logger.debug("Max sort alloc: {}", maxSortAlloc);
- for (PhysicalOperator op : plan.getSortedOperators()) {
- if (op instanceof ExternalSort) {
- ((ExternalSort) op).setMaxAllocation(maxSortAlloc);
- }
+
+ for(ExternalSort externalSort : sortList) {
+ externalSort.setMaxAllocation(maxSortAlloc);
}
}
}
- private void acquireQuerySemaphore(PhysicalPlan plan) throws ForemanSetupException {
-
- double size = 0;
- for (PhysicalOperator ops : plan.getSortedOperators()) {
- size += ops.getCost();
- }
-
+ /**
+ * This limits the number of "small" and "large" queries that a Drill cluster will run
+ * simultaneously, if queueing is enabled. If the query is unable to run, this will block
+ * until it can. Beware that this is called under run(), and so will consume a Thread
+ * while it waits for the required distributed semaphore.
+ *
+ * @param plan the query plan
+ * @throws ForemanSetupException
+ */
+ private void acquireQuerySemaphore(final PhysicalPlan plan) throws ForemanSetupException {
+ final OptionManager optionManager = queryContext.getOptions();
+ final boolean queuingEnabled = optionManager.getOption(ExecConstants.ENABLE_QUEUE_KEY).bool_val;
if (queuingEnabled) {
+ final long queueThreshold = optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_KEY).num_val;
+ double totalCost = 0;
+ for (PhysicalOperator ops : plan.getSortedOperators()) {
+ totalCost += ops.getCost();
+ }
+
try {
- if (size > this.queueThreshold) {
- this.lease = largeSemaphore.acquire(this.queueTimeout, TimeUnit.MILLISECONDS);
+ @SuppressWarnings("resource")
+ final ClusterCoordinator clusterCoordinator = drillbitContext.getClusterCoordinator();
+ DistributedSemaphore distributedSemaphore;
+
+ // get the appropriate semaphore
+ if (totalCost > queueThreshold) {
+ final int largeQueue = optionManager.getOption(ExecConstants.LARGE_QUEUE_KEY).num_val.intValue();
+ distributedSemaphore = clusterCoordinator.getSemaphore("query.large", largeQueue);
} else {
- this.lease = smallSemaphore.acquire(this.queueTimeout, TimeUnit.MILLISECONDS);
+ final int smallQueue = optionManager.getOption(ExecConstants.SMALL_QUEUE_KEY).num_val.intValue();
+ distributedSemaphore = clusterCoordinator.getSemaphore("query.small", smallQueue);
}
+
+ final long queueTimeout = optionManager.getOption(ExecConstants.QUEUE_TIMEOUT_KEY).num_val;
+ lease = distributedSemaphore.acquire(queueTimeout, TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new ForemanSetupException("Unable to acquire slot for query.", e);
}
}
}
- private QueryWorkUnit getQueryWorkUnit(PhysicalPlan plan) throws ExecutionSetupException {
- PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
- Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null);
+ private QueryWorkUnit getQueryWorkUnit(final PhysicalPlan plan) throws ExecutionSetupException {
+ final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
+ final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null);
+ final SimpleParallelizer parallelizer = new SimpleParallelizer(queryContext);
+ final QueryWorkUnit queryWorkUnit = parallelizer.getFragments(
+ queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(),
+ queryId, queryContext.getActiveEndpoints(), drillbitContext.getPlanReader(), rootFragment,
+ initiatingClient.getSession(), queryContext.getQueryDateTimeInfo());
+
+ if (logger.isInfoEnabled()) {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("PlanFragments for query ");
+ sb.append(queryId);
+ sb.append('\n');
+
+ final List<PlanFragment> planFragments = queryWorkUnit.getFragments();
+ final int fragmentCount = planFragments.size();
+ int fragmentIndex = 0;
+ for(PlanFragment planFragment : planFragments) {
+ final FragmentHandle fragmentHandle = planFragment.getHandle();
+ sb.append("PlanFragment(");
+ sb.append(++fragmentIndex);
+ sb.append('/');
+ sb.append(fragmentCount);
+ sb.append(") major_fragment_id ");
+ sb.append(fragmentHandle.getMajorFragmentId());
+ sb.append(" minor_fragment_id ");
+ sb.append(fragmentHandle.getMinorFragmentId());
+ sb.append('\n');
+
+ final DrillbitEndpoint endpointAssignment = planFragment.getAssignment();
+ sb.append(" DrillbitEndpoint address ");
+ sb.append(endpointAssignment.getAddress());
+ sb.append('\n');
+
+ String jsonString = "<<malformed JSON>>";
+ sb.append(" fragment_json: ");
+ final ObjectMapper objectMapper = new ObjectMapper();
+ try
+ {
+ final Object json = objectMapper.readValue(planFragment.getFragmentJson(), Object.class);
+ jsonString = objectMapper.defaultPrettyPrintingWriter().writeValueAsString(json);
+ } catch(Exception e) {
+ // we've already set jsonString to a fallback value
+ }
+ sb.append(jsonString);
- SimpleParallelizer parallelizer = new SimpleParallelizer(context);
- return parallelizer.getFragments(context.getOptions().getOptionList(), context.getCurrentEndpoint(),
- queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, initiatingClient.getSession(),
- context.getQueryDateTimeInfo());
+ logger.info(sb.toString());
+ }
+ }
+
+ return queryWorkUnit;
}
/**
- * Tells the foreman to move to a new state. Note that
- * @param state
- * @return
+ * Manages the end-state processing for Foreman.
+ *
+ * End-state processing is tricky, because even if a query appears to succeed, but
+ * we then encounter a problem during cleanup, we still want to mark the query as
+ * failed. So we have to construct the successful result we would send, and then
+ * clean up before we send that result, possibly changing that result if we encounter
+ * a problem during cleanup. We only send the result when there is nothing left to
+ * do, so it will account for any possible problems.
+ *
+ * The idea here is to make close()ing the ForemanResult do the final cleanup and
+ * sending. Closing the result must be the last thing that is done by Foreman.
*/
- private synchronized boolean moveToState(QueryState newState, Exception exception){
- logger.info("State change requested. {} --> {}", state, newState, exception);
- outside: switch(state) {
+ private class ForemanResult implements AutoCloseable {
+ private QueryState resultState = null;
+ private Exception resultException = null;
+ private boolean isClosed = false;
+
+ /**
+ * Set up the result for a COMPLETED or CANCELED state.
+ *
+ * <p>Note that before sending this result, we execute cleanup steps that could
+ * result in this result still being changed to a FAILED state.
+ *
+ * @param queryState one of COMPLETED or CANCELED
+ */
+ public void setCompleted(final QueryState queryState) {
+ Preconditions.checkArgument((queryState == QueryState.COMPLETED) || (queryState == QueryState.CANCELED));
+ Preconditions.checkState(!isClosed);
+ Preconditions.checkState(resultState == null);
+
+ resultState = queryState;
+ }
+
+ /**
+ * Set up the result for a FAILED state.
+ *
+ * <p>Failures that occur during cleanup processing will be added as suppressed
+ * exceptions.
+ *
+ * @param exception the exception that led to the FAILED state
+ */
+ public void setFailed(final Exception exception) {
+ Preconditions.checkArgument(exception != null);
+ Preconditions.checkState(!isClosed);
+ Preconditions.checkState(resultState == null);
+
+ resultState = QueryState.FAILED;
+ resultException = exception;
+ }
+
+ /**
+ * Add an exception to the result. All exceptions after the first become suppressed
+ * exceptions hanging off the first.
+ *
+ * @param exception the exception to add
+ */
+ private void addException(final Exception exception) {
+ Preconditions.checkNotNull(exception);
+
+ if (resultException == null) {
+ resultException = exception;
+ } else {
+ resultException.addSuppressed(exception);
+ }
+ }
- case PENDING:
- // since we're moving out of pending, we can now start accepting other changes in state.
- // This guarantees that the first state change is driven by the original thread.
- acceptExternalEvents.countDown();
+ /**
+ * Close the given resource, catching and adding any caught exceptions via
+ * {@link #addException(Exception)}. If an exception is caught, it will change
+ * the result state to FAILED, regardless of what its current value.
+ *
+ * @param autoCloseable the resource to close
+ */
+ private void suppressingClose(final AutoCloseable autoCloseable) {
+ Preconditions.checkState(!isClosed);
+ Preconditions.checkState(resultState != null);
- if(newState == QueryState.RUNNING){
- recordNewState(QueryState.RUNNING);
- return true;
+ if (autoCloseable == null) {
+ return;
}
- // fall through to running behavior.
- //
- case RUNNING: {
+ try {
+ autoCloseable.close();
+ } catch(Exception e) {
+ /*
+ * Even if the query completed successfully, we'll still report failure if we have
+ * problems cleaning up.
+ */
+ resultState = QueryState.FAILED;
+ addException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ Preconditions.checkState(!isClosed);
+ Preconditions.checkState(resultState != null);
+
+ logger.info("foreman cleaning up - status: {}", queryManager.getFragmentStatesAsString());
+
+ // These are straight forward removals from maps, so they won't throw.
+ drillbitContext.getWorkBus().removeFragmentStatusListener(queryId);
+ drillbitContext.getClusterCoordinator().removeDrillbitStatusListener(queryManager);
+
+ suppressingClose(queryContext);
+
+ /*
+ * We do our best to write the latest state, but even that could fail. If it does, we can't write
+ * the (possibly newly failing) state, so we continue on anyway.
+ *
+ * We only need to do this if the resultState differs from the last recorded state
+ */
+ if (resultState != state) {
+ suppressingClose(new AutoCloseable() {
+ @Override
+ public void close() throws Exception {
+ recordNewState(resultState);
+ }
+ });
+ }
+
+ /*
+ * Construct the response based on the latest resultState. The builder shouldn't fail.
+ */
+ final QueryResult.Builder resultBuilder = QueryResult.newBuilder()
+ .setIsLastChunk(resultState != QueryState.COMPLETED) // TODO(DRILL-2498) temporary
+ .setQueryId(queryId)
+ .setQueryState(resultState);
+ if (resultException != null) {
+ final DrillPBError error = ErrorHelper.logAndConvertError(queryContext.getCurrentEndpoint(),
+ ExceptionUtils.getRootCauseMessage(resultException), resultException, logger);
+ resultBuilder.addError(error);
+ }
+
+ /*
+ * If sending the result fails, we don't really have any way to modify the result we tried to send;
+ * it is possible it got sent but the result came from a later part of the code path. It is also
+ * possible the connection has gone away, so this is irrelevant because there's nowhere to
+ * send anything to.
+ */
+ try {
+ // send whatever result we ended up with
+ initiatingClient.sendResult(responseListener, new QueryWritableBatch(resultBuilder.build()), true);
+ } catch(Exception e) {
+ addException(e);
+ logger.warn("Exception sending result to client", resultException);
+ }
+
+ try {
+ releaseLease();
+ } finally {
+ isClosed = true;
+ }
+ }
+ }
+
+ /**
+ * Tells the foreman to move to a new state.
+ *
+ * @param newState the state to move to
+ * @param exception if not null, the exception that drove this state transition (usually a failure)
+ */
+ private synchronized void moveToState(final QueryState newState, final Exception exception) {
+ logger.info("State change requested. {} --> {}", state, newState, exception);
+ switch(state) {
+ case PENDING:
+ if (newState == QueryState.RUNNING) {
+ recordNewState(QueryState.RUNNING);
+ return;
+ }
- switch(newState){
+ //$FALL-THROUGH$
- case CANCELED: {
+ case RUNNING: {
+ /*
+ * For cases that cancel executing fragments, we have to record the new state first, because
+ * the cancellation of the local root fragment will cause this to be called recursively.
+ */
+ switch(newState) {
+ case CANCELLATION_REQUESTED: {
assert exception == null;
- recordNewState(QueryState.CANCELED);
- cancelExecutingFragments();
- QueryResult result = QueryResult.newBuilder() //
- .setQueryId(queryId) //
- .setQueryState(QueryState.CANCELED) //
- .setIsLastChunk(true) //
- .build();
-
- // immediately notify client that cancellation is taking place, final clean-up happens when foreman reaches to
- // a terminal state(completed, failed)
- initiatingClient.sendResult(responseListener, new QueryWritableBatch(result), true);
- return true;
+ queryManager.markEndTime();
+ recordNewState(QueryState.CANCELLATION_REQUESTED);
+ queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
+ foremanResult.setCompleted(QueryState.CANCELED);
+ /*
+ * We don't close the foremanResult until we've gotten acknowledgements, which
+ * happens below in the case for current state == CANCELLATION_REQUESTED.
+ */
+ return;
}
case COMPLETED: {
assert exception == null;
+ queryManager.markEndTime();
recordNewState(QueryState.COMPLETED);
- QueryResult result = QueryResult //
- .newBuilder() //
- .setQueryState(QueryState.COMPLETED) //
- .setQueryId(queryId) //
- .build();
- cleanup(result);
- return true;
+ foremanResult.setCompleted(QueryState.COMPLETED);
+ foremanResult.close();
+ return;
}
-
- case FAILED:
+ case FAILED: {
assert exception != null;
+ queryManager.markEndTime();
recordNewState(QueryState.FAILED);
- cancelExecutingFragments();
- DrillPBError error = ErrorHelper.logAndConvertError(context.getCurrentEndpoint(),
- ExceptionUtils.getRootCauseMessage(exception), exception, logger);
- QueryResult result = QueryResult //
- .newBuilder() //
- .addError(error) //
- .setIsLastChunk(true) //
- .setQueryState(QueryState.FAILED) //
- .setQueryId(queryId) //
- .build();
- cleanup(result);
- return true;
- default:
- break outside;
+ queryManager.cancelExecutingFragments(drillbitContext, rootRunner);
+ foremanResult.setFailed(exception);
+ foremanResult.close();
+ return;
+ }
+ default:
+ throw new IllegalStateException("illegal transition from RUNNING to " + newState);
}
}
+ case CANCELLATION_REQUESTED:
+ if ((newState == QueryState.CANCELED) || (newState == QueryState.COMPLETED)
+ || (newState == QueryState.FAILED)) {
+ /*
+ * These amount to a completion of the cancellation requests' cleanup; now we
+ * can clean up and send the result.
+ */
+ foremanResult.close();
+ }
+ return;
+
case CANCELED:
case COMPLETED:
- case FAILED: {
- // no op.
- logger.warn("Dropping request to move to {} state as query is already at {} state (which is terminal).", newState, state, exception);
- return false;
- }
-
- }
-
- throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s", state.name(), newState.name()));
- }
-
- private void cancelExecutingFragments(){
-
- // Stop all framgents with a currently active status.
- List<FragmentData> fragments = getStatus().getFragmentData();
- Collections.sort(fragments, new Comparator<FragmentData>() {
- @Override
- public int compare(FragmentData o1, FragmentData o2) {
- return o2.getHandle().getMajorFragmentId() - o1.getHandle().getMajorFragmentId();
- }
- });
- for(FragmentData data: fragments){
- FragmentHandle handle = data.getStatus().getHandle();
- switch(data.getStatus().getProfile().getState()){
- case SENDING:
- case AWAITING_ALLOCATION:
- case RUNNING:
- if(data.isLocal()){
- if(rootRunner != null){
- rootRunner.cancel();
- }
- }else{
- bee.getContext().getController().getTunnel(data.getEndpoint()).cancelFragment(new CancelListener(data.getEndpoint(), handle), handle);
- }
- break;
- default:
- break;
- }
+ case FAILED:
+ logger.warn("Dropping request to move to {} state as query is already at {} state (which is terminal).",
+ newState, state);
+ return;
}
+ throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s",
+ state.name(), newState.name()));
}
- private QueryStatus getStatus(){
- return queryManager.getStatus();
- }
-
- private void recordNewState(QueryState newState){
- this.state = newState;
- getStatus().updateQueryStateInStore(newState);
+ private void recordNewState(final QueryState newState) {
+ state = newState;
+ queryManager.updateQueryStateInStore(newState);
}
- private void runSQL(String sql) throws ExecutionSetupException {
- DrillSqlWorker sqlWorker = new DrillSqlWorker(context);
- Pointer<String> textPlan = new Pointer<>();
- PhysicalPlan plan = sqlWorker.getPlan(sql, textPlan);
- getStatus().setPlanText(textPlan.value);
+ private void runSQL(final String sql) throws ExecutionSetupException {
+ final DrillSqlWorker sqlWorker = new DrillSqlWorker(queryContext);
+ final Pointer<String> textPlan = new Pointer<>();
+ final PhysicalPlan plan = sqlWorker.getPlan(sql, textPlan);
+ queryManager.setPlanText(textPlan.value);
runPhysicalPlan(plan);
}
- private PhysicalPlan convert(LogicalPlan plan) throws OptimizerException {
+ private PhysicalPlan convert(final LogicalPlan plan) throws OptimizerException {
if (logger.isDebugEnabled()) {
- logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(context.getConfig()));
+ logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(queryContext.getConfig()));
}
- return new BasicOptimizer(DrillConfig.create(), context, initiatingClient).optimize(
- new BasicOptimizer.BasicOptimizationContext(context), plan);
+ return new BasicOptimizer(queryContext).optimize(
+ new BasicOptimizer.BasicOptimizationContext(queryContext), plan);
}
public QueryId getQueryId() {
return queryId;
}
- @Override
- public void close() throws IOException {
- }
-
- public QueryStatus getQueryStatus() {
- return this.queryManager.getStatus();
- }
-
- private void setupRootFragment(PlanFragment rootFragment, UserClientConnection rootClient, FragmentRoot rootOperator) throws ExecutionSetupException {
- FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment, rootClient, bee.getContext()
- .getFunctionImplementationRegistry());
-
- IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext);
-
+ /**
+ * Set up the root fragment (which will run locally), and submit it for execution.
+ *
+ * @param rootFragment
+ * @param rootClient
+ * @param rootOperator
+ * @throws ExecutionSetupException
+ */
+ private void setupRootFragment(final PlanFragment rootFragment, final UserClientConnection rootClient,
+ final FragmentRoot rootOperator) throws ExecutionSetupException {
+ @SuppressWarnings("resource")
+ final FragmentContext rootContext = new FragmentContext(drillbitContext, rootFragment, rootClient,
+ drillbitContext.getFunctionImplementationRegistry());
+ @SuppressWarnings("resource")
+ final IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext);
rootContext.setBuffers(buffers);
- // add fragment to local node.
queryManager.addFragmentStatusTracker(rootFragment, true);
- this.rootRunner = new FragmentExecutor(rootContext, bee, rootOperator, queryManager.getRootStatusHandler(rootContext, rootFragment));
- RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
+ rootRunner = new FragmentExecutor(rootContext, rootOperator,
+ queryManager.getRootStatusHandler(rootContext));
+ final RootFragmentManager fragmentManager =
+ new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner);
if (buffers.isDone()) {
// if we don't have to wait for any incoming data, start the fragment runner.
bee.addFragmentRunner(fragmentManager.getRunnable());
} else {
// if we do, record the fragment manager in the workBus.
- bee.getContext().getWorkBus().setFragmentManager(fragmentManager);
+ // TODO aren't we managing our own work? What does this do? It looks like this will never get run
+ drillbitContext.getWorkBus().addFragmentManager(fragmentManager);
}
}
- private void setupNonRootFragments(Collection<PlanFragment> fragments) throws ForemanException{
- Multimap<DrillbitEndpoint, PlanFragment> leafFragmentMap = ArrayListMultimap.create();
- Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = ArrayListMultimap.create();
+ /**
+ * Set up the non-root fragments for execution. Some may be local, and some may be remote.
+ * Messages are sent immediately, so they may start returning data even before we complete this.
+ *
+ * @param fragments the fragments
+ * @throws ForemanException
+ */
+ private void setupNonRootFragments(final Collection<PlanFragment> fragments) throws ForemanException {
+ /*
+ * We will send a single message to each endpoint, regardless of how many fragments will be
+ * executed there. We need to start up the intermediate fragments first so that they will be
+ * ready once the leaf fragments start producing data. To satisfy both of these, we will
+ * make a pass through the fragments and put them into these two maps according to their
+ * leaf/intermediate state, as well as their target drillbit.
+ */
+ final Multimap<DrillbitEndpoint, PlanFragment> leafFragmentMap = ArrayListMultimap.create();
+ final Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = ArrayListMultimap.create();
// record all fragments for status purposes.
- for (PlanFragment f : fragments) {
+ for (PlanFragment planFragment : fragments) {
// logger.debug("Tracking intermediate remote node {} with data {}", f.getAssignment(), f.getFragmentJson());
- queryManager.addFragmentStatusTracker(f, false);
- if (f.getLeafFragment()) {
- leafFragmentMap.put(f.getAssignment(), f);
+ queryManager.addFragmentStatusTracker(planFragment, false);
+ if (planFragment.getLeafFragment()) {
+ leafFragmentMap.put(planFragment.getAssignment(), planFragment);
} else {
- intFragmentMap.put(f.getAssignment(), f);
+ intFragmentMap.put(planFragment.getAssignment(), planFragment);
}
}
- CountDownLatch latch = new CountDownLatch(intFragmentMap.keySet().size());
+ /*
+ * We need to wait for the intermediates to be sent so that they'll be set up by the time
+ * the leaves start producing data. We'll use this latch to wait for the responses.
+ *
+ * However, in order not to hang the process if any of the RPC requests fails, we always
+ * count down (see FragmentSubmitFailures), but we count the number of failures so that we'll
+ * know if any submissions did fail.
+ */
+ final CountDownLatch endpointLatch = new CountDownLatch(intFragmentMap.keySet().size());
+ final FragmentSubmitFailures fragmentSubmitFailures = new FragmentSubmitFailures();
// send remote intermediate fragments
for (DrillbitEndpoint ep : intFragmentMap.keySet()) {
- sendRemoteFragments(ep, intFragmentMap.get(ep), latch);
+ sendRemoteFragments(ep, intFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures);
}
- // wait for send complete
- try {
- latch.await();
- } catch (InterruptedException e) {
- throw new ForemanException("Interrupted while waiting to complete send of remote fragments.", e);
+ // wait for the status of all requests sent above to be known
+ boolean ready = false;
+ while(!ready) {
+ try {
+ endpointLatch.await();
+ ready = true;
+ } catch (InterruptedException e) {
+ // if we weren't ready, the while loop will continue to wait
+ }
}
- // send remote (leaf) fragments.
- for (DrillbitEndpoint ep : leafFragmentMap.keySet()) {
- sendRemoteFragments(ep, leafFragmentMap.get(ep), null);
+ // if any of the intermediate fragment submissions failed, fail the query
+ final List<FragmentSubmitFailures.SubmissionException> submissionExceptions =
+ fragmentSubmitFailures.submissionExceptions;
+ if (submissionExceptions.size() > 0) {
+ throw new ForemanSetupException("Error setting up remote intermediate fragment execution",
+ submissionExceptions.get(0).rpcException);
+ // TODO indicate the failing drillbit?
+ // TODO report on all the failures?
}
- }
- public RpcOutcomeListener<Ack> getSubmitListener(DrillbitEndpoint endpoint, InitializeFragments value, CountDownLatch latch){
- return new FragmentSubmitListener(endpoint, value, latch);
+ /*
+ * Send the remote (leaf) fragments; we don't wait for these. Any problems will come in through
+ * the regular sendListener event delivery.
+ */
+ for (DrillbitEndpoint ep : leafFragmentMap.keySet()) {
+ sendRemoteFragments(ep, leafFragmentMap.get(ep), null, null);
+ }
}
- private void sendRemoteFragments(DrillbitEndpoint assignment, Collection<PlanFragment> fragments, CountDownLatch latch){
- Controller controller = bee.getContext().getController();
- InitializeFragments.Builder fb = InitializeFragments.newBuilder();
- for(PlanFragment f : fragments){
- fb.addFragment(f);
+ /**
+ * Send all the remote fragments belonging to a single target drillbit in one request.
+ *
+ * @param assignment the drillbit assigned to these fragments
+ * @param fragments the set of fragments
+ * @param latch the countdown latch used to track the requests to all endpoints
+ * @param fragmentSubmitFailures the submission failure counter used to track the requests to all endpoints
+ */
+ private void sendRemoteFragments(final DrillbitEndpoint assignment, final Collection<PlanFragment> fragments,
+ final CountDownLatch latch, final FragmentSubmitFailures fragmentSubmitFailures) {
+ @SuppressWarnings("resource")
+ final Controller controller = drillbitContext.getController();
+ final InitializeFragments.Builder fb = InitializeFragments.newBuilder();
+ for(PlanFragment planFragment : fragments) {
+ fb.addFragment(planFragment);
}
- InitializeFragments initFrags = fb.build();
+ final InitializeFragments initFrags = fb.build();
logger.debug("Sending remote fragments to node {} with data {}", assignment, initFrags);
- FragmentSubmitListener listener = new FragmentSubmitListener(assignment, initFrags, latch);
+ final FragmentSubmitListener listener =
+ new FragmentSubmitListener(assignment, initFrags, latch, fragmentSubmitFailures);
controller.getTunnel(assignment).sendFragments(listener, initFrags);
}
- public QueryState getState(){
+ public QueryState getState() {
return state;
}
- private class FragmentSubmitListener extends EndpointListener<Ack, InitializeFragments>{
+ /**
+ * Used by {@link FragmentSubmitListener} to track the number of submission failures.
+ */
+ private static class FragmentSubmitFailures {
+ static class SubmissionException {
+// final DrillbitEndpoint drillbitEndpoint;
+ final RpcException rpcException;
+
+ SubmissionException(@SuppressWarnings("unused") final DrillbitEndpoint drillbitEndpoint,
+ final RpcException rpcException) {
+// this.drillbitEndpoint = drillbitEndpoint;
+ this.rpcException = rpcException;
+ }
+ }
+
+ final List<SubmissionException> submissionExceptions = new LinkedList<>();
+
+ void addFailure(final DrillbitEndpoint drillbitEndpoint, final RpcException rpcException) {
+ submissionExceptions.add(new SubmissionException(drillbitEndpoint, rpcException));
+ }
+ }
- private CountDownLatch latch;
+ private class FragmentSubmitListener extends EndpointListener<Ack, InitializeFragments> {
+ private final CountDownLatch latch;
+ private final FragmentSubmitFailures fragmentSubmitFailures;
- public FragmentSubmitListener(DrillbitEndpoint endpoint, InitializeFragments value, CountDownLatch latch) {
+ /**
+ * Constructor.
+ *
+ * @param endpoint the endpoint for the submission
+ * @param value the initialize fragments message
+ * @param latch the latch to count down when the status is known; may be null
+ * @param fragmentSubmitFailures the counter to use for failures; must be non-null iff latch is non-null
+ */
+ public FragmentSubmitListener(final DrillbitEndpoint endpoint, final InitializeFragments value,
+ final CountDownLatch latch, final FragmentSubmitFailures fragmentSubmitFailures) {
super(endpoint, value);
+ Preconditions.checkState((latch == null) == (fragmentSubmitFailures == null));
this.latch = latch;
+ this.fragmentSubmitFailures = fragmentSubmitFailures;
}
@Override
- public void success(Ack ack, ByteBuf byteBuf) {
+ public void success(final Ack ack, final ByteBuf byteBuf) {
if (latch != null) {
latch.countDown();
}
}
@Override
- public void failed(RpcException ex) {
- logger.debug("Failure while sending fragment. Stopping query.", ex);
- moveToState(QueryState.FAILED, ex);
+ public void failed(final RpcException ex) {
+ if (latch != null) {
+ fragmentSubmitFailures.addFailure(endpoint, ex);
+ latch.countDown();
+ } else {
+ // since this won't be waited on, we can wait to deliver this event once the Foreman is ready
+ logger.debug("Failure while sending fragment. Stopping query.", ex);
+ stateListener.moveToState(QueryState.FAILED, ex);
+ }
}
-
}
-
+ /**
+ * Provides gated access to state transitions.
+ *
+ * <p>The StateListener waits on a latch before delivery state transitions to the Foreman. The
+ * latch will be tripped when the Foreman is sufficiently set up that it can receive and process
+ * external events from other threads.
+ */
public class StateListener {
- public boolean moveToState(QueryState newState, Exception ex){
- try {
- acceptExternalEvents.await();
- } catch(InterruptedException e){
- logger.warn("Interrupted while waiting to move state.", e);
- return false;
+ /**
+ * Move the Foreman to the specified new state.
+ *
+ * @param newState the state to move to
+ * @param ex if moving to a failure state, the exception that led to the failure; used for reporting
+ * to the user
+ */
+ public void moveToState(final QueryState newState, final Exception ex) {
+ boolean ready = false;
+ while(!ready) {
+ try {
+ acceptExternalEvents.await();
+ ready = true;
+ } catch(InterruptedException e) {
+ // if we're still not ready, the while loop will cause us to wait again
+ logger.warn("Interrupted while waiting to move state.", e);
+ }
}
- return Foreman.this.moveToState(newState, ex);
+ Foreman.this.moveToState(newState, ex);
}
}
-
- @Override
- public int compareTo(Object o) {
- return hashCode() - o.hashCode();
- }
-
+ /**
+ * Listens for the status of the RPC response sent to the user for the query.
+ */
private class ResponseSendListener extends BaseRpcOutcomeListener<Ack> {
@Override
- public void failed(RpcException ex) {
- logger
- .info(
- "Failure while trying communicate query result to initating client. This would happen if a client is disconnected before response notice can be sent.",
- ex);
- moveToState(QueryState.FAILED, ex);
+ public void failed(final RpcException ex) {
+ logger.info(
+ "Failure while trying communicate query result to initating client. This would happen if a client is disconnected before response notice can be sent.",
+ ex);
+ stateListener.moveToState(QueryState.FAILED, ex);
}
}
-
-
- private class CancelListener extends EndpointListener<Ack, FragmentHandle>{
-
- public CancelListener(DrillbitEndpoint endpoint, FragmentHandle handle) {
- super(endpoint, handle);
- }
-
- @Override
- public void failed(RpcException ex) {
- logger.error("Failure while attempting to cancel fragment {} on endpoint {}.", value, endpoint, ex);
- }
-
- @Override
- public void success(Ack value, ByteBuf buf) {
- if(!value.getOk()){
- logger.warn("Remote node {} responded negative on cancellation request for fragment {}.", endpoint, value);
- }
- // do nothing.
- }
-
- }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
index 52fd0a92e..433ab260a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java
@@ -29,19 +29,21 @@ public class FragmentData {
private volatile long lastStatusUpdate = 0;
private final DrillbitEndpoint endpoint;
- public FragmentData(FragmentHandle handle, DrillbitEndpoint endpoint, boolean isLocal) {
- super();
- MinorFragmentProfile f = MinorFragmentProfile.newBuilder() //
- .setState(FragmentState.SENDING) //
- .setMinorFragmentId(handle.getMinorFragmentId()) //
- .setEndpoint(endpoint) //
- .build();
- this.status = FragmentStatus.newBuilder().setHandle(handle).setProfile(f).build();
+ public FragmentData(final FragmentHandle handle, final DrillbitEndpoint endpoint, final boolean isLocal) {
this.endpoint = endpoint;
this.isLocal = isLocal;
+ final MinorFragmentProfile f = MinorFragmentProfile.newBuilder()
+ .setState(FragmentState.SENDING)
+ .setMinorFragmentId(handle.getMinorFragmentId())
+ .setEndpoint(endpoint)
+ .build();
+ status = FragmentStatus.newBuilder()
+ .setHandle(handle)
+ .setProfile(f)
+ .build();
}
- public void setStatus(FragmentStatus status){
+ public void setStatus(final FragmentStatus status) {
this.status = status;
lastStatusUpdate = System.currentTimeMillis();
}
@@ -54,15 +56,11 @@ public class FragmentData {
return isLocal;
}
- public long getLastStatusUpdate() {
- return lastStatusUpdate;
- }
-
public DrillbitEndpoint getEndpoint() {
return endpoint;
}
- public FragmentHandle getHandle(){
+ public FragmentHandle getHandle() {
return status.getHandle();
}
@@ -71,7 +69,4 @@ public class FragmentData {
return "FragmentData [isLocal=" + isLocal + ", status=" + status + ", lastStatusUpdate=" + lastStatusUpdate
+ ", endpoint=" + endpoint + "]";
}
-
-
-
} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java
index 6a719d2a1..b2a40aea2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java
@@ -20,7 +20,5 @@ package org.apache.drill.exec.work.foreman;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
public interface FragmentStatusListener {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStatusListener.class);
-
public void statusUpdate(FragmentStatus status);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index 2de3592b7..8626d5b54 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -17,144 +17,353 @@
*/
package org.apache.drill.exec.work.foreman;
+import io.netty.buffer.ByteBuf;
+
+import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.base.Preconditions;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
-import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
+import org.apache.drill.exec.proto.SchemaUserBitShared;
+import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.UserBitShared.QueryInfo;
+import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserProtos.RunQuery;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.RemoteRpcException;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.control.Controller;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.store.sys.PStore;
+import org.apache.drill.exec.store.sys.PStoreConfig;
import org.apache.drill.exec.store.sys.PStoreProvider;
+import org.apache.drill.exec.work.EndpointListener;
import org.apache.drill.exec.work.foreman.Foreman.StateListener;
import org.apache.drill.exec.work.fragment.AbstractStatusReporter;
+import org.apache.drill.exec.work.fragment.FragmentExecutor;
+import com.carrotsearch.hppc.IntObjectOpenHashMap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-
/**
* Each Foreman holds its own QueryManager. This manages the events associated with execution of a particular query across all fragments.
*/
-public class QueryManager implements FragmentStatusListener, DrillbitStatusListener{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryManager.class);
- private final Set<DrillbitEndpoint> includedBits;
+public class QueryManager implements FragmentStatusListener, DrillbitStatusListener {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryManager.class);
+
+ public static final PStoreConfig<QueryProfile> QUERY_PROFILE = PStoreConfig.
+ newProtoBuilder(SchemaUserBitShared.QueryProfile.WRITE, SchemaUserBitShared.QueryProfile.MERGE)
+ .name("profiles")
+ .blob()
+ .max(100)
+ .build();
- private final QueryStatus status;
+ public static final PStoreConfig<QueryInfo> RUNNING_QUERY_INFO = PStoreConfig.
+ newProtoBuilder(SchemaUserBitShared.QueryInfo.WRITE, SchemaUserBitShared.QueryInfo.MERGE)
+ .name("running")
+ .ephemeral()
+ .build();
+
+ private final Set<DrillbitEndpoint> includedBits;
private final StateListener stateListener;
- private final AtomicInteger remainingFragmentCount;
private final QueryId queryId;
+ private final String stringQueryId;
+ private final RunQuery runQuery;
+ private final Foreman foreman;
+
+ /*
+ * Doesn't need to be thread safe as fragmentDataMap is generated in a single thread and then
+ * accessed by multiple threads for reads only.
+ */
+ private final IntObjectOpenHashMap<IntObjectOpenHashMap<FragmentData>> fragmentDataMap =
+ new IntObjectOpenHashMap<>();
+ private final List<FragmentData> fragmentDataSet = Lists.newArrayList();
+
+ private final PStore<QueryProfile> profilePStore;
+ private final PStore<QueryInfo> profileEStore;
- public QueryManager(QueryId id, RunQuery query, PStoreProvider pStoreProvider, StateListener stateListener, Foreman foreman) {
+ // the following mutable variables are used to capture ongoing query status
+ private String planText;
+ private long startTime;
+ private long endTime;
+ private final AtomicInteger finishedFragments = new AtomicInteger(0);
+
+ public QueryManager(final QueryId queryId, final RunQuery runQuery, final PStoreProvider pStoreProvider,
+ final StateListener stateListener, final Foreman foreman) {
+ this.queryId = queryId;
+ this.runQuery = runQuery;
this.stateListener = stateListener;
- this.queryId = id;
- this.remainingFragmentCount = new AtomicInteger(0);
- this.status = new QueryStatus(query, id, pStoreProvider, foreman);
- this.includedBits = Sets.newHashSet();
- }
+ this.foreman = foreman;
+
+ stringQueryId = QueryIdHelper.getQueryId(queryId);
+ try {
+ profilePStore = pStoreProvider.getStore(QUERY_PROFILE);
+ profileEStore = pStoreProvider.getStore(RUNNING_QUERY_INFO);
+ } catch (IOException e) {
+ throw new DrillRuntimeException(e);
+ }
- public QueryStatus getStatus(){
- return status;
+ includedBits = Sets.newHashSet();
}
@Override
- public void drillbitRegistered(Set<DrillbitEndpoint> registeredDrillbits) {
+ public void drillbitRegistered(final Set<DrillbitEndpoint> registeredDrillbits) {
}
@Override
- public void drillbitUnregistered(Set<DrillbitEndpoint> unregisteredDrillbits) {
- for(DrillbitEndpoint ep : unregisteredDrillbits){
- if(this.includedBits.contains(ep)){
- logger.warn("Drillbit {} no longer registered in cluster. Canceling query {}", ep.getAddress() + ep.getControlPort(), QueryIdHelper.getQueryId(queryId));
- this.stateListener.moveToState(QueryState.FAILED, new ForemanException("One more more nodes lost connectivity during query. Identified node was " + ep.getAddress()));
+ public void drillbitUnregistered(final Set<DrillbitEndpoint> unregisteredDrillbits) {
+ for(DrillbitEndpoint ep : unregisteredDrillbits) {
+ if (includedBits.contains(ep)) {
+ logger.warn("Drillbit {} no longer registered in cluster. Canceling query {}",
+ ep.getAddress() + ep.getControlPort(), QueryIdHelper.getQueryId(queryId));
+ stateListener.moveToState(QueryState.FAILED,
+ new ForemanException("One more more nodes lost connectivity during query. Identified node was "
+ + ep.getAddress()));
}
}
}
-
@Override
- public void statusUpdate(FragmentStatus status) {
-
- logger.debug("New fragment status was provided to Foreman of {}", status);
- switch(status.getProfile().getState()){
+ public void statusUpdate(final FragmentStatus status) {
+ logger.debug("New fragment status was provided to QueryManager of {}", status);
+ switch(status.getProfile().getState()) {
case AWAITING_ALLOCATION:
+ case RUNNING:
updateFragmentStatus(status);
break;
+
+ case FINISHED:
+ fragmentDone(status);
+ break;
+
case CANCELLED:
- //TODO: define a new query state to distinguish the state of early termination from cancellation
+ /*
+ * TODO
+ * This doesn't seem right; shouldn't this be similar to FAILED?
+ * and this means once all are cancelled we'll get to COMPLETED, even though some weren't?
+ *
+ * So, we add it to the finishedFragments if we ourselves we receive a statusUpdate (from where),
+ * but not if our cancellation listener gets it?
+ */
+ // TODO(DRILL-2370) we might not get these, so we need to take extra care for cleanup
fragmentDone(status);
break;
+
case FAILED:
stateListener.moveToState(QueryState.FAILED, new RemoteRpcException(status.getProfile().getError()));
break;
- case FINISHED:
- fragmentDone(status);
- break;
- case RUNNING:
- updateFragmentStatus(status);
- break;
+
default:
throw new UnsupportedOperationException(String.format("Received status of %s", status));
}
}
- private void updateFragmentStatus(FragmentStatus status){
- this.status.updateFragmentStatus(status);
+ private void updateFragmentStatus(final FragmentStatus fragmentStatus) {
+ final FragmentHandle fragmentHandle = fragmentStatus.getHandle();
+ final int majorFragmentId = fragmentHandle.getMajorFragmentId();
+ final int minorFragmentId = fragmentHandle.getMinorFragmentId();
+ fragmentDataMap.get(majorFragmentId).get(minorFragmentId).setStatus(fragmentStatus);
}
- private void fragmentDone(FragmentStatus status){
- this.status.incrementFinishedFragments();
- int remaining = remainingFragmentCount.decrementAndGet();
+ private void fragmentDone(final FragmentStatus status) {
updateFragmentStatus(status);
+
+ final int finishedFragments = this.finishedFragments.incrementAndGet();
+ final int totalFragments = fragmentDataSet.size();
+ assert finishedFragments <= totalFragments : "The finished fragment count exceeds the total fragment count";
+ final int remaining = totalFragments - finishedFragments;
logger.debug("waiting for {} fragments", remaining);
- if(remaining == 0){
+ if (remaining == 0) {
+ // this target state may be adjusted in moveToState() based on current FAILURE/CANCELLATION_REQUESTED status
stateListener.moveToState(QueryState.COMPLETED, null);
}
}
- public void setup(FragmentHandle rootFragmentHandle, DrillbitEndpoint localIdentity, int countOfNonRootFragments){
- remainingFragmentCount.set(countOfNonRootFragments + 1);
- logger.debug("foreman is waiting for {} fragments to finish", countOfNonRootFragments + 1);
- status.add(new FragmentData(rootFragmentHandle, localIdentity, true));
- this.status.setTotalFragments(countOfNonRootFragments + 1);
+ private void addFragment(final FragmentData fragmentData) {
+ final FragmentHandle fragmentHandle = fragmentData.getHandle();
+ final int majorFragmentId = fragmentHandle.getMajorFragmentId();
+ final int minorFragmentId = fragmentHandle.getMinorFragmentId();
- List<FragmentData> fragments = status.getFragmentData();
- for (FragmentData fragment : fragments) {
- this.includedBits.add(fragment.getEndpoint());
+ IntObjectOpenHashMap<FragmentData> minorMap = fragmentDataMap.get(majorFragmentId);
+ if (minorMap == null) {
+ minorMap = new IntObjectOpenHashMap<>();
+ fragmentDataMap.put(majorFragmentId, minorMap);
}
+ minorMap.put(minorFragmentId, fragmentData);
+ fragmentDataSet.add(fragmentData);
+
+ // keep track of all the drill bits that are used by this query
+ includedBits.add(fragmentData.getEndpoint());
+ }
+
+ public String getFragmentStatesAsString() {
+ return fragmentDataMap.toString();
}
- public void addFragmentStatusTracker(PlanFragment fragment, boolean isRoot){
- addFragmentStatusTracker(fragment.getHandle(), fragment.getAssignment(), isRoot);
+ void addFragmentStatusTracker(final PlanFragment fragment, final boolean isRoot) {
+ addFragment(new FragmentData(fragment.getHandle(), fragment.getAssignment(), isRoot));
}
- public void addFragmentStatusTracker(FragmentHandle handle, DrillbitEndpoint node, boolean isRoot){
- status.add(new FragmentData(handle, node, isRoot));
+ /**
+ * Stop all fragments with a currently active status.
+ */
+ void cancelExecutingFragments(final DrillbitContext drillbitContext, final FragmentExecutor rootRunner) {
+ final Controller controller = drillbitContext.getController();
+ for(FragmentData data : fragmentDataSet) {
+ final FragmentStatus fragmentStatus = data.getStatus();
+ switch(fragmentStatus.getProfile().getState()) {
+ case SENDING:
+ case AWAITING_ALLOCATION:
+ case RUNNING:
+ if (rootRunner != null) {
+ rootRunner.cancel();
+ } else {
+ final DrillbitEndpoint endpoint = data.getEndpoint();
+ final FragmentHandle handle = fragmentStatus.getHandle();
+ // TODO is the CancelListener redundant? Does the FragmentStatusListener get notified of the same?
+ controller.getTunnel(endpoint).cancelFragment(new CancelListener(endpoint, handle), handle);
+ }
+ break;
+
+ case FINISHED:
+ case CANCELLED:
+ case FAILED:
+ // nothing to do
+ break;
+ }
+ }
}
- public RootStatusReporter getRootStatusHandler(FragmentContext context, PlanFragment fragment){
- return new RootStatusReporter(context, fragment);
+ /*
+ * This assumes that the FragmentStatusListener implementation takes action when it hears
+ * that the target fragment has been cancelled. As a result, this listener doesn't do anything
+ * but log messages.
+ */
+ private class CancelListener extends EndpointListener<Ack, FragmentHandle> {
+ public CancelListener(final DrillbitEndpoint endpoint, final FragmentHandle handle) {
+ super(endpoint, handle);
+ }
+
+ @Override
+ public void failed(final RpcException ex) {
+ logger.error("Failure while attempting to cancel fragment {} on endpoint {}.", value, endpoint, ex);
+ }
+
+ @Override
+ public void success(final Ack value, final ByteBuf buf) {
+ if (!value.getOk()) {
+ logger.warn("Remote node {} responded negative on cancellation request for fragment {}.", endpoint, value);
+ }
+ }
}
- class RootStatusReporter extends AbstractStatusReporter{
+ public RootStatusReporter getRootStatusHandler(final FragmentContext context) {
+ return new RootStatusReporter(context);
+ }
- private RootStatusReporter(FragmentContext context, PlanFragment fragment){
+ class RootStatusReporter extends AbstractStatusReporter {
+ private RootStatusReporter(final FragmentContext context) {
super(context);
}
@Override
- protected void statusChange(FragmentHandle handle, FragmentStatus status) {
+ protected void statusChange(final FragmentHandle handle, final FragmentStatus status) {
statusUpdate(status);
}
+ }
+
+ QueryState updateQueryStateInStore(final QueryState queryState) {
+ switch (queryState) {
+ case PENDING:
+ case RUNNING:
+ case CANCELLATION_REQUESTED:
+ profileEStore.put(stringQueryId, getQueryInfo()); // store as ephemeral query profile.
+ break;
+
+ case COMPLETED:
+ case CANCELED:
+ case FAILED:
+ try {
+ profileEStore.delete(stringQueryId);
+ } catch(Exception e) {
+ logger.warn("Failure while trying to delete the estore profile for this query.", e);
+ }
+
+ // TODO(DRILL-2362) when do these ever get deleted?
+ profilePStore.put(stringQueryId, getQueryProfile());
+ break;
+
+ default:
+ throw new IllegalStateException("unrecognized queryState " + queryState);
+ }
+
+ return queryState;
+ }
+
+ private QueryInfo getQueryInfo() {
+ return QueryInfo.newBuilder()
+ .setQuery(runQuery.getPlan())
+ .setState(foreman.getState())
+ .setForeman(foreman.getQueryContext().getCurrentEndpoint())
+ .setStart(startTime)
+ .build();
+ }
+
+ public QueryProfile getQueryProfile() {
+ final QueryProfile.Builder profileBuilder = QueryProfile.newBuilder()
+ .setQuery(runQuery.getPlan())
+ .setType(runQuery.getType())
+ .setId(queryId)
+ .setState(foreman.getState())
+ .setForeman(foreman.getQueryContext().getCurrentEndpoint())
+ .setStart(startTime)
+ .setEnd(endTime)
+ .setTotalFragments(fragmentDataSet.size())
+ .setFinishedFragments(finishedFragments.get());
+
+ if (planText != null) {
+ profileBuilder.setPlan(planText);
+ }
+
+ for (int i = 0; i < fragmentDataMap.allocated.length; i++) {
+ if (fragmentDataMap.allocated[i]) {
+ final int majorFragmentId = fragmentDataMap.keys[i];
+ final IntObjectOpenHashMap<FragmentData> minorMap =
+ (IntObjectOpenHashMap<FragmentData>) ((Object[]) fragmentDataMap.values)[i];
+ final MajorFragmentProfile.Builder fb = MajorFragmentProfile.newBuilder()
+ .setMajorFragmentId(majorFragmentId);
+ for (int v = 0; v < minorMap.allocated.length; v++) {
+ if (minorMap.allocated[v]) {
+ final FragmentData data = (FragmentData) ((Object[]) minorMap.values)[v];
+ fb.addMinorFragmentProfile(data.getStatus().getProfile());
+ }
+ }
+ profileBuilder.addFragmentProfile(fb);
+ }
+ }
+ return profileBuilder.build();
+ }
+
+ void setPlanText(final String planText) {
+ this.planText = planText;
+ }
+ void markStartTime() {
+ startTime = System.currentTimeMillis();
}
+ void markEndTime() {
+ endTime = System.currentTimeMillis();
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
deleted file mode 100644
index 4e18da631..000000000
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.work.foreman;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.drill.common.exceptions.DrillRuntimeException;
-import org.apache.drill.exec.proto.BitControl.FragmentStatus;
-import org.apache.drill.exec.proto.SchemaUserBitShared;
-import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
-import org.apache.drill.exec.proto.UserBitShared.QueryId;
-import org.apache.drill.exec.proto.UserBitShared.QueryInfo;
-import org.apache.drill.exec.proto.UserBitShared.QueryProfile;
-import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
-import org.apache.drill.exec.proto.UserProtos.RunQuery;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
-import org.apache.drill.exec.store.sys.EStore;
-import org.apache.drill.exec.store.sys.PStore;
-import org.apache.drill.exec.store.sys.PStoreConfig;
-import org.apache.drill.exec.store.sys.PStoreProvider;
-
-import com.carrotsearch.hppc.IntObjectOpenHashMap;
-import com.google.common.collect.Lists;
-
-public class QueryStatus {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryStatus.class);
-
- public static final PStoreConfig<QueryProfile> QUERY_PROFILE = PStoreConfig.
- newProtoBuilder(SchemaUserBitShared.QueryProfile.WRITE, SchemaUserBitShared.QueryProfile.MERGE) //
- .name("profiles") //
- .blob() //
- .max(100) //
- .build();
-
- public static final PStoreConfig<QueryInfo> RUNNING_QUERY_INFO = PStoreConfig.
- newProtoBuilder(SchemaUserBitShared.QueryInfo.WRITE, SchemaUserBitShared.QueryInfo.MERGE).name("running").ephemeral().build();
-
- // doesn't need to be thread safe as fragmentDataMap is generated in a single thread and then accessed by multiple threads for reads only.
- private IntObjectOpenHashMap<IntObjectOpenHashMap<FragmentData>> fragmentDataMap = new IntObjectOpenHashMap<IntObjectOpenHashMap<FragmentData>>();
- private List<FragmentData> fragmentDataSet = Lists.newArrayList();
-
- private final String queryId;
- private final QueryId id;
- private RunQuery query;
- private String planText;
- private Foreman foreman;
- private long startTime;
- private long endTime;
- private int totalFragments;
- private int finishedFragments = 0;
-
- private final PStore<QueryProfile> profilePStore;
- private final PStore<QueryInfo> profileEStore;
-
- public QueryStatus(RunQuery query, QueryId id, PStoreProvider provider, Foreman foreman) {
- this.id = id;
- this.query = query;
- this.queryId = QueryIdHelper.getQueryId(id);
- try {
- this.profilePStore = provider.getStore(QUERY_PROFILE);
- this.profileEStore = provider.getStore(RUNNING_QUERY_INFO);
- } catch (IOException e) {
- throw new DrillRuntimeException(e);
- }
- this.foreman = foreman;
- }
-
- public List<FragmentData> getFragmentData() {
- return fragmentDataSet;
- }
-
- public void setPlanText(String planText) {
- this.planText = planText;
- }
-
- public void markStart() {
- this.startTime = System.currentTimeMillis();
- }
-
- public void setEndTime(long endTime) {
- this.endTime = endTime;
- }
-
- public void setTotalFragments(int totalFragments) {
- this.totalFragments = totalFragments;
- }
-
- public void incrementFinishedFragments() {
- finishedFragments++;
- assert finishedFragments <= totalFragments;
- }
-
- void add(FragmentData data) {
- int majorFragmentId = data.getHandle().getMajorFragmentId();
- int minorFragmentId = data.getHandle().getMinorFragmentId();
- IntObjectOpenHashMap<FragmentData> minorMap = fragmentDataMap.get(majorFragmentId);
- if (minorMap == null) {
- minorMap = new IntObjectOpenHashMap<FragmentData>();
- fragmentDataMap.put(majorFragmentId, minorMap);
- }
-
- minorMap.put(minorFragmentId, data);
- fragmentDataSet.add(data);
- }
-
- void updateFragmentStatus(FragmentStatus fragmentStatus) {
- int majorFragmentId = fragmentStatus.getHandle().getMajorFragmentId();
- int minorFragmentId = fragmentStatus.getHandle().getMinorFragmentId();
- fragmentDataMap.get(majorFragmentId).get(minorFragmentId).setStatus(fragmentStatus);
- }
-
- synchronized QueryState updateQueryStateInStore(QueryState queryState) {
- switch (queryState) {
- case PENDING:
- case RUNNING:
- profileEStore.put(queryId, getAsInfo()); // store as ephemeral query profile.
- break;
- case COMPLETED:
- case CANCELED:
- case FAILED:
- try{
- profileEStore.delete(queryId);
- }catch(Exception e){
- logger.warn("Failure while trying to delete the estore profile for this query.", e);
- }
-
- profilePStore.put(queryId, getAsProfile());
- break;
- default:
- throw new IllegalStateException();
- }
- return queryState;
- }
-
- @Override
- public String toString() {
- return fragmentDataMap.toString();
- }
-
- public static class FragmentId{
- int major;
- int minor;
-
- public FragmentId(FragmentStatus status) {
- this.major = status.getHandle().getMajorFragmentId();
- this.minor = status.getHandle().getMinorFragmentId();
- }
-
- public FragmentId(FragmentData data) {
- this.major = data.getHandle().getMajorFragmentId();
- this.minor = data.getHandle().getMinorFragmentId();
- }
-
- public FragmentId(int major, int minor) {
- super();
- this.major = major;
- this.minor = minor;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + major;
- result = prime * result + minor;
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- FragmentId other = (FragmentId) obj;
- if (major != other.major) {
- return false;
- }
- if (minor != other.minor) {
- return false;
- }
- return true;
- }
-
- @Override
- public String toString() {
- return major + ":" + minor;
- }
- }
-
- public QueryInfo getAsInfo() {
- return QueryInfo.newBuilder() //
- .setQuery(query.getPlan())
- .setState(foreman.getState())
- .setForeman(foreman.getContext().getCurrentEndpoint())
- .setStart(startTime)
- .build();
- }
-
- public QueryProfile getAsProfile() {
- QueryProfile.Builder b = QueryProfile.newBuilder();
- b.setQuery(query.getPlan());
- b.setType(query.getType());
- if (planText != null) {
- b.setPlan(planText);
- }
- b.setId(id);
- for (int i = 0; i < fragmentDataMap.allocated.length; i++) {
- if (fragmentDataMap.allocated[i]) {
- int majorFragmentId = fragmentDataMap.keys[i];
- IntObjectOpenHashMap<FragmentData> minorMap = (IntObjectOpenHashMap<FragmentData>) ((Object[]) fragmentDataMap.values)[i];
-
- MajorFragmentProfile.Builder fb = MajorFragmentProfile.newBuilder();
- fb.setMajorFragmentId(majorFragmentId);
- for (int v = 0; v < minorMap.allocated.length; v++) {
- if (minorMap.allocated[v]) {
- FragmentData data = (FragmentData) ((Object[]) minorMap.values)[v];
- fb.addMinorFragmentProfile(data.getStatus().getProfile());
- }
- }
- b.addFragmentProfile(fb);
- }
- }
-
- b.setState(foreman.getState());
- b.setForeman(foreman.getContext().getCurrentEndpoint());
- b.setStart(startTime);
- b.setEnd(endTime);
- b.setTotalFragments(totalFragments);
- b.setFinishedFragments(finishedFragments);
- return b.build();
- }
-
-}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index e2f7bbf8c..b6176db71 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -18,70 +18,65 @@
package org.apache.drill.exec.work.fragment;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.drill.common.DeferredException;
+import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.FragmentStats;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.impl.ImplCreator;
import org.apache.drill.exec.physical.impl.RootExec;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.UserBitShared.FragmentState;
-import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
-import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
-import org.apache.drill.exec.work.CancelableQuery;
-import org.apache.drill.exec.work.StatusProvider;
-import org.apache.drill.exec.work.WorkManager.WorkerBee;
import org.apache.drill.exec.work.foreman.DrillbitStatusListener;
/**
- * Responsible for running a single fragment on a single Drillbit. Listens/responds to status request and cancellation
- * messages.
+ * Responsible for running a single fragment on a single Drillbit. Listens/responds to status request
+ * and cancellation messages.
*/
-public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvider, Comparable<Object>{
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentExecutor.class);
+public class FragmentExecutor implements Runnable {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentExecutor.class);
private final AtomicInteger state = new AtomicInteger(FragmentState.AWAITING_ALLOCATION_VALUE);
private final FragmentRoot rootOperator;
- private final FragmentContext context;
- private final WorkerBee bee;
+ private final FragmentContext fragmentContext;
private final StatusReporter listener;
- private final DrillbitStatusListener drillbitStatusListener = new FragmentDrillbitStatusListener();
-
+ private volatile boolean closed;
private RootExec root;
- private boolean closed;
- public FragmentExecutor(FragmentContext context, WorkerBee bee, FragmentRoot rootOperator, StatusReporter listener) {
- this.context = context;
- this.bee = bee;
+ public FragmentExecutor(final FragmentContext context, final FragmentRoot rootOperator,
+ final StatusReporter listener) {
+ this.fragmentContext = context;
this.rootOperator = rootOperator;
this.listener = listener;
}
- @Override
public FragmentStatus getStatus() {
- // If the query is not in a running state, the operator tree is still being constructed and
- // there is no reason to poll for intermediate results.
-
- // Previously the call to get the operator stats with the AbstractStatusReporter was happening
- // before this check. This caused a concurrent modification exception as the list of operator
- // stats is iterated over while collecting info, and added to while building the operator tree.
- if(state.get() != FragmentState.RUNNING_VALUE){
+ /*
+ * If the query is not in a running state, the operator tree is still being constructed and
+ * there is no reason to poll for intermediate results.
+ *
+ * Previously the call to get the operator stats with the AbstractStatusReporter was happening
+ * before this check. This caused a concurrent modification exception as the list of operator
+ * stats is iterated over while collecting info, and added to while building the operator tree.
+ */
+ if(state.get() != FragmentState.RUNNING_VALUE) {
return null;
}
- FragmentStatus status = AbstractStatusReporter.getBuilder(context, FragmentState.RUNNING, null, null).build();
+ final FragmentStatus status =
+ AbstractStatusReporter.getBuilder(fragmentContext, FragmentState.RUNNING, null, null).build();
return status;
}
- @Override
public void cancel() {
+ // Note this will be called outside of run(), from another thread
updateState(FragmentState.CANCELLED);
- logger.debug("Cancelled Fragment {}", context.getHandle());
- context.cancel();
+ logger.debug("Cancelled Fragment {}", fragmentContext.getHandle());
+ fragmentContext.cancel();
}
public void receivingFragmentFinished(FragmentHandle handle) {
@@ -91,98 +86,116 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
}
}
- public UserClientConnection getClient() {
- return context.getConnection();
- }
-
@Override
public void run() {
- final String originalThread = Thread.currentThread().getName();
- try {
- String newThreadName = String.format("%s:frag:%s:%s", //
- QueryIdHelper.getQueryId(context.getHandle().getQueryId()), //
- context.getHandle().getMajorFragmentId(),
- context.getHandle().getMinorFragmentId()
- );
- Thread.currentThread().setName(newThreadName);
+ final Thread myThread = Thread.currentThread();
+ final String originalThreadName = myThread.getName();
+ final FragmentHandle fragmentHandle = fragmentContext.getHandle();
+ final ClusterCoordinator clusterCoordinator = fragmentContext.getDrillbitContext().getClusterCoordinator();
+ final DrillbitStatusListener drillbitStatusListener = new FragmentDrillbitStatusListener();
- root = ImplCreator.getExec(context, rootOperator);
+ try {
+ final String newThreadName = String.format("%s:frag:%s:%s",
+ QueryIdHelper.getQueryId(fragmentHandle.getQueryId()),
+ fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId());
+ myThread.setName(newThreadName);
- context.getDrillbitContext().getClusterCoordinator().addDrillbitStatusListener(drillbitStatusListener);
+ root = ImplCreator.getExec(fragmentContext, rootOperator);
+ clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener);
- logger.debug("Starting fragment runner. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
+ logger.debug("Starting fragment runner. {}:{}",
+ fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId());
if (!updateStateOrFail(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING)) {
logger.warn("Unable to set fragment state to RUNNING. Cancelled or failed?");
return;
}
- // run the query until root.next returns false.
+ /*
+ * Run the query until root.next returns false.
+ * Note that we closeOutResources() here if we're done. That's because this can also throw
+ * exceptions that we want to treat as failures of the request, even if the request did fine
+ * up until this point. Any failures there will be caught in the catch clause below, which
+ * will be reported to the user. If they were to come from the finally clause, the uncaught
+ * exception there will simply terminate this thread without alerting the user -- the
+ * behavior then is to hang.
+ */
while (state.get() == FragmentState.RUNNING_VALUE) {
if (!root.next()) {
- if (context.isFailed()) {
- internalFail(context.getFailureCause());
- closeOutResources(false);
+ if (fragmentContext.isFailed()) {
+ internalFail(fragmentContext.getFailureCause());
+ closeOutResources();
} else {
- closeOutResources(true); // make sure to close out resources before we report success.
+ /*
+ * Close out resources before we report success. We do this so that we'll get an
+ * error if there's a problem cleaning up, even though the query execution portion
+ * succeeded.
+ */
+ closeOutResources();
updateStateOrFail(FragmentState.RUNNING, FragmentState.FINISHED);
}
-
break;
}
}
} catch (AssertionError | Exception e) {
logger.warn("Error while initializing or executing fragment", e);
- context.fail(e);
+ fragmentContext.fail(e);
internalFail(e);
} finally {
- bee.removeFragment(context.getHandle());
- context.getDrillbitContext().getClusterCoordinator().removeDrillbitStatusListener(drillbitStatusListener);
+ clusterCoordinator.removeDrillbitStatusListener(drillbitStatusListener);
// Final check to make sure RecordBatches are cleaned up.
- closeOutResources(false);
+ closeOutResources();
- Thread.currentThread().setName(originalThread);
+ myThread.setName(originalThreadName);
}
}
- private void closeOutResources(boolean throwFailure) {
+ private static final String CLOSE_FAILURE = "Failure while closing out resources";
+
+ private void closeOutResources() {
+ /*
+ * Because of the way this method can be called, it needs to be idempotent; it must
+ * be safe to call it more than once. We use this flag to bypass the body if it has
+ * been called before.
+ */
if (closed) {
return;
}
+ final DeferredException deferredException = fragmentContext.getDeferredException();
try {
- root.stop();
+ root.stop(); // TODO make this an AutoCloseable so we can detect lack of closure
} catch (RuntimeException e) {
- if (throwFailure) {
- throw e;
- }
- logger.warn("Failure while closing out resources.", e);
+ logger.warn(CLOSE_FAILURE, e);
+ deferredException.addException(e);
}
+ closed = true;
+
+ /*
+ * This must be last, because this may throw deferred exceptions.
+ * We are forced to wrap the checked exception (if any) so that it will be unchecked.
+ */
try {
- context.close();
- } catch (Exception e) {
- if (throwFailure) {
- throw new RuntimeException("Error closing fragment context.", e);
- }
- logger.warn("Failure while closing out resources.", e);
+ fragmentContext.close();
+ } catch(Exception e) {
+ throw new RuntimeException("Error closing fragment context.", e);
}
-
- closed = true;
}
- private void internalFail(Throwable excep) {
+ private void internalFail(final Throwable excep) {
state.set(FragmentState.FAILED_VALUE);
- listener.fail(context.getHandle(), "Failure while running fragment.", excep);
+ listener.fail(fragmentContext.getHandle(), "Failure while running fragment.", excep);
}
/**
* Updates the fragment state with the given state
+ *
* @param to target state
*/
- protected void updateState(FragmentState to) {;
+ private void updateState(final FragmentState to) {
state.set(to.getNumber());
- listener.stateChanged(context.getHandle(), to);
+ listener.stateChanged(fragmentContext.getHandle(), to);
}
/**
@@ -192,10 +205,10 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
* @param to target state
* @return true only if update succeeds
*/
- protected boolean checkAndUpdateState(FragmentState expected, FragmentState to) {
- boolean success = state.compareAndSet(expected.getNumber(), to.getNumber());
+ private boolean checkAndUpdateState(final FragmentState expected, final FragmentState to) {
+ final boolean success = state.compareAndSet(expected.getNumber(), to.getNumber());
if (success) {
- listener.stateChanged(context.getHandle(), to);
+ listener.stateChanged(fragmentContext.getHandle(), to);
} else {
logger.debug("State change failed. Expected state: {} -- target state: {} -- current state: {}.",
expected.name(), to.name(), FragmentState.valueOf(state.get()));
@@ -206,7 +219,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
/**
* Returns true if the fragment is in a terminal state
*/
- protected boolean isCompleted() {
+ private boolean isCompleted() {
return state.get() == FragmentState.CANCELLED_VALUE
|| state.get() == FragmentState.FAILED_VALUE
|| state.get() == FragmentState.FINISHED_VALUE;
@@ -220,38 +233,32 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
* @param to target state
* @return true only if update succeeds
*/
- protected boolean updateStateOrFail(FragmentState expected, FragmentState to) {
+ private boolean updateStateOrFail(final FragmentState expected, final FragmentState to) {
final boolean updated = checkAndUpdateState(expected, to);
if (!updated && !isCompleted()) {
final String msg = "State was different than expected while attempting to update state from %s to %s however current state was %s.";
- internalFail(new StateTransitionException(String.format(msg, expected.name(), to.name(), FragmentState.valueOf(state.get()))));
+ internalFail(new StateTransitionException(
+ String.format(msg, expected.name(), to.name(), FragmentState.valueOf(state.get()))));
}
return updated;
}
-
- @Override
- public int compareTo(Object o) {
- return o.hashCode() - this.hashCode();
- }
-
public FragmentContext getContext() {
- return context;
+ return fragmentContext;
}
private class FragmentDrillbitStatusListener implements DrillbitStatusListener {
-
@Override
- public void drillbitRegistered(Set<CoordinationProtos.DrillbitEndpoint> registeredDrillbits) {
- // Do nothing.
+ public void drillbitRegistered(final Set<CoordinationProtos.DrillbitEndpoint> registeredDrillbits) {
}
@Override
- public void drillbitUnregistered(Set<CoordinationProtos.DrillbitEndpoint> unregisteredDrillbits) {
- if (unregisteredDrillbits.contains(FragmentExecutor.this.context.getForemanEndpoint())) {
- logger.warn("Forman : {} no longer active. Cancelling fragment {}.",
- FragmentExecutor.this.context.getForemanEndpoint().getAddress(),
- QueryIdHelper.getQueryIdentifier(context.getHandle()));
+ public void drillbitUnregistered(final Set<CoordinationProtos.DrillbitEndpoint> unregisteredDrillbits) {
+ // if the defunct Drillbit was running our Foreman, then cancel the query
+ final DrillbitEndpoint foremanEndpoint = FragmentExecutor.this.fragmentContext.getForemanEndpoint();
+ if (unregisteredDrillbits.contains(foremanEndpoint)) {
+ logger.warn("Foreman : {} no longer active. Cancelling fragment {}.",
+ foremanEndpoint.getAddress(), QueryIdHelper.getQueryIdentifier(fragmentContext.getHandle()));
FragmentExecutor.this.cancel();
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
index 3671804e4..41e87cd50 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java
@@ -30,13 +30,13 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.record.RawFragmentBatch;
import org.apache.drill.exec.rpc.RemoteConnection;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.work.WorkManager.WorkerBee;
import org.apache.drill.exec.work.batch.IncomingBuffers;
import org.apache.drill.exec.work.foreman.ForemanException;
/**
* This managers determines when to run a non-root fragment node.
*/
+// TODO a lot of this is the same as RootFragmentManager
public class NonRootFragmentManager implements FragmentManager {
private final PlanFragment fragment;
private FragmentRoot root;
@@ -44,15 +44,13 @@ public class NonRootFragmentManager implements FragmentManager {
private final StatusReporter runnerListener;
private volatile FragmentExecutor runner;
private volatile boolean cancel = false;
- private final WorkerBee bee;
private final FragmentContext context;
private List<RemoteConnection> connections = new CopyOnWriteArrayList<>();
- public NonRootFragmentManager(PlanFragment fragment, WorkerBee bee) throws ExecutionSetupException {
+ public NonRootFragmentManager(final PlanFragment fragment, final DrillbitContext context)
+ throws ExecutionSetupException {
try {
this.fragment = fragment;
- DrillbitContext context = bee.getContext();
- this.bee = bee;
this.root = context.getPlanReader().readFragmentOperator(fragment.getFragmentJson());
this.context = new FragmentContext(context, fragment, null, context.getFunctionImplementationRegistry());
this.buffers = new IncomingBuffers(root, this.context);
@@ -84,8 +82,8 @@ public class NonRootFragmentManager implements FragmentManager {
if (cancel) {
return null;
}
- runner = new FragmentExecutor(context, bee, root, runnerListener);
- return this.runner;
+ runner = new FragmentExecutor(context, root, runnerListener);
+ return runner;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
index 54fc8c41f..84071c3c0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java
@@ -28,6 +28,7 @@ import org.apache.drill.exec.record.RawFragmentBatch;
import org.apache.drill.exec.rpc.RemoteConnection;
import org.apache.drill.exec.work.batch.IncomingBuffers;
+// TODO a lot of this is the same as NonRootFragmentManager
public class RootFragmentManager implements FragmentManager{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootFragmentManager.class);