diff options
Diffstat (limited to 'exec')
78 files changed, 3285 insertions, 1768 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 93d06f00f..cd0a0a285 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -202,4 +202,8 @@ public interface ExecConstants { public static final String ENABLE_WINDOW_FUNCTIONS = "window.enable"; public static final OptionValidator ENABLE_WINDOW_FUNCTIONS_VALIDATOR = new BooleanValidator(ENABLE_WINDOW_FUNCTIONS, false); + + public static final String DRILLBIT_EXCEPTION_INJECTIONS = "drill.exec.testing.exception-injections"; + public static final OptionValidator DRILLBIT_EXCEPTION_INJECTIONS_VALIDATOR = + new StringValidator(DRILLBIT_EXCEPTION_INJECTIONS, ""); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java index 04b955bbb..c3a873ccc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/client/DrillClient.java @@ -98,7 +98,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{ public DrillClient(DrillConfig config, ClusterCoordinator coordinator, BufferAllocator allocator) { this.ownsZkConnection = coordinator == null; this.ownsAllocator = allocator == null; - this.allocator = allocator == null ? new TopLevelAllocator(config) : allocator; + this.allocator = ownsAllocator ? new TopLevelAllocator(config) : allocator; this.config = config; this.clusterCoordinator = coordinator; this.reconnectTimes = config.getInt(ExecConstants.BIT_RETRY_TIMES); @@ -131,7 +131,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{ /** * Connects the client to a Drillbit server * - * @throws IOException + * @throws RpcException */ public void connect() throws RpcException { connect(null, new Properties()); @@ -176,7 +176,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{ connected = true; } - protected EventLoopGroup createEventLoop(int size, String prefix) { + protected static EventLoopGroup createEventLoop(int size, String prefix) { return TransportCheck.createEventLoopGroup(size, prefix); } @@ -204,12 +204,8 @@ public class DrillClient implements Closeable, ConnectionThrottle{ private void connect(DrillbitEndpoint endpoint) throws RpcException { FutureHandler f = new FutureHandler(); - try { - client.connect(f, endpoint, props, getUserCredentials()); - f.checkedGet(); - } catch (InterruptedException e) { - throw new RpcException(e); - } + client.connect(f, endpoint, props, getUserCredentials()); + f.checkedGet(); } public BufferAllocator getAllocator() { @@ -219,6 +215,7 @@ public class DrillClient implements Closeable, ConnectionThrottle{ /** * Closes this client's connection to the server */ + @Override public void close() { if (this.client != null) { this.client.close(); @@ -286,15 +283,13 @@ public class DrillClient implements Closeable, ConnectionThrottle{ * Submits a Logical plan for direct execution (bypasses parsing) * * @param plan the plan to execute - * @return a handle for the query result - * @throws RpcException */ public void runQuery(QueryType type, String plan, UserResultsListener resultsListener) { client.submitQuery(resultsListener, newBuilder().setResultsMode(STREAM_FULL).setType(type).setPlan(plan).build()); } private class ListHoldingResultsListener implements UserResultsListener { - private Vector<QueryResultBatch> results = new Vector<QueryResultBatch>(); + private Vector<QueryResultBatch> results = new Vector<>(); private SettableFuture<List<QueryResultBatch>> future = SettableFuture.create(); private UserProtos.RunQuery query ; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java index 5c657248c..493f6ce8e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/compile/ClassTransformer.java @@ -294,14 +294,11 @@ public class ClassTransformer { if (templateDefinition.getExternalInterface().isAssignableFrom(c)) { logger.debug("Done compiling (bytecode size={}, time:{} millis).", DrillStringUtils.readable(totalBytecodeSize), (System.nanoTime() - t1) / 1000000); return c; - } else { - throw new ClassTransformationException("The requested class did not implement the expected interface."); } + + throw new ClassTransformationException("The requested class did not implement the expected interface."); } catch (CompileException | IOException | ClassNotFoundException e) { throw new ClassTransformationException(String.format("Failure generating transformation classes for value: \n %s", entireClass), e); } - } - } - diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java index 035c1aa2c..27868f01a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/local/LocalClusterCoordinator.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Collection; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -32,10 +33,15 @@ import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import com.google.common.collect.Maps; public class LocalClusterCoordinator extends ClusterCoordinator { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalClusterCoordinator.class); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(LocalClusterCoordinator.class); - private volatile Map<RegistrationHandle, DrillbitEndpoint> endpoints = Maps.newConcurrentMap(); - private volatile ConcurrentMap<String, DistributedSemaphore> semaphores = Maps.newConcurrentMap(); + /* + * Since we hand out the endpoints list in {@see #getAvailableEndpoints()}, we use a + * {@see java.util.concurrent.ConcurrentHashMap} because those guarantee not to throw + * ConcurrentModificationException. + */ + private final Map<RegistrationHandle, DrillbitEndpoint> endpoints = new ConcurrentHashMap<>(); + private final ConcurrentMap<String, DistributedSemaphore> semaphores = Maps.newConcurrentMap(); @Override public void close() throws IOException { @@ -43,21 +49,21 @@ public class LocalClusterCoordinator extends ClusterCoordinator { } @Override - public void start(long millis) throws Exception { + public void start(final long millis) throws Exception { logger.debug("Local Cluster Coordinator started."); } @Override - public RegistrationHandle register(DrillbitEndpoint data) { + public RegistrationHandle register(final DrillbitEndpoint data) { logger.debug("Endpoint registered {}.", data); - Handle h = new Handle(); + final Handle h = new Handle(); endpoints.put(h, data); return h; } @Override - public void unregister(RegistrationHandle handle) { - if(handle == null) { + public void unregister(final RegistrationHandle handle) { + if (handle == null) { return; } @@ -69,8 +75,8 @@ public class LocalClusterCoordinator extends ClusterCoordinator { return endpoints.values(); } - private class Handle implements RegistrationHandle{ - UUID id = UUID.randomUUID(); + private class Handle implements RegistrationHandle { + private final UUID id = UUID.randomUUID(); @Override public int hashCode() { @@ -82,7 +88,7 @@ public class LocalClusterCoordinator extends ClusterCoordinator { } @Override - public boolean equals(Object obj) { + public boolean equals(final Object obj) { if (this == obj) { return true; } @@ -92,7 +98,7 @@ public class LocalClusterCoordinator extends ClusterCoordinator { if (getClass() != obj.getClass()) { return false; } - Handle other = (Handle) obj; + final Handle other = (Handle) obj; if (!getOuterType().equals(other.getOuterType())) { return false; } @@ -109,41 +115,38 @@ public class LocalClusterCoordinator extends ClusterCoordinator { private LocalClusterCoordinator getOuterType() { return LocalClusterCoordinator.this; } - } @Override - public DistributedSemaphore getSemaphore(String name, int maximumLeases) { - semaphores.putIfAbsent(name, new LocalSemaphore(maximumLeases)); + public DistributedSemaphore getSemaphore(final String name, final int maximumLeases) { + if (!semaphores.containsKey(name)) { + semaphores.putIfAbsent(name, new LocalSemaphore(maximumLeases)); + } return semaphores.get(name); } - public class LocalSemaphore implements DistributedSemaphore{ - - private final Semaphore inner; - private final LocalLease lease = new LocalLease(); + public class LocalSemaphore implements DistributedSemaphore { + private final Semaphore semaphore; + private final LocalLease localLease = new LocalLease(); - public LocalSemaphore(int size) { - inner = new Semaphore(size); + public LocalSemaphore(final int size) { + semaphore = new Semaphore(size); } @Override - public DistributedLease acquire(long timeout, TimeUnit unit) throws Exception { - if(!inner.tryAcquire(timeout, unit)) { + public DistributedLease acquire(final long timeout, final TimeUnit timeUnit) throws Exception { + if (!semaphore.tryAcquire(timeout, timeUnit)) { return null; - }else{ - return lease; + } else { + return localLease; } } - private class LocalLease implements DistributedLease{ - + private class LocalLease implements DistributedLease { @Override public void close() throws Exception { - inner.release(); + semaphore.release(); } - } } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java index 2a6660e0e..5e31e5cc2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java @@ -19,7 +19,6 @@ package org.apache.drill.exec.ops; import io.netty.buffer.DrillBuf; -import java.io.Closeable; import java.io.IOException; import java.util.List; import java.util.Map; @@ -27,9 +26,9 @@ import java.util.Map; import net.hydromatic.optiq.SchemaPlus; import net.hydromatic.optiq.jdbc.SimpleOptiqSchema; +import org.apache.drill.common.DeferredException; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.exec.compile.QueryClassLoader; import org.apache.drill.exec.exception.ClassTransformationException; import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.CodeGenerator; @@ -39,7 +38,6 @@ import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; -import org.apache.drill.exec.proto.UserBitShared.UserCredentials; import org.apache.drill.exec.rpc.control.ControlTunnel; import org.apache.drill.exec.rpc.data.DataTunnel; import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection; @@ -49,53 +47,52 @@ import org.apache.drill.exec.server.options.OptionList; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.work.batch.IncomingBuffers; -import com.carrotsearch.hppc.LongObjectOpenHashMap; -import com.google.common.collect.Lists; import com.google.common.collect.Maps; /** * Contextual objects required for execution of a particular fragment. */ public class FragmentContext implements AutoCloseable, UdfUtilities { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class); - - - private Map<DrillbitEndpoint, DataTunnel> tunnels = Maps.newHashMap(); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentContext.class); + private final Map<DrillbitEndpoint, DataTunnel> tunnels = Maps.newHashMap(); private final DrillbitContext context; private final UserClientConnection connection; private final FragmentStats stats; private final FunctionImplementationRegistry funcRegistry; - private final QueryClassLoader loader; private final BufferAllocator allocator; private final PlanFragment fragment; - private List<Thread> daemonThreads = Lists.newLinkedList(); - private QueryDateTimeInfo queryDateTimeInfo; + private final QueryDateTimeInfo queryDateTimeInfo; private IncomingBuffers buffers; private final OptionManager fragmentOptions; - private final UserCredentials credentials; private final BufferManager bufferManager; - private volatile Throwable failureCause; + private final DeferredException deferredException = new DeferredException(); private volatile FragmentContextState state = FragmentContextState.OK; + /* + * TODO we need a state that indicates that cancellation has been requested and + * is in progress. Early termination (such as from limit queries) could also use + * this, as the cleanup steps should be exactly the same. + */ private static enum FragmentContextState { OK, FAILED, CANCELED } - public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection, - FunctionImplementationRegistry funcRegistry) throws OutOfMemoryException, ExecutionSetupException { - + public FragmentContext(final DrillbitContext dbContext, final PlanFragment fragment, + final UserClientConnection connection, final FunctionImplementationRegistry funcRegistry) + throws ExecutionSetupException { this.context = dbContext; this.connection = connection; this.fragment = fragment; this.funcRegistry = funcRegistry; - this.credentials = fragment.getCredentials(); - this.queryDateTimeInfo = new QueryDateTimeInfo(fragment.getQueryStartTime(), fragment.getTimeZone()); + queryDateTimeInfo = new QueryDateTimeInfo(fragment.getQueryStartTime(), fragment.getTimeZone()); + logger.debug("Getting initial memory allocation of {}", fragment.getMemInitial()); logger.debug("Fragment max allocation: {}", fragment.getMemMax()); + try { OptionList list; if (!fragment.hasOptionsJson() || fragment.getOptionsJson().isEmpty()) { @@ -103,7 +100,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities { } else { list = dbContext.getConfig().getMapper().readValue(fragment.getOptionsJson(), OptionList.class); } - this.fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list); + fragmentOptions = new FragmentOptionManager(context.getOptionManager(), list); } catch (Exception e) { throw new ExecutionSetupException("Failure while reading plan options.", e); } @@ -111,15 +108,15 @@ public class FragmentContext implements AutoCloseable, UdfUtilities { // Add the fragment context to the root allocator. // The QueryManager will call the root allocator to recalculate all the memory limits for all the fragments try { - this.allocator = dbContext.getAllocator().getChildAllocator(this, fragment.getMemInitial(), fragment.getMemMax(), true); + allocator = context.getAllocator().getChildAllocator( + this, fragment.getMemInitial(), fragment.getMemMax(), true); assert (allocator != null); - }catch(Throwable e){ + } catch(Throwable e) { throw new ExecutionSetupException("Failure while getting memory allocator for fragment.", e); } - this.stats = new FragmentStats(allocator, dbContext.getMetrics(), fragment.getAssignment()); - this.bufferManager = new BufferManager(this.allocator, this); - this.loader = new QueryClassLoader(dbContext.getConfig(), fragmentOptions); + stats = new FragmentStats(allocator, dbContext.getMetrics(), fragment.getAssignment()); + bufferManager = new BufferManager(this.allocator, this); } public OptionManager getOptions() { @@ -133,7 +130,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities { public void fail(Throwable cause) { logger.error("Fragment Context received failure.", cause); setState(FragmentContextState.FAILED); - failureCause = cause; + deferredException.addThrowable(cause); } public void cancel() { @@ -161,11 +158,11 @@ public class FragmentContext implements AutoCloseable, UdfUtilities { fail(new UnsupportedOperationException("Schema tree can only be created in root fragment. " + "This is a non-root fragment.")); return null; - } else { - SchemaPlus root = SimpleOptiqSchema.createRootSchema(false); - context.getStorage().getSchemaFactory().registerSchemas(connection.getSession(), root); - return root; } + + final SchemaPlus root = SimpleOptiqSchema.createRootSchema(false); + context.getStorage().getSchemaFactory().registerSchemas(connection.getSession(), root); + return root; } /** @@ -177,14 +174,17 @@ public class FragmentContext implements AutoCloseable, UdfUtilities { } public FragmentStats getStats() { - return this.stats; + return stats; } + @Override public QueryDateTimeInfo getQueryDateTimeInfo(){ return this.queryDateTimeInfo; } - public DrillbitEndpoint getForemanEndpoint() {return fragment.getForeman();} + public DrillbitEndpoint getForemanEndpoint() { + return fragment.getForeman(); + } /** * The FragmentHandle for this Fragment @@ -194,31 +194,37 @@ public class FragmentContext implements AutoCloseable, UdfUtilities { return fragment.getHandle(); } + private String getFragIdString() { + final FragmentHandle handle = getHandle(); + final String frag = handle != null ? handle.getMajorFragmentId() + ":" + handle.getMinorFragmentId() : "0:0"; + return frag; + } + /** * Get this fragment's allocator. - * @return + * @return the allocator */ @Deprecated public BufferAllocator getAllocator() { - if(allocator == null){ - FragmentHandle handle=getHandle(); - String frag=handle!=null?handle.getMajorFragmentId()+":"+handle.getMinorFragmentId():"0:0"; - logger.debug("Fragment:"+frag+" Allocator is NULL"); + if (allocator == null) { + logger.debug("Fragment: " + getFragIdString() + " Allocator is NULL"); } return allocator; } - public BufferAllocator getNewChildAllocator(long initialReservation, - long maximumReservation, - boolean applyFragmentLimit) throws OutOfMemoryException { + public BufferAllocator getNewChildAllocator(final long initialReservation, + final long maximumReservation, + final boolean applyFragmentLimit) throws OutOfMemoryException { return allocator.getChildAllocator(this, initialReservation, maximumReservation, applyFragmentLimit); } - public <T> T getImplementationClass(ClassGenerator<T> cg) throws ClassTransformationException, IOException { + public <T> T getImplementationClass(final ClassGenerator<T> cg) + throws ClassTransformationException, IOException { return getImplementationClass(cg.getCodeGenerator()); } - public <T> T getImplementationClass(CodeGenerator<T> cg) throws ClassTransformationException, IOException { + public <T> T getImplementationClass(final CodeGenerator<T> cg) + throws ClassTransformationException, IOException { return context.getCompiler().getImplementationClass(cg); } @@ -238,7 +244,7 @@ public class FragmentContext implements AutoCloseable, UdfUtilities { return connection; } - public ControlTunnel getControlTunnel(DrillbitEndpoint endpoint) { + public ControlTunnel getControlTunnel(final DrillbitEndpoint endpoint) { return context.getController().getTunnel(endpoint); } @@ -251,24 +257,12 @@ public class FragmentContext implements AutoCloseable, UdfUtilities { return tunnel; } - /** - * Add a new thread to this fragment's context. This thread will likely run for the life of the fragment but should be - * terminated when the fragment completes. When the fragment completes, the threads will be interrupted. - * - * @param thread - */ - public void addDaemonThread(Thread thread) { - daemonThreads.add(thread); - thread.start(); - - } - public IncomingBuffers getBuffers() { return buffers; } public Throwable getFailureCause() { - return failureCause; + return deferredException.getException(); } public boolean isFailed() { @@ -283,43 +277,36 @@ public class FragmentContext implements AutoCloseable, UdfUtilities { return funcRegistry; } - public UserCredentials getCredentials() { - return credentials; - } - - public QueryClassLoader getClassLoader() { - return loader; - } - public DrillConfig getConfig() { return context.getConfig(); } - public void setFragmentLimit(long limit) { - this.allocator.setFragmentLimit(limit); + public void setFragmentLimit(final long limit) { + allocator.setFragmentLimit(limit); + } + + public DeferredException getDeferredException() { + return deferredException; } @Override public void close() throws Exception { - for (Thread thread: daemonThreads) { - thread.interrupt(); - } - bufferManager.close(); - - if (buffers != null) { - buffers.close(); - } + /* + * TODO wait for threads working on this Fragment to terminate (or at least stop working + * on this Fragment's query) + */ + deferredException.suppressingClose(bufferManager); + deferredException.suppressingClose(buffers); + deferredException.suppressingClose(allocator); - FragmentHandle handle=getHandle(); - String frag=handle!=null?handle.getMajorFragmentId()+":"+handle.getMinorFragmentId():"0:0"; - allocator.close(); - logger.debug("Fragment:"+frag+" After close allocator is: "+allocator!=null?"OK":"NULL"); + deferredException.close(); // must be last, as this may throw } public DrillBuf replace(DrillBuf old, int newSize) { return bufferManager.replace(old, newSize); } + @Override public DrillBuf getManagedBuffer() { return bufferManager.getManagedBuffer(); } @@ -327,5 +314,4 @@ public class FragmentContext implements AutoCloseable, UdfUtilities { public DrillBuf getManagedBuffer(int size) { return bufferManager.getManagedBuffer(size); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java index fb6b4aa13..3b51a6916 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java @@ -24,35 +24,30 @@ import net.hydromatic.optiq.SchemaPlus; import net.hydromatic.optiq.jdbc.SimpleOptiqSchema; import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; import org.apache.drill.exec.expr.fn.impl.DateUtility; -import org.apache.drill.exec.planner.PhysicalPlanReader; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.planner.sql.DrillOperatorTable; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; -import org.apache.drill.exec.proto.UserBitShared.QueryId; -import org.apache.drill.exec.rpc.control.WorkEventBus; -import org.apache.drill.exec.rpc.data.DataConnectionCreator; import org.apache.drill.exec.rpc.user.UserSession; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.server.options.QueryOptionManager; import org.apache.drill.exec.store.StoragePluginRegistry; +// TODO except for a couple of tests, this is only created by Foreman +// TODO the many methods that just return drillbitContext.getXxx() should be replaced with getDrillbitContext() // TODO - consider re-name to PlanningContext, as the query execution context actually appears // in fragment contexts -public class QueryContext implements AutoCloseable, UdfUtilities{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class); +public class QueryContext implements AutoCloseable, UdfUtilities { +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class); - private final QueryId queryId; private final DrillbitContext drillbitContext; - private final WorkEventBus workBus; - private UserSession session; - private OptionManager queryOptions; + private final UserSession session; + private final OptionManager queryOptions; private final PlannerSettings plannerSettings; private final DrillOperatorTable table; @@ -62,32 +57,32 @@ public class QueryContext implements AutoCloseable, UdfUtilities{ private static final int INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES = 1024 * 1024; private static final int MAX_OFF_HEAP_ALLOCATION_IN_BYTES = 16 * 1024 * 1024; - // flag to indicate if close has been called, after calling close the first - // time this is set to true and the close method becomes a no-op + /* + * Flag to indicate if close has been called, after calling close the first + * time this is set to true and the close method becomes a no-op. + */ private boolean closed = false; - public QueryContext(final UserSession session, QueryId queryId, final DrillbitContext drllbitContext) { - super(); - this.queryId = queryId; + public QueryContext(final UserSession session, final DrillbitContext drllbitContext) { this.drillbitContext = drllbitContext; - this.workBus = drllbitContext.getWorkBus(); this.session = session; - this.queryOptions = new QueryOptionManager(session.getOptions()); - this.plannerSettings = new PlannerSettings(queryOptions, getFunctionRegistry()); + queryOptions = new QueryOptionManager(session.getOptions()); + plannerSettings = new PlannerSettings(queryOptions, getFunctionRegistry()); plannerSettings.setNumEndPoints(drillbitContext.getBits().size()); - this.table = new DrillOperatorTable(getFunctionRegistry()); + table = new DrillOperatorTable(getFunctionRegistry()); - long queryStartTime = System.currentTimeMillis(); - int timeZone = DateUtility.getIndex(System.getProperty("user.timezone")); - this.queryDateTimeInfo = new QueryDateTimeInfo(queryStartTime, timeZone); + final long queryStartTime = System.currentTimeMillis(); + final int timeZone = DateUtility.getIndex(System.getProperty("user.timezone")); + queryDateTimeInfo = new QueryDateTimeInfo(queryStartTime, timeZone); try { - this.allocator = drllbitContext.getAllocator().getChildAllocator(null, INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES, MAX_OFF_HEAP_ALLOCATION_IN_BYTES, false); + allocator = drllbitContext.getAllocator().getChildAllocator(null, INITIAL_OFF_HEAP_ALLOCATION_IN_BYTES, + MAX_OFF_HEAP_ALLOCATION_IN_BYTES, false); } catch (OutOfMemoryException e) { throw new DrillRuntimeException("Error creating off-heap allocator for planning context.",e); } // TODO(DRILL-1942) the new allocator has this capability built-in, so this can be removed once that is available - this.bufferManager = new BufferManager(this.allocator, null); + bufferManager = new BufferManager(this.allocator, null); } @@ -95,7 +90,7 @@ public class QueryContext implements AutoCloseable, UdfUtilities{ return plannerSettings; } - public UserSession getSession(){ + public UserSession getSession() { return session; } @@ -103,18 +98,18 @@ public class QueryContext implements AutoCloseable, UdfUtilities{ return allocator; } - public SchemaPlus getNewDefaultSchema(){ - SchemaPlus rootSchema = getRootSchema(); - SchemaPlus defaultSchema = session.getDefaultSchema(rootSchema); - if(defaultSchema == null){ + public SchemaPlus getNewDefaultSchema() { + final SchemaPlus rootSchema = getRootSchema(); + final SchemaPlus defaultSchema = session.getDefaultSchema(rootSchema); + if (defaultSchema == null) { return rootSchema; - }else{ - return defaultSchema; } + + return defaultSchema; } - public SchemaPlus getRootSchema(){ - SchemaPlus rootSchema = SimpleOptiqSchema.createRootSchema(false); + public SchemaPlus getRootSchema() { + final SchemaPlus rootSchema = SimpleOptiqSchema.createRootSchema(false); drillbitContext.getSchemaFactory().registerSchemas(session, rootSchema); return rootSchema; } @@ -123,43 +118,23 @@ public class QueryContext implements AutoCloseable, UdfUtilities{ return queryOptions; } - public OptionManager getSessionOptions() { - return session.getOptions(); - } - - public DrillbitEndpoint getCurrentEndpoint(){ + public DrillbitEndpoint getCurrentEndpoint() { return drillbitContext.getEndpoint(); } - public QueryId getQueryId() { - return queryId; - } - - public StoragePluginRegistry getStorage(){ + public StoragePluginRegistry getStorage() { return drillbitContext.getStorage(); } - public Collection<DrillbitEndpoint> getActiveEndpoints(){ + public Collection<DrillbitEndpoint> getActiveEndpoints() { return drillbitContext.getBits(); } - public PhysicalPlanReader getPlanReader(){ - return drillbitContext.getPlanReader(); - } - - public DataConnectionCreator getDataConnectionsPool(){ - return drillbitContext.getDataConnectionsPool(); - } - - public DrillConfig getConfig(){ + public DrillConfig getConfig() { return drillbitContext.getConfig(); } - public WorkEventBus getWorkBus(){ - return workBus; - } - - public FunctionImplementationRegistry getFunctionRegistry(){ + public FunctionImplementationRegistry getFunctionRegistry() { return drillbitContext.getFunctionImplementationRegistry(); } @@ -167,10 +142,6 @@ public class QueryContext implements AutoCloseable, UdfUtilities{ return table; } - public ClusterCoordinator getClusterCoordinator() { - return drillbitContext.getClusterCoordinator(); - } - @Override public QueryDateTimeInfo getQueryDateTimeInfo() { return queryDateTimeInfo; @@ -183,10 +154,13 @@ public class QueryContext implements AutoCloseable, UdfUtilities{ @Override public void close() throws Exception { - if (!closed) { - // TODO(DRILL-1942) the new allocator has this capability built-in, so this can be removed once that is available - bufferManager.close(); - allocator.close(); + try { + if (!closed) { + // TODO(DRILL-1942) the new allocator has this capability built-in, so this can be removed once that is available + bufferManager.close(); + allocator.close(); + } + } finally { closed = true; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java index 9f89f249b..b1a71a5d5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/BasicOptimizer.java @@ -29,6 +29,7 @@ import org.apache.drill.common.logical.data.Filter; import org.apache.drill.common.logical.data.GroupingAggregate; import org.apache.drill.common.logical.data.Join; import org.apache.drill.common.logical.data.JoinCondition; +import org.apache.drill.common.logical.data.LogicalOperator; import org.apache.drill.common.logical.data.NamedExpression; import org.apache.drill.common.logical.data.Order; import org.apache.drill.common.logical.data.Order.Ordering; @@ -52,10 +53,8 @@ import org.apache.drill.exec.physical.config.SelectionVectorRemover; import org.apache.drill.exec.physical.config.Sort; import org.apache.drill.exec.physical.config.StreamingAggregate; import org.apache.drill.exec.physical.config.WindowPOP; -import org.apache.drill.exec.rpc.user.UserServer; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.store.StoragePlugin; -import org.apache.drill.exec.work.foreman.ForemanException; import org.eigenbase.rel.RelFieldCollation.Direction; import org.eigenbase.rel.RelFieldCollation.NullDirection; @@ -63,87 +62,49 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Iterator; import java.util.List; -public class BasicOptimizer extends Optimizer{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicOptimizer.class); +public class BasicOptimizer extends Optimizer { +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicOptimizer.class); - private DrillConfig config; - private QueryContext context; - private UserServer.UserClientConnection userSession; + private final QueryContext queryContext; - public BasicOptimizer(DrillConfig config, QueryContext context, UserServer.UserClientConnection userSession){ - this.config = config; - this.context = context; - this.userSession = userSession; - logCurrentOptionValues(); - } - - private void logCurrentOptionValues(){ -// Iterator<DrillOptionValue> optionVals = userSession.getSessionOptionIterator(); -// DrillOptionValue val = null; -// String output = ""; -// output += "SessionOptions: {\n"; -// for ( ;optionVals.hasNext(); val = optionVals.next()){ -// if (val != null) { -// output += val.getOptionName() + ":" + val.getValue() + ",\n"; -// } -// } -// output += "}"; -// logger.debug(output); - } - - /** - * Get the current value of an option. Session options override global options. - * - * @param name - the name of the option - * @return - value of the option - */ - private Object getOptionValue(String name) { -// Object val = userSession.getSessionLevelOption(name); -// if (val == null) { -// context.getOptionValue(name); -// } -// return val; - return null; + public BasicOptimizer(final QueryContext queryContext) { + this.queryContext = queryContext; } @Override - public void init(DrillConfig config) { - + public void init(final DrillConfig config) { } @Override - public PhysicalPlan optimize(OptimizationContext context, LogicalPlan plan) throws OptimizerException{ - Object obj = new Object(); - Collection<SinkOperator> roots = plan.getGraph().getRoots(); - List<PhysicalOperator> physOps = new ArrayList<PhysicalOperator>(roots.size()); - LogicalConverter converter = new LogicalConverter(plan); - for ( SinkOperator op : roots){ - PhysicalOperator pop = op.accept(converter, obj); + public PhysicalPlan optimize(final OptimizationContext context, final LogicalPlan plan) + throws OptimizerException { + final Object obj = new Object(); + final Collection<SinkOperator> roots = plan.getGraph().getRoots(); + final List<PhysicalOperator> physOps = new ArrayList<>(roots.size()); + final LogicalConverter converter = new LogicalConverter(plan); + + for (SinkOperator op : roots) { + final PhysicalOperator pop = op.accept(converter, obj); physOps.add(pop); } - PlanProperties props = PlanProperties.builder() + final PlanProperties logicalProperties = plan.getProperties(); + final PlanProperties props = PlanProperties.builder() .type(PlanProperties.PlanType.APACHE_DRILL_PHYSICAL) - .version(plan.getProperties().version) - .generator(plan.getProperties().generator) + .version(logicalProperties.version) + .generator(logicalProperties.generator) .options(new JSONOptions(context.getOptions().getOptionList())).build(); - PhysicalPlan p = new PhysicalPlan(props, physOps); + final PhysicalPlan p = new PhysicalPlan(props, physOps); return p; - //return new PhysicalPlan(props, physOps); - } - - @Override - public void close() { - } public static class BasicOptimizationContext implements OptimizationContext { - - private OptionManager ops; - public BasicOptimizationContext(QueryContext c){ - this.ops = c.getOptions(); + private final OptionManager ops; + public BasicOptimizationContext(final QueryContext c) { + ops = c.getOptions(); } @Override @@ -159,64 +120,65 @@ public class BasicOptimizer extends Optimizer{ private class LogicalConverter extends AbstractLogicalVisitor<PhysicalOperator, Object, OptimizerException> { - // storing a reference to the plan for access to other elements outside of the query graph - // such as the storage engine configs - LogicalPlan logicalPlan; + /* + * Store a reference to the plan for access to other elements outside of the query graph + * such as the storage engine configs. + */ + private final LogicalPlan logicalPlan; - public LogicalConverter(LogicalPlan logicalPlan){ + public LogicalConverter(final LogicalPlan logicalPlan) { this.logicalPlan = logicalPlan; } @Override public PhysicalOperator visitGroupingAggregate(GroupingAggregate groupBy, Object value) throws OptimizerException { - - List<Ordering> orderDefs = Lists.newArrayList(); - - + final List<Ordering> orderDefs = Lists.newArrayList(); PhysicalOperator input = groupBy.getInput().accept(this, value); - if(groupBy.getKeys().length > 0){ - for(NamedExpression e : groupBy.getKeys()){ + if (groupBy.getKeys().length > 0) { + for(NamedExpression e : groupBy.getKeys()) { orderDefs.add(new Ordering(Direction.ASCENDING, e.getExpr(), NullDirection.FIRST)); } input = new Sort(input, orderDefs, false); } - StreamingAggregate sa = new StreamingAggregate(input, groupBy.getKeys(), groupBy.getExprs(), 1.0f); + final StreamingAggregate sa = new StreamingAggregate(input, groupBy.getKeys(), groupBy.getExprs(), 1.0f); return sa; } @Override - public PhysicalOperator visitWindow(Window window, Object value) throws OptimizerException { + public PhysicalOperator visitWindow(final Window window, final Object value) throws OptimizerException { PhysicalOperator input = window.getInput().accept(this, value); - - List<Ordering> ods = Lists.newArrayList(); + final List<Ordering> ods = Lists.newArrayList(); input = new Sort(input, ods, false); - return new WindowPOP(input, window.getWithins(), window.getAggregations(), window.getOrderings(), window.getStart(), window.getEnd()); + return new WindowPOP(input, window.getWithins(), window.getAggregations(), + window.getOrderings(), window.getStart(), window.getEnd()); } @Override - public PhysicalOperator visitOrder(Order order, Object value) throws OptimizerException { - PhysicalOperator input = order.getInput().accept(this, value); - List<Ordering> ods = Lists.newArrayList(); + public PhysicalOperator visitOrder(final Order order, final Object value) throws OptimizerException { + final PhysicalOperator input = order.getInput().accept(this, value); + final List<Ordering> ods = Lists.newArrayList(); for (Ordering o : order.getOrderings()){ ods.add(o); } + return new SelectionVectorRemover(new Sort(input, ods, false)); } @Override - public PhysicalOperator visitLimit(org.apache.drill.common.logical.data.Limit limit, Object value) throws OptimizerException { - PhysicalOperator input = limit.getInput().accept(this, value); + public PhysicalOperator visitLimit(final org.apache.drill.common.logical.data.Limit limit, + final Object value) throws OptimizerException { + final PhysicalOperator input = limit.getInput().accept(this, value); return new SelectionVectorRemover(new Limit(input, limit.getFirst(), limit.getLast())); } @Override - public PhysicalOperator visitJoin(Join join, Object value) throws OptimizerException { + public PhysicalOperator visitJoin(final Join join, final Object value) throws OptimizerException { PhysicalOperator leftOp = join.getLeft().accept(this, value); - List<Ordering> leftOrderDefs = Lists.newArrayList(); + final List<Ordering> leftOrderDefs = Lists.newArrayList(); for(JoinCondition jc : join.getConditions()){ leftOrderDefs.add(new Ordering(Direction.ASCENDING, jc.getLeft())); } @@ -224,28 +186,28 @@ public class BasicOptimizer extends Optimizer{ leftOp = new SelectionVectorRemover(leftOp); PhysicalOperator rightOp = join.getRight().accept(this, value); - List<Ordering> rightOrderDefs = Lists.newArrayList(); + final List<Ordering> rightOrderDefs = Lists.newArrayList(); for(JoinCondition jc : join.getConditions()){ rightOrderDefs.add(new Ordering(Direction.ASCENDING, jc.getRight())); } rightOp = new Sort(rightOp, rightOrderDefs, false); rightOp = new SelectionVectorRemover(rightOp); - MergeJoinPOP mjp = new MergeJoinPOP(leftOp, rightOp, Arrays.asList(join.getConditions()), join.getJoinType()); + final MergeJoinPOP mjp = new MergeJoinPOP(leftOp, rightOp, Arrays.asList(join.getConditions()), + join.getJoinType()); return new SelectionVectorRemover(mjp); } - - @Override - public PhysicalOperator visitScan(Scan scan, Object obj) throws OptimizerException { - StoragePluginConfig config = logicalPlan.getStorageEngineConfig(scan.getStorageEngine()); + public PhysicalOperator visitScan(final Scan scan, final Object obj) throws OptimizerException { + final StoragePluginConfig config = logicalPlan.getStorageEngineConfig(scan.getStorageEngine()); if(config == null) { - throw new OptimizerException(String.format("Logical plan referenced the storage engine config %s but the logical plan didn't have that available as a config.", scan.getStorageEngine())); + throw new OptimizerException( + String.format("Logical plan referenced the storage engine config %s but the logical plan didn't have that available as a config.", + scan.getStorageEngine())); } - StoragePlugin storagePlugin; try { - storagePlugin = context.getStorage().getPlugin(config); + final StoragePlugin storagePlugin = queryContext.getStorage().getPlugin(config); return storagePlugin.getPhysicalScan(scan.getSelection()); } catch (IOException | ExecutionSetupException e) { throw new OptimizerException("Failure while attempting to retrieve storage engine.", e); @@ -253,27 +215,27 @@ public class BasicOptimizer extends Optimizer{ } @Override - public PhysicalOperator visitStore(Store store, Object obj) throws OptimizerException { - if (!store.iterator().hasNext()) { + public PhysicalOperator visitStore(final Store store, final Object obj) throws OptimizerException { + final Iterator<LogicalOperator> iterator = store.iterator(); + if (!iterator.hasNext()) { throw new OptimizerException("Store node in logical plan does not have a child."); } - return new Screen(store.iterator().next().accept(this, obj), context.getCurrentEndpoint()); + return new Screen(iterator.next().accept(this, obj), queryContext.getCurrentEndpoint()); } @Override - public PhysicalOperator visitProject(Project project, Object obj) throws OptimizerException { - return new org.apache.drill.exec.physical.config.Project(Arrays.asList(project.getSelections()), project.iterator().next().accept(this, obj)); + public PhysicalOperator visitProject(final Project project, final Object obj) throws OptimizerException { + return new org.apache.drill.exec.physical.config.Project( + Arrays.asList(project.getSelections()), project.iterator().next().accept(this, obj)); } @Override - public PhysicalOperator visitFilter(Filter filter, Object obj) throws OptimizerException { - TypeProtos.MajorType.Builder b = TypeProtos.MajorType.getDefaultInstance().newBuilderForType(); + public PhysicalOperator visitFilter(final Filter filter, final Object obj) throws OptimizerException { + final TypeProtos.MajorType.Builder b = TypeProtos.MajorType.getDefaultInstance().newBuilderForType(); b.setMode(DataMode.REQUIRED); b.setMinorType(MinorType.BIGINT); - PhysicalOperator child = filter.iterator().next().accept(this, obj); + final PhysicalOperator child = filter.iterator().next().accept(this, obj); return new SelectionVectorRemover(new org.apache.drill.exec.physical.config.Filter(child, filter.getExpr(), 1.0f)); } - } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java index 979c5e241..9e650b096 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/IdentityOptimizer.java @@ -22,18 +22,14 @@ import org.apache.drill.common.logical.LogicalPlan; import org.apache.drill.exec.physical.PhysicalPlan; public class IdentityOptimizer extends Optimizer { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IdentityOptimizer.class); +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IdentityOptimizer.class); @Override - public void init(DrillConfig config) { + public void init(final DrillConfig config) { } @Override - public PhysicalPlan optimize(OptimizationContext context, LogicalPlan plan) { + public PhysicalPlan optimize(final OptimizationContext context, final LogicalPlan plan) { return null; } - - @Override - public void close() { - } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java index 34d0622c6..aed6299e0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/opt/Optimizer.java @@ -17,8 +17,6 @@ */ package org.apache.drill.exec.opt; -import java.io.Closeable; - import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.DrillConfigurationException; import org.apache.drill.common.logical.LogicalPlan; @@ -26,22 +24,19 @@ import org.apache.drill.exec.exception.OptimizerException; import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.server.options.OptionManager; -public abstract class Optimizer implements Closeable{ - +public abstract class Optimizer { public static String OPTIMIZER_IMPL_KEY = "drill.exec.optimizer.implementation"; public abstract void init(DrillConfig config); - public abstract PhysicalPlan optimize(OptimizationContext context, LogicalPlan plan) throws OptimizerException; - public abstract void close(); - public static Optimizer getOptimizer(DrillConfig config) throws DrillConfigurationException{ - Optimizer o = config.getInstanceOf(OPTIMIZER_IMPL_KEY, Optimizer.class); - o.init(config); - return o; + public static Optimizer getOptimizer(final DrillConfig config) throws DrillConfigurationException { + final Optimizer optimizer = config.getInstanceOf(OPTIMIZER_IMPL_KEY, Optimizer.class); + optimizer.init(config); + return optimizer; } - public interface OptimizationContext{ + public interface OptimizationContext { public int getPriority(); public OptionManager getOptions(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java index 412da8535..a00df9d27 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java @@ -28,6 +28,7 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.RecordBatch.IterOutcome; public abstract class BaseRootExec implements RootExec { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseRootExec.class); protected OperatorStats stats = null; protected OperatorContext oContext = null; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java index a644c3497..8fd68b224 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/RootExec.java @@ -17,8 +17,6 @@ */ package org.apache.drill.exec.physical.impl; - -import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; /** @@ -26,8 +24,6 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; * output nodes and storage nodes. They are there driving force behind the completion of a query. */ public interface RootExec { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootExec.class); - /** * Do the next batch of work. * @return Whether or not additional batches of work are necessary. False means that this fragment is done. @@ -44,5 +40,4 @@ public interface RootExec { * @param handle */ public void receivingFragmentFinished(FragmentHandle handle); - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java index d88420058..2d1a1367e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java @@ -23,7 +23,6 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; @@ -58,7 +57,7 @@ public class ScreenCreator implements RootCreator<Screen>{ static class ScreenRoot extends BaseRootExec { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class); +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ScreenRoot.class); volatile boolean ok = true; private final SendingAccountor sendCount = new SendingAccountor(); @@ -67,7 +66,6 @@ public class ScreenCreator implements RootCreator<Screen>{ final FragmentContext context; final UserClientConnection connection; private RecordMaterializer materializer; - private boolean first = true; public enum Metric implements MetricDef { BYTES_SENT; @@ -146,7 +144,7 @@ public class ScreenCreator implements RootCreator<Screen>{ } case OK_NEW_SCHEMA: materializer = new VectorRecordMaterializer(context, incoming); - // fall through. + //$FALL-THROUGH$ case OK: // context.getStats().batchesCompleted.inc(1); // context.getStats().recordsCompleted.inc(incoming.getRecordCount()); @@ -160,7 +158,6 @@ public class ScreenCreator implements RootCreator<Screen>{ } sendCount.increment(); - first = false; return true; default: throw new UnsupportedOperationException(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java index 3920f9ccd..8794188b5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/SendingAccountor.java @@ -46,6 +46,7 @@ public class SendingAccountor { batchesSent.set(0); } catch (InterruptedException e) { logger.warn("Failure while waiting for send complete.", e); + // TODO InterruptedException } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java index e6c3fba8d..2a59e22aa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/materialize/QueryWritableBatch.java @@ -20,31 +20,21 @@ package org.apache.drill.exec.physical.impl.materialize; import io.netty.buffer.ByteBuf; import java.util.Arrays; -import java.util.List; -import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryResult; -import org.apache.drill.exec.proto.UserBitShared.RecordBatchDef; -import org.apache.drill.exec.proto.UserBitShared.SerializedField; -import org.apache.drill.exec.record.BatchSchema; -import org.apache.drill.exec.record.MaterializedField; - -import com.google.common.collect.Lists; public class QueryWritableBatch { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWritableBatch.class); +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWritableBatch.class); private final QueryResult header; private final ByteBuf[] buffers; - public QueryWritableBatch(QueryResult header, ByteBuf... buffers) { - super(); this.header = header; this.buffers = buffers; } - public ByteBuf[] getBuffers(){ + public ByteBuf[] getBuffers() { return buffers; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java index 352e7aedf..42b1080bd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/orderedpartitioner/OrderedPartitionRecordBatch.java @@ -307,6 +307,7 @@ public class OrderedPartitionRecordBatch extends AbstractRecordBatch<OrderedPart logger.error("Failure while building final partition table.", ex); context.fail(ex); return false; + // TODO InterruptedException } return true; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java index 4c9b33bf5..c50cb8af0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/producer/ProducerConsumerBatch.java @@ -73,6 +73,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { context.fail(e); } return IterOutcome.STOP; + // TODO InterruptedException } finally { stats.stopWait(); } @@ -147,6 +148,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { } } catch (InterruptedException e) { logger.warn("Producer thread is interrupted.", e); + // TODO InterruptedException } finally { if (stop) { try { @@ -154,6 +156,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { queue.put(new RecordBatchDataWrapper(null, true, false)); } catch (InterruptedException e) { logger.error("Unable to enqueue the last batch indicator. Something is broken.", e); + // TODO InterruptedException } } if (wrapper!=null) { @@ -181,6 +184,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { producer.join(); } catch (InterruptedException e) { logger.warn("Interrupted while waiting for producer thread"); + // TODO InterruptedException } } @@ -191,6 +195,7 @@ public class ProducerConsumerBatch extends AbstractRecordBatch { cleanUpLatch.await(); } catch (InterruptedException e) { logger.warn("Interrupted while waiting for producer to clean up first. I will try to clean up now...", e); + // TODO InterruptedException } finally { super.cleanup(); clearQueue(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java index 2436a0ea8..bc9ed74f9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java @@ -42,6 +42,7 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> { if (root == null) { root = o; } + // TODO should complain otherwise } public void addSendExchange(Exchange e, Fragment sendingToFragment) throws ForemanSetupException{ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java index eebd40ec9..12043cee5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java @@ -33,7 +33,6 @@ import com.google.common.collect.Ordering; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.util.DrillStringUtils; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.expr.fn.impl.DateUtility; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.ops.QueryDateTimeInfo; import org.apache.drill.exec.physical.EndpointAffinity; @@ -145,7 +144,7 @@ public class SimpleParallelizer { * @param planningSet * @return Returns a list of leaf fragments in fragment dependency graph. */ - private Set<Wrapper> constructFragmentDependencyGraph(PlanningSet planningSet) { + private static Set<Wrapper> constructFragmentDependencyGraph(PlanningSet planningSet) { // Set up dependency of fragments based on the affinity of exchange that separates the fragments. for(Wrapper currentFragmentWrapper : planningSet) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java index b5d3f4a72..dc63ef95c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/SetOptionHandler.java @@ -31,24 +31,21 @@ import org.eigenbase.sql.SqlLiteral; import org.eigenbase.sql.SqlNode; import org.eigenbase.sql.SqlSetOption; -public class SetOptionHandler extends AbstractSqlHandler{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SetOptionHandler.class); - - QueryContext context; +public class SetOptionHandler extends AbstractSqlHandler { +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SetOptionHandler.class); + private final QueryContext context; public SetOptionHandler(QueryContext context) { - super(); this.context = context; } - @Override public PhysicalPlan getPlan(SqlNode sqlNode) throws ValidationException, RelConversionException, IOException, ForemanSetupException { - SqlSetOption option = unwrap(sqlNode, SqlSetOption.class); - String scope = option.getScope(); - String name = option.getName(); - SqlNode value = option.getValue(); + final SqlSetOption option = unwrap(sqlNode, SqlSetOption.class); + final String scope = option.getScope(); + final String name = option.getName(); + final SqlNode value = option.getValue(); OptionValue.OptionType type; if (value instanceof SqlLiteral) { switch (scope.toLowerCase()) { @@ -70,9 +67,5 @@ public class SetOptionHandler extends AbstractSqlHandler{ } return DirectPlan.createDirectPlan(context, true, String.format("%s updated.", name)); - } - - - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java index f358097cb..72ae130ce 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicClient.java @@ -43,7 +43,6 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClient.class); private final Bootstrap b; - private volatile boolean connect = false; protected R connection; private final T handshakeType; private final Class<HANDSHAKE_RESPONSE> responseClass; @@ -81,7 +80,6 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection new InboundHandler(connection), // new RpcExceptionHandler() // ); - connect = true; } }); // @@ -180,7 +178,6 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection try { BasicClient.this.validateHandshake(value); BasicClient.this.finalizeConnection(value, connection); - BasicClient.this.connect = true; l.connectionSucceeded(connection); // logger.debug("Handshake completed succesfully."); } catch (RpcException ex) { @@ -218,6 +215,7 @@ public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection connection.getChannel().close().get(); } catch (InterruptedException | ExecutionException e) { logger.warn("Failure whiel shutting {}", this.getClass().getName(), e); + // TODO InterruptedException } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java index c00df4e6e..0e0398d8f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/BasicServer.java @@ -145,7 +145,7 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection return null; } - public int bind(final int initialPort, boolean allowPortHunting) throws InterruptedException, DrillbitStartupException { + public int bind(final int initialPort, boolean allowPortHunting) throws DrillbitStartupException { int port = initialPort - 1; while (true) { try { @@ -170,6 +170,7 @@ public abstract class BasicServer<T extends EnumLite, C extends RemoteConnection eventLoopGroup.shutdownGracefully().get(); } catch (InterruptedException | ExecutionException e) { logger.warn("Failure while shutting down {}. ", this.getClass().getName(), e); + // TODO InterruptedException } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/NamedThreadFactory.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/NamedThreadFactory.java index 2b49579a3..e7580b248 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/NamedThreadFactory.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/NamedThreadFactory.java @@ -20,29 +20,61 @@ package org.apache.drill.exec.rpc; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +/** + * {@link ThreadFactory} for {@link ExecutorServices} that names threads sequentially. + * Creates Threads named with the prefix specified at construction time. Created threads + * have the daemon bit set and priority Thread.MAX_PRIORITY. + * + * <p>An instance creates names with an instance-specific prefix suffixed with sequential + * integers.</p> + * + * <p>Concurrency: See {@link newThread}.</p> + */ public class NamedThreadFactory implements ThreadFactory { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NamedThreadFactory.class); - private final AtomicInteger nextId = new AtomicInteger(); + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(NamedThreadFactory.class); + private final AtomicInteger nextId = new AtomicInteger(); // used to generate unique ids private final String prefix; - public NamedThreadFactory(String prefix) { + /** + * Constructor. + * + * @param prefix the string prefix that will be used to name threads created by this factory + */ + public NamedThreadFactory(final String prefix) { this.prefix = prefix; } + /** + * Creates a sequentially named thread running a given Runnable. + * <p> + * The thread's name will be this instance's prefix concatenated with + * this instance's next<sup><a href="#fn-1">*</a></sup> sequential integer. + * </p> + * <p> + * Concurrency: Thread-safe. + * </p> + * <p> + * (Concurrent calls get different numbers. + * Calls started after other calls complete get later/higher numbers than + * those other calls. + * </p> + * <p> + * <a name="fn-1" />*However, for concurrent calls, the order of numbers + * is not defined.) + */ @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, prefix + nextId.incrementAndGet()); + public Thread newThread(final Runnable runnable) { + final Thread thread = new Thread(runnable, prefix + nextId.incrementAndGet()); + thread.setDaemon(true); + try { - if (t.isDaemon()) { - t.setDaemon(true); - } - if (t.getPriority() != Thread.MAX_PRIORITY) { - t.setPriority(Thread.MAX_PRIORITY); + if (thread.getPriority() != Thread.MAX_PRIORITY) { + thread.setPriority(Thread.MAX_PRIORITY); } } catch (Exception ignored) { // Doesn't matter even if failed to set. + logger.info("ignored exception " + ignored); } - return t; + return thread; } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java index f214c4d1e..9948d3e64 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ReconnectingConnection.java @@ -114,6 +114,7 @@ public abstract class ReconnectingConnection<CONNECTION_TYPE extends RemoteConne } } catch (InterruptedException e) { cmd.connectionFailed(FailureType.CONNECTION, e); + // TODO InterruptedException } catch (ExecutionException e) { throw new IllegalStateException(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java index 3a139f8a3..a72dd326c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RemoteConnection.java @@ -54,6 +54,7 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea return true; }catch(InterruptedException e){ listener.failed(new RpcException(e)); + // TODO InterruptedException return false; } } @@ -109,6 +110,7 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea channel.close().get(); } catch (InterruptedException | ExecutionException e) { logger.warn("Caught exception while closing channel.", e); + // TODO InterruptedException } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java index 5eab16a1a..b97496330 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java @@ -30,7 +30,6 @@ import java.io.Closeable; import java.util.Arrays; import java.util.List; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode; import org.apache.drill.exec.proto.UserBitShared.DrillPBError; import org.apache.drill.exec.work.ErrorHelper; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java index 7f84a2b24..af60ff080 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/Controller.java @@ -39,7 +39,7 @@ public interface Controller extends Closeable { */ public ControlTunnel getTunnel(DrillbitEndpoint node) ; - public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws InterruptedException, DrillbitStartupException; + public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws DrillbitStartupException; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java index f8f6fd795..631d479f9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/ControllerImpl.java @@ -46,7 +46,7 @@ public class ControllerImpl implements Controller { } @Override - public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws InterruptedException, DrillbitStartupException { + public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws DrillbitStartupException { server = new ControlServer(handler, context, connectionRegistry); int port = context.getConfig().getInt(ExecConstants.INITIAL_BIT_PORT); port = server.bind(port, allowPortHunting); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java index d6b86375a..a5a544197 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java @@ -26,8 +26,6 @@ import org.apache.drill.exec.proto.BitControl.FragmentStatus; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.helper.QueryIdHelper; -import org.apache.drill.exec.rpc.RpcException; -import org.apache.drill.exec.work.WorkManager.WorkerBee; import org.apache.drill.exec.work.foreman.FragmentStatusListener; import org.apache.drill.exec.work.foreman.ForemanSetupException; import org.apache.drill.exec.work.fragment.FragmentManager; @@ -37,60 +35,53 @@ import com.google.common.cache.CacheBuilder; import com.google.common.collect.Maps; public class WorkEventBus { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkEventBus.class); - + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkEventBus.class); private final ConcurrentMap<FragmentHandle, FragmentManager> managers = Maps.newConcurrentMap(); - private final ConcurrentMap<QueryId, FragmentStatusListener> listeners = new ConcurrentHashMap<QueryId, FragmentStatusListener>( - 16, 0.75f, 16); - private final WorkerBee bee; - private final Cache<FragmentHandle,Integer> recentlyFinishedFragments = CacheBuilder.newBuilder() + private final ConcurrentMap<QueryId, FragmentStatusListener> listeners = + new ConcurrentHashMap<>(16, 0.75f, 16); + private final Cache<FragmentHandle, Integer> recentlyFinishedFragments = CacheBuilder.newBuilder() .maximumSize(10000) - .expireAfterWrite(10, TimeUnit.MINUTES) .build(); - public WorkEventBus(WorkerBee bee) { - this.bee = bee; - } - - public void removeFragmentStatusListener(QueryId queryId) { + public void removeFragmentStatusListener(final QueryId queryId) { logger.debug("Removing fragment status listener for queryId {}.", queryId); listeners.remove(queryId); } - public void setFragmentStatusListener(QueryId queryId, FragmentStatusListener listener) throws ForemanSetupException { + public void addFragmentStatusListener(final QueryId queryId, final FragmentStatusListener listener) + throws ForemanSetupException { logger.debug("Adding fragment status listener for queryId {}.", queryId); - FragmentStatusListener old = listeners.putIfAbsent(queryId, listener); + final FragmentStatusListener old = listeners.putIfAbsent(queryId, listener); if (old != null) { throw new ForemanSetupException ( "Failure. The provided handle already exists in the listener pool. You need to remove one listener before adding another."); } } - public void status(FragmentStatus status) { - FragmentStatusListener l = listeners.get(status.getHandle().getQueryId()); - if (l == null) { + public void statusUpdate(final FragmentStatus status) { + final FragmentStatusListener listener = listeners.get(status.getHandle().getQueryId()); + if (listener == null) { logger.warn("A fragment message arrived but there was no registered listener for that message: {}.", status); } else { - l.statusUpdate(status); + listener.statusUpdate(status); } } - public void setFragmentManager(FragmentManager fragmentManager) { + public void addFragmentManager(final FragmentManager fragmentManager) { logger.debug("Manager created: {}", QueryIdHelper.getQueryIdentifier(fragmentManager.getHandle())); - FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), fragmentManager); + final FragmentManager old = managers.putIfAbsent(fragmentManager.getHandle(), fragmentManager); if (old != null) { throw new IllegalStateException( "Tried to set fragment manager when has already been set for the provided fragment handle."); } } - public FragmentManager getFragmentManagerIfExists(FragmentHandle handle){ + public FragmentManager getFragmentManagerIfExists(final FragmentHandle handle) { return managers.get(handle); - } - public FragmentManager getFragmentManager(FragmentHandle handle) throws FragmentSetupException { + public FragmentManager getFragmentManager(final FragmentHandle handle) throws FragmentSetupException { // check if this was a recently canceled fragment. If so, throw away message. if (recentlyFinishedFragments.asMap().containsKey(handle)) { logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle); @@ -98,17 +89,17 @@ public class WorkEventBus { } // since non-leaf fragments are sent first, it is an error condition if the manager is unavailable. - FragmentManager m = managers.get(handle); + final FragmentManager m = managers.get(handle); if(m != null) { return m; } - throw new FragmentSetupException("Failed to receive plan fragment that was required for id: " + QueryIdHelper.getQueryIdentifier(handle)); + throw new FragmentSetupException("Failed to receive plan fragment that was required for id: " + + QueryIdHelper.getQueryIdentifier(handle)); } - public void removeFragmentManager(FragmentHandle handle) { + public void removeFragmentManager(final FragmentHandle handle) { logger.debug("Removing fragment manager: {}", QueryIdHelper.getQueryIdentifier(handle)); recentlyFinishedFragments.put(handle, 1); managers.remove(handle); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java index 33f0d0937..a76d75391 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataConnectionCreator.java @@ -50,7 +50,7 @@ public class DataConnectionCreator implements Closeable { this.allowPortHunting = allowPortHunting; } - public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws InterruptedException, DrillbitStartupException { + public DrillbitEndpoint start(DrillbitEndpoint partialEndpoint) throws DrillbitStartupException { server = new DataServer(context, workBus, dataHandler); int port = server.bind(partialEndpoint.getControlPort() + 1, allowPortHunting); DrillbitEndpoint completeEndpoint = partialEndpoint.toBuilder().setDataPort(port).build(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java index e0392fdd4..33e066511 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java @@ -21,50 +21,43 @@ import io.netty.buffer.DrillBuf; import org.apache.drill.exec.exception.FragmentSetupException; import org.apache.drill.exec.proto.BitData.FragmentRecordBatch; -import org.apache.drill.exec.proto.BitData.RpcType; import org.apache.drill.exec.record.RawFragmentBatch; -import org.apache.drill.exec.rpc.Acks; -import org.apache.drill.exec.rpc.RemoteConnection; -import org.apache.drill.exec.rpc.Response; -import org.apache.drill.exec.rpc.ResponseSender; -import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.work.WorkManager.WorkerBee; import org.apache.drill.exec.work.fragment.FragmentManager; import java.io.IOException; public class DataResponseHandlerImpl implements DataResponseHandler{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataResponseHandlerImpl.class); - + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DataResponseHandlerImpl.class); private final WorkerBee bee; - public DataResponseHandlerImpl(WorkerBee bee) { - super(); + public DataResponseHandlerImpl(final WorkerBee bee) { this.bee = bee; } - @Override public void informOutOfMemory() { logger.error("Out of memory outside any particular fragment."); } - - public void handle(FragmentManager manager, FragmentRecordBatch fragmentBatch, DrillBuf data, AckSender sender) throws FragmentSetupException, IOException { -// logger.debug("Fragment Batch received {}", fragmentBatch); + @Override + public void handle(final FragmentManager manager, final FragmentRecordBatch fragmentBatch, + final DrillBuf data, final AckSender sender) throws FragmentSetupException, IOException { +// logger.debug("Fragment Batch received {}", fragmentBatch); boolean canRun = manager.handle(new RawFragmentBatch(fragmentBatch, data, sender)); if (canRun) { -// logger.debug("Arriving batch means local batch can run, starting local batch."); - // if we've reached the canRun threshold, we'll proceed. This expects handler.handle() to only return a single - // true. +// logger.debug("Arriving batch means local batch can run, starting local batch."); + /* + * If we've reached the canRun threshold, we'll proceed. This expects handler.handle() to + * only return a single true. + */ bee.startFragmentPendingRemote(manager); } if (fragmentBatch.getIsLastBatch() && !manager.isWaiting()) { -// logger.debug("Removing handler. Is Last Batch {}. Is Waiting for more {}", fragmentBatch.getIsLastBatch(), -// manager.isWaiting()); +// logger.debug("Removing handler. Is Last Batch {}. Is Waiting for more {}", fragmentBatch.getIsLastBatch(), +// manager.isWaiting()); bee.getContext().getWorkBus().removeFragmentManager(manager.getHandle()); } - } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java index 5aa4aa67c..11f549672 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java @@ -50,6 +50,7 @@ public class DataTunnel { manager.runCommand(b); }catch(InterruptedException e){ outcomeListener.failed(new RpcException("Interrupted while trying to get sending semaphore.", e)); + // TODO InterruptedException } } @@ -60,6 +61,7 @@ public class DataTunnel { manager.runCommand(b); }catch(InterruptedException e){ b.connectionFailed(FailureType.CONNECTION, new RpcException("Interrupted while trying to get sending semaphore.", e)); + // TODO InterruptedException } return b.getFuture(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java index 4e7fc925e..925154d90 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserClient.java @@ -57,7 +57,7 @@ public class UserClient extends BasicClientWithConnection<RpcType, UserToBitHand } public void connect(RpcConnectionHandler<ServerConnection> handler, DrillbitEndpoint endpoint, UserProperties props, UserBitShared.UserCredentials credentials) - throws RpcException, InterruptedException { + throws RpcException { UserToBitHandshake.Builder hsBuilder = UserToBitHandshake.newBuilder() .setRpcVersion(UserRpcConfig.RPC_VERSION) .setSupportListening(true) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java index dffb9a179..c76d324fe 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/UserServer.java @@ -29,7 +29,6 @@ import org.apache.drill.exec.physical.impl.materialize.QueryWritableBatch; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserProtos.BitToUserHandshake; -import org.apache.drill.exec.proto.UserProtos.RequestResults; import org.apache.drill.exec.proto.UserProtos.RpcType; import org.apache.drill.exec.proto.UserProtos.RunQuery; import org.apache.drill.exec.proto.UserProtos.UserToBitHandshake; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java index 6521303d8..b60670723 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java @@ -17,8 +17,7 @@ */ package org.apache.drill.exec.server; -import java.io.Closeable; - +import org.apache.drill.common.AutoCloseables; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.coord.ClusterCoordinator; @@ -52,27 +51,32 @@ import com.google.common.io.Closeables; /** * Starts, tracks and stops all the required services for a Drillbit daemon to work. */ -public class Drillbit implements Closeable{ - private static final org.slf4j.Logger logger; +public class Drillbit implements AutoCloseable { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Drillbit.class); static { - logger = org.slf4j.LoggerFactory.getLogger(Drillbit.class); Environment.logEnv("Drillbit environment:.", logger); } - public static Drillbit start(StartupOptions options) throws DrillbitStartupException { - return start(DrillConfig.create(options.getConfigLocation())); + public static Drillbit start(final StartupOptions options) throws DrillbitStartupException { + return start(DrillConfig.create(options.getConfigLocation()), null); + } + + public static Drillbit start(final DrillConfig config) throws DrillbitStartupException { + return start(config, null); } - public static Drillbit start(DrillConfig config) throws DrillbitStartupException { + public static Drillbit start(final DrillConfig config, final RemoteServiceSet remoteServiceSet) + throws DrillbitStartupException { + logger.debug("Setting up Drillbit."); Drillbit bit; try { - logger.debug("Setting up Drillbit."); - bit = new Drillbit(config, null); + bit = new Drillbit(config, remoteServiceSet); } catch (Exception ex) { throw new DrillbitStartupException("Failure while initializing values in Drillbit.", ex); } + + logger.debug("Starting Drillbit."); try { - logger.debug("Starting Drillbit."); bit.run(); } catch (Exception e) { bit.close(); @@ -152,90 +156,94 @@ public class Drillbit implements Closeable{ } } - public static void main(String[] cli) throws DrillbitStartupException { + public static void main(final String[] cli) throws DrillbitStartupException { StartupOptions options = StartupOptions.parse(cli); start(options); } - final ClusterCoordinator coord; - final ServiceEngine engine; - final PStoreProvider storeProvider; - final WorkManager manager; - final BootStrapContext context; - final Server embeddedJetty; - - private volatile RegistrationHandle handle; + private final ClusterCoordinator coord; + private final ServiceEngine engine; + private final PStoreProvider storeProvider; + private final WorkManager manager; + private final BootStrapContext context; + private final Server embeddedJetty; + private RegistrationHandle registrationHandle; public Drillbit(DrillConfig config, RemoteServiceSet serviceSet) throws Exception { - boolean allowPortHunting = serviceSet != null; - boolean enableHttp = config.getBoolean(ExecConstants.HTTP_ENABLE); - this.context = new BootStrapContext(config); - this.manager = new WorkManager(context); - this.engine = new ServiceEngine(manager.getControlMessageHandler(), manager.getUserWorker(), context, manager.getWorkBus(), manager.getDataHandler(), allowPortHunting); - - if(enableHttp) { - this.embeddedJetty = new Server(config.getInt(ExecConstants.HTTP_PORT)); + final boolean allowPortHunting = serviceSet != null; + final boolean enableHttp = config.getBoolean(ExecConstants.HTTP_ENABLE); + context = new BootStrapContext(config); + manager = new WorkManager(context); + engine = new ServiceEngine(manager.getControlMessageHandler(), manager.getUserWorker(), context, + manager.getWorkBus(), manager.getDataHandler(), allowPortHunting); + + if (enableHttp) { + embeddedJetty = new Server(config.getInt(ExecConstants.HTTP_PORT)); } else { - this.embeddedJetty = null; + embeddedJetty = null; } - if(serviceSet != null) { - this.coord = serviceSet.getCoordinator(); - this.storeProvider = new CachingStoreProvider(new LocalPStoreProvider(config)); + if (serviceSet != null) { + coord = serviceSet.getCoordinator(); + storeProvider = new CachingStoreProvider(new LocalPStoreProvider(config)); } else { - Runtime.getRuntime().addShutdownHook(new ShutdownThread(config)); - this.coord = new ZKClusterCoordinator(config); - this.storeProvider = new PStoreRegistry(this.coord, config).newPStoreProvider(); + coord = new ZKClusterCoordinator(config); + storeProvider = new PStoreRegistry(this.coord, config).newPStoreProvider(); } } - private void startJetty() throws Exception{ + private void startJetty() throws Exception { if (embeddedJetty == null) { return; } - ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SESSIONS); - - ErrorHandler errorHandler = new ErrorHandler(); + final ErrorHandler errorHandler = new ErrorHandler(); errorHandler.setShowStacks(true); errorHandler.setShowMessageInTitle(true); - context.setErrorHandler(errorHandler); - context.setContextPath("/"); - embeddedJetty.setHandler(context); - ServletHolder h = new ServletHolder(new ServletContainer(new DrillRestServer(manager))); -// h.setInitParameter(ServerProperties.PROVIDER_PACKAGES, "org.apache.drill.exec.server"); - h.setInitOrder(1); - context.addServlet(h, "/*"); - context.addServlet(new ServletHolder(new MetricsServlet(this.context.getMetrics())), "/status/metrics"); - context.addServlet(new ServletHolder(new ThreadDumpServlet()), "/status/threads"); - - ServletHolder staticHolder = new ServletHolder("static", DefaultServlet.class); + + final ServletContextHandler servletContextHandler = + new ServletContextHandler(ServletContextHandler.NO_SESSIONS); + servletContextHandler.setErrorHandler(errorHandler); + servletContextHandler.setContextPath("/"); + embeddedJetty.setHandler(servletContextHandler); + + final ServletHolder servletHolder = new ServletHolder(new ServletContainer(new DrillRestServer(manager))); +// servletHolder.setInitParameter(ServerProperties.PROVIDER_PACKAGES, "org.apache.drill.exec.server"); + servletHolder.setInitOrder(1); + servletContextHandler.addServlet(servletHolder, "/*"); + + servletContextHandler.addServlet( + new ServletHolder(new MetricsServlet(context.getMetrics())), "/status/metrics"); + servletContextHandler.addServlet(new ServletHolder(new ThreadDumpServlet()), "/status/threads"); + + final ServletHolder staticHolder = new ServletHolder("static", DefaultServlet.class); staticHolder.setInitParameter("resourceBase", Resource.newClassPathResource("/rest/static").toString()); staticHolder.setInitParameter("dirAllowed","false"); staticHolder.setInitParameter("pathInfoOnly","true"); - context.addServlet(staticHolder,"/static/*"); + servletContextHandler.addServlet(staticHolder,"/static/*"); embeddedJetty.start(); - } - - public void run() throws Exception { coord.start(10000); storeProvider.start(); - DrillbitEndpoint md = engine.start(); + final DrillbitEndpoint md = engine.start(); manager.start(md, engine.getController(), engine.getDataConnectionCreator(), coord, storeProvider); - manager.getContext().getStorage().init(); - manager.getContext().getOptionManager().init(); + final DrillbitContext drillbitContext = manager.getContext(); + drillbitContext.getStorage().init(); + drillbitContext.getOptionManager().init(); javaPropertiesToSystemOptions(); - handle = coord.register(md); + registrationHandle = coord.register(md); startJetty(); + + Runtime.getRuntime().addShutdownHook(new ShutdownThread()); } + @Override public void close() { - if (coord != null && handle != null) { - coord.unregister(handle); + if (coord != null && registrationHandle != null) { + coord.unregister(registrationHandle); } try { @@ -243,28 +251,27 @@ public class Drillbit implements Closeable{ } catch (InterruptedException e) { logger.warn("Interrupted while sleeping during coordination deregistration."); } - try { - if (embeddedJetty != null) { + + if (embeddedJetty != null) { + try { embeddedJetty.stop(); + } catch (Exception e) { + logger.warn("Failure while shutting down embedded jetty server."); } - } catch (Exception e) { - logger.warn("Failure while shutting down embedded jetty server."); } + Closeables.closeQuietly(engine); - try{ - storeProvider.close(); - }catch(Exception e){ - logger.warn("Failure while closing store provider.", e); - } + AutoCloseables.close(storeProvider, logger); Closeables.closeQuietly(coord); - Closeables.closeQuietly(manager); + AutoCloseables.close(manager, logger); Closeables.closeQuietly(context); + logger.info("Shutdown completed."); } private class ShutdownThread extends Thread { - ShutdownThread(DrillConfig config) { - this.setName("ShutdownHook"); + ShutdownThread() { + setName("Drillbit-ShutdownHook"); } @Override @@ -272,14 +279,9 @@ public class Drillbit implements Closeable{ logger.info("Received shutdown request."); close(); } - - } - public ClusterCoordinator getCoordinator() { - return coord; } public DrillbitContext getContext() { - return this.manager.getContext(); + return manager.getContext(); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java index 1b35aecd7..dbf3c7426 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java @@ -37,6 +37,7 @@ import org.apache.drill.exec.server.options.SystemOptionManager; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.StoragePluginRegistry.DrillSchemaFactory; import org.apache.drill.exec.store.sys.PStoreProvider; +import org.apache.drill.exec.testing.SimulatedExceptions; import com.codahale.metrics.MetricRegistry; import com.google.common.base.Preconditions; @@ -45,8 +46,7 @@ public class DrillbitContext { // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitContext.class); private final BootStrapContext context; - - private PhysicalPlanReader reader; + private final PhysicalPlanReader reader; private final ClusterCoordinator coord; private final DataConnectionCreator connectionsPool; private final DrillbitEndpoint endpoint; @@ -59,10 +59,11 @@ public class DrillbitContext { private final PStoreProvider provider; private final CodeCompiler compiler; private final ExecutorService executor; + private final SimulatedExceptions simulatedExceptions = new SimulatedExceptions(); - public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord, Controller controller, DataConnectionCreator connectionsPool, WorkEventBus workBus, PStoreProvider provider, + public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord, + Controller controller, DataConnectionCreator connectionsPool, WorkEventBus workBus, PStoreProvider provider, ExecutorService executor) { - super(); Preconditions.checkNotNull(endpoint); Preconditions.checkNotNull(context); Preconditions.checkNotNull(controller); @@ -87,7 +88,7 @@ public class DrillbitContext { return functionRegistry; } - public WorkEventBus getWorkBus(){ + public WorkEventBus getWorkBus() { return workBus; } @@ -99,7 +100,7 @@ public class DrillbitContext { return systemOptions; } - public DrillbitEndpoint getEndpoint(){ + public DrillbitEndpoint getEndpoint() { return endpoint; } @@ -107,11 +108,11 @@ public class DrillbitContext { return context.getConfig(); } - public Collection<DrillbitEndpoint> getBits(){ + public Collection<DrillbitEndpoint> getBits() { return coord.getAvailableEndpoints(); } - public BufferAllocator getAllocator(){ + public BufferAllocator getAllocator() { return context.getAllocator(); } @@ -119,35 +120,35 @@ public class DrillbitContext { return operatorCreatorRegistry; } - public StoragePluginRegistry getStorage(){ + public StoragePluginRegistry getStorage() { return this.storagePlugins; } - public EventLoopGroup getBitLoopGroup(){ + public EventLoopGroup getBitLoopGroup() { return context.getBitLoopGroup(); } - public DataConnectionCreator getDataConnectionsPool(){ + public DataConnectionCreator getDataConnectionsPool() { return connectionsPool; } - public Controller getController(){ + public Controller getController() { return controller; } - public MetricRegistry getMetrics(){ + public MetricRegistry getMetrics() { return context.getMetrics(); } - public PhysicalPlanReader getPlanReader(){ + public PhysicalPlanReader getPlanReader() { return reader; } - public PStoreProvider getPersistentStoreProvider(){ + public PStoreProvider getPersistentStoreProvider() { return provider; } - public DrillSchemaFactory getSchemaFactory(){ + public DrillSchemaFactory getSchemaFactory() { return storagePlugins.getSchemaFactory(); } @@ -162,4 +163,8 @@ public class DrillbitContext { public ExecutorService getExecutor() { return executor; } + + public SimulatedExceptions getSimulatedExceptions() { + return simulatedExceptions; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java index 5c2aedee0..34ebf80f1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java @@ -72,10 +72,6 @@ public class DrillConfigIterator implements Iterable<OptionValue> { case NULL: throw new IllegalStateException("Config value \"" + name + "\" has NULL type"); -/* TODO(cwestin) - optionValue = OptionValue.createOption(kind, OptionType.BOOT, name, ""); - break; -*/ } return optionValue; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java index 45c0ce80d..c3de19014 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java @@ -19,8 +19,8 @@ package org.apache.drill.exec.server.options; import java.util.concurrent.ConcurrentHashMap; -public class SessionOptionManager extends InMemoryOptionManager{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SessionOptionManager.class); +public class SessionOptionManager extends InMemoryOptionManager { +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SessionOptionManager.class); public SessionOptionManager(OptionManager systemOptions) { super(systemOptions, new ConcurrentHashMap<String, OptionValue>()); @@ -30,5 +30,4 @@ public class SessionOptionManager extends InMemoryOptionManager{ boolean supportsOption(OptionValue value) { return value.type == OptionValue.OptionType.SESSION; } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index fa4725d5c..608fac737 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -95,6 +95,7 @@ public class SystemOptionManager implements OptionManager { QueryClassLoader.JAVA_COMPILER_DEBUG, ExecConstants.ENABLE_VERBOSE_ERRORS, ExecConstants.ENABLE_WINDOW_FUNCTIONS_VALIDATOR, + ExecConstants.DRILLBIT_EXCEPTION_INJECTIONS_VALIDATOR, ClassTransformer.SCALAR_REPLACEMENT_VALIDATOR, }; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java index ae04bad56..d22798de9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java @@ -38,14 +38,12 @@ import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryInfo; import org.apache.drill.exec.proto.UserBitShared.QueryProfile; -import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; import org.apache.drill.exec.proto.helper.QueryIdHelper; -import org.apache.drill.exec.store.sys.EStore; import org.apache.drill.exec.store.sys.PStore; import org.apache.drill.exec.store.sys.PStoreProvider; import org.apache.drill.exec.work.WorkManager; import org.apache.drill.exec.work.foreman.Foreman; -import org.apache.drill.exec.work.foreman.QueryStatus; +import org.apache.drill.exec.work.foreman.QueryManager; import org.glassfish.jersey.server.mvc.Viewable; import com.google.common.collect.Lists; @@ -137,8 +135,8 @@ public class ProfileResources { PStore<QueryProfile> completed = null; PStore<QueryInfo> running = null; try { - completed = provider().getStore(QueryStatus.QUERY_PROFILE); - running = provider().getStore(QueryStatus.RUNNING_QUERY_INFO); + completed = provider().getStore(QueryManager.QUERY_PROFILE); + running = provider().getStore(QueryManager.RUNNING_QUERY_INFO); } catch (IOException e) { logger.debug("Failed to get profiles from persistent or ephemeral store."); return new QProfiles(new ArrayList<ProfileInfo>(), new ArrayList<ProfileInfo>()); @@ -177,12 +175,12 @@ public class ProfileResources { // first check local running Foreman f = work.getBee().getForemanForQueryId(id); if(f != null){ - return f.getQueryStatus().getAsProfile(); + return f.getQueryManager().getQueryProfile(); } // then check remote running try{ - PStore<QueryInfo> runningQueries = provider().getStore(QueryStatus.RUNNING_QUERY_INFO); + PStore<QueryInfo> runningQueries = provider().getStore(QueryManager.RUNNING_QUERY_INFO); QueryInfo info = runningQueries.get(queryId); return work.getContext().getController().getTunnel(info.getForeman()).requestQueryProfile(id).checkedGet(2, TimeUnit.SECONDS); }catch(Exception e){ @@ -191,7 +189,7 @@ public class ProfileResources { // then check blob store try{ - PStore<QueryProfile> profiles = provider().getStore(QueryStatus.QUERY_PROFILE); + PStore<QueryProfile> profiles = provider().getStore(QueryManager.QUERY_PROFILE); return profiles.get(queryId); }catch(Exception e){ logger.warn("Failure to load query profile for query {}", queryId, e); @@ -208,7 +206,7 @@ public class ProfileResources { @Produces(MediaType.APPLICATION_JSON) public String getProfileJSON(@PathParam("queryid") String queryId) { try { - return new String(QueryStatus.QUERY_PROFILE.getSerializer().serialize(getQueryProfile(queryId))); + return new String(QueryManager.QUERY_PROFILE.getSerializer().serialize(getQueryProfile(queryId))); } catch (IOException e) { logger.debug("Failed to serialize profile for: " + queryId); return ("{ 'message' : 'error (unable to serialize profile)' }"); @@ -242,7 +240,7 @@ public class ProfileResources { // then check remote running try{ - PStore<QueryInfo> runningQueries = provider().getStore(QueryStatus.RUNNING_QUERY_INFO); + PStore<QueryInfo> runningQueries = provider().getStore(QueryManager.RUNNING_QUERY_INFO); QueryInfo info = runningQueries.get(queryId); Ack a = work.getContext().getController().getTunnel(info.getForeman()).requestCancelQuery(id).checkedGet(2, TimeUnit.SECONDS); if(a.getOk()){ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java index ff6e13cd0..2efc9a998 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/service/ServiceEngine.java @@ -57,7 +57,7 @@ public class ServiceEngine implements Closeable{ this.allowPortHunting = allowPortHunting; } - public DrillbitEndpoint start() throws DrillbitStartupException, InterruptedException, UnknownHostException{ + public DrillbitEndpoint start() throws DrillbitStartupException, UnknownHostException{ int userPort = userServer.bind(config.getInt(ExecConstants.INITIAL_USER_PORT), allowPortHunting); String address = useIP ? InetAddress.getLocalHost().getHostAddress() : InetAddress.getLocalHost().getCanonicalHostName(); DrillbitEndpoint partialEndpoint = DrillbitEndpoint.newBuilder() diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java index baa998dfa..ebee7a861 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/sys/local/FilePStore.java @@ -56,13 +56,13 @@ public class FilePStore<V> implements PStore<V> { this.fs = fs; try { - mk(basePath); + mkdirs(basePath); } catch (IOException e) { throw new RuntimeException("Failure setting pstore configuration path."); } } - private void mk(Path path) throws IOException{ + private void mkdirs(Path path) throws IOException{ fs.mkdirs(path); } @@ -112,20 +112,20 @@ public class FilePStore<V> implements PStore<V> { } } - private Path p(String name) throws IOException { + private Path makePath(String name) { Preconditions.checkArgument( !name.contains("/") && !name.contains(":") && !name.contains("..")); - Path f = new Path(basePath, name + DRILL_SYS_FILE_SUFFIX); + final Path path = new Path(basePath, name + DRILL_SYS_FILE_SUFFIX); // do this to check file name. - return f; + return path; } public V get(String key) { try{ - Path path = p(key); + Path path = makePath(key); if(!fs.exists(path)){ return null; } @@ -133,15 +133,16 @@ public class FilePStore<V> implements PStore<V> { throw new RuntimeException(e); } - try (InputStream is = fs.open(p(key))) { + final Path path = makePath(key); + try (InputStream is = fs.open(path)) { return config.getSerializer().deserialize(IOUtils.toByteArray(is)); } catch (IOException e) { - throw new RuntimeException(e); + throw new RuntimeException("Unable to deserialize \"" + path + "\"", e); } } public void put(String key, V value) { - try (OutputStream os = fs.create(p(key))) { + try (OutputStream os = fs.create(makePath(key))) { IOUtils.write(config.getSerializer().serialize(value), os); } catch (IOException e) { throw new RuntimeException(e); @@ -151,7 +152,7 @@ public class FilePStore<V> implements PStore<V> { @Override public boolean putIfAbsent(String key, V value) { try { - Path p = p(key); + Path p = makePath(key); if (fs.exists(p)) { return false; } else { @@ -165,7 +166,7 @@ public class FilePStore<V> implements PStore<V> { public void delete(String key) { try { - fs.delete(p(key), false); + fs.delete(makePath(key), false); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java new file mode 100644 index 000000000..68cbf0895 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjection.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.testing; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Injection for a single exception. Specifies how many times to inject it, and how many times to skip + * injecting it before the first injection. This class is used internally for tracking injected + * exceptions; injected exceptions are specified via the + * {@link org.apache.drill.exec.ExecConstants#DRILLBIT_EXCEPTION_INJECTIONS} system option. + */ +public class ExceptionInjection { + private final String desc; // description of the injection site + + private final AtomicInteger nSkip; // the number of times to skip the injection; starts >= 0 + private final AtomicInteger nThrow; // the number of times to do the injection, after any skips; starts > 0 + + private final Class<? extends Throwable> exceptionClass; + + /** + * Constructor. + * + * @param desc description of the injection site; useful for multiple injections in a single class + * @param nSkip non-negative number of times to skip injecting the exception + * @param nFire positive number of times to inject the exception + * @param exceptionClass + */ + public ExceptionInjection(final String desc, final int nSkip, final int nFire, + final Class<? extends Throwable> exceptionClass) { + this.desc = desc; + this.nSkip = new AtomicInteger(nSkip); + this.nThrow = new AtomicInteger(nFire); + this.exceptionClass = exceptionClass; + } + + /** + * Constructs the exception to throw, if it is time to throw it. + * + * @return the exception to throw, or null if it isn't time to throw it + */ + private Throwable constructException() { + final int remainingSkips = nSkip.decrementAndGet(); + if (remainingSkips >= 0) { + return null; + } + + final int remainingFirings = nThrow.decrementAndGet(); + if (remainingFirings < 0) { + return null; + } + + // if we get here, we should throw the specified exception + Constructor<?> constructor; + try { + constructor = exceptionClass.getConstructor(String.class); + } catch(NoSuchMethodException e) { + throw new RuntimeException("No constructor found that takes a single String argument"); + } + + Throwable throwable; + try { + throwable = (Throwable) constructor.newInstance(desc); + } catch(InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException e) { + throw new IllegalStateException("Couldn't construct exception instance", e); + } + + return throwable; + } + + /** + * Throw the unchecked exception specified by this injection. + * + * @throws IllegalStateException if it's time to throw, and the injection specified a checked exception + */ + public void throwUnchecked() { + final Throwable throwable = constructException(); + if (throwable == null) { + return; + } + + if (throwable instanceof RuntimeException) { + final RuntimeException e = (RuntimeException) throwable; + throw e; + } + if (throwable instanceof Error) { + final Error e = (Error) throwable; + throw e; + } + + throw new IllegalStateException("throwable was not an unchecked exception"); + } + + /** + * Throw the checked exception specified by this injection. + * + * @param exceptionClass the class of the exception to throw + * @throws T if it is time to throw the exception + * @throws IllegalStateException if it is time to throw the exception, and the exception's class + * is incompatible with the class specified by the injection + */ + public <T extends Throwable> void throwChecked(final Class<T> exceptionClass) throws T { + final Throwable throwable = constructException(); + if (throwable == null) { + return; + } + + if (exceptionClass.isAssignableFrom(throwable.getClass())) { + final T exception = exceptionClass.cast(throwable); + throw exception; + } + + throw new IllegalStateException("Constructed Throwable(" + throwable.getClass().getName() + + ") is incompatible with exceptionClass("+ exceptionClass.getName() + ")"); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjector.java new file mode 100644 index 000000000..54bc351eb --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/ExceptionInjector.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.testing; + +import org.apache.drill.exec.server.DrillbitContext; + +/** + * Injects exceptions at execution time for testing. Any class that wants to simulate exceptions + * for testing should have it's own private static instance of an injector (similar to the use + * of loggers). + * + * <p>See {@link org.apache.drill.exec.testing.TestExceptionInjection} for examples of + * use. + */ +public class ExceptionInjector { + private final Class<?> clazz; // the class that owns this injector + + /** + * Constructor. Classes should use the static {@link #getInjector()} method to obtain + * their injector. + * + * @param clazz the owning class + */ + private ExceptionInjector(final Class<?> clazz) { + this.clazz = clazz; + } + + /** + * Create an injector. + * + * @param clazz the owning class + * @return the newly created injector + */ + public static ExceptionInjector getInjector(final Class<?> clazz) { + return new ExceptionInjector(clazz); + } + + /** + * Get the injector's owning class. + * + * @return the injector's owning class + */ + public Class<?> getSiteClass() { + return clazz; + } + + /** + * Lookup an injection within this class that matches the site description. + * + * @param drillbitContext + * @param desc the site description + * @return the injection, if there is one; null otherwise + */ + private ExceptionInjection getInjection(final DrillbitContext drillbitContext, final String desc) { + final SimulatedExceptions simulatedExceptions = drillbitContext.getSimulatedExceptions(); + final ExceptionInjection exceptionInjection = simulatedExceptions.lookupInjection(drillbitContext, this, desc); + return exceptionInjection; + } + + /** + * Inject (throw) an unchecked exception at this point, if an injection is specified, and it is time + * for it to be thrown. + * + * <p>Implementors use this in their code at a site where they want to simulate an exception + * during testing. + * + * @param drillbitContext + * @param desc the site description + * throws the exception specified by the injection, if it is time + */ + public void injectUnchecked(final DrillbitContext drillbitContext, final String desc) { + final ExceptionInjection exceptionInjection = getInjection(drillbitContext, desc); + if (exceptionInjection != null) { + exceptionInjection.throwUnchecked(); + } + } + + /** + * Inject (throw) a checked exception at this point, if an injection is specified, and it is time + * for it to be thrown. + * + * <p>Implementors use this in their code at a site where they want to simulate an exception + * during testing. + * + * @param drillbitContext + * @param desc the site description + * @param exceptionClass the expected class of the exception (or a super class of it) + * @throws T the exception specified by the injection, if it is time + */ + public <T extends Throwable> void injectChecked( + final DrillbitContext drillbitContext, final String desc, final Class<T> exceptionClass) throws T { + final ExceptionInjection exceptionInjection = getInjection(drillbitContext, desc); + if (exceptionInjection != null) { + exceptionInjection.throwChecked(exceptionClass); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionSite.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionSite.java new file mode 100644 index 000000000..9e19fdd16 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/InjectionSite.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.testing; + +import com.google.common.base.Preconditions; + +public class InjectionSite { + private final Class<?> clazz; + private final String desc; + + public InjectionSite(final Class<?> clazz, final String desc) { + Preconditions.checkNotNull(clazz); + Preconditions.checkNotNull(desc); + + this.clazz = clazz; + this.desc = desc; + } + + public Class<?> getSiteClass() { + return clazz; + } + + public String getDesc() { + return desc; + } + + @Override + public boolean equals(Object o) { + if (o == null) { + return false; + } + + if (this == o) { + return true; + } + + if (!(o instanceof InjectionSite)) { + return false; + } + + final InjectionSite other = (InjectionSite) o; + if (clazz != other.clazz) { + return false; + } + + if (!desc.equals(other.desc)) { + return false; + } + + return true; + } + + @Override + public int hashCode() { + return (clazz.hashCode() + 13) ^ (1 - desc.hashCode()); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/SimulatedExceptions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/SimulatedExceptions.java new file mode 100644 index 000000000..0292c08ce --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/SimulatedExceptions.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.testing; + +import java.io.IOException; +import java.util.HashMap; + +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.server.options.OptionValue; + +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * Tracks the simulated exceptions that will be injected for testing purposes. + */ +public class SimulatedExceptions { + /** + * Caches the currently specified ExceptionInjections. Updated when + * {@link org.apache.drill.exec.ExecConstants.DRILLBIT_EXCEPTION_INJECTIONS} is noticed + * to have changed. + */ + private HashMap<InjectionSite, ExceptionInjection> exMap = null; + + /** + * The string that was parsed to produce exMap; we keep it as a means to quickly detect whether + * the option string has changed or not between calls to getOption(). + */ + private String exString = null; + + /** + * POJO used to parse JSON-specified exception injection. + */ + public static class InjectionOption { + public String siteClass; + public String desc; + public int nSkip; + public int nFire; + public String exceptionClass; + } + + /** + * POJO used to parse JSON-specified set of exception injections. + */ + public static class InjectionOptions { + public InjectionOption injections[]; + } + + /** + * Look for an exception injection matching the given injector and site description. + * + * @param drillbitContext + * @param injector the injector, which indicates a class + * @param desc the injection site description + * @return the exception injection, if there is one for the injector and site; null otherwise + */ + public synchronized ExceptionInjection lookupInjection( + final DrillbitContext drillbitContext, final ExceptionInjector injector, final String desc) { + // get the option string + final OptionManager optionManager = drillbitContext.getOptionManager(); + final OptionValue optionValue = optionManager.getOption(ExecConstants.DRILLBIT_EXCEPTION_INJECTIONS); + final String opString = optionValue.string_val; + + // if the option string is empty, there's nothing to inject + if ((opString == null) || opString.isEmpty()) { + // clear these in case there used to be something to inject + exMap = null; + exString = null; + return null; + } + + // if the option string is different from before, recreate the injection map + if ((exString == null) || (exString != opString) && !exString.equals(opString)) { + // parse the option string into JSON + final ObjectMapper objectMapper = new ObjectMapper(); + InjectionOptions injectionOptions; + try { + injectionOptions = objectMapper.readValue(opString, InjectionOptions.class); + } catch(IOException e) { + throw new RuntimeException("Couldn't parse exception injections", e); + } + + // create a new map from the option JSON + exMap = new HashMap<>(); + for(InjectionOption injectionOption : injectionOptions.injections) { + addToMap(exMap, injectionOption); + } + + // this is the current set of options in effect + exString = opString; + } + + // lookup the request + final InjectionSite injectionSite = new InjectionSite(injector.getSiteClass(), desc); + final ExceptionInjection injection = exMap.get(injectionSite); + return injection; + } + + /** + * Adds a single exception injection to the injection map + * + * <p>Validates injection options before adding to the map, and throws various exceptions for + * validation failures. + * + * @param exMap the injection map + * @param injectionOption the option to add + */ + private static void addToMap( + final HashMap<InjectionSite, ExceptionInjection> exMap, final InjectionOption injectionOption) { + Class<?> siteClass; + try { + siteClass = Class.forName(injectionOption.siteClass); + } catch(ClassNotFoundException e) { + throw new RuntimeException("Injection siteClass not found", e); + } + + if ((injectionOption.desc == null) || injectionOption.desc.isEmpty()) { + throw new RuntimeException("Injection desc is null or empty"); + } + + if (injectionOption.nSkip < 0) { + throw new RuntimeException("Injection nSkip is not non-negative"); + } + + if (injectionOption.nFire <= 0) { + throw new RuntimeException("Injection nFire is non-positive"); + } + + Class<?> clazz; + try { + clazz = Class.forName(injectionOption.exceptionClass); + } catch(ClassNotFoundException e) { + throw new RuntimeException("Injected exceptionClass not found", e); + } + + if (!Throwable.class.isAssignableFrom(clazz)) { + throw new RuntimeException("Injected exceptionClass is not a Throwable"); + } + + @SuppressWarnings("unchecked") + final Class<? extends Throwable> exceptionClass = (Class<? extends Throwable>) clazz; + + final InjectionSite injectionSite = new InjectionSite(siteClass, injectionOption.desc); + final ExceptionInjection exceptionInjection = new ExceptionInjection( + injectionOption.desc, injectionOption.nSkip, injectionOption.nFire, exceptionClass); + exMap.put(injectionSite, exceptionInjection); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/CancelableQuery.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/CancelableQuery.java deleted file mode 100644 index 5b11943ec..000000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/CancelableQuery.java +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.work; - -public interface CancelableQuery { - public void cancel(); -} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java index 9743d6e0c..4f99f859b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java @@ -25,14 +25,13 @@ import org.apache.drill.exec.proto.BitControl.PlanFragment; import com.google.common.base.Preconditions; public class QueryWorkUnit { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWorkUnit.class); - + // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWorkUnit.class); private final PlanFragment rootFragment; // for local private final FragmentRoot rootOperator; // for local private final List<PlanFragment> fragments; - public QueryWorkUnit(FragmentRoot rootOperator, PlanFragment rootFragment, List<PlanFragment> fragments) { - super(); + public QueryWorkUnit(final FragmentRoot rootOperator, final PlanFragment rootFragment, + final List<PlanFragment> fragments) { Preconditions.checkNotNull(rootFragment); Preconditions.checkNotNull(fragments); Preconditions.checkNotNull(rootOperator); @@ -53,11 +52,4 @@ public class QueryWorkUnit { public FragmentRoot getRootOperator() { return rootOperator; } - - - - - - - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/StatusProvider.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/StatusProvider.java deleted file mode 100644 index 6086f74a5..000000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/StatusProvider.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.work; - -import org.apache.drill.exec.proto.BitControl.FragmentStatus; - -public interface StatusProvider { - - /** - * Provides the current status of the FragmentExecutor's work. - * @return Status if currently. Null if in another state. - */ - public FragmentStatus getStatus(); -} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java index 125c56df0..a08630ab7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java @@ -17,25 +17,21 @@ */ package org.apache.drill.exec.work; -import java.io.Closeable; -import java.io.IOException; -import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.apache.drill.common.SelfCleaningRunnable; import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.proto.BitControl.FragmentStatus; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.UserBitShared.QueryId; -import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.DrillRpcFuture; import org.apache.drill.exec.rpc.NamedThreadFactory; import org.apache.drill.exec.rpc.RpcException; @@ -50,7 +46,6 @@ import org.apache.drill.exec.store.sys.PStoreProvider; import org.apache.drill.exec.work.batch.ControlHandlerImpl; import org.apache.drill.exec.work.batch.ControlMessageHandler; import org.apache.drill.exec.work.foreman.Foreman; -import org.apache.drill.exec.work.foreman.QueryStatus; import org.apache.drill.exec.work.fragment.FragmentExecutor; import org.apache.drill.exec.work.fragment.FragmentManager; import org.apache.drill.exec.work.user.UserWorker; @@ -59,23 +54,24 @@ import com.codahale.metrics.Gauge; import com.codahale.metrics.MetricRegistry; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Queues; -public class WorkManager implements Closeable { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkManager.class); - - private Set<FragmentManager> incomingFragments = Collections.newSetFromMap(Maps - .<FragmentManager, Boolean> newConcurrentMap()); - - private LinkedBlockingQueue<RunnableWrapper> pendingTasks = Queues.newLinkedBlockingQueue(); - - private Map<FragmentHandle, FragmentExecutor> runningFragments = Maps.newConcurrentMap(); +/** + * Manages the running fragments in a Drillbit. Periodically requests run-time stats updates from fragments + * running elsewhere. + */ +public class WorkManager implements AutoCloseable { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkManager.class); - private ConcurrentMap<QueryId, Foreman> queries = Maps.newConcurrentMap(); + /* + * We use a {@see java.util.concurrent.ConcurrentHashMap} because it promises never to throw a + * {@see java.util.ConcurrentModificationException}; we need that because the statusThread may + * iterate over the map while other threads add FragmentExecutors via the {@see #WorkerBee}. + */ + private final Map<FragmentHandle, FragmentExecutor> runningFragments = new ConcurrentHashMap<>(); - private ConcurrentMap<QueryId, QueryStatus> status = Maps.newConcurrentMap(); + private final ConcurrentMap<QueryId, Foreman> queries = Maps.newConcurrentMap(); - private BootStrapContext bContext; + private final BootStrapContext bContext; private DrillbitContext dContext; private final ControlMessageHandler controlMessageWorker; @@ -83,48 +79,50 @@ public class WorkManager implements Closeable { private final UserWorker userWorker; private final WorkerBee bee; private final WorkEventBus workBus; - private ExecutorService executor; - private final EventThread eventThread; + private final ExecutorService executor; private final StatusThread statusThread; - private Controller controller; - public WorkManager(BootStrapContext context) { - this.bee = new WorkerBee(); - this.workBus = new WorkEventBus(bee); + /** + * How often the StatusThread collects statistics about running fragments. + */ + private final static int STATUS_PERIOD_SECONDS = 5; + + public WorkManager(final BootStrapContext context) { this.bContext = context; - this.controlMessageWorker = new ControlHandlerImpl(bee); - this.userWorker = new UserWorker(bee); - this.eventThread = new EventThread(); - this.statusThread = new StatusThread(); - this.dataHandler = new DataResponseHandlerImpl(bee); + bee = new WorkerBee(); // TODO should this just be an interface? + workBus = new WorkEventBus(); // TODO should this just be an interface? + + /* + * TODO + * This executor isn't bounded in any way and could create an arbitrarily large number of + * threads, possibly choking the machine. We should really put an upper bound on the number of + * threads that can be created. Ideally, this might be computed based on the number of cores or + * some similar metric; ThreadPoolExecutor can impose an upper bound, and might be a better choice. + */ + executor = Executors.newCachedThreadPool(new NamedThreadFactory("WorkManager-")); + + // TODO references to this escape here (via WorkerBee) before construction is done + controlMessageWorker = new ControlHandlerImpl(bee); // TODO getFragmentRunner(), getForemanForQueryId() + userWorker = new UserWorker(bee); // TODO should just be an interface? addNewForeman(), getForemanForQueryId() + statusThread = new StatusThread(); + dataHandler = new DataResponseHandlerImpl(bee); // TODO only uses startFragmentPendingRemote() } - public void start(DrillbitEndpoint endpoint, Controller controller, - DataConnectionCreator data, ClusterCoordinator coord, PStoreProvider provider) { - this.executor = Executors.newCachedThreadPool(new NamedThreadFactory("WorkManager-")); - this.dContext = new DrillbitContext(endpoint, bContext, coord, controller, data, workBus, provider, executor); - // executor = Executors.newFixedThreadPool(dContext.getConfig().getInt(ExecConstants.EXECUTOR_THREADS) - this.controller = controller; - this.eventThread.start(); - this.statusThread.start(); + public void start(final DrillbitEndpoint endpoint, final Controller controller, + final DataConnectionCreator data, final ClusterCoordinator coord, final PStoreProvider provider) { + dContext = new DrillbitContext(endpoint, bContext, coord, controller, data, workBus, provider, executor); + statusThread.start(); + // TODO remove try block once metrics moved from singleton, For now catch to avoid unit test failures try { dContext.getMetrics().register( - MetricRegistry.name("drill.exec.work.running_fragments." + dContext.getEndpoint().getUserPort()), + MetricRegistry.name("drill.exec.work.running_fragments." + dContext.getEndpoint().getUserPort()), new Gauge<Integer>() { @Override public Integer getValue() { return runningFragments.size(); } - }); - dContext.getMetrics().register( - MetricRegistry.name("drill.exec.work.pendingTasks" + dContext.getEndpoint().getUserPort()), - new Gauge<Integer>() { - @Override - public Integer getValue() { - return pendingTasks.size(); - } - }); + }); } catch (IllegalArgumentException e) { logger.warn("Exception while registering metrics", e); } @@ -155,7 +153,7 @@ public class WorkManager implements Closeable { } @Override - public void close() throws IOException { + public void close() throws Exception { try { if (executor != null) { executor.awaitTermination(1, TimeUnit.SECONDS); @@ -169,147 +167,90 @@ public class WorkManager implements Closeable { return dContext; } - private static String getId(FragmentHandle handle){ - return "FragmentExecutor: " + QueryIdHelper.getQueryId(handle.getQueryId()) + ':' + handle.getMajorFragmentId() + ':' + handle.getMinorFragmentId(); - } - - // create this so items can see the data here whether or not they are in this package. + /** + * Narrowed interface to WorkManager that is made available to tasks it is managing. + */ public class WorkerBee { - - - - public void addFragmentRunner(FragmentExecutor runner) { - logger.debug("Adding pending task {}", runner); - RunnableWrapper wrapper = new RunnableWrapper(runner, getId(runner.getContext().getHandle())); - pendingTasks.add(wrapper); - } - - public void addNewForeman(Foreman foreman) { - String id = "Foreman: " + QueryIdHelper.getQueryId(foreman.getQueryId()); - RunnableWrapper wrapper = new RunnableWrapper(foreman, id); - pendingTasks.add(wrapper); + public void addNewForeman(final Foreman foreman) { queries.put(foreman.getQueryId(), foreman); + executor.execute(new SelfCleaningRunnable(foreman) { + @Override + protected void cleanup() { + queries.remove(foreman.getQueryId(), foreman); + } + }); } - public void addFragmentPendingRemote(FragmentManager handler) { - incomingFragments.add(handler); - } - - public void startFragmentPendingRemote(FragmentManager handler) { - incomingFragments.remove(handler); - FragmentExecutor runner = handler.getRunnable(); - RunnableWrapper wrapper = new RunnableWrapper(runner, getId(runner.getContext().getHandle())); - pendingTasks.add(wrapper); - } - - public FragmentExecutor getFragmentRunner(FragmentHandle handle) { - return runningFragments.get(handle); + public Foreman getForemanForQueryId(final QueryId queryId) { + return queries.get(queryId); } - public void removeFragment(FragmentHandle handle) { - runningFragments.remove(handle); + public DrillbitContext getContext() { + return dContext; } - public Foreman getForemanForQueryId(QueryId queryId) { - return queries.get(queryId); + public void startFragmentPendingRemote(final FragmentManager handler) { + executor.execute(handler.getRunnable()); } - public void retireForeman(Foreman foreman) { - queries.remove(foreman.getQueryId(), foreman); + public void addFragmentRunner(final FragmentExecutor fragmentExecutor) { + final FragmentHandle fragmentHandle = fragmentExecutor.getContext().getHandle(); + runningFragments.put(fragmentHandle, fragmentExecutor); + executor.execute(new SelfCleaningRunnable(fragmentExecutor) { + @Override + protected void cleanup() { + runningFragments.remove(fragmentHandle); + } + }); } - public DrillbitContext getContext() { - return dContext; + public FragmentExecutor getFragmentRunner(final FragmentHandle handle) { + return runningFragments.get(handle); } - } + /** + * Periodically gather current statistics. {@link QueryManager} uses a FragmentStatusListener to + * maintain changes to state, and should be current. However, we want to collect current statistics + * about RUNNING queries, such as current memory consumption, number of rows processed, and so on. + * The FragmentStatusListener only tracks changes to state, so the statistics kept there will be + * stale; this thread probes for current values. + */ private class StatusThread extends Thread { public StatusThread() { - this.setDaemon(true); - this.setName("WorkManager Status Reporter"); + setDaemon(true); + setName("WorkManager.StatusThread"); } @Override public void run() { - while(true){ - List<DrillRpcFuture<Ack>> futures = Lists.newArrayList(); - for(FragmentExecutor e : runningFragments.values()){ - FragmentStatus status = e.getStatus(); - if(status == null){ + while(true) { + final List<DrillRpcFuture<Ack>> futures = Lists.newArrayList(); + for(FragmentExecutor fragmentExecutor : runningFragments.values()) { + final FragmentStatus status = fragmentExecutor.getStatus(); + if (status == null) { continue; } - DrillbitEndpoint ep = e.getContext().getForemanEndpoint(); - futures.add(controller.getTunnel(ep).sendFragmentStatus(status)); + + final DrillbitEndpoint ep = fragmentExecutor.getContext().getForemanEndpoint(); + futures.add(dContext.getController().getTunnel(ep).sendFragmentStatus(status)); } - for(DrillRpcFuture<Ack> future : futures){ - try{ + for(DrillRpcFuture<Ack> future : futures) { + try { future.checkedGet(); - }catch(RpcException ex){ + } catch(RpcException ex) { logger.info("Failure while sending intermediate fragment status to Foreman", ex); } } - try{ - Thread.sleep(5000); - }catch(InterruptedException e){ + try { + Thread.sleep(STATUS_PERIOD_SECONDS * 1000); + } catch(InterruptedException e) { // exit status thread on interrupt. break; } } } - - } - - private class EventThread extends Thread { - public EventThread() { - this.setDaemon(true); - this.setName("WorkManager Event Thread"); - } - - @Override - public void run() { - try { - while (true) { - // logger.debug("Polling for pending work tasks."); - RunnableWrapper r = pendingTasks.take(); - if (r != null) { - logger.debug("Starting pending task {}", r); - if (r.inner instanceof FragmentExecutor) { - FragmentExecutor fragmentExecutor = (FragmentExecutor) r.inner; - runningFragments.put(fragmentExecutor.getContext().getHandle(), fragmentExecutor); - } - executor.execute(r); - } - - } - } catch (InterruptedException e) { - logger.info("Work Manager stopping as it was interrupted."); - } - } - } - - private class RunnableWrapper implements Runnable { - - final Runnable inner; - private final String id; - - public RunnableWrapper(Runnable r, String id){ - this.inner = r; - this.id = id; - } - - @Override - public void run() { - try{ - inner.run(); - }catch(Exception | Error e){ - logger.error("Failure while running wrapper [{}]", id, e); - } - } - - } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java index 3228da9a9..3a7123d6c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/ControlHandlerImpl.java @@ -20,8 +20,6 @@ package org.apache.drill.exec.work.batch; import static org.apache.drill.exec.rpc.RpcBus.get; import io.netty.buffer.ByteBuf; -import java.io.IOException; - import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.proto.BitControl.FinishedReceiver; @@ -33,7 +31,6 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryProfile; -import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.Acks; import org.apache.drill.exec.rpc.Response; import org.apache.drill.exec.rpc.RpcConstants; @@ -42,123 +39,129 @@ import org.apache.drill.exec.rpc.UserRpcException; import org.apache.drill.exec.rpc.control.ControlConnection; import org.apache.drill.exec.rpc.control.ControlTunnel; import org.apache.drill.exec.rpc.data.DataRpcConfig; +import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.work.WorkManager.WorkerBee; import org.apache.drill.exec.work.foreman.Foreman; -import org.apache.drill.exec.work.foreman.QueryStatus; import org.apache.drill.exec.work.fragment.FragmentExecutor; import org.apache.drill.exec.work.fragment.FragmentManager; import org.apache.drill.exec.work.fragment.NonRootFragmentManager; import org.apache.drill.exec.work.fragment.NonRootStatusReporter; public class ControlHandlerImpl implements ControlMessageHandler { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlHandlerImpl.class); - + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlHandlerImpl.class); private final WorkerBee bee; - public ControlHandlerImpl(WorkerBee bee) { - super(); + public ControlHandlerImpl(final WorkerBee bee) { this.bee = bee; } @Override - public Response handle(ControlConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody) throws RpcException { + public Response handle(final ControlConnection connection, final int rpcType, + final ByteBuf pBody, final ByteBuf dBody) throws RpcException { if (RpcConstants.EXTRA_DEBUGGING) { logger.debug("Received bit com message of type {}", rpcType); } switch (rpcType) { - case RpcType.REQ_CANCEL_FRAGMENT_VALUE: - FragmentHandle handle = get(pBody, FragmentHandle.PARSER); + case RpcType.REQ_CANCEL_FRAGMENT_VALUE: { + final FragmentHandle handle = get(pBody, FragmentHandle.PARSER); cancelFragment(handle); return DataRpcConfig.OK; + } - case RpcType.REQ_RECEIVER_FINISHED_VALUE: - FinishedReceiver finishedReceiver = get(pBody, FinishedReceiver.PARSER); + case RpcType.REQ_RECEIVER_FINISHED_VALUE: { + final FinishedReceiver finishedReceiver = get(pBody, FinishedReceiver.PARSER); receivingFragmentFinished(finishedReceiver); return DataRpcConfig.OK; + } case RpcType.REQ_FRAGMENT_STATUS_VALUE: - bee.getContext().getWorkBus().status( get(pBody, FragmentStatus.PARSER)); + bee.getContext().getWorkBus().statusUpdate( get(pBody, FragmentStatus.PARSER)); // TODO: Support a type of message that has no response. return DataRpcConfig.OK; - case RpcType.REQ_QUERY_CANCEL_VALUE: - QueryId id = get(pBody, QueryId.PARSER); - Foreman f = bee.getForemanForQueryId(id); - if(f != null){ - f.cancel(); + case RpcType.REQ_QUERY_CANCEL_VALUE: { + final QueryId queryId = get(pBody, QueryId.PARSER); + final Foreman foreman = bee.getForemanForQueryId(queryId); + if (foreman != null) { + foreman.cancel(); return DataRpcConfig.OK; - }else{ + } else { return DataRpcConfig.FAIL; } + } - case RpcType.REQ_INIATILIZE_FRAGMENTS_VALUE: - InitializeFragments fragments = get(pBody, InitializeFragments.PARSER); - for(int i =0; i < fragments.getFragmentCount(); i++){ + case RpcType.REQ_INIATILIZE_FRAGMENTS_VALUE: { + final InitializeFragments fragments = get(pBody, InitializeFragments.PARSER); + for(int i = 0; i < fragments.getFragmentCount(); i++) { startNewRemoteFragment(fragments.getFragment(i)); } return DataRpcConfig.OK; + } - case RpcType.REQ_QUERY_STATUS_VALUE: - QueryId queryId = get(pBody, QueryId.PARSER); - Foreman foreman = bee.getForemanForQueryId(queryId); - QueryProfile profile; + case RpcType.REQ_QUERY_STATUS_VALUE: { + final QueryId queryId = get(pBody, QueryId.PARSER); + final Foreman foreman = bee.getForemanForQueryId(queryId); if (foreman == null) { throw new RpcException("Query not running on node."); - } else { - profile = bee.getForemanForQueryId(queryId).getQueryStatus().getAsProfile(); } + final QueryProfile profile = foreman.getQueryManager().getQueryProfile(); return new Response(RpcType.RESP_QUERY_STATUS, profile); + } default: throw new RpcException("Not yet supported."); } - } @Override - public void startNewRemoteFragment(PlanFragment fragment) throws UserRpcException { + public void startNewRemoteFragment(final PlanFragment fragment) throws UserRpcException { logger.debug("Received remote fragment start instruction", fragment); + final DrillbitContext drillbitContext = bee.getContext(); try { // we either need to start the fragment if it is a leaf fragment, or set up a fragment manager if it is non leaf. - if(fragment.getLeafFragment()){ - FragmentContext context = new FragmentContext(bee.getContext(), fragment, null, bee.getContext().getFunctionImplementationRegistry()); - ControlTunnel tunnel = bee.getContext().getController().getTunnel(fragment.getForeman()); - NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel); - FragmentRoot rootOperator = bee.getContext().getPlanReader().readFragmentOperator(fragment.getFragmentJson()); - FragmentExecutor fr = new FragmentExecutor(context, bee, rootOperator, listener); + if (fragment.getLeafFragment()) { + final FragmentContext context = new FragmentContext(drillbitContext, fragment, null, + drillbitContext.getFunctionImplementationRegistry()); + final ControlTunnel tunnel = drillbitContext.getController().getTunnel(fragment.getForeman()); + final NonRootStatusReporter listener = new NonRootStatusReporter(context, tunnel); + final FragmentRoot rootOperator = drillbitContext.getPlanReader().readFragmentOperator( + fragment.getFragmentJson()); + final FragmentExecutor fr = new FragmentExecutor(context, rootOperator, listener); bee.addFragmentRunner(fr); - }else{ // isIntermediate, store for incoming data. - NonRootFragmentManager manager = new NonRootFragmentManager(fragment, bee); - bee.getContext().getWorkBus().setFragmentManager(manager); + } else { + // isIntermediate, store for incoming data. + final NonRootFragmentManager manager = new NonRootFragmentManager(fragment, drillbitContext); + drillbitContext.getWorkBus().addFragmentManager(manager); } } catch (Exception e) { - throw new UserRpcException(bee.getContext().getEndpoint(), "Failure while trying to start remote fragment", e); + throw new UserRpcException(drillbitContext.getEndpoint(), + "Failure while trying to start remote fragment", e); } catch (OutOfMemoryError t) { if (t.getMessage().startsWith("Direct buffer")) { - throw new UserRpcException(bee.getContext().getEndpoint(), "Out of direct memory while trying to start remote fragment", t); + throw new UserRpcException(drillbitContext.getEndpoint(), + "Out of direct memory while trying to start remote fragment", t); } else { throw t; } } - } /* (non-Javadoc) * @see org.apache.drill.exec.work.batch.BitComHandler#cancelFragment(org.apache.drill.exec.proto.ExecProtos.FragmentHandle) */ @Override - public Ack cancelFragment(FragmentHandle handle) { - FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle); + public Ack cancelFragment(final FragmentHandle handle) { + final FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(handle); if (manager != null) { // try remote fragment cancel. manager.cancel(); } else { // then try local cancel. - FragmentExecutor runner = bee.getFragmentRunner(handle); + final FragmentExecutor runner = bee.getFragmentRunner(handle); if (runner != null) { runner.cancel(); } @@ -167,8 +170,9 @@ public class ControlHandlerImpl implements ControlMessageHandler { return Acks.OK; } - public Ack receivingFragmentFinished(FinishedReceiver finishedReceiver) { - FragmentManager manager = bee.getContext().getWorkBus().getFragmentManagerIfExists(finishedReceiver.getSender()); + private Ack receivingFragmentFinished(final FinishedReceiver finishedReceiver) { + final FragmentManager manager = + bee.getContext().getWorkBus().getFragmentManagerIfExists(finishedReceiver.getSender()); FragmentExecutor executor; if (manager != null) { @@ -184,5 +188,4 @@ public class ControlHandlerImpl implements ControlMessageHandler { return Acks.OK; } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java index f0b4983f6..2a79e42c1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java @@ -162,6 +162,7 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer { return batch; } catch (InterruptedException e) { return null; + // TODO InterruptedException } } if (w == null) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java index 3d5b94849..2430e644c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java @@ -162,6 +162,7 @@ public class UnlimitedRawBatchBuffer implements RawBatchBuffer{ b = buffer.take(); } catch (InterruptedException e) { return null; + // TODO InterruptedException } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java index ca52f0cde..80f2ca113 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/DrillbitStatusListener.java @@ -26,7 +26,7 @@ import java.util.Set; * Interface to define the listener to take actions when the set of active drillbits is changed. */ public interface DrillbitStatusListener { - + // TODO this doesn't belong here static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillbitStatusListener.class); /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 8e0780b23..9650ee5d4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -19,27 +19,27 @@ package org.apache.drill.exec.work.foreman; import io.netty.buffer.ByteBuf; -import java.io.Closeable; import java.io.IOException; import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import com.google.common.base.Preconditions; + import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.logical.LogicalPlan; import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode; import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.coord.DistributedSemaphore; import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease; import org.apache.drill.exec.exception.OptimizerException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.QueryContext; -import org.apache.drill.exec.ops.QueryDateTimeInfo; import org.apache.drill.exec.opt.BasicOptimizer; import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.physical.base.FragmentRoot; @@ -64,10 +64,11 @@ import org.apache.drill.exec.proto.UserProtos.RunQuery; import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.BaseRpcOutcomeListener; import org.apache.drill.exec.rpc.RpcException; -import org.apache.drill.exec.rpc.RpcOutcomeListener; import org.apache.drill.exec.rpc.control.Controller; import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection; import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.testing.ExceptionInjector; import org.apache.drill.exec.util.Pointer; import org.apache.drill.exec.work.EndpointListener; import org.apache.drill.exec.work.ErrorHelper; @@ -76,117 +77,120 @@ import org.apache.drill.exec.work.WorkManager.WorkerBee; import org.apache.drill.exec.work.batch.IncomingBuffers; import org.apache.drill.exec.work.fragment.FragmentExecutor; import org.apache.drill.exec.work.fragment.RootFragmentManager; +import org.codehaus.jackson.map.ObjectMapper; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.Multimap; /** - * Foreman manages all queries where this is the driving/root node. + * Foreman manages all the fragments (local and remote) for a single query where this + * is the driving/root node. * * The flow is as follows: - * - Foreman is submitted as a runnable. - * - Runnable does query planning. - * - PENDING > RUNNING - * - Runnable sends out starting fragments - * - Status listener are activated - * - Foreman listens for state move messages. - * + * - Foreman is submitted as a runnable. + * - Runnable does query planning. + * - state changes from PENDING to RUNNING + * - Runnable sends out starting fragments + * - Status listener are activated + * - The Runnable's run() completes, but the Foreman stays around + * - Foreman listens for state change messages. + * - state change messages can drive the state to FAILED or CANCELED, in which case + * messages are sent to running fragments to terminate + * - when all fragments complete, state change messages drive the state to COMPLETED */ -public class Foreman implements Runnable, Closeable, Comparable<Object> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class); - - private QueryId queryId; - private RunQuery queryRequest; - private QueryContext context; - private QueryManager queryManager; - private WorkerBee bee; - private UserClientConnection initiatingClient; +public class Foreman implements Runnable { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class); + private final static ExceptionInjector injector = ExceptionInjector.getInjector(Foreman.class); + + private final QueryId queryId; + private final RunQuery queryRequest; + private final QueryContext queryContext; + private final QueryManager queryManager; // handles lower-level details of query execution + private final WorkerBee bee; // provides an interface to submit tasks + private final DrillbitContext drillbitContext; + private final UserClientConnection initiatingClient; // used to send responses private volatile QueryState state; - private final DistributedSemaphore smallSemaphore; - private final DistributedSemaphore largeSemaphore; - private final long queueThreshold; - private final long queueTimeout; - private volatile DistributedLease lease; - private final boolean queuingEnabled; + private volatile DistributedLease lease; // used to limit the number of concurrent queries + + private FragmentExecutor rootRunner; // root Fragment - private FragmentExecutor rootRunner; - private final CountDownLatch acceptExternalEvents = new CountDownLatch(1); - private final StateListener stateListener = new StateListener(); + private final CountDownLatch acceptExternalEvents = new CountDownLatch(1); // gates acceptance of external events + private final StateListener stateListener = new StateListener(); // source of external events private final ResponseSendListener responseListener = new ResponseSendListener(); + private final ForemanResult foremanResult = new ForemanResult(); - public Foreman(WorkerBee bee, DrillbitContext dContext, UserClientConnection connection, QueryId queryId, - RunQuery queryRequest) { + /** + * Constructor. Sets up the Foreman, but does not initiate any execution. + * + * @param bee used to submit additional work + * @param drillbitContext + * @param connection + * @param queryId the id for the query + * @param queryRequest the query to execute + */ + public Foreman(final WorkerBee bee, final DrillbitContext drillbitContext, + final UserClientConnection connection, final QueryId queryId, final RunQuery queryRequest) { + this.bee = bee; this.queryId = queryId; this.queryRequest = queryRequest; - this.context = new QueryContext(connection.getSession(), queryId, dContext); + this.drillbitContext = drillbitContext; - // set up queuing - this.queuingEnabled = context.getOptions().getOption(ExecConstants.ENABLE_QUEUE_KEY).bool_val; - if (queuingEnabled) { - int smallQueue = context.getOptions().getOption(ExecConstants.SMALL_QUEUE_KEY).num_val.intValue(); - int largeQueue = context.getOptions().getOption(ExecConstants.LARGE_QUEUE_KEY).num_val.intValue(); - this.largeSemaphore = dContext.getClusterCoordinator().getSemaphore("query.large", largeQueue); - this.smallSemaphore = dContext.getClusterCoordinator().getSemaphore("query.small", smallQueue); - this.queueThreshold = context.getOptions().getOption(ExecConstants.QUEUE_THRESHOLD_KEY).num_val; - this.queueTimeout = context.getOptions().getOption(ExecConstants.QUEUE_TIMEOUT_KEY).num_val; - } else { - this.largeSemaphore = null; - this.smallSemaphore = null; - this.queueThreshold = 0; - this.queueTimeout = 0; - } - // end queuing setup. - - this.initiatingClient = connection; - this.queryManager = new QueryManager(queryId, queryRequest, bee.getContext().getPersistentStoreProvider(), - stateListener, this); - this.bee = bee; + initiatingClient = connection; + queryContext = new QueryContext(connection.getSession(), drillbitContext); + queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getPersistentStoreProvider(), + stateListener, this); // TODO reference escapes before ctor is complete via stateListener, this recordNewState(QueryState.PENDING); } - public QueryContext getContext() { - return context; + /** + * Get the QueryContext created for the query. + * + * @return the QueryContext + */ + public QueryContext getQueryContext() { + return queryContext; } - public void cancel() { - stateListener.moveToState(QueryState.CANCELED, null); + /** + * Get the QueryManager created for the query. + * + * @return the QueryManager + */ + public QueryManager getQueryManager() { + return queryManager; } - private void cleanup(QueryResult result) { - logger.info("foreman cleaning up - status: {}", queryManager.getStatus()); - - bee.retireForeman(this); - context.getWorkBus().removeFragmentStatusListener(queryId); - context.getClusterCoordinator().removeDrillbitStatusListener(queryManager); - - try { - try { - context.close(); - } catch (Exception e) { - moveToState(QueryState.FAILED, e); - return; - } - - if (result != null) { - initiatingClient.sendResult(responseListener, new QueryWritableBatch(result), true); - } - } finally { - releaseLease(); - } + /** + * Cancel the query. Asynchronous -- it may take some time for all remote fragments to be + * terminated. + */ + public void cancel() { + // Note this can be called from outside of run() on another thread, or after run() completes + stateListener.moveToState(QueryState.CANCELLATION_REQUESTED, null); } /** - * Called by execution pool to do foreman setup. Actual query execution is a separate phase (and can be scheduled). + * Called by execution pool to do query setup, and kick off remote execution. + * + * <p>Note that completion of this function is not the end of the Foreman's role + * in the query's lifecycle. */ + @Override public void run() { + // rename the thread we're using for debugging purposes + final Thread currentThread = Thread.currentThread(); + final String originalName = currentThread.getName(); + currentThread.setName(QueryIdHelper.getQueryId(queryId) + ":foreman"); + + // track how long the query takes + queryManager.markStartTime(); - final String originalThread = Thread.currentThread().getName(); - Thread.currentThread().setName(QueryIdHelper.getQueryId(queryId) + ":foreman"); - getStatus().markStart(); - // convert a run query request into action try { + injector.injectChecked(drillbitContext, "run-try-beginning", ForemanException.class); + + // convert a run query request into action switch (queryRequest.getType()) { case LOGICAL: parseAndRunLogicalPlan(queryRequest.getPlan()); @@ -200,39 +204,71 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> { default: throw new IllegalStateException(); } + injector.injectChecked(drillbitContext, "run-try-end", ForemanException.class); } catch (ForemanException e) { moveToState(QueryState.FAILED, e); - } catch (AssertionError | Exception ex) { - moveToState(QueryState.FAILED, new ForemanException("Unexpected exception during fragment initialization: " + ex.getMessage(), ex)); - + moveToState(QueryState.FAILED, + new ForemanException("Unexpected exception during fragment initialization: " + ex.getMessage(), ex)); } catch (OutOfMemoryError e) { + /* + * FragmentExecutors use a DrillbitStatusListener to watch out for the death of their query's Foreman. + * So, if we die here, they should get notified about that, and cancel themselves; we don't have to + * attempt to notify them, which might not work under these conditions. + */ + /* + * TODO this will kill everything in this JVM; why can't we just free all allocation + * associated with this Foreman and allow others to continue? + */ System.out.println("Out of memory, exiting."); e.printStackTrace(); System.out.flush(); System.exit(-1); - } finally { - Thread.currentThread().setName(originalThread); + /* + * Begin accepting external events. + * + * Doing this here in the finally clause will guarantee that it occurs. Otherwise, if there + * is an exception anywhere during setup, it wouldn't occur, and any events that are generated + * as a result of any partial setup that was done (such as the FragmentSubmitListener, + * the ResponseSendListener, or an external call to cancel()), will hang the thread that makes the + * event delivery call. + * + * If we do throw an exception during setup, and have already moved to QueryState.FAILED, we just need to + * make sure that we can't make things any worse as those events are delivered, but allow + * any necessary remaining cleanup to proceed. + */ + acceptExternalEvents.countDown(); + + // restore the thread's original name + currentThread.setName(originalName); } + + /* + * Note that despite the run() completing, the Foreman continues to exist, and receives + * events (indirectly, through the QueryManager's use of stateListener), about fragment + * completions. It won't go away until everything is completed, failed, or cancelled. + */ } private void releaseLease() { - if (lease != null) { + while (lease != null) { try { lease.close(); + lease = null; + } catch (InterruptedException e) { + // if we end up here, the while loop will try again } catch (Exception e) { logger.warn("Failure while releasing lease.", e); + break; } - ; } - } - private void parseAndRunLogicalPlan(String json) throws ExecutionSetupException { + private void parseAndRunLogicalPlan(final String json) throws ExecutionSetupException { LogicalPlan logicalPlan; try { - logicalPlan = context.getPlanReader().readLogicalPlan(json); + logicalPlan = drillbitContext.getPlanReader().readLogicalPlan(json); } catch (IOException e) { throw new ForemanException("Failure parsing logical plan.", e); } @@ -244,7 +280,7 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> { log(logicalPlan); - PhysicalPlan physicalPlan = convert(logicalPlan); + final PhysicalPlan physicalPlan = convert(logicalPlan); if (logicalPlan.getProperties().resultMode == ResultMode.PHYSICAL) { returnPhysical(physicalPlan); @@ -252,20 +288,19 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> { } log(physicalPlan); - runPhysicalPlan(physicalPlan); } - private void log(LogicalPlan plan) { + private void log(final LogicalPlan plan) { if (logger.isDebugEnabled()) { - logger.debug("Logical {}", plan.unparse(context.getConfig())); + logger.debug("Logical {}", plan.unparse(queryContext.getConfig())); } } - private void log(PhysicalPlan plan) { + private void log(final PhysicalPlan plan) { if (logger.isDebugEnabled()) { try { - String planText = context.getConfig().getMapper().writeValueAsString(plan); + String planText = queryContext.getConfig().getMapper().writeValueAsString(plan); logger.debug("Physical {}", planText); } catch (IOException e) { logger.warn("Error while attempting to log physical plan.", e); @@ -273,60 +308,54 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> { } } - private void returnPhysical(PhysicalPlan plan) throws ExecutionSetupException { - String jsonPlan = plan.unparse(context.getConfig().getMapper().writer()); - runPhysicalPlan(DirectPlan.createDirectPlan(context, new PhysicalFromLogicalExplain(jsonPlan))); + private void returnPhysical(final PhysicalPlan plan) throws ExecutionSetupException { + final String jsonPlan = plan.unparse(queryContext.getConfig().getMapper().writer()); + runPhysicalPlan(DirectPlan.createDirectPlan(queryContext, new PhysicalFromLogicalExplain(jsonPlan))); } public static class PhysicalFromLogicalExplain { - public String json; + public final String json; - public PhysicalFromLogicalExplain(String json) { - super(); + public PhysicalFromLogicalExplain(final String json) { this.json = json; } - } - private void parseAndRunPhysicalPlan(String json) throws ExecutionSetupException { + private void parseAndRunPhysicalPlan(final String json) throws ExecutionSetupException { try { - PhysicalPlan plan = context.getPlanReader().readPhysicalPlan(json); + final PhysicalPlan plan = drillbitContext.getPlanReader().readPhysicalPlan(json); runPhysicalPlan(plan); } catch (IOException e) { throw new ForemanSetupException("Failure while parsing physical plan.", e); } } - private void runPhysicalPlan(PhysicalPlan plan) throws ExecutionSetupException { - + private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupException { validatePlan(plan); setupSortMemoryAllocations(plan); acquireQuerySemaphore(plan); final QueryWorkUnit work = getQueryWorkUnit(plan); - - this.context.getWorkBus().setFragmentStatusListener(work.getRootFragment().getHandle().getQueryId(), queryManager); - this.context.getClusterCoordinator().addDrillbitStatusListener(queryManager); - - logger.debug("Submitting fragments to run."); - + final List<PlanFragment> planFragments = work.getFragments(); final PlanFragment rootPlanFragment = work.getRootFragment(); assert queryId == rootPlanFragment.getHandle().getQueryId(); - queryManager.setup(rootPlanFragment.getHandle(), context.getCurrentEndpoint(), work.getFragments().size()); + drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager); + drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager); + + logger.debug("Submitting fragments to run."); // set up the root fragment first so we'll have incoming buffers available. setupRootFragment(rootPlanFragment, initiatingClient, work.getRootOperator()); - setupNonRootFragments(work.getFragments()); - bee.getContext().getAllocator().resetFragmentLimits(); + setupNonRootFragments(planFragments); + drillbitContext.getAllocator().resetFragmentLimits(); // TODO a global effect for this query?!? moveToState(QueryState.RUNNING, null); logger.debug("Fragments running."); - } - private void validatePlan(PhysicalPlan plan) throws ForemanSetupException{ + private static void validatePlan(final PhysicalPlan plan) throws ForemanSetupException { if (plan.getProperties().resultMode != ResultMode.EXEC) { throw new ForemanSetupException(String.format( "Failure running plan. You requested a result mode of %s and a physical plan can only be output as EXEC", @@ -334,369 +363,632 @@ public class Foreman implements Runnable, Closeable, Comparable<Object> { } } - private void setupSortMemoryAllocations(PhysicalPlan plan){ - int sortCount = 0; + private void setupSortMemoryAllocations(final PhysicalPlan plan) { + // look for external sorts + final List<ExternalSort> sortList = new LinkedList<>(); for (PhysicalOperator op : plan.getSortedOperators()) { if (op instanceof ExternalSort) { - sortCount++; + sortList.add((ExternalSort) op); } } - if (sortCount > 0) { - long maxWidthPerNode = context.getOptions().getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val; + // if there are any sorts, compute the maximum allocation, and set it on them + if (sortList.size() > 0) { + final OptionManager optionManager = queryContext.getOptions(); + final long maxWidthPerNode = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val; long maxAllocPerNode = Math.min(DrillConfig.getMaxDirectMemory(), - context.getConfig().getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC)); + queryContext.getConfig().getLong(ExecConstants.TOP_LEVEL_MAX_ALLOC)); maxAllocPerNode = Math.min(maxAllocPerNode, - context.getOptions().getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val); - long maxSortAlloc = maxAllocPerNode / (sortCount * maxWidthPerNode); + optionManager.getOption(ExecConstants.MAX_QUERY_MEMORY_PER_NODE_KEY).num_val); + final long maxSortAlloc = maxAllocPerNode / (sortList.size() * maxWidthPerNode); logger.debug("Max sort alloc: {}", maxSortAlloc); - for (PhysicalOperator op : plan.getSortedOperators()) { - if (op instanceof ExternalSort) { - ((ExternalSort) op).setMaxAllocation(maxSortAlloc); - } + + for(ExternalSort externalSort : sortList) { + externalSort.setMaxAllocation(maxSortAlloc); } } } - private void acquireQuerySemaphore(PhysicalPlan plan) throws ForemanSetupException { - - double size = 0; - for (PhysicalOperator ops : plan.getSortedOperators()) { - size += ops.getCost(); - } - + /** + * This limits the number of "small" and "large" queries that a Drill cluster will run + * simultaneously, if queueing is enabled. If the query is unable to run, this will block + * until it can. Beware that this is called under run(), and so will consume a Thread + * while it waits for the required distributed semaphore. + * + * @param plan the query plan + * @throws ForemanSetupException + */ + private void acquireQuerySemaphore(final PhysicalPlan plan) throws ForemanSetupException { + final OptionManager optionManager = queryContext.getOptions(); + final boolean queuingEnabled = optionManager.getOption(ExecConstants.ENABLE_QUEUE_KEY).bool_val; if (queuingEnabled) { + final long queueThreshold = optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_KEY).num_val; + double totalCost = 0; + for (PhysicalOperator ops : plan.getSortedOperators()) { + totalCost += ops.getCost(); + } + try { - if (size > this.queueThreshold) { - this.lease = largeSemaphore.acquire(this.queueTimeout, TimeUnit.MILLISECONDS); + @SuppressWarnings("resource") + final ClusterCoordinator clusterCoordinator = drillbitContext.getClusterCoordinator(); + DistributedSemaphore distributedSemaphore; + + // get the appropriate semaphore + if (totalCost > queueThreshold) { + final int largeQueue = optionManager.getOption(ExecConstants.LARGE_QUEUE_KEY).num_val.intValue(); + distributedSemaphore = clusterCoordinator.getSemaphore("query.large", largeQueue); } else { - this.lease = smallSemaphore.acquire(this.queueTimeout, TimeUnit.MILLISECONDS); + final int smallQueue = optionManager.getOption(ExecConstants.SMALL_QUEUE_KEY).num_val.intValue(); + distributedSemaphore = clusterCoordinator.getSemaphore("query.small", smallQueue); } + + final long queueTimeout = optionManager.getOption(ExecConstants.QUEUE_TIMEOUT_KEY).num_val; + lease = distributedSemaphore.acquire(queueTimeout, TimeUnit.MILLISECONDS); } catch (Exception e) { throw new ForemanSetupException("Unable to acquire slot for query.", e); } } } - private QueryWorkUnit getQueryWorkUnit(PhysicalPlan plan) throws ExecutionSetupException { - PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next(); - Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null); + private QueryWorkUnit getQueryWorkUnit(final PhysicalPlan plan) throws ExecutionSetupException { + final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next(); + final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null); + final SimpleParallelizer parallelizer = new SimpleParallelizer(queryContext); + final QueryWorkUnit queryWorkUnit = parallelizer.getFragments( + queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(), + queryId, queryContext.getActiveEndpoints(), drillbitContext.getPlanReader(), rootFragment, + initiatingClient.getSession(), queryContext.getQueryDateTimeInfo()); + + if (logger.isInfoEnabled()) { + final StringBuilder sb = new StringBuilder(); + sb.append("PlanFragments for query "); + sb.append(queryId); + sb.append('\n'); + + final List<PlanFragment> planFragments = queryWorkUnit.getFragments(); + final int fragmentCount = planFragments.size(); + int fragmentIndex = 0; + for(PlanFragment planFragment : planFragments) { + final FragmentHandle fragmentHandle = planFragment.getHandle(); + sb.append("PlanFragment("); + sb.append(++fragmentIndex); + sb.append('/'); + sb.append(fragmentCount); + sb.append(") major_fragment_id "); + sb.append(fragmentHandle.getMajorFragmentId()); + sb.append(" minor_fragment_id "); + sb.append(fragmentHandle.getMinorFragmentId()); + sb.append('\n'); + + final DrillbitEndpoint endpointAssignment = planFragment.getAssignment(); + sb.append(" DrillbitEndpoint address "); + sb.append(endpointAssignment.getAddress()); + sb.append('\n'); + + String jsonString = "<<malformed JSON>>"; + sb.append(" fragment_json: "); + final ObjectMapper objectMapper = new ObjectMapper(); + try + { + final Object json = objectMapper.readValue(planFragment.getFragmentJson(), Object.class); + jsonString = objectMapper.defaultPrettyPrintingWriter().writeValueAsString(json); + } catch(Exception e) { + // we've already set jsonString to a fallback value + } + sb.append(jsonString); - SimpleParallelizer parallelizer = new SimpleParallelizer(context); - return parallelizer.getFragments(context.getOptions().getOptionList(), context.getCurrentEndpoint(), - queryId, context.getActiveEndpoints(), context.getPlanReader(), rootFragment, initiatingClient.getSession(), - context.getQueryDateTimeInfo()); + logger.info(sb.toString()); + } + } + + return queryWorkUnit; } /** - * Tells the foreman to move to a new state. Note that - * @param state - * @return + * Manages the end-state processing for Foreman. + * + * End-state processing is tricky, because even if a query appears to succeed, but + * we then encounter a problem during cleanup, we still want to mark the query as + * failed. So we have to construct the successful result we would send, and then + * clean up before we send that result, possibly changing that result if we encounter + * a problem during cleanup. We only send the result when there is nothing left to + * do, so it will account for any possible problems. + * + * The idea here is to make close()ing the ForemanResult do the final cleanup and + * sending. Closing the result must be the last thing that is done by Foreman. */ - private synchronized boolean moveToState(QueryState newState, Exception exception){ - logger.info("State change requested. {} --> {}", state, newState, exception); - outside: switch(state) { + private class ForemanResult implements AutoCloseable { + private QueryState resultState = null; + private Exception resultException = null; + private boolean isClosed = false; + + /** + * Set up the result for a COMPLETED or CANCELED state. + * + * <p>Note that before sending this result, we execute cleanup steps that could + * result in this result still being changed to a FAILED state. + * + * @param queryState one of COMPLETED or CANCELED + */ + public void setCompleted(final QueryState queryState) { + Preconditions.checkArgument((queryState == QueryState.COMPLETED) || (queryState == QueryState.CANCELED)); + Preconditions.checkState(!isClosed); + Preconditions.checkState(resultState == null); + + resultState = queryState; + } + + /** + * Set up the result for a FAILED state. + * + * <p>Failures that occur during cleanup processing will be added as suppressed + * exceptions. + * + * @param exception the exception that led to the FAILED state + */ + public void setFailed(final Exception exception) { + Preconditions.checkArgument(exception != null); + Preconditions.checkState(!isClosed); + Preconditions.checkState(resultState == null); + + resultState = QueryState.FAILED; + resultException = exception; + } + + /** + * Add an exception to the result. All exceptions after the first become suppressed + * exceptions hanging off the first. + * + * @param exception the exception to add + */ + private void addException(final Exception exception) { + Preconditions.checkNotNull(exception); + + if (resultException == null) { + resultException = exception; + } else { + resultException.addSuppressed(exception); + } + } - case PENDING: - // since we're moving out of pending, we can now start accepting other changes in state. - // This guarantees that the first state change is driven by the original thread. - acceptExternalEvents.countDown(); + /** + * Close the given resource, catching and adding any caught exceptions via + * {@link #addException(Exception)}. If an exception is caught, it will change + * the result state to FAILED, regardless of what its current value. + * + * @param autoCloseable the resource to close + */ + private void suppressingClose(final AutoCloseable autoCloseable) { + Preconditions.checkState(!isClosed); + Preconditions.checkState(resultState != null); - if(newState == QueryState.RUNNING){ - recordNewState(QueryState.RUNNING); - return true; + if (autoCloseable == null) { + return; } - // fall through to running behavior. - // - case RUNNING: { + try { + autoCloseable.close(); + } catch(Exception e) { + /* + * Even if the query completed successfully, we'll still report failure if we have + * problems cleaning up. + */ + resultState = QueryState.FAILED; + addException(e); + } + } + + @Override + public void close() { + Preconditions.checkState(!isClosed); + Preconditions.checkState(resultState != null); + + logger.info("foreman cleaning up - status: {}", queryManager.getFragmentStatesAsString()); + + // These are straight forward removals from maps, so they won't throw. + drillbitContext.getWorkBus().removeFragmentStatusListener(queryId); + drillbitContext.getClusterCoordinator().removeDrillbitStatusListener(queryManager); + + suppressingClose(queryContext); + + /* + * We do our best to write the latest state, but even that could fail. If it does, we can't write + * the (possibly newly failing) state, so we continue on anyway. + * + * We only need to do this if the resultState differs from the last recorded state + */ + if (resultState != state) { + suppressingClose(new AutoCloseable() { + @Override + public void close() throws Exception { + recordNewState(resultState); + } + }); + } + + /* + * Construct the response based on the latest resultState. The builder shouldn't fail. + */ + final QueryResult.Builder resultBuilder = QueryResult.newBuilder() + .setIsLastChunk(resultState != QueryState.COMPLETED) // TODO(DRILL-2498) temporary + .setQueryId(queryId) + .setQueryState(resultState); + if (resultException != null) { + final DrillPBError error = ErrorHelper.logAndConvertError(queryContext.getCurrentEndpoint(), + ExceptionUtils.getRootCauseMessage(resultException), resultException, logger); + resultBuilder.addError(error); + } + + /* + * If sending the result fails, we don't really have any way to modify the result we tried to send; + * it is possible it got sent but the result came from a later part of the code path. It is also + * possible the connection has gone away, so this is irrelevant because there's nowhere to + * send anything to. + */ + try { + // send whatever result we ended up with + initiatingClient.sendResult(responseListener, new QueryWritableBatch(resultBuilder.build()), true); + } catch(Exception e) { + addException(e); + logger.warn("Exception sending result to client", resultException); + } + + try { + releaseLease(); + } finally { + isClosed = true; + } + } + } + + /** + * Tells the foreman to move to a new state. + * + * @param newState the state to move to + * @param exception if not null, the exception that drove this state transition (usually a failure) + */ + private synchronized void moveToState(final QueryState newState, final Exception exception) { + logger.info("State change requested. {} --> {}", state, newState, exception); + switch(state) { + case PENDING: + if (newState == QueryState.RUNNING) { + recordNewState(QueryState.RUNNING); + return; + } - switch(newState){ + //$FALL-THROUGH$ - case CANCELED: { + case RUNNING: { + /* + * For cases that cancel executing fragments, we have to record the new state first, because + * the cancellation of the local root fragment will cause this to be called recursively. + */ + switch(newState) { + case CANCELLATION_REQUESTED: { assert exception == null; - recordNewState(QueryState.CANCELED); - cancelExecutingFragments(); - QueryResult result = QueryResult.newBuilder() // - .setQueryId(queryId) // - .setQueryState(QueryState.CANCELED) // - .setIsLastChunk(true) // - .build(); - - // immediately notify client that cancellation is taking place, final clean-up happens when foreman reaches to - // a terminal state(completed, failed) - initiatingClient.sendResult(responseListener, new QueryWritableBatch(result), true); - return true; + queryManager.markEndTime(); + recordNewState(QueryState.CANCELLATION_REQUESTED); + queryManager.cancelExecutingFragments(drillbitContext, rootRunner); + foremanResult.setCompleted(QueryState.CANCELED); + /* + * We don't close the foremanResult until we've gotten acknowledgements, which + * happens below in the case for current state == CANCELLATION_REQUESTED. + */ + return; } case COMPLETED: { assert exception == null; + queryManager.markEndTime(); recordNewState(QueryState.COMPLETED); - QueryResult result = QueryResult // - .newBuilder() // - .setQueryState(QueryState.COMPLETED) // - .setQueryId(queryId) // - .build(); - cleanup(result); - return true; + foremanResult.setCompleted(QueryState.COMPLETED); + foremanResult.close(); + return; } - - case FAILED: + case FAILED: { assert exception != null; + queryManager.markEndTime(); recordNewState(QueryState.FAILED); - cancelExecutingFragments(); - DrillPBError error = ErrorHelper.logAndConvertError(context.getCurrentEndpoint(), - ExceptionUtils.getRootCauseMessage(exception), exception, logger); - QueryResult result = QueryResult // - .newBuilder() // - .addError(error) // - .setIsLastChunk(true) // - .setQueryState(QueryState.FAILED) // - .setQueryId(queryId) // - .build(); - cleanup(result); - return true; - default: - break outside; + queryManager.cancelExecutingFragments(drillbitContext, rootRunner); + foremanResult.setFailed(exception); + foremanResult.close(); + return; + } + default: + throw new IllegalStateException("illegal transition from RUNNING to " + newState); } } + case CANCELLATION_REQUESTED: + if ((newState == QueryState.CANCELED) || (newState == QueryState.COMPLETED) + || (newState == QueryState.FAILED)) { + /* + * These amount to a completion of the cancellation requests' cleanup; now we + * can clean up and send the result. + */ + foremanResult.close(); + } + return; + case CANCELED: case COMPLETED: - case FAILED: { - // no op. - logger.warn("Dropping request to move to {} state as query is already at {} state (which is terminal).", newState, state, exception); - return false; - } - - } - - throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s", state.name(), newState.name())); - } - - private void cancelExecutingFragments(){ - - // Stop all framgents with a currently active status. - List<FragmentData> fragments = getStatus().getFragmentData(); - Collections.sort(fragments, new Comparator<FragmentData>() { - @Override - public int compare(FragmentData o1, FragmentData o2) { - return o2.getHandle().getMajorFragmentId() - o1.getHandle().getMajorFragmentId(); - } - }); - for(FragmentData data: fragments){ - FragmentHandle handle = data.getStatus().getHandle(); - switch(data.getStatus().getProfile().getState()){ - case SENDING: - case AWAITING_ALLOCATION: - case RUNNING: - if(data.isLocal()){ - if(rootRunner != null){ - rootRunner.cancel(); - } - }else{ - bee.getContext().getController().getTunnel(data.getEndpoint()).cancelFragment(new CancelListener(data.getEndpoint(), handle), handle); - } - break; - default: - break; - } + case FAILED: + logger.warn("Dropping request to move to {} state as query is already at {} state (which is terminal).", + newState, state); + return; } + throw new IllegalStateException(String.format("Failure trying to change states: %s --> %s", + state.name(), newState.name())); } - private QueryStatus getStatus(){ - return queryManager.getStatus(); - } - - private void recordNewState(QueryState newState){ - this.state = newState; - getStatus().updateQueryStateInStore(newState); + private void recordNewState(final QueryState newState) { + state = newState; + queryManager.updateQueryStateInStore(newState); } - private void runSQL(String sql) throws ExecutionSetupException { - DrillSqlWorker sqlWorker = new DrillSqlWorker(context); - Pointer<String> textPlan = new Pointer<>(); - PhysicalPlan plan = sqlWorker.getPlan(sql, textPlan); - getStatus().setPlanText(textPlan.value); + private void runSQL(final String sql) throws ExecutionSetupException { + final DrillSqlWorker sqlWorker = new DrillSqlWorker(queryContext); + final Pointer<String> textPlan = new Pointer<>(); + final PhysicalPlan plan = sqlWorker.getPlan(sql, textPlan); + queryManager.setPlanText(textPlan.value); runPhysicalPlan(plan); } - private PhysicalPlan convert(LogicalPlan plan) throws OptimizerException { + private PhysicalPlan convert(final LogicalPlan plan) throws OptimizerException { if (logger.isDebugEnabled()) { - logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(context.getConfig())); + logger.debug("Converting logical plan {}.", plan.toJsonStringSafe(queryContext.getConfig())); } - return new BasicOptimizer(DrillConfig.create(), context, initiatingClient).optimize( - new BasicOptimizer.BasicOptimizationContext(context), plan); + return new BasicOptimizer(queryContext).optimize( + new BasicOptimizer.BasicOptimizationContext(queryContext), plan); } public QueryId getQueryId() { return queryId; } - @Override - public void close() throws IOException { - } - - public QueryStatus getQueryStatus() { - return this.queryManager.getStatus(); - } - - private void setupRootFragment(PlanFragment rootFragment, UserClientConnection rootClient, FragmentRoot rootOperator) throws ExecutionSetupException { - FragmentContext rootContext = new FragmentContext(bee.getContext(), rootFragment, rootClient, bee.getContext() - .getFunctionImplementationRegistry()); - - IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext); - + /** + * Set up the root fragment (which will run locally), and submit it for execution. + * + * @param rootFragment + * @param rootClient + * @param rootOperator + * @throws ExecutionSetupException + */ + private void setupRootFragment(final PlanFragment rootFragment, final UserClientConnection rootClient, + final FragmentRoot rootOperator) throws ExecutionSetupException { + @SuppressWarnings("resource") + final FragmentContext rootContext = new FragmentContext(drillbitContext, rootFragment, rootClient, + drillbitContext.getFunctionImplementationRegistry()); + @SuppressWarnings("resource") + final IncomingBuffers buffers = new IncomingBuffers(rootOperator, rootContext); rootContext.setBuffers(buffers); - // add fragment to local node. queryManager.addFragmentStatusTracker(rootFragment, true); - this.rootRunner = new FragmentExecutor(rootContext, bee, rootOperator, queryManager.getRootStatusHandler(rootContext, rootFragment)); - RootFragmentManager fragmentManager = new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner); + rootRunner = new FragmentExecutor(rootContext, rootOperator, + queryManager.getRootStatusHandler(rootContext)); + final RootFragmentManager fragmentManager = + new RootFragmentManager(rootFragment.getHandle(), buffers, rootRunner); if (buffers.isDone()) { // if we don't have to wait for any incoming data, start the fragment runner. bee.addFragmentRunner(fragmentManager.getRunnable()); } else { // if we do, record the fragment manager in the workBus. - bee.getContext().getWorkBus().setFragmentManager(fragmentManager); + // TODO aren't we managing our own work? What does this do? It looks like this will never get run + drillbitContext.getWorkBus().addFragmentManager(fragmentManager); } } - private void setupNonRootFragments(Collection<PlanFragment> fragments) throws ForemanException{ - Multimap<DrillbitEndpoint, PlanFragment> leafFragmentMap = ArrayListMultimap.create(); - Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = ArrayListMultimap.create(); + /** + * Set up the non-root fragments for execution. Some may be local, and some may be remote. + * Messages are sent immediately, so they may start returning data even before we complete this. + * + * @param fragments the fragments + * @throws ForemanException + */ + private void setupNonRootFragments(final Collection<PlanFragment> fragments) throws ForemanException { + /* + * We will send a single message to each endpoint, regardless of how many fragments will be + * executed there. We need to start up the intermediate fragments first so that they will be + * ready once the leaf fragments start producing data. To satisfy both of these, we will + * make a pass through the fragments and put them into these two maps according to their + * leaf/intermediate state, as well as their target drillbit. + */ + final Multimap<DrillbitEndpoint, PlanFragment> leafFragmentMap = ArrayListMultimap.create(); + final Multimap<DrillbitEndpoint, PlanFragment> intFragmentMap = ArrayListMultimap.create(); // record all fragments for status purposes. - for (PlanFragment f : fragments) { + for (PlanFragment planFragment : fragments) { // logger.debug("Tracking intermediate remote node {} with data {}", f.getAssignment(), f.getFragmentJson()); - queryManager.addFragmentStatusTracker(f, false); - if (f.getLeafFragment()) { - leafFragmentMap.put(f.getAssignment(), f); + queryManager.addFragmentStatusTracker(planFragment, false); + if (planFragment.getLeafFragment()) { + leafFragmentMap.put(planFragment.getAssignment(), planFragment); } else { - intFragmentMap.put(f.getAssignment(), f); + intFragmentMap.put(planFragment.getAssignment(), planFragment); } } - CountDownLatch latch = new CountDownLatch(intFragmentMap.keySet().size()); + /* + * We need to wait for the intermediates to be sent so that they'll be set up by the time + * the leaves start producing data. We'll use this latch to wait for the responses. + * + * However, in order not to hang the process if any of the RPC requests fails, we always + * count down (see FragmentSubmitFailures), but we count the number of failures so that we'll + * know if any submissions did fail. + */ + final CountDownLatch endpointLatch = new CountDownLatch(intFragmentMap.keySet().size()); + final FragmentSubmitFailures fragmentSubmitFailures = new FragmentSubmitFailures(); // send remote intermediate fragments for (DrillbitEndpoint ep : intFragmentMap.keySet()) { - sendRemoteFragments(ep, intFragmentMap.get(ep), latch); + sendRemoteFragments(ep, intFragmentMap.get(ep), endpointLatch, fragmentSubmitFailures); } - // wait for send complete - try { - latch.await(); - } catch (InterruptedException e) { - throw new ForemanException("Interrupted while waiting to complete send of remote fragments.", e); + // wait for the status of all requests sent above to be known + boolean ready = false; + while(!ready) { + try { + endpointLatch.await(); + ready = true; + } catch (InterruptedException e) { + // if we weren't ready, the while loop will continue to wait + } } - // send remote (leaf) fragments. - for (DrillbitEndpoint ep : leafFragmentMap.keySet()) { - sendRemoteFragments(ep, leafFragmentMap.get(ep), null); + // if any of the intermediate fragment submissions failed, fail the query + final List<FragmentSubmitFailures.SubmissionException> submissionExceptions = + fragmentSubmitFailures.submissionExceptions; + if (submissionExceptions.size() > 0) { + throw new ForemanSetupException("Error setting up remote intermediate fragment execution", + submissionExceptions.get(0).rpcException); + // TODO indicate the failing drillbit? + // TODO report on all the failures? } - } - public RpcOutcomeListener<Ack> getSubmitListener(DrillbitEndpoint endpoint, InitializeFragments value, CountDownLatch latch){ - return new FragmentSubmitListener(endpoint, value, latch); + /* + * Send the remote (leaf) fragments; we don't wait for these. Any problems will come in through + * the regular sendListener event delivery. + */ + for (DrillbitEndpoint ep : leafFragmentMap.keySet()) { + sendRemoteFragments(ep, leafFragmentMap.get(ep), null, null); + } } - private void sendRemoteFragments(DrillbitEndpoint assignment, Collection<PlanFragment> fragments, CountDownLatch latch){ - Controller controller = bee.getContext().getController(); - InitializeFragments.Builder fb = InitializeFragments.newBuilder(); - for(PlanFragment f : fragments){ - fb.addFragment(f); + /** + * Send all the remote fragments belonging to a single target drillbit in one request. + * + * @param assignment the drillbit assigned to these fragments + * @param fragments the set of fragments + * @param latch the countdown latch used to track the requests to all endpoints + * @param fragmentSubmitFailures the submission failure counter used to track the requests to all endpoints + */ + private void sendRemoteFragments(final DrillbitEndpoint assignment, final Collection<PlanFragment> fragments, + final CountDownLatch latch, final FragmentSubmitFailures fragmentSubmitFailures) { + @SuppressWarnings("resource") + final Controller controller = drillbitContext.getController(); + final InitializeFragments.Builder fb = InitializeFragments.newBuilder(); + for(PlanFragment planFragment : fragments) { + fb.addFragment(planFragment); } - InitializeFragments initFrags = fb.build(); + final InitializeFragments initFrags = fb.build(); logger.debug("Sending remote fragments to node {} with data {}", assignment, initFrags); - FragmentSubmitListener listener = new FragmentSubmitListener(assignment, initFrags, latch); + final FragmentSubmitListener listener = + new FragmentSubmitListener(assignment, initFrags, latch, fragmentSubmitFailures); controller.getTunnel(assignment).sendFragments(listener, initFrags); } - public QueryState getState(){ + public QueryState getState() { return state; } - private class FragmentSubmitListener extends EndpointListener<Ack, InitializeFragments>{ + /** + * Used by {@link FragmentSubmitListener} to track the number of submission failures. + */ + private static class FragmentSubmitFailures { + static class SubmissionException { +// final DrillbitEndpoint drillbitEndpoint; + final RpcException rpcException; + + SubmissionException(@SuppressWarnings("unused") final DrillbitEndpoint drillbitEndpoint, + final RpcException rpcException) { +// this.drillbitEndpoint = drillbitEndpoint; + this.rpcException = rpcException; + } + } + + final List<SubmissionException> submissionExceptions = new LinkedList<>(); + + void addFailure(final DrillbitEndpoint drillbitEndpoint, final RpcException rpcException) { + submissionExceptions.add(new SubmissionException(drillbitEndpoint, rpcException)); + } + } - private CountDownLatch latch; + private class FragmentSubmitListener extends EndpointListener<Ack, InitializeFragments> { + private final CountDownLatch latch; + private final FragmentSubmitFailures fragmentSubmitFailures; - public FragmentSubmitListener(DrillbitEndpoint endpoint, InitializeFragments value, CountDownLatch latch) { + /** + * Constructor. + * + * @param endpoint the endpoint for the submission + * @param value the initialize fragments message + * @param latch the latch to count down when the status is known; may be null + * @param fragmentSubmitFailures the counter to use for failures; must be non-null iff latch is non-null + */ + public FragmentSubmitListener(final DrillbitEndpoint endpoint, final InitializeFragments value, + final CountDownLatch latch, final FragmentSubmitFailures fragmentSubmitFailures) { super(endpoint, value); + Preconditions.checkState((latch == null) == (fragmentSubmitFailures == null)); this.latch = latch; + this.fragmentSubmitFailures = fragmentSubmitFailures; } @Override - public void success(Ack ack, ByteBuf byteBuf) { + public void success(final Ack ack, final ByteBuf byteBuf) { if (latch != null) { latch.countDown(); } } @Override - public void failed(RpcException ex) { - logger.debug("Failure while sending fragment. Stopping query.", ex); - moveToState(QueryState.FAILED, ex); + public void failed(final RpcException ex) { + if (latch != null) { + fragmentSubmitFailures.addFailure(endpoint, ex); + latch.countDown(); + } else { + // since this won't be waited on, we can wait to deliver this event once the Foreman is ready + logger.debug("Failure while sending fragment. Stopping query.", ex); + stateListener.moveToState(QueryState.FAILED, ex); + } } - } - + /** + * Provides gated access to state transitions. + * + * <p>The StateListener waits on a latch before delivery state transitions to the Foreman. The + * latch will be tripped when the Foreman is sufficiently set up that it can receive and process + * external events from other threads. + */ public class StateListener { - public boolean moveToState(QueryState newState, Exception ex){ - try { - acceptExternalEvents.await(); - } catch(InterruptedException e){ - logger.warn("Interrupted while waiting to move state.", e); - return false; + /** + * Move the Foreman to the specified new state. + * + * @param newState the state to move to + * @param ex if moving to a failure state, the exception that led to the failure; used for reporting + * to the user + */ + public void moveToState(final QueryState newState, final Exception ex) { + boolean ready = false; + while(!ready) { + try { + acceptExternalEvents.await(); + ready = true; + } catch(InterruptedException e) { + // if we're still not ready, the while loop will cause us to wait again + logger.warn("Interrupted while waiting to move state.", e); + } } - return Foreman.this.moveToState(newState, ex); + Foreman.this.moveToState(newState, ex); } } - - @Override - public int compareTo(Object o) { - return hashCode() - o.hashCode(); - } - + /** + * Listens for the status of the RPC response sent to the user for the query. + */ private class ResponseSendListener extends BaseRpcOutcomeListener<Ack> { @Override - public void failed(RpcException ex) { - logger - .info( - "Failure while trying communicate query result to initating client. This would happen if a client is disconnected before response notice can be sent.", - ex); - moveToState(QueryState.FAILED, ex); + public void failed(final RpcException ex) { + logger.info( + "Failure while trying communicate query result to initating client. This would happen if a client is disconnected before response notice can be sent.", + ex); + stateListener.moveToState(QueryState.FAILED, ex); } } - - - private class CancelListener extends EndpointListener<Ack, FragmentHandle>{ - - public CancelListener(DrillbitEndpoint endpoint, FragmentHandle handle) { - super(endpoint, handle); - } - - @Override - public void failed(RpcException ex) { - logger.error("Failure while attempting to cancel fragment {} on endpoint {}.", value, endpoint, ex); - } - - @Override - public void success(Ack value, ByteBuf buf) { - if(!value.getOk()){ - logger.warn("Remote node {} responded negative on cancellation request for fragment {}.", endpoint, value); - } - // do nothing. - } - - } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java index 52fd0a92e..433ab260a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentData.java @@ -29,19 +29,21 @@ public class FragmentData { private volatile long lastStatusUpdate = 0; private final DrillbitEndpoint endpoint; - public FragmentData(FragmentHandle handle, DrillbitEndpoint endpoint, boolean isLocal) { - super(); - MinorFragmentProfile f = MinorFragmentProfile.newBuilder() // - .setState(FragmentState.SENDING) // - .setMinorFragmentId(handle.getMinorFragmentId()) // - .setEndpoint(endpoint) // - .build(); - this.status = FragmentStatus.newBuilder().setHandle(handle).setProfile(f).build(); + public FragmentData(final FragmentHandle handle, final DrillbitEndpoint endpoint, final boolean isLocal) { this.endpoint = endpoint; this.isLocal = isLocal; + final MinorFragmentProfile f = MinorFragmentProfile.newBuilder() + .setState(FragmentState.SENDING) + .setMinorFragmentId(handle.getMinorFragmentId()) + .setEndpoint(endpoint) + .build(); + status = FragmentStatus.newBuilder() + .setHandle(handle) + .setProfile(f) + .build(); } - public void setStatus(FragmentStatus status){ + public void setStatus(final FragmentStatus status) { this.status = status; lastStatusUpdate = System.currentTimeMillis(); } @@ -54,15 +56,11 @@ public class FragmentData { return isLocal; } - public long getLastStatusUpdate() { - return lastStatusUpdate; - } - public DrillbitEndpoint getEndpoint() { return endpoint; } - public FragmentHandle getHandle(){ + public FragmentHandle getHandle() { return status.getHandle(); } @@ -71,7 +69,4 @@ public class FragmentData { return "FragmentData [isLocal=" + isLocal + ", status=" + status + ", lastStatusUpdate=" + lastStatusUpdate + ", endpoint=" + endpoint + "]"; } - - - }
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java index 6a719d2a1..b2a40aea2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/FragmentStatusListener.java @@ -20,7 +20,5 @@ package org.apache.drill.exec.work.foreman; import org.apache.drill.exec.proto.BitControl.FragmentStatus; public interface FragmentStatusListener { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentStatusListener.class); - public void statusUpdate(FragmentStatus status); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java index 2de3592b7..8626d5b54 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java @@ -17,144 +17,353 @@ */ package org.apache.drill.exec.work.foreman; +import io.netty.buffer.ByteBuf; + +import java.io.IOException; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; -import com.google.common.base.Preconditions; +import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.proto.BitControl.FragmentStatus; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; -import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; +import org.apache.drill.exec.proto.SchemaUserBitShared; +import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile; import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.UserBitShared.QueryInfo; +import org.apache.drill.exec.proto.UserBitShared.QueryProfile; import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; import org.apache.drill.exec.proto.UserProtos.RunQuery; import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.RemoteRpcException; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.control.Controller; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.store.sys.PStore; +import org.apache.drill.exec.store.sys.PStoreConfig; import org.apache.drill.exec.store.sys.PStoreProvider; +import org.apache.drill.exec.work.EndpointListener; import org.apache.drill.exec.work.foreman.Foreman.StateListener; import org.apache.drill.exec.work.fragment.AbstractStatusReporter; +import org.apache.drill.exec.work.fragment.FragmentExecutor; +import com.carrotsearch.hppc.IntObjectOpenHashMap; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; - /** * Each Foreman holds its own QueryManager. This manages the events associated with execution of a particular query across all fragments. */ -public class QueryManager implements FragmentStatusListener, DrillbitStatusListener{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryManager.class); - private final Set<DrillbitEndpoint> includedBits; +public class QueryManager implements FragmentStatusListener, DrillbitStatusListener { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryManager.class); + + public static final PStoreConfig<QueryProfile> QUERY_PROFILE = PStoreConfig. + newProtoBuilder(SchemaUserBitShared.QueryProfile.WRITE, SchemaUserBitShared.QueryProfile.MERGE) + .name("profiles") + .blob() + .max(100) + .build(); - private final QueryStatus status; + public static final PStoreConfig<QueryInfo> RUNNING_QUERY_INFO = PStoreConfig. + newProtoBuilder(SchemaUserBitShared.QueryInfo.WRITE, SchemaUserBitShared.QueryInfo.MERGE) + .name("running") + .ephemeral() + .build(); + + private final Set<DrillbitEndpoint> includedBits; private final StateListener stateListener; - private final AtomicInteger remainingFragmentCount; private final QueryId queryId; + private final String stringQueryId; + private final RunQuery runQuery; + private final Foreman foreman; + + /* + * Doesn't need to be thread safe as fragmentDataMap is generated in a single thread and then + * accessed by multiple threads for reads only. + */ + private final IntObjectOpenHashMap<IntObjectOpenHashMap<FragmentData>> fragmentDataMap = + new IntObjectOpenHashMap<>(); + private final List<FragmentData> fragmentDataSet = Lists.newArrayList(); + + private final PStore<QueryProfile> profilePStore; + private final PStore<QueryInfo> profileEStore; - public QueryManager(QueryId id, RunQuery query, PStoreProvider pStoreProvider, StateListener stateListener, Foreman foreman) { + // the following mutable variables are used to capture ongoing query status + private String planText; + private long startTime; + private long endTime; + private final AtomicInteger finishedFragments = new AtomicInteger(0); + + public QueryManager(final QueryId queryId, final RunQuery runQuery, final PStoreProvider pStoreProvider, + final StateListener stateListener, final Foreman foreman) { + this.queryId = queryId; + this.runQuery = runQuery; this.stateListener = stateListener; - this.queryId = id; - this.remainingFragmentCount = new AtomicInteger(0); - this.status = new QueryStatus(query, id, pStoreProvider, foreman); - this.includedBits = Sets.newHashSet(); - } + this.foreman = foreman; + + stringQueryId = QueryIdHelper.getQueryId(queryId); + try { + profilePStore = pStoreProvider.getStore(QUERY_PROFILE); + profileEStore = pStoreProvider.getStore(RUNNING_QUERY_INFO); + } catch (IOException e) { + throw new DrillRuntimeException(e); + } - public QueryStatus getStatus(){ - return status; + includedBits = Sets.newHashSet(); } @Override - public void drillbitRegistered(Set<DrillbitEndpoint> registeredDrillbits) { + public void drillbitRegistered(final Set<DrillbitEndpoint> registeredDrillbits) { } @Override - public void drillbitUnregistered(Set<DrillbitEndpoint> unregisteredDrillbits) { - for(DrillbitEndpoint ep : unregisteredDrillbits){ - if(this.includedBits.contains(ep)){ - logger.warn("Drillbit {} no longer registered in cluster. Canceling query {}", ep.getAddress() + ep.getControlPort(), QueryIdHelper.getQueryId(queryId)); - this.stateListener.moveToState(QueryState.FAILED, new ForemanException("One more more nodes lost connectivity during query. Identified node was " + ep.getAddress())); + public void drillbitUnregistered(final Set<DrillbitEndpoint> unregisteredDrillbits) { + for(DrillbitEndpoint ep : unregisteredDrillbits) { + if (includedBits.contains(ep)) { + logger.warn("Drillbit {} no longer registered in cluster. Canceling query {}", + ep.getAddress() + ep.getControlPort(), QueryIdHelper.getQueryId(queryId)); + stateListener.moveToState(QueryState.FAILED, + new ForemanException("One more more nodes lost connectivity during query. Identified node was " + + ep.getAddress())); } } } - @Override - public void statusUpdate(FragmentStatus status) { - - logger.debug("New fragment status was provided to Foreman of {}", status); - switch(status.getProfile().getState()){ + public void statusUpdate(final FragmentStatus status) { + logger.debug("New fragment status was provided to QueryManager of {}", status); + switch(status.getProfile().getState()) { case AWAITING_ALLOCATION: + case RUNNING: updateFragmentStatus(status); break; + + case FINISHED: + fragmentDone(status); + break; + case CANCELLED: - //TODO: define a new query state to distinguish the state of early termination from cancellation + /* + * TODO + * This doesn't seem right; shouldn't this be similar to FAILED? + * and this means once all are cancelled we'll get to COMPLETED, even though some weren't? + * + * So, we add it to the finishedFragments if we ourselves we receive a statusUpdate (from where), + * but not if our cancellation listener gets it? + */ + // TODO(DRILL-2370) we might not get these, so we need to take extra care for cleanup fragmentDone(status); break; + case FAILED: stateListener.moveToState(QueryState.FAILED, new RemoteRpcException(status.getProfile().getError())); break; - case FINISHED: - fragmentDone(status); - break; - case RUNNING: - updateFragmentStatus(status); - break; + default: throw new UnsupportedOperationException(String.format("Received status of %s", status)); } } - private void updateFragmentStatus(FragmentStatus status){ - this.status.updateFragmentStatus(status); + private void updateFragmentStatus(final FragmentStatus fragmentStatus) { + final FragmentHandle fragmentHandle = fragmentStatus.getHandle(); + final int majorFragmentId = fragmentHandle.getMajorFragmentId(); + final int minorFragmentId = fragmentHandle.getMinorFragmentId(); + fragmentDataMap.get(majorFragmentId).get(minorFragmentId).setStatus(fragmentStatus); } - private void fragmentDone(FragmentStatus status){ - this.status.incrementFinishedFragments(); - int remaining = remainingFragmentCount.decrementAndGet(); + private void fragmentDone(final FragmentStatus status) { updateFragmentStatus(status); + + final int finishedFragments = this.finishedFragments.incrementAndGet(); + final int totalFragments = fragmentDataSet.size(); + assert finishedFragments <= totalFragments : "The finished fragment count exceeds the total fragment count"; + final int remaining = totalFragments - finishedFragments; logger.debug("waiting for {} fragments", remaining); - if(remaining == 0){ + if (remaining == 0) { + // this target state may be adjusted in moveToState() based on current FAILURE/CANCELLATION_REQUESTED status stateListener.moveToState(QueryState.COMPLETED, null); } } - public void setup(FragmentHandle rootFragmentHandle, DrillbitEndpoint localIdentity, int countOfNonRootFragments){ - remainingFragmentCount.set(countOfNonRootFragments + 1); - logger.debug("foreman is waiting for {} fragments to finish", countOfNonRootFragments + 1); - status.add(new FragmentData(rootFragmentHandle, localIdentity, true)); - this.status.setTotalFragments(countOfNonRootFragments + 1); + private void addFragment(final FragmentData fragmentData) { + final FragmentHandle fragmentHandle = fragmentData.getHandle(); + final int majorFragmentId = fragmentHandle.getMajorFragmentId(); + final int minorFragmentId = fragmentHandle.getMinorFragmentId(); - List<FragmentData> fragments = status.getFragmentData(); - for (FragmentData fragment : fragments) { - this.includedBits.add(fragment.getEndpoint()); + IntObjectOpenHashMap<FragmentData> minorMap = fragmentDataMap.get(majorFragmentId); + if (minorMap == null) { + minorMap = new IntObjectOpenHashMap<>(); + fragmentDataMap.put(majorFragmentId, minorMap); } + minorMap.put(minorFragmentId, fragmentData); + fragmentDataSet.add(fragmentData); + + // keep track of all the drill bits that are used by this query + includedBits.add(fragmentData.getEndpoint()); + } + + public String getFragmentStatesAsString() { + return fragmentDataMap.toString(); } - public void addFragmentStatusTracker(PlanFragment fragment, boolean isRoot){ - addFragmentStatusTracker(fragment.getHandle(), fragment.getAssignment(), isRoot); + void addFragmentStatusTracker(final PlanFragment fragment, final boolean isRoot) { + addFragment(new FragmentData(fragment.getHandle(), fragment.getAssignment(), isRoot)); } - public void addFragmentStatusTracker(FragmentHandle handle, DrillbitEndpoint node, boolean isRoot){ - status.add(new FragmentData(handle, node, isRoot)); + /** + * Stop all fragments with a currently active status. + */ + void cancelExecutingFragments(final DrillbitContext drillbitContext, final FragmentExecutor rootRunner) { + final Controller controller = drillbitContext.getController(); + for(FragmentData data : fragmentDataSet) { + final FragmentStatus fragmentStatus = data.getStatus(); + switch(fragmentStatus.getProfile().getState()) { + case SENDING: + case AWAITING_ALLOCATION: + case RUNNING: + if (rootRunner != null) { + rootRunner.cancel(); + } else { + final DrillbitEndpoint endpoint = data.getEndpoint(); + final FragmentHandle handle = fragmentStatus.getHandle(); + // TODO is the CancelListener redundant? Does the FragmentStatusListener get notified of the same? + controller.getTunnel(endpoint).cancelFragment(new CancelListener(endpoint, handle), handle); + } + break; + + case FINISHED: + case CANCELLED: + case FAILED: + // nothing to do + break; + } + } } - public RootStatusReporter getRootStatusHandler(FragmentContext context, PlanFragment fragment){ - return new RootStatusReporter(context, fragment); + /* + * This assumes that the FragmentStatusListener implementation takes action when it hears + * that the target fragment has been cancelled. As a result, this listener doesn't do anything + * but log messages. + */ + private class CancelListener extends EndpointListener<Ack, FragmentHandle> { + public CancelListener(final DrillbitEndpoint endpoint, final FragmentHandle handle) { + super(endpoint, handle); + } + + @Override + public void failed(final RpcException ex) { + logger.error("Failure while attempting to cancel fragment {} on endpoint {}.", value, endpoint, ex); + } + + @Override + public void success(final Ack value, final ByteBuf buf) { + if (!value.getOk()) { + logger.warn("Remote node {} responded negative on cancellation request for fragment {}.", endpoint, value); + } + } } - class RootStatusReporter extends AbstractStatusReporter{ + public RootStatusReporter getRootStatusHandler(final FragmentContext context) { + return new RootStatusReporter(context); + } - private RootStatusReporter(FragmentContext context, PlanFragment fragment){ + class RootStatusReporter extends AbstractStatusReporter { + private RootStatusReporter(final FragmentContext context) { super(context); } @Override - protected void statusChange(FragmentHandle handle, FragmentStatus status) { + protected void statusChange(final FragmentHandle handle, final FragmentStatus status) { statusUpdate(status); } + } + + QueryState updateQueryStateInStore(final QueryState queryState) { + switch (queryState) { + case PENDING: + case RUNNING: + case CANCELLATION_REQUESTED: + profileEStore.put(stringQueryId, getQueryInfo()); // store as ephemeral query profile. + break; + + case COMPLETED: + case CANCELED: + case FAILED: + try { + profileEStore.delete(stringQueryId); + } catch(Exception e) { + logger.warn("Failure while trying to delete the estore profile for this query.", e); + } + + // TODO(DRILL-2362) when do these ever get deleted? + profilePStore.put(stringQueryId, getQueryProfile()); + break; + + default: + throw new IllegalStateException("unrecognized queryState " + queryState); + } + + return queryState; + } + + private QueryInfo getQueryInfo() { + return QueryInfo.newBuilder() + .setQuery(runQuery.getPlan()) + .setState(foreman.getState()) + .setForeman(foreman.getQueryContext().getCurrentEndpoint()) + .setStart(startTime) + .build(); + } + + public QueryProfile getQueryProfile() { + final QueryProfile.Builder profileBuilder = QueryProfile.newBuilder() + .setQuery(runQuery.getPlan()) + .setType(runQuery.getType()) + .setId(queryId) + .setState(foreman.getState()) + .setForeman(foreman.getQueryContext().getCurrentEndpoint()) + .setStart(startTime) + .setEnd(endTime) + .setTotalFragments(fragmentDataSet.size()) + .setFinishedFragments(finishedFragments.get()); + + if (planText != null) { + profileBuilder.setPlan(planText); + } + + for (int i = 0; i < fragmentDataMap.allocated.length; i++) { + if (fragmentDataMap.allocated[i]) { + final int majorFragmentId = fragmentDataMap.keys[i]; + final IntObjectOpenHashMap<FragmentData> minorMap = + (IntObjectOpenHashMap<FragmentData>) ((Object[]) fragmentDataMap.values)[i]; + final MajorFragmentProfile.Builder fb = MajorFragmentProfile.newBuilder() + .setMajorFragmentId(majorFragmentId); + for (int v = 0; v < minorMap.allocated.length; v++) { + if (minorMap.allocated[v]) { + final FragmentData data = (FragmentData) ((Object[]) minorMap.values)[v]; + fb.addMinorFragmentProfile(data.getStatus().getProfile()); + } + } + profileBuilder.addFragmentProfile(fb); + } + } + return profileBuilder.build(); + } + + void setPlanText(final String planText) { + this.planText = planText; + } + void markStartTime() { + startTime = System.currentTimeMillis(); } + void markEndTime() { + endTime = System.currentTimeMillis(); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java deleted file mode 100644 index 4e18da631..000000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryStatus.java +++ /dev/null @@ -1,255 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.work.foreman; - -import java.io.IOException; -import java.util.List; - -import org.apache.drill.common.exceptions.DrillRuntimeException; -import org.apache.drill.exec.proto.BitControl.FragmentStatus; -import org.apache.drill.exec.proto.SchemaUserBitShared; -import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile; -import org.apache.drill.exec.proto.UserBitShared.QueryId; -import org.apache.drill.exec.proto.UserBitShared.QueryInfo; -import org.apache.drill.exec.proto.UserBitShared.QueryProfile; -import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; -import org.apache.drill.exec.proto.UserProtos.RunQuery; -import org.apache.drill.exec.proto.helper.QueryIdHelper; -import org.apache.drill.exec.store.sys.EStore; -import org.apache.drill.exec.store.sys.PStore; -import org.apache.drill.exec.store.sys.PStoreConfig; -import org.apache.drill.exec.store.sys.PStoreProvider; - -import com.carrotsearch.hppc.IntObjectOpenHashMap; -import com.google.common.collect.Lists; - -public class QueryStatus { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryStatus.class); - - public static final PStoreConfig<QueryProfile> QUERY_PROFILE = PStoreConfig. - newProtoBuilder(SchemaUserBitShared.QueryProfile.WRITE, SchemaUserBitShared.QueryProfile.MERGE) // - .name("profiles") // - .blob() // - .max(100) // - .build(); - - public static final PStoreConfig<QueryInfo> RUNNING_QUERY_INFO = PStoreConfig. - newProtoBuilder(SchemaUserBitShared.QueryInfo.WRITE, SchemaUserBitShared.QueryInfo.MERGE).name("running").ephemeral().build(); - - // doesn't need to be thread safe as fragmentDataMap is generated in a single thread and then accessed by multiple threads for reads only. - private IntObjectOpenHashMap<IntObjectOpenHashMap<FragmentData>> fragmentDataMap = new IntObjectOpenHashMap<IntObjectOpenHashMap<FragmentData>>(); - private List<FragmentData> fragmentDataSet = Lists.newArrayList(); - - private final String queryId; - private final QueryId id; - private RunQuery query; - private String planText; - private Foreman foreman; - private long startTime; - private long endTime; - private int totalFragments; - private int finishedFragments = 0; - - private final PStore<QueryProfile> profilePStore; - private final PStore<QueryInfo> profileEStore; - - public QueryStatus(RunQuery query, QueryId id, PStoreProvider provider, Foreman foreman) { - this.id = id; - this.query = query; - this.queryId = QueryIdHelper.getQueryId(id); - try { - this.profilePStore = provider.getStore(QUERY_PROFILE); - this.profileEStore = provider.getStore(RUNNING_QUERY_INFO); - } catch (IOException e) { - throw new DrillRuntimeException(e); - } - this.foreman = foreman; - } - - public List<FragmentData> getFragmentData() { - return fragmentDataSet; - } - - public void setPlanText(String planText) { - this.planText = planText; - } - - public void markStart() { - this.startTime = System.currentTimeMillis(); - } - - public void setEndTime(long endTime) { - this.endTime = endTime; - } - - public void setTotalFragments(int totalFragments) { - this.totalFragments = totalFragments; - } - - public void incrementFinishedFragments() { - finishedFragments++; - assert finishedFragments <= totalFragments; - } - - void add(FragmentData data) { - int majorFragmentId = data.getHandle().getMajorFragmentId(); - int minorFragmentId = data.getHandle().getMinorFragmentId(); - IntObjectOpenHashMap<FragmentData> minorMap = fragmentDataMap.get(majorFragmentId); - if (minorMap == null) { - minorMap = new IntObjectOpenHashMap<FragmentData>(); - fragmentDataMap.put(majorFragmentId, minorMap); - } - - minorMap.put(minorFragmentId, data); - fragmentDataSet.add(data); - } - - void updateFragmentStatus(FragmentStatus fragmentStatus) { - int majorFragmentId = fragmentStatus.getHandle().getMajorFragmentId(); - int minorFragmentId = fragmentStatus.getHandle().getMinorFragmentId(); - fragmentDataMap.get(majorFragmentId).get(minorFragmentId).setStatus(fragmentStatus); - } - - synchronized QueryState updateQueryStateInStore(QueryState queryState) { - switch (queryState) { - case PENDING: - case RUNNING: - profileEStore.put(queryId, getAsInfo()); // store as ephemeral query profile. - break; - case COMPLETED: - case CANCELED: - case FAILED: - try{ - profileEStore.delete(queryId); - }catch(Exception e){ - logger.warn("Failure while trying to delete the estore profile for this query.", e); - } - - profilePStore.put(queryId, getAsProfile()); - break; - default: - throw new IllegalStateException(); - } - return queryState; - } - - @Override - public String toString() { - return fragmentDataMap.toString(); - } - - public static class FragmentId{ - int major; - int minor; - - public FragmentId(FragmentStatus status) { - this.major = status.getHandle().getMajorFragmentId(); - this.minor = status.getHandle().getMinorFragmentId(); - } - - public FragmentId(FragmentData data) { - this.major = data.getHandle().getMajorFragmentId(); - this.minor = data.getHandle().getMinorFragmentId(); - } - - public FragmentId(int major, int minor) { - super(); - this.major = major; - this.minor = minor; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + major; - result = prime * result + minor; - return result; - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - FragmentId other = (FragmentId) obj; - if (major != other.major) { - return false; - } - if (minor != other.minor) { - return false; - } - return true; - } - - @Override - public String toString() { - return major + ":" + minor; - } - } - - public QueryInfo getAsInfo() { - return QueryInfo.newBuilder() // - .setQuery(query.getPlan()) - .setState(foreman.getState()) - .setForeman(foreman.getContext().getCurrentEndpoint()) - .setStart(startTime) - .build(); - } - - public QueryProfile getAsProfile() { - QueryProfile.Builder b = QueryProfile.newBuilder(); - b.setQuery(query.getPlan()); - b.setType(query.getType()); - if (planText != null) { - b.setPlan(planText); - } - b.setId(id); - for (int i = 0; i < fragmentDataMap.allocated.length; i++) { - if (fragmentDataMap.allocated[i]) { - int majorFragmentId = fragmentDataMap.keys[i]; - IntObjectOpenHashMap<FragmentData> minorMap = (IntObjectOpenHashMap<FragmentData>) ((Object[]) fragmentDataMap.values)[i]; - - MajorFragmentProfile.Builder fb = MajorFragmentProfile.newBuilder(); - fb.setMajorFragmentId(majorFragmentId); - for (int v = 0; v < minorMap.allocated.length; v++) { - if (minorMap.allocated[v]) { - FragmentData data = (FragmentData) ((Object[]) minorMap.values)[v]; - fb.addMinorFragmentProfile(data.getStatus().getProfile()); - } - } - b.addFragmentProfile(fb); - } - } - - b.setState(foreman.getState()); - b.setForeman(foreman.getContext().getCurrentEndpoint()); - b.setStart(startTime); - b.setEnd(endTime); - b.setTotalFragments(totalFragments); - b.setFinishedFragments(finishedFragments); - return b.build(); - } - -} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java index e2f7bbf8c..b6176db71 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java @@ -18,70 +18,65 @@ package org.apache.drill.exec.work.fragment; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.drill.common.DeferredException; +import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.ops.FragmentContext; -import org.apache.drill.exec.ops.FragmentStats; import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.impl.ImplCreator; import org.apache.drill.exec.physical.impl.RootExec; import org.apache.drill.exec.proto.BitControl.FragmentStatus; import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.UserBitShared.FragmentState; -import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile; import org.apache.drill.exec.proto.helper.QueryIdHelper; -import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection; -import org.apache.drill.exec.work.CancelableQuery; -import org.apache.drill.exec.work.StatusProvider; -import org.apache.drill.exec.work.WorkManager.WorkerBee; import org.apache.drill.exec.work.foreman.DrillbitStatusListener; /** - * Responsible for running a single fragment on a single Drillbit. Listens/responds to status request and cancellation - * messages. + * Responsible for running a single fragment on a single Drillbit. Listens/responds to status request + * and cancellation messages. */ -public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvider, Comparable<Object>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentExecutor.class); +public class FragmentExecutor implements Runnable { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(FragmentExecutor.class); private final AtomicInteger state = new AtomicInteger(FragmentState.AWAITING_ALLOCATION_VALUE); private final FragmentRoot rootOperator; - private final FragmentContext context; - private final WorkerBee bee; + private final FragmentContext fragmentContext; private final StatusReporter listener; - private final DrillbitStatusListener drillbitStatusListener = new FragmentDrillbitStatusListener(); - + private volatile boolean closed; private RootExec root; - private boolean closed; - public FragmentExecutor(FragmentContext context, WorkerBee bee, FragmentRoot rootOperator, StatusReporter listener) { - this.context = context; - this.bee = bee; + public FragmentExecutor(final FragmentContext context, final FragmentRoot rootOperator, + final StatusReporter listener) { + this.fragmentContext = context; this.rootOperator = rootOperator; this.listener = listener; } - @Override public FragmentStatus getStatus() { - // If the query is not in a running state, the operator tree is still being constructed and - // there is no reason to poll for intermediate results. - - // Previously the call to get the operator stats with the AbstractStatusReporter was happening - // before this check. This caused a concurrent modification exception as the list of operator - // stats is iterated over while collecting info, and added to while building the operator tree. - if(state.get() != FragmentState.RUNNING_VALUE){ + /* + * If the query is not in a running state, the operator tree is still being constructed and + * there is no reason to poll for intermediate results. + * + * Previously the call to get the operator stats with the AbstractStatusReporter was happening + * before this check. This caused a concurrent modification exception as the list of operator + * stats is iterated over while collecting info, and added to while building the operator tree. + */ + if(state.get() != FragmentState.RUNNING_VALUE) { return null; } - FragmentStatus status = AbstractStatusReporter.getBuilder(context, FragmentState.RUNNING, null, null).build(); + final FragmentStatus status = + AbstractStatusReporter.getBuilder(fragmentContext, FragmentState.RUNNING, null, null).build(); return status; } - @Override public void cancel() { + // Note this will be called outside of run(), from another thread updateState(FragmentState.CANCELLED); - logger.debug("Cancelled Fragment {}", context.getHandle()); - context.cancel(); + logger.debug("Cancelled Fragment {}", fragmentContext.getHandle()); + fragmentContext.cancel(); } public void receivingFragmentFinished(FragmentHandle handle) { @@ -91,98 +86,116 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid } } - public UserClientConnection getClient() { - return context.getConnection(); - } - @Override public void run() { - final String originalThread = Thread.currentThread().getName(); - try { - String newThreadName = String.format("%s:frag:%s:%s", // - QueryIdHelper.getQueryId(context.getHandle().getQueryId()), // - context.getHandle().getMajorFragmentId(), - context.getHandle().getMinorFragmentId() - ); - Thread.currentThread().setName(newThreadName); + final Thread myThread = Thread.currentThread(); + final String originalThreadName = myThread.getName(); + final FragmentHandle fragmentHandle = fragmentContext.getHandle(); + final ClusterCoordinator clusterCoordinator = fragmentContext.getDrillbitContext().getClusterCoordinator(); + final DrillbitStatusListener drillbitStatusListener = new FragmentDrillbitStatusListener(); - root = ImplCreator.getExec(context, rootOperator); + try { + final String newThreadName = String.format("%s:frag:%s:%s", + QueryIdHelper.getQueryId(fragmentHandle.getQueryId()), + fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId()); + myThread.setName(newThreadName); - context.getDrillbitContext().getClusterCoordinator().addDrillbitStatusListener(drillbitStatusListener); + root = ImplCreator.getExec(fragmentContext, rootOperator); + clusterCoordinator.addDrillbitStatusListener(drillbitStatusListener); - logger.debug("Starting fragment runner. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId()); + logger.debug("Starting fragment runner. {}:{}", + fragmentHandle.getMajorFragmentId(), fragmentHandle.getMinorFragmentId()); if (!updateStateOrFail(FragmentState.AWAITING_ALLOCATION, FragmentState.RUNNING)) { logger.warn("Unable to set fragment state to RUNNING. Cancelled or failed?"); return; } - // run the query until root.next returns false. + /* + * Run the query until root.next returns false. + * Note that we closeOutResources() here if we're done. That's because this can also throw + * exceptions that we want to treat as failures of the request, even if the request did fine + * up until this point. Any failures there will be caught in the catch clause below, which + * will be reported to the user. If they were to come from the finally clause, the uncaught + * exception there will simply terminate this thread without alerting the user -- the + * behavior then is to hang. + */ while (state.get() == FragmentState.RUNNING_VALUE) { if (!root.next()) { - if (context.isFailed()) { - internalFail(context.getFailureCause()); - closeOutResources(false); + if (fragmentContext.isFailed()) { + internalFail(fragmentContext.getFailureCause()); + closeOutResources(); } else { - closeOutResources(true); // make sure to close out resources before we report success. + /* + * Close out resources before we report success. We do this so that we'll get an + * error if there's a problem cleaning up, even though the query execution portion + * succeeded. + */ + closeOutResources(); updateStateOrFail(FragmentState.RUNNING, FragmentState.FINISHED); } - break; } } } catch (AssertionError | Exception e) { logger.warn("Error while initializing or executing fragment", e); - context.fail(e); + fragmentContext.fail(e); internalFail(e); } finally { - bee.removeFragment(context.getHandle()); - context.getDrillbitContext().getClusterCoordinator().removeDrillbitStatusListener(drillbitStatusListener); + clusterCoordinator.removeDrillbitStatusListener(drillbitStatusListener); // Final check to make sure RecordBatches are cleaned up. - closeOutResources(false); + closeOutResources(); - Thread.currentThread().setName(originalThread); + myThread.setName(originalThreadName); } } - private void closeOutResources(boolean throwFailure) { + private static final String CLOSE_FAILURE = "Failure while closing out resources"; + + private void closeOutResources() { + /* + * Because of the way this method can be called, it needs to be idempotent; it must + * be safe to call it more than once. We use this flag to bypass the body if it has + * been called before. + */ if (closed) { return; } + final DeferredException deferredException = fragmentContext.getDeferredException(); try { - root.stop(); + root.stop(); // TODO make this an AutoCloseable so we can detect lack of closure } catch (RuntimeException e) { - if (throwFailure) { - throw e; - } - logger.warn("Failure while closing out resources.", e); + logger.warn(CLOSE_FAILURE, e); + deferredException.addException(e); } + closed = true; + + /* + * This must be last, because this may throw deferred exceptions. + * We are forced to wrap the checked exception (if any) so that it will be unchecked. + */ try { - context.close(); - } catch (Exception e) { - if (throwFailure) { - throw new RuntimeException("Error closing fragment context.", e); - } - logger.warn("Failure while closing out resources.", e); + fragmentContext.close(); + } catch(Exception e) { + throw new RuntimeException("Error closing fragment context.", e); } - - closed = true; } - private void internalFail(Throwable excep) { + private void internalFail(final Throwable excep) { state.set(FragmentState.FAILED_VALUE); - listener.fail(context.getHandle(), "Failure while running fragment.", excep); + listener.fail(fragmentContext.getHandle(), "Failure while running fragment.", excep); } /** * Updates the fragment state with the given state + * * @param to target state */ - protected void updateState(FragmentState to) {; + private void updateState(final FragmentState to) { state.set(to.getNumber()); - listener.stateChanged(context.getHandle(), to); + listener.stateChanged(fragmentContext.getHandle(), to); } /** @@ -192,10 +205,10 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid * @param to target state * @return true only if update succeeds */ - protected boolean checkAndUpdateState(FragmentState expected, FragmentState to) { - boolean success = state.compareAndSet(expected.getNumber(), to.getNumber()); + private boolean checkAndUpdateState(final FragmentState expected, final FragmentState to) { + final boolean success = state.compareAndSet(expected.getNumber(), to.getNumber()); if (success) { - listener.stateChanged(context.getHandle(), to); + listener.stateChanged(fragmentContext.getHandle(), to); } else { logger.debug("State change failed. Expected state: {} -- target state: {} -- current state: {}.", expected.name(), to.name(), FragmentState.valueOf(state.get())); @@ -206,7 +219,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid /** * Returns true if the fragment is in a terminal state */ - protected boolean isCompleted() { + private boolean isCompleted() { return state.get() == FragmentState.CANCELLED_VALUE || state.get() == FragmentState.FAILED_VALUE || state.get() == FragmentState.FINISHED_VALUE; @@ -220,38 +233,32 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid * @param to target state * @return true only if update succeeds */ - protected boolean updateStateOrFail(FragmentState expected, FragmentState to) { + private boolean updateStateOrFail(final FragmentState expected, final FragmentState to) { final boolean updated = checkAndUpdateState(expected, to); if (!updated && !isCompleted()) { final String msg = "State was different than expected while attempting to update state from %s to %s however current state was %s."; - internalFail(new StateTransitionException(String.format(msg, expected.name(), to.name(), FragmentState.valueOf(state.get())))); + internalFail(new StateTransitionException( + String.format(msg, expected.name(), to.name(), FragmentState.valueOf(state.get())))); } return updated; } - - @Override - public int compareTo(Object o) { - return o.hashCode() - this.hashCode(); - } - public FragmentContext getContext() { - return context; + return fragmentContext; } private class FragmentDrillbitStatusListener implements DrillbitStatusListener { - @Override - public void drillbitRegistered(Set<CoordinationProtos.DrillbitEndpoint> registeredDrillbits) { - // Do nothing. + public void drillbitRegistered(final Set<CoordinationProtos.DrillbitEndpoint> registeredDrillbits) { } @Override - public void drillbitUnregistered(Set<CoordinationProtos.DrillbitEndpoint> unregisteredDrillbits) { - if (unregisteredDrillbits.contains(FragmentExecutor.this.context.getForemanEndpoint())) { - logger.warn("Forman : {} no longer active. Cancelling fragment {}.", - FragmentExecutor.this.context.getForemanEndpoint().getAddress(), - QueryIdHelper.getQueryIdentifier(context.getHandle())); + public void drillbitUnregistered(final Set<CoordinationProtos.DrillbitEndpoint> unregisteredDrillbits) { + // if the defunct Drillbit was running our Foreman, then cancel the query + final DrillbitEndpoint foremanEndpoint = FragmentExecutor.this.fragmentContext.getForemanEndpoint(); + if (unregisteredDrillbits.contains(foremanEndpoint)) { + logger.warn("Foreman : {} no longer active. Cancelling fragment {}.", + foremanEndpoint.getAddress(), QueryIdHelper.getQueryIdentifier(fragmentContext.getHandle())); FragmentExecutor.this.cancel(); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java index 3671804e4..41e87cd50 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/NonRootFragmentManager.java @@ -30,13 +30,13 @@ import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.record.RawFragmentBatch; import org.apache.drill.exec.rpc.RemoteConnection; import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.work.WorkManager.WorkerBee; import org.apache.drill.exec.work.batch.IncomingBuffers; import org.apache.drill.exec.work.foreman.ForemanException; /** * This managers determines when to run a non-root fragment node. */ +// TODO a lot of this is the same as RootFragmentManager public class NonRootFragmentManager implements FragmentManager { private final PlanFragment fragment; private FragmentRoot root; @@ -44,15 +44,13 @@ public class NonRootFragmentManager implements FragmentManager { private final StatusReporter runnerListener; private volatile FragmentExecutor runner; private volatile boolean cancel = false; - private final WorkerBee bee; private final FragmentContext context; private List<RemoteConnection> connections = new CopyOnWriteArrayList<>(); - public NonRootFragmentManager(PlanFragment fragment, WorkerBee bee) throws ExecutionSetupException { + public NonRootFragmentManager(final PlanFragment fragment, final DrillbitContext context) + throws ExecutionSetupException { try { this.fragment = fragment; - DrillbitContext context = bee.getContext(); - this.bee = bee; this.root = context.getPlanReader().readFragmentOperator(fragment.getFragmentJson()); this.context = new FragmentContext(context, fragment, null, context.getFunctionImplementationRegistry()); this.buffers = new IncomingBuffers(root, this.context); @@ -84,8 +82,8 @@ public class NonRootFragmentManager implements FragmentManager { if (cancel) { return null; } - runner = new FragmentExecutor(context, bee, root, runnerListener); - return this.runner; + runner = new FragmentExecutor(context, root, runnerListener); + return runner; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java index 54fc8c41f..84071c3c0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/RootFragmentManager.java @@ -28,6 +28,7 @@ import org.apache.drill.exec.record.RawFragmentBatch; import org.apache.drill.exec.rpc.RemoteConnection; import org.apache.drill.exec.work.batch.IncomingBuffers; +// TODO a lot of this is the same as NonRootFragmentManager public class RootFragmentManager implements FragmentManager{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RootFragmentManager.class); diff --git a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java index 775bccb1c..8281b7f9c 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java +++ b/exec/java-exec/src/test/java/org/apache/drill/BaseTestQuery.java @@ -23,18 +23,13 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import com.google.common.base.Preconditions; import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.common.util.TestTools; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ExecTest; import org.apache.drill.exec.client.DrillClient; -import org.apache.drill.exec.client.PrintingResultsListener; import org.apache.drill.exec.client.QuerySubmitter; -import org.apache.drill.exec.client.QuerySubmitter.Format; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.TopLevelAllocator; @@ -58,7 +53,7 @@ import org.junit.runner.Description; import com.google.common.base.Charsets; import com.google.common.io.Resources; -public class BaseTestQuery extends ExecTest{ +public class BaseTestQuery extends ExecTest { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseTestQuery.class); /** @@ -129,7 +124,7 @@ public class BaseTestQuery extends ExecTest{ } @BeforeClass - public static void openClient() throws Exception{ + public static void openClient() throws Exception { config = DrillConfig.create(TEST_CONFIGURATIONS); allocator = new TopLevelAllocator(config); if (config.hasPath(ENABLE_FULL_CACHE) && config.getBoolean(ENABLE_FULL_CACHE)) { @@ -139,20 +134,15 @@ public class BaseTestQuery extends ExecTest{ } bits = new Drillbit[drillbitCount]; - for(int i=0; i<drillbitCount; i++) { + for(int i = 0; i < drillbitCount; i++) { bits[i] = new Drillbit(config, serviceSet); bits[i].run(); } - client = new DrillClient(config, serviceSet.getCoordinator()); - client.connect(); - List<QueryResultBatch> results = client.runQuery(QueryType.SQL, String.format("alter session set `%s` = 2", ExecConstants.MAX_WIDTH_PER_NODE_KEY)); - for (QueryResultBatch b : results) { - b.release(); - } + client = QueryTestUtil.createClient(config, serviceSet, 2); } - protected BufferAllocator getAllocator() { + protected static BufferAllocator getAllocator() { return allocator; } @@ -205,27 +195,23 @@ public class BaseTestQuery extends ExecTest{ } public static List<QueryResultBatch> testRunAndReturn(QueryType type, String query) throws Exception{ - query = normalizeQuery(query); + query = QueryTestUtil.normalizeQuery(query); return client.runQuery(type, query); } - public static int testRunAndPrint(QueryType type, String query) throws Exception{ - query = normalizeQuery(query); - PrintingResultsListener resultListener = new PrintingResultsListener(client.getConfig(), Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH); - client.runQuery(type, query, resultListener); - return resultListener.await(); + public static int testRunAndPrint(final QueryType type, final String query) throws Exception { + return QueryTestUtil.testRunAndPrint(client, type, query); } protected static void testWithListener(QueryType type, String query, UserResultsListener resultListener) { - query = normalizeQuery(query); - client.runQuery(type, query, resultListener); + QueryTestUtil.testWithListener(client, type, query, resultListener); } - protected void testNoResult(String query, Object... args) throws Exception { + protected static void testNoResult(String query, Object... args) throws Exception { testNoResult(1, query, args); } - protected void testNoResult(int interation, String query, Object... args) throws Exception { + protected static void testNoResult(int interation, String query, Object... args) throws Exception { query = String.format(query, args); logger.debug("Running query:\n--------------\n"+query); for (int i = 0; i < interation; i++) { @@ -237,27 +223,11 @@ public class BaseTestQuery extends ExecTest{ } public static void test(String query, Object... args) throws Exception { - test(String.format(query, args)); + QueryTestUtil.test(client, String.format(query, args)); } - public static void test(String query) throws Exception{ - query = normalizeQuery(query); - String[] queries = query.split(";"); - for (String q : queries) { - if (q.trim().isEmpty()) { - continue; - } - testRunAndPrint(QueryType.SQL, q); - } - } - - public static String normalizeQuery(String query) { - if (query.contains("${WORKING_PATH}")) { - return query.replaceAll(Pattern.quote("${WORKING_PATH}"), Matcher.quoteReplacement(TestTools.getWorkingPath())); - } else if (query.contains("[WORKING_PATH]")) { - return query.replaceAll(Pattern.quote("[WORKING_PATH]"), Matcher.quoteReplacement(TestTools.getWorkingPath())); - } - return query; + public static void test(final String query) throws Exception { + QueryTestUtil.test(client, query); } protected static int testLogical(String query) throws Exception{ @@ -272,19 +242,19 @@ public class BaseTestQuery extends ExecTest{ return testRunAndPrint(QueryType.SQL, query); } - protected void testPhysicalFromFile(String file) throws Exception{ + protected static void testPhysicalFromFile(String file) throws Exception{ testPhysical(getFile(file)); } - protected List<QueryResultBatch> testPhysicalFromFileWithResults(String file) throws Exception { + protected static List<QueryResultBatch> testPhysicalFromFileWithResults(String file) throws Exception { return testRunAndReturn(QueryType.PHYSICAL, getFile(file)); } - protected void testLogicalFromFile(String file) throws Exception{ + protected static void testLogicalFromFile(String file) throws Exception{ testLogical(getFile(file)); } - protected void testSqlFromFile(String file) throws Exception{ + protected static void testSqlFromFile(String file) throws Exception{ test(getFile(file)); } @@ -358,7 +328,8 @@ public class BaseTestQuery extends ExecTest{ return rowCount; } - protected String getResultString(List<QueryResultBatch> results, String delimiter) throws SchemaChangeException { + protected static String getResultString(List<QueryResultBatch> results, String delimiter) + throws SchemaChangeException { StringBuilder formattedResults = new StringBuilder(); boolean includeHeader = true; RecordBatchLoader loader = new RecordBatchLoader(getAllocator()); diff --git a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java index 36091af9c..80b4d13ff 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/PlanTestBase.java @@ -18,8 +18,6 @@ package org.apache.drill; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.util.List; @@ -28,7 +26,6 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.common.util.TestTools; import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.rpc.user.QueryResultBatch; @@ -38,7 +35,6 @@ import org.eigenbase.sql.SqlExplain.Depth; import org.eigenbase.sql.SqlExplainLevel; import com.google.common.base.Strings; -import org.junit.Test; public class PlanTestBase extends BaseTestQuery { @@ -54,9 +50,9 @@ public class PlanTestBase extends BaseTestQuery { * format. Then check the physical plan against the list expected substrs. * Verify all the expected strings are contained in the physical plan string. */ - public void testPhysicalPlan(String sql, String... expectedSubstrs) + public static void testPhysicalPlan(String sql, String... expectedSubstrs) throws Exception { - sql = "EXPLAIN PLAN for " + normalizeQuery(sql); + sql = "EXPLAIN PLAN for " + QueryTestUtil.normalizeQuery(sql); String planStr = getPlanInString(sql, JSON_FORMAT); @@ -79,8 +75,9 @@ public class PlanTestBase extends BaseTestQuery { * @throws Exception - if an inclusion or exclusion check fails, or the * planning process throws an exception */ - public void testPlanMatchingPatterns(String query, String[] expectedPatterns, String[] excludedPatterns) throws Exception { - String plan = getPlanInString("EXPLAIN PLAN for " + normalizeQuery(query), OPTIQ_FORMAT); + public static void testPlanMatchingPatterns(String query, String[] expectedPatterns, String[] excludedPatterns) + throws Exception { + String plan = getPlanInString("EXPLAIN PLAN for " + QueryTestUtil.normalizeQuery(query), OPTIQ_FORMAT); Pattern p; Matcher m; @@ -121,8 +118,9 @@ public class PlanTestBase extends BaseTestQuery { * @throws Exception - if an inclusion or exclusion check fails, or the * planning process throws an exception */ - public void testPlanSubstrPatterns(String query, String[] expectedPatterns, String[] excludedPatterns) throws Exception { - String plan = getPlanInString("EXPLAIN PLAN for " + normalizeQuery(query), OPTIQ_FORMAT); + public static void testPlanSubstrPatterns(String query, String[] expectedPatterns, String[] excludedPatterns) + throws Exception { + final String plan = getPlanInString("EXPLAIN PLAN for " + QueryTestUtil.normalizeQuery(query), OPTIQ_FORMAT); // Check and make sure all expected patterns are in the plan if (expectedPatterns != null) { @@ -139,15 +137,15 @@ public class PlanTestBase extends BaseTestQuery { } } - public void testPlanOneExpectedPatternOneExcluded(String query, String expectedPattern, String excludedPattern) throws Exception { + public static void testPlanOneExpectedPatternOneExcluded(String query, String expectedPattern, String excludedPattern) throws Exception { testPlanMatchingPatterns(query, new String[]{expectedPattern}, new String[]{excludedPattern}); } - public void testPlanOneExpectedPattern(String query, String expectedPattern) throws Exception { + public static void testPlanOneExpectedPattern(String query, String expectedPattern) throws Exception { testPlanMatchingPatterns(query, new String[]{expectedPattern}, new String[]{}); } - public void testPlanOneExcludedPattern(String query, String excludedPattern) throws Exception { + public static void testPlanOneExcludedPattern(String query, String excludedPattern) throws Exception { testPlanMatchingPatterns(query, new String[]{}, new String[]{excludedPattern}); } @@ -157,7 +155,7 @@ public class PlanTestBase extends BaseTestQuery { * substrs. Verify all the expected strings are contained in the physical plan * string. */ - public void testRelLogicalJoinOrder(String sql, String... expectedSubstrs) throws Exception { + public static void testRelLogicalJoinOrder(String sql, String... expectedSubstrs) throws Exception { String planStr = getDrillRelPlanInString(sql, SqlExplainLevel.EXPPLAN_ATTRIBUTES, Depth.LOGICAL); String prefixJoinOrder = getLogicalPrefixJoinOrderFromPlan(planStr); @@ -173,7 +171,7 @@ public class PlanTestBase extends BaseTestQuery { * substrs. Verify all the expected strings are contained in the physical plan * string. */ - public void testRelPhysicalJoinOrder(String sql, String... expectedSubstrs) throws Exception { + public static void testRelPhysicalJoinOrder(String sql, String... expectedSubstrs) throws Exception { String planStr = getDrillRelPlanInString(sql, SqlExplainLevel.EXPPLAN_ATTRIBUTES, Depth.PHYSICAL); String prefixJoinOrder = getPhysicalPrefixJoinOrderFromPlan(planStr); @@ -189,9 +187,9 @@ public class PlanTestBase extends BaseTestQuery { * expected substrs. Verify all the expected strings are contained in the * physical plan string. */ - public void testRelPhysicalPlanLevDigest(String sql, String... expectedSubstrs) + public static void testRelPhysicalPlanLevDigest(String sql, String... expectedSubstrs) throws Exception { - String planStr = getDrillRelPlanInString(sql, SqlExplainLevel.DIGEST_ATTRIBUTES, Depth.PHYSICAL); + final String planStr = getDrillRelPlanInString(sql, SqlExplainLevel.DIGEST_ATTRIBUTES, Depth.PHYSICAL); for (String substr : expectedSubstrs) { assertTrue(planStr.contains(substr)); @@ -204,9 +202,9 @@ public class PlanTestBase extends BaseTestQuery { * substrs. Verify all the expected strings are contained in the physical plan * string. */ - public void testRelLogicalPlanLevDigest(String sql, String... expectedSubstrs) + public static void testRelLogicalPlanLevDigest(String sql, String... expectedSubstrs) throws Exception { - String planStr = getDrillRelPlanInString(sql, + final String planStr = getDrillRelPlanInString(sql, SqlExplainLevel.DIGEST_ATTRIBUTES, Depth.LOGICAL); for (String substr : expectedSubstrs) { @@ -220,8 +218,8 @@ public class PlanTestBase extends BaseTestQuery { * expected substrs. Verify all the expected strings are contained in the * physical plan string. */ - public void testRelPhysicalPlanLevExplain(String sql, String... expectedSubstrs) throws Exception { - String planStr = getDrillRelPlanInString(sql, SqlExplainLevel.EXPPLAN_ATTRIBUTES, Depth.PHYSICAL); + public static void testRelPhysicalPlanLevExplain(String sql, String... expectedSubstrs) throws Exception { + final String planStr = getDrillRelPlanInString(sql, SqlExplainLevel.EXPPLAN_ATTRIBUTES, Depth.PHYSICAL); for (String substr : expectedSubstrs) { assertTrue(planStr.contains(substr)); @@ -234,8 +232,8 @@ public class PlanTestBase extends BaseTestQuery { * substrs. Verify all the expected strings are contained in the physical plan * string. */ - public void testRelLogicalPlanLevExplain(String sql, String... expectedSubstrs) throws Exception { - String planStr = getDrillRelPlanInString(sql, SqlExplainLevel.EXPPLAN_ATTRIBUTES, Depth.LOGICAL); + public static void testRelLogicalPlanLevExplain(String sql, String... expectedSubstrs) throws Exception { + final String planStr = getDrillRelPlanInString(sql, SqlExplainLevel.EXPPLAN_ATTRIBUTES, Depth.LOGICAL); for (String substr : expectedSubstrs) { assertTrue(planStr.contains(substr)); @@ -246,7 +244,7 @@ public class PlanTestBase extends BaseTestQuery { * This will get the plan (either logical or physical) in Optiq RelNode * format, based on SqlExplainLevel and Depth. */ - private String getDrillRelPlanInString(String sql, SqlExplainLevel level, + private static String getDrillRelPlanInString(String sql, SqlExplainLevel level, Depth depth) throws Exception { String levelStr = " ", depthStr = " "; @@ -279,7 +277,7 @@ public class PlanTestBase extends BaseTestQuery { } sql = "EXPLAIN PLAN " + levelStr + " " + depthStr + " for " - + normalizeQuery(sql); + + QueryTestUtil.normalizeQuery(sql); return getPlanInString(sql, OPTIQ_FORMAT); } @@ -326,22 +324,21 @@ public class PlanTestBase extends BaseTestQuery { return builder.toString(); } - private String getLogicalPrefixJoinOrderFromPlan(String plan) { + private static String getLogicalPrefixJoinOrderFromPlan(String plan) { return getPrefixJoinOrderFromPlan(plan, "DrillJoinRel", "DrillScanRel"); - } - private String getPhysicalPrefixJoinOrderFromPlan(String plan) { + private static String getPhysicalPrefixJoinOrderFromPlan(String plan) { return getPrefixJoinOrderFromPlan(plan, "JoinPrel", "ScanPrel"); } - private String getPrefixJoinOrderFromPlan(String plan, String joinKeyWord, String scanKeyWord) { + private static String getPrefixJoinOrderFromPlan(String plan, String joinKeyWord, String scanKeyWord) { StringBuilder builder = new StringBuilder(); - String[] planLines = plan.split("\n"); + final String[] planLines = plan.split("\n"); int cnt = 0; - Stack<Integer> s = new Stack<Integer>(); + final Stack<Integer> s = new Stack<>(); for (String line : planLines) { if (line.trim().isEmpty()) { @@ -369,5 +366,4 @@ public class PlanTestBase extends BaseTestQuery { return builder.toString(); } - } diff --git a/exec/java-exec/src/test/java/org/apache/drill/QueryTestUtil.java b/exec/java-exec/src/test/java/org/apache/drill/QueryTestUtil.java new file mode 100644 index 000000000..3d192292c --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/QueryTestUtil.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill; + +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.util.TestTools; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.client.DrillClient; +import org.apache.drill.exec.client.PrintingResultsListener; +import org.apache.drill.exec.client.QuerySubmitter.Format; +import org.apache.drill.exec.proto.UserBitShared.QueryType; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.user.QueryResultBatch; +import org.apache.drill.exec.rpc.user.UserResultsListener; +import org.apache.drill.exec.server.RemoteServiceSet; +import org.apache.drill.exec.util.VectorUtil; + +/** + * Utilities useful for tests that issue SQL queries. + */ +public class QueryTestUtil { + /** + * Constructor. All methods are static. + */ + private QueryTestUtil() { + } + + /** + * Create a DrillClient that can be used to query a drill cluster. + * + * @param drillConfig + * @param remoteServiceSet remote service set + * @param maxWidth maximum width per node + * @return the newly created client + * @throws RpcException if there is a problem setting up the client + */ + public static DrillClient createClient( + final DrillConfig drillConfig, final RemoteServiceSet remoteServiceSet, final int maxWidth) + throws RpcException { + final DrillClient drillClient = new DrillClient(drillConfig, remoteServiceSet.getCoordinator()); + drillClient.connect(); + + final List<QueryResultBatch> results = drillClient.runQuery( + QueryType.SQL, String.format("alter session set `%s` = %d", + ExecConstants.MAX_WIDTH_PER_NODE_KEY, maxWidth)); + for (QueryResultBatch queryResultBatch : results) { + queryResultBatch.release(); + } + + return drillClient; + } + + /** + * Normalize the query relative to the test environment. + * + * <p>Looks for "${WORKING_PATH}" in the query string, and replaces it the current + * working patch obtained from {@link org.apache.drill.common.util.TestTools#getWorkingPath()}. + * + * @param query the query string + * @return the normalized query string + */ + public static String normalizeQuery(final String query) { + if (query.contains("${WORKING_PATH}")) { + return query.replaceAll(Pattern.quote("${WORKING_PATH}"), Matcher.quoteReplacement(TestTools.getWorkingPath())); + } else if (query.contains("[WORKING_PATH]")) { + return query.replaceAll(Pattern.quote("[WORKING_PATH]"), Matcher.quoteReplacement(TestTools.getWorkingPath())); + } + return query; + } + + /** + * Execute a SQL query, and print the results. + * + * @param drillClient drill client to use + * @param type type of the query + * @param queryString query string + * @return number of rows returned + * @throws Exception + */ + public static int testRunAndPrint( + final DrillClient drillClient, final QueryType type, final String queryString) throws Exception { + final String query = normalizeQuery(queryString); + PrintingResultsListener resultListener = + new PrintingResultsListener(drillClient.getConfig(), Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH); + drillClient.runQuery(type, query, resultListener); + return resultListener.await(); + } + + /** + * Execute one or more queries separated by semicolons, and print the results. + * + * @param drillClient drill client to use + * @param queryString the query string + * @throws Exception + */ + public static void test(final DrillClient drillClient, final String queryString) throws Exception{ + final String query = normalizeQuery(queryString); + String[] queries = query.split(";"); + for (String q : queries) { + final String trimmedQuery = q.trim(); + if (trimmedQuery.isEmpty()) { + continue; + } + testRunAndPrint(drillClient, QueryType.SQL, trimmedQuery); + } + } + + /** + * Execute one or more queries separated by semicolons, and print the results, with the option to + * add formatted arguments to the query string. + * + * @param drillClient drill client to use + * @param query the query string; may contain formatting specifications to be used by + * {@link String#format(String, Object...)}. + * @param args optional args to use in the formatting call for the query string + * @throws Exception + */ + public static void test(final DrillClient drillClient, final String query, Object... args) throws Exception { + test(drillClient, String.format(query, args)); + } + + /** + * Execute a single query with a user supplied result listener. + * + * @param drillClient drill client to use + * @param type type of query + * @param queryString the query string + * @param resultListener the result listener + */ + public static void testWithListener(final DrillClient drillClient, final QueryType type, + final String queryString, final UserResultsListener resultListener) { + final String query = QueryTestUtil.normalizeQuery(queryString); + drillClient.runQuery(type, query, resultListener); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java b/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java new file mode 100644 index 000000000..07cb833c0 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/SingleRowListener.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.UserBitShared.QueryResult; +import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; +import org.apache.drill.exec.proto.UserBitShared.DrillPBError; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.user.ConnectionThrottle; +import org.apache.drill.exec.rpc.user.QueryResultBatch; +import org.apache.drill.exec.rpc.user.UserResultsListener; + +/** + * Result listener that is set up to receive a single row. Useful for queries + * such with a count(*) or limit 1. The abstract method {@link #rowArrived(QueryResultBatch)} provides + * the means for a derived class to get the expected record's data. + */ +public abstract class SingleRowListener implements UserResultsListener { + private final CountDownLatch latch = new CountDownLatch(1); // used to wait for completion + private final AtomicInteger nRows = new AtomicInteger(0); // counts rows received + private QueryState queryState = null; // last received QueryState, if any + private final List<DrillPBError> errorList = new LinkedList<>(); // all errors ever received + private Exception exception = null; // the exception captured from a submission failure + + @Override + public void queryIdArrived(final QueryId queryId) { + } + + @Override + public void submissionFailed(final RpcException ex) { + exception = ex; + latch.countDown(); + } + + @Override + public void resultArrived(final QueryResultBatch result, final ConnectionThrottle throttle) { + final QueryResult queryResult = result.getHeader(); + if (result.hasData()) { + final int nRows = this.nRows.addAndGet(queryResult.getRowCount()); + if (nRows > 1) { + throw new IllegalStateException("Expected exactly one row, but got " + nRows); + } + + rowArrived(result); + } + + // TODO this appears to never be set + if (queryResult.hasQueryState()) { + queryState = queryResult.getQueryState(); + } + + synchronized(errorList) { + errorList.addAll(queryResult.getErrorList()); + } + + final boolean isLastChunk = queryResult.getIsLastChunk(); + result.release(); + + if (isLastChunk) { + cleanup(); + latch.countDown(); + } + } + + /** + * Get the last known QueryState. + * + * @return the query state; may be null if no query state has been received + */ + public QueryState getQueryState() { + return queryState; + } + + /** + * Get an immutable copy of the list of all errors received so far. + * + * @return list of errors received + */ + public List<DrillPBError> getErrorList() { + synchronized(errorList) { + return Collections.unmodifiableList(errorList); + } + } + + /** + * A record has arrived and is ready for access. + * + * <p>Derived classes provide whatever implementation they require here to access + * the record's data. + * + * @param queryResultBatch result batch holding the row + */ + protected abstract void rowArrived(QueryResultBatch queryResultBatch); + + /** + * Wait for the completion of this query; receiving a record or an error will both cause the + * query to be considered complete + * + * @throws Exception if there was any kind of problem + */ + public void waitForCompletion() throws Exception { + latch.await(); + if (exception != null) { + throw new RuntimeException("Query submission failed", exception); + } + } + + /** + * Clean up any resources used. + * + * <p>Derived classes may use this to free things like allocators or files that were used to + * record data received in resultArrived(). + */ + public void cleanup() { + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java index 978e565c6..13f958c22 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestBuilder.java @@ -27,7 +27,6 @@ import org.apache.drill.common.expression.parser.ExprLexer; import org.apache.drill.common.expression.parser.ExprParser; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.Types; -import org.apache.drill.common.util.TestTools; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.UserBitShared; @@ -36,7 +35,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.regex.Pattern; import static org.junit.Assert.assertEquals; @@ -124,7 +122,7 @@ public class TestBuilder { } public TestBuilder sqlQuery(String query) { - this.query = BaseTestQuery.normalizeQuery(query); + this.query = QueryTestUtil.normalizeQuery(query); this.queryType = UserBitShared.QueryType.SQL; return this; } diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java index e0f830dc3..b062b39a4 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestTpchDistributed.java @@ -23,8 +23,8 @@ import org.junit.Test; public class TestTpchDistributed extends BaseTestQuery { // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestTpchDistributed.class); - private static void testDistributed(String fileName) throws Exception { - String query = getFile(fileName); + private static void testDistributed(final String fileName) throws Exception { + final String query = getFile(fileName); test("alter session set `planner.slice_target` = 10; " + query); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java index 75ba3a9d6..e63f085af 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/DrillSystemTestBase.java @@ -21,7 +21,6 @@ import static com.google.common.base.Throwables.propagate; import java.util.List; -import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.exception.DrillbitStartupException; import org.apache.drill.exec.server.Drillbit; import org.slf4j.Logger; @@ -33,8 +32,7 @@ import com.google.common.collect.ImmutableList; * Starts one or more Drillbits, an embedded ZooKeeper cluster and provides a configured client for testing. */ public class DrillSystemTestBase extends TestWithZookeeper { - - static final Logger logger = org.slf4j.LoggerFactory.getLogger(DrillConfig.class); + private static final Logger logger = org.slf4j.LoggerFactory.getLogger(DrillSystemTestBase.class); private List<Drillbit> servers; @@ -62,9 +60,7 @@ public class DrillSystemTestBase extends TestWithZookeeper { } } - public Drillbit getABit(){ return this.servers.iterator().next(); } - } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java index bb69c9aa5..f65c63879 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestWithZookeeper.java @@ -17,65 +17,27 @@ */ package org.apache.drill.exec; -import static com.google.common.base.Throwables.propagate; - -import java.io.File; -import java.io.IOException; - import org.apache.drill.common.config.DrillConfig; -import org.apache.drill.exec.util.MiniZooKeeperCluster; import org.junit.AfterClass; import org.junit.BeforeClass; public class TestWithZookeeper extends ExecTest { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestWithZookeeper.class); +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestWithZookeeper.class); - private static File testDir = new File("target/test-data"); - private static DrillConfig config; - private static String zkUrl; - private static MiniZooKeeperCluster zkCluster; + private static ZookeeperHelper zkHelper; @BeforeClass public static void setUp() throws Exception { - config = DrillConfig.create(); - zkUrl = config.getString(ExecConstants.ZK_CONNECTION); - setupTestDir(); - startZookeeper(1); + zkHelper = new ZookeeperHelper(); + zkHelper.startZookeeper(1); } @AfterClass public static void tearDown() throws Exception { - stopZookeeper(); + zkHelper.stopZookeeper(); } - private static void setupTestDir() { - if (!testDir.exists()) { - testDir.mkdirs(); - } + public static DrillConfig getConfig() { + return zkHelper.getConfig(); } - - private static void startZookeeper(int numServers) { - try { - zkCluster = new MiniZooKeeperCluster(); - zkCluster.setDefaultClientPort(Integer.parseInt(zkUrl.split(":")[1])); - zkCluster.startup(testDir, numServers); - } catch (IOException e) { - propagate(e); - } catch (InterruptedException e) { - propagate(e); - } - } - - private static void stopZookeeper() { - try { - zkCluster.shutdown(); - } catch (IOException e) { - propagate(e); - } - } - - public static DrillConfig getConfig(){ - return config; - } - } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java b/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java new file mode 100644 index 000000000..7fcf4cb30 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec; + +import static com.google.common.base.Throwables.propagate; + +import java.io.File; +import java.io.IOException; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.util.MiniZooKeeperCluster; + +/** + * Test utility for managing a Zookeeper instance. + * + * <p>Tests that need a Zookeeper instance can initialize a static instance of this class in + * their {@link org.junit.BeforeClass} section to set up Zookeeper. + */ +public class ZookeeperHelper { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ZookeeperHelper.class); + + private final File testDir = new File("target/test-data"); + private final DrillConfig config; + private final String zkUrl; + private MiniZooKeeperCluster zkCluster; + + /** + * Constructor. + * + * <p>Will create a "test-data" directory for Zookeeper's use if one doesn't already exist. + */ + public ZookeeperHelper() { + config = DrillConfig.create(); + zkUrl = config.getString(ExecConstants.ZK_CONNECTION); + + if (!testDir.exists()) { + testDir.mkdirs(); + } + } + + /** + * Start the Zookeeper instance. + * + * <p>This must be used before any operations that depend on the Zookeeper instance being up. + * + * @param numServers how many servers the Zookeeper instance should have + */ + public void startZookeeper(final int numServers) { + if (zkCluster != null) { + throw new IllegalStateException("Zookeeper cluster already running"); + } + + try { + zkCluster = new MiniZooKeeperCluster(); + zkCluster.setDefaultClientPort(Integer.parseInt(zkUrl.split(":")[1])); + zkCluster.startup(testDir, numServers); + } catch (IOException | InterruptedException e) { + propagate(e); + } + } + + /** + * Shut down the Zookeeper instance. + * + * <p>This must be used before the program exits. + */ + public void stopZookeeper() { + try { + zkCluster.shutdown(); + zkCluster = null; + } catch (IOException e) { + // since this is meant to be used in a test's cleanup, we don't propagate the exception + final String message = "Unable to shutdown Zookeeper"; + System.err.println(message + '.'); + logger.warn(message, e); + } + } + + /** + * Get the DrillConfig used for the Zookeeper instance. + * + * @return the DrillConfig used. + */ + public DrillConfig getConfig() { + return config; + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java index 027787629..933417ea9 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/SimpleRootExec.java @@ -21,7 +21,6 @@ import java.util.Iterator; import java.util.List; import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.impl.ScreenCreator.ScreenRoot; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; @@ -35,11 +34,12 @@ import org.apache.drill.exec.vector.ValueVector; import com.google.common.collect.Lists; @Deprecated -public class SimpleRootExec implements RootExec, Iterable<ValueVector>{ - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleRootExec.class); +public class SimpleRootExec implements RootExec, Iterable<ValueVector> { +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleRootExec.class); + + private final RecordBatch incoming; + private final ScreenRoot screenRoot; - private RecordBatch incoming; - private ScreenRoot screenRoot; public SimpleRootExec(RootExec e) { if (e instanceof ScreenRoot) { incoming = ((ScreenRoot)e).getIncoming(); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java index 609bc14c3..0f6fd43b9 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestComparisonFunctions.java @@ -42,13 +42,12 @@ import com.google.common.base.Charsets; import com.google.common.io.Resources; public class TestComparisonFunctions extends ExecTest { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestComparisonFunctions.class); +// private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestComparisonFunctions.class); - DrillConfig c = DrillConfig.create(); - String COMPARISON_TEST_PHYSICAL_PLAN = "functions/comparisonTest.json"; - PhysicalPlanReader reader; - FunctionImplementationRegistry registry; - FragmentContext context; + private final DrillConfig c = DrillConfig.create(); + private final String COMPARISON_TEST_PHYSICAL_PLAN = "functions/comparisonTest.json"; + private PhysicalPlanReader reader; + private FunctionImplementationRegistry registry; public void runTest(@Injectable final DrillbitContext bitContext, @Injectable UserServer.UserClientConnection connection, String expression, int expectedResults) throws Throwable { @@ -68,21 +67,20 @@ public class TestComparisonFunctions extends ExecTest { if (registry == null) { registry = new FunctionImplementationRegistry(c); } - if(context == null) { - context = new FragmentContext(bitContext, PlanFragment.getDefaultInstance(), connection, registry); - } + final FragmentContext context = + new FragmentContext(bitContext, PlanFragment.getDefaultInstance(), connection, registry); PhysicalPlan plan = reader.readPhysicalPlan(planString); SimpleRootExec exec = new SimpleRootExec(ImplCreator.getExec(context, (FragmentRoot) plan.getSortedOperators(false).iterator().next())); while(exec.next()) { - assertEquals(String.format("Expression: %s;", expression), expectedResults, exec.getSelectionVector2().getCount()); + assertEquals(String.format("Expression: %s;", expression), expectedResults, + exec.getSelectionVector2().getCount()); // for (ValueVector vv: exec) { // vv.close(); // } } exec.stop(); - context.close(); if (context.getFailureCause() != null) { diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java index 2a0aedcc0..6bf23ec39 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestOptiqPlans.java @@ -40,7 +40,6 @@ import org.apache.drill.exec.planner.PhysicalPlanReader; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.CoordinationProtos; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; -import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.rpc.control.Controller; @@ -104,11 +103,13 @@ public class TestOptiqPlans extends ExecTest { } }; RemoteServiceSet lss = RemoteServiceSet.getLocalServiceSet(); - DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller, com, workBus, new LocalPStoreProvider(DrillConfig.create()), null); - QueryContext qc = new QueryContext(UserSession.Builder.newBuilder().setSupportComplexTypes(true).build(), QueryId.getDefaultInstance(), bitContext); + DrillbitContext bitContext = new DrillbitContext(DrillbitEndpoint.getDefaultInstance(), context, coord, controller, + com, workBus, new LocalPStoreProvider(DrillConfig.create()), null); + QueryContext qc = new QueryContext(UserSession.Builder.newBuilder().setSupportComplexTypes(true).build(), + bitContext); PhysicalPlanReader reader = bitContext.getPlanReader(); LogicalPlan plan = reader.readLogicalPlan(Files.toString(FileUtils.getResourceAsFile(file), Charsets.UTF_8)); - PhysicalPlan pp = new BasicOptimizer(DrillConfig.create(), qc, connection).optimize(new BasicOptimizer.BasicOptimizationContext(qc), plan); + PhysicalPlan pp = new BasicOptimizer(qc).optimize(new BasicOptimizer.BasicOptimizationContext(qc), plan); FunctionImplementationRegistry registry = new FunctionImplementationRegistry(c); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java new file mode 100644 index 000000000..9bc055238 --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java @@ -0,0 +1,398 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.server; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.drill.QueryTestUtil; +import org.apache.drill.SingleRowListener; +import org.apache.drill.common.AutoCloseables; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.types.TypeProtos.MinorType; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ExecTest; +import org.apache.drill.exec.ZookeeperHelper; +import org.apache.drill.exec.client.DrillClient; +import org.apache.drill.exec.exception.DrillbitStartupException; +import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.memory.TopLevelAllocator; +import org.apache.drill.exec.proto.UserBitShared.DrillPBError; +import org.apache.drill.exec.proto.UserBitShared.QueryResult; +import org.apache.drill.exec.proto.UserBitShared.QueryType; +import org.apache.drill.exec.record.BatchSchema; +import org.apache.drill.exec.record.MaterializedField; +import org.apache.drill.exec.record.RecordBatchLoader; +import org.apache.drill.exec.record.VectorWrapper; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.user.QueryResultBatch; +import org.apache.drill.exec.testing.ExceptionInjectionUtil; +import org.apache.drill.exec.testing.SimulatedExceptions.InjectionOption; +import org.apache.drill.exec.testing.SimulatedExceptions.InjectionOptions; +import org.apache.drill.exec.work.foreman.Foreman; +import org.apache.drill.exec.work.foreman.ForemanException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; + +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * Test how resilient drillbits are to throwing exceptions during various phases of query + * execution by injecting exceptions at various points. + */ +public class TestDrillbitResilience extends ExecTest { + private static final Logger logger = org.slf4j.LoggerFactory.getLogger(TestDrillbitResilience.class); + + private static ZookeeperHelper zkHelper; + private static RemoteServiceSet remoteServiceSet; + private static final Map<String, Drillbit> drillbits = new HashMap<>(); + private static DrillClient drillClient; + + private static void startDrillbit(final String name, final RemoteServiceSet remoteServiceSet) { + if (drillbits.containsKey(name)) { + throw new IllegalStateException("Drillbit named \"" + name + "\" already exists"); + } + + try { + @SuppressWarnings("resource") + final Drillbit drillbit = Drillbit.start(zkHelper.getConfig(), remoteServiceSet); + drillbits.put(name, drillbit); + } catch(DrillbitStartupException e) { + throw new RuntimeException("Failed to start Drillbit \"" + name + "\"", e); + } + } + + /** + * Shutdown the specified drillbit. + * + * @param name + */ + private static void stopDrillbit(final String name) { + @SuppressWarnings("resource") + final Drillbit drillbit = drillbits.get(name); + if (drillbit == null) { + throw new IllegalStateException("No Drillbit named \"" + name + "\" found"); + } + + try { + drillbit.close(); + } catch(Exception e) { + final String message = "Error shutting down Drillbit \"" + name + "\""; + System.err.println(message + '.'); + logger.warn(message, e); + } + } + + /** + * Shutdown all the drillbits. + */ + private static void stopAllDrillbits() { + for(String name : drillbits.keySet()) { + stopDrillbit(name); + } + } + + /* + * Canned drillbit names. + */ + private final static String DRILLBIT_ALPHA = "alpha"; + private final static String DRILLBIT_BETA = "beta"; + private final static String DRILLBIT_GAMMA = "gamma"; + + @BeforeClass + public static void startSomeDrillbits() throws Exception { + // turn off the HTTP server to avoid port conflicts between the drill bits + System.setProperty(ExecConstants.HTTP_ENABLE, "false"); + + zkHelper = new ZookeeperHelper(); + zkHelper.startZookeeper(1); + + // use a non-null service set so that the drillbits can use port hunting + remoteServiceSet = RemoteServiceSet.getLocalServiceSet(); + + // create name-addressable drillbits + startDrillbit(DRILLBIT_ALPHA, remoteServiceSet); + startDrillbit(DRILLBIT_BETA, remoteServiceSet); + startDrillbit(DRILLBIT_GAMMA, remoteServiceSet); + clearAllInjections(); + + // create a client + final DrillConfig drillConfig = zkHelper.getConfig(); + drillClient = QueryTestUtil.createClient(drillConfig, remoteServiceSet, 1); + } + + @AfterClass + public static void shutdownAllDrillbits() { + if (drillClient != null) { + drillClient.close(); + drillClient = null; + } + + stopAllDrillbits(); + + if (remoteServiceSet != null) { + AutoCloseables.close(remoteServiceSet, logger); + remoteServiceSet = null; + } + + zkHelper.stopZookeeper(); + } + + /** + * Clear all injections from all drillbits. + */ + private static void clearAllInjections() { + for(Drillbit drillbit : drillbits.values()) { + ExceptionInjectionUtil.clearInjections(drillbit); + } + } + + /** + * Check that all the drillbits are ok. + * + * <p>The current implementation does this by counting the number of drillbits using a + * query. + */ + private static void assertDrillbitsOk() { + final SingleRowListener listener = new SingleRowListener() { + private final BufferAllocator bufferAllocator = new TopLevelAllocator(zkHelper.getConfig()); + private final RecordBatchLoader loader = new RecordBatchLoader(bufferAllocator); + + @Override + public void rowArrived(final QueryResultBatch queryResultBatch) { + // load the single record + final QueryResult queryResult = queryResultBatch.getHeader(); + try { + loader.load(queryResult.getDef(), queryResultBatch.getData()); + } catch(SchemaChangeException e) { + fail(e.toString()); + } + assertEquals(1, loader.getRecordCount()); + + // there should only be one column + final BatchSchema batchSchema = loader.getSchema(); + assertEquals(1, batchSchema.getFieldCount()); + + // the column should be an integer + final MaterializedField countField = batchSchema.getColumn(0); + final MinorType fieldType = countField.getType().getMinorType(); + assertEquals(MinorType.BIGINT, fieldType); + + // get the column value + final VectorWrapper<?> vw = loader.iterator().next(); + final Object obj = vw.getValueVector().getAccessor().getObject(0); + assertTrue(obj instanceof Long); + final Long countValue = (Long) obj; + + // assume this means all the drillbits are still ok + assertEquals(drillbits.size(), countValue.intValue()); + + loader.clear(); + } + + @Override + public void cleanup() { + bufferAllocator.close(); + } + }; + + try { + QueryTestUtil.testWithListener( + drillClient, QueryType.SQL, "select count(*) from sys.drillbits", listener); + listener.waitForCompletion(); + } catch(Exception e) { + throw new RuntimeException("Couldn't query active drillbits", e); + } + + final List<DrillPBError> errorList = listener.getErrorList(); + assertTrue(errorList.isEmpty()); + } + + @SuppressWarnings("static-method") + @After + public void checkDrillbits() { + clearAllInjections(); // so that the drillbit check itself doesn't trigger anything + assertDrillbitsOk(); // TODO we need a way to do this without using a query + } + + /** + * Set the given injections on a single named drillbit. + * + * @param bitName + * @param injectionOptions the injections + */ + private static void setInjections(final String bitName, final InjectionOptions injectionOptions) { + @SuppressWarnings("resource") + final Drillbit drillbit = drillbits.get(bitName); + if (drillbit == null) { + throw new IllegalStateException("No Drillbit named \"" + bitName + "\" found"); + } + + ExceptionInjectionUtil.setInjections(drillbit, injectionOptions); + } + + /** + * Set the given injections on all drillbits. + * + * @param injectionOptions the injections + */ + private static void setInjectionsAll(final InjectionOptions injectionOptions) { + for(Drillbit drillbit : drillbits.values()) { + ExceptionInjectionUtil.setInjections(drillbit, injectionOptions); + } + } + + /** + * Create a single exception injection. + * + * @param siteClassName the name of the injection site class + * @param desc the injection site description + * @param exceptionClassName the name of the exception to throw + * @return the created injection options POJO + */ + private static InjectionOptions createSingleInjection( + final String siteClassName, final String desc, final String exceptionClassName) { + final InjectionOption injectionOption = new InjectionOption(); + injectionOption.nFire = 1; + injectionOption.siteClass = siteClassName; + injectionOption.desc = desc; + injectionOption.exceptionClass = exceptionClassName; + + final InjectionOptions injectionOptions = new InjectionOptions(); + injectionOptions.injections = new InjectionOption[1]; + injectionOptions.injections[0] = injectionOption; + + return injectionOptions; + } + + /** + * Create a single exception injection. + * + * @param siteClass the injection site class + * @param desc the injection site description + * @param exceptionClass the class of the exception to throw + * @return the created injection options POJO + */ + private static InjectionOptions createSingleInjection( + final Class<?> siteClass, final String desc, final Class<? extends Throwable> exceptionClass) { + return createSingleInjection(siteClass.getName(), desc, exceptionClass.getName()); + } + + /** + * Check that the injected exception is what we were expecting. + * + * @param caught the exception that was caught (by the test) + * @param exceptionClass the expected exception class + * @param desc the expected exception site description + */ + private static void assertInjected( + final Throwable caught, final Class<? extends Throwable> exceptionClass, final String desc) { + final String cause = caught.getMessage(); + final String[] causeParts = cause.split(":"); + final String causeShortName = causeParts[0].trim(); + final String causeDesc = causeParts[1].trim(); + assertTrue(exceptionClass.getName().endsWith(causeShortName)); + assertEquals(desc, causeDesc); + } + + @Test + public void testSettingNoopInjectionsAndQuery() throws Exception { + final InjectionOptions injectionOptions = + createSingleInjection(getClass(), "noop", RuntimeException.class); + setInjections(DRILLBIT_BETA, injectionOptions); + QueryTestUtil.test(drillClient, "select * from sys.drillbits"); + } + + /** + * Test throwing exceptions from sites within the Foreman class, as specified by the site + * description + * + * @param desc site description + * @throws Exception + */ + private static void testForeman(final String desc) throws Exception { + final InjectionOptions injectionOptions = createSingleInjection(Foreman.class, desc, ForemanException.class); + setInjectionsAll(injectionOptions); + try { + QueryTestUtil.test(drillClient, "select * from sys.drillbits"); + fail(); + } catch(RpcException rpce) { + assertInjected(rpce, ForemanException.class, desc); + } + } + + @SuppressWarnings("static-method") + @Test + public void testForeman_runTryBeginning() throws Exception { + testForeman("run-try-beginning"); + } + + @SuppressWarnings("static-method") + @Test + public void testForeman_setInjectionViaAlterSystem() throws Exception { + final String exceptionDesc = "run-try-beginning"; + final InjectionOptions injectionOptions = + createSingleInjection(Foreman.class, exceptionDesc, ForemanException.class); + final ObjectMapper objectMapper = new ObjectMapper(); + final String jsonString = objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(injectionOptions); + final String alterSession = String.format( + "alter system set `%s`='%s'", + ExecConstants.DRILLBIT_EXCEPTION_INJECTIONS, jsonString); + QueryTestUtil.test(drillClient, alterSession); + try { + QueryTestUtil.test(drillClient, "select * from sys.drillbits"); + fail(); + } catch(RpcException rpce) { + assertInjected(rpce, ForemanException.class, exceptionDesc); + } + } + + /* + * This test doesn't work because worker threads have returned the result to the client before + * Foreman.run() has even finished executing. This might not happen if the results are larger. + * This brings up the question of how we detect failed queries, because here a failure is happening + * after the query starts running, yet apparently the query still succeeds. + * + * TODO I'm beginning to think that Foreman needs to gate output to its client in a similar way + * that it gates input via stateListener. That could be tricky, since some results could be + * queued up before Foreman has gotten through it's run(), and they would all have to be sent + * before the gate is opened. There's also the question of what to do in case we detect failure + * there after some data has been sent. Right now, this test doesn't work because that's + * exactly what happens, and the client believes that the query succeeded, even though an exception + * was thrown after setup completed, but data was asynchronously sent to the client before that. + * This test also revealed that the QueryState never seems to make it to the client, so we can't + * detect the failure that way (see SingleRowListener's getQueryState(), which I originally tried + * to use here to detect query completion). + */ + @SuppressWarnings("static-method") + @Test + @Ignore + public void testForeman_runTryEnd() throws Exception { + testForeman("run-try-end"); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ExceptionInjectionUtil.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ExceptionInjectionUtil.java new file mode 100644 index 000000000..bf93dee4a --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/ExceptionInjectionUtil.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.testing; + +import java.io.IOException; +import java.io.StringWriter; + +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.server.Drillbit; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.options.OptionManager; +import org.apache.drill.exec.server.options.OptionValue; +import org.apache.drill.exec.testing.SimulatedExceptions.InjectionOptions; + +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * Static methods for constructing exception injections for testing purposes. + */ +public class ExceptionInjectionUtil { + /** + * Constructor. Prevent instantiation of static utility class. + */ + private ExceptionInjectionUtil() { + } + + /** + * Add a set of injections to a drillbit. + * + * @param drillbit the drillbit + * @param injections the JSON-specified injections + */ + public static void setInjections(final Drillbit drillbit, final String injections) { + final DrillbitContext drillbitContext = drillbit.getContext(); + final OptionValue stringValue = OptionValue.createString( + OptionValue.OptionType.SYSTEM, ExecConstants.DRILLBIT_EXCEPTION_INJECTIONS, injections); + final OptionManager optionManager = drillbitContext.getOptionManager(); + optionManager.setOption(stringValue); + } + + /** + * Add a set of injections to a drillbit. + * + * @param drillbit the drillbit + * @param injectionOptions the injections, specified using the parsing POJOs + */ + public static void setInjections(final Drillbit drillbit, final InjectionOptions injectionOptions) { + final ObjectMapper objectMapper = new ObjectMapper(); + final StringWriter stringWriter = new StringWriter(); + try { + objectMapper.writeValue(stringWriter, injectionOptions); + } catch(IOException e) { + throw new RuntimeException("Couldn't serialize injectionOptions to JSON", e); + } + + setInjections(drillbit, stringWriter.toString()); + } + + /** + * Clear all injections on a drillbit. + * + * @param drillbit the drillbit + */ + public static void clearInjections(final Drillbit drillbit) { + setInjections(drillbit, ""); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java new file mode 100644 index 000000000..d0c02794e --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestExceptionInjection.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.testing; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.IOException; + +import org.apache.drill.BaseTestQuery; +import org.apache.drill.exec.server.Drillbit; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.testing.SimulatedExceptions.InjectionOption; +import org.apache.drill.exec.testing.SimulatedExceptions.InjectionOptions; +import org.junit.Test; + +public class TestExceptionInjection extends BaseTestQuery { + private final static String NO_THROW_FAIL = "Didn't throw expected exception"; + + /** + * Class whose methods we want to simulate exceptions at run-time for testing + * purposes. + */ + public static class DummyClass { + private final static ExceptionInjector injector = ExceptionInjector.getInjector(DummyClass.class); + private final DrillbitContext drillbitContext; + + public DummyClass(final DrillbitContext drillbitContext) { + this.drillbitContext = drillbitContext; + } + + /** + * Method that injects an unchecked exception with the given site description. + * + * @param desc the injection site description + */ + public void descPassthroughMethod(final String desc) { + // ... code ... + + // simulated unchecked exception + injector.injectUnchecked(drillbitContext, desc); + + // ... code ... + } + + public final static String THROWS_IOEXCEPTION = "<<throwsIOException>>"; + + /** + * Method that injects an IOException with a site description of THROWS_IOEXCEPTION. + * + * @throws IOException + */ + public void throwsIOException() throws IOException { + // ... code ... + + // simulated IOException + injector.injectChecked(drillbitContext, THROWS_IOEXCEPTION, IOException.class); + + // ... code ... + } + } + + @SuppressWarnings("static-method") + @Test + public void testNoInjection() throws Exception { + test("select * from sys.drillbits"); + } + + private static void setInjections(final String jsonInjections) { + for(Drillbit bit : bits) { + ExceptionInjectionUtil.setInjections(bit, jsonInjections); + } + } + + @SuppressWarnings("static-method") + @Test + public void testEmptyInjection() throws Exception { + setInjections("{\"injections\":[]}"); + test("select * from sys.drillbits"); + } + + /** + * Assert that DummyClass.descPassThroughMethod does indeed throw the expected exception. + * + * @param dummyClass the instance of DummyClass + * @param exceptionClassName the expected exception + * @param exceptionDesc the expected exception site description + */ + private static void assertPassthroughThrows( + final DummyClass dummyClass, final String exceptionClassName, final String exceptionDesc) { + try { + dummyClass.descPassthroughMethod(exceptionDesc); + fail(NO_THROW_FAIL); + } catch(Exception e) { + assertEquals(exceptionClassName, e.getClass().getName()); + assertEquals(exceptionDesc, e.getMessage()); + } + } + + @SuppressWarnings("static-method") + @Test + public void testUncheckedStringInjection() { + // set injections via a string + final String exceptionDesc = "<<injected from descPassthroughMethod()>>"; + final String exceptionClassName = "java.lang.RuntimeException"; + final String jsonString = "{\"injections\":[{" + + "\"siteClass\":\"org.apache.drill.exec.testing.TestExceptionInjection$DummyClass\"," + + "\"desc\":\"" + exceptionDesc + "\"," + + "\"nSkip\":0," + + "\"nFire\":1," + + "\"exceptionClass\":\"" + exceptionClassName + "\"" + + "}]}"; + setInjections(jsonString); + + // test that the exception gets thrown + final DummyClass dummyClass = new DummyClass(bits[0].getContext()); + assertPassthroughThrows(dummyClass, exceptionClassName, exceptionDesc); + } + + private static InjectionOptions buildDefaultJson() { + final InjectionOption injectionOption = new InjectionOption(); + injectionOption.siteClass = "org.apache.drill.exec.testing.TestExceptionInjection$DummyClass"; + injectionOption.desc = DummyClass.THROWS_IOEXCEPTION; + injectionOption.nSkip = 0; + injectionOption.nFire = 1; + injectionOption.exceptionClass = "java.io.IOException"; + final InjectionOptions injectionOptions = new InjectionOptions(); + injectionOptions.injections = new InjectionOption[1]; + injectionOptions.injections[0] = injectionOption; + return injectionOptions; + } + + @SuppressWarnings("static-method") + @Test + public void testCheckedJsonInjection() { + // set the injection via the parsing POJOs + final InjectionOptions injectionOptions = buildDefaultJson(); + ExceptionInjectionUtil.setInjections(bits[0], injectionOptions); + + // test that the expected exception (checked) gets thrown + final DummyClass dummyClass = new DummyClass(bits[0].getContext()); + try { + dummyClass.throwsIOException(); + fail(NO_THROW_FAIL); + } catch(IOException e) { + assertEquals(DummyClass.THROWS_IOEXCEPTION, e.getMessage()); + } + } + + @SuppressWarnings("static-method") + @Test + public void testSkipAndLimit() { + final String passthroughDesc = "<<injected from descPassthrough>>"; + final InjectionOptions injectionOptions = buildDefaultJson(); + final InjectionOption injectionOption = injectionOptions.injections[0]; + injectionOption.desc = passthroughDesc; + injectionOption.nSkip = 7; + injectionOption.nFire = 3; + injectionOption.exceptionClass = RuntimeException.class.getName(); + ExceptionInjectionUtil.setInjections(bits[0], injectionOptions); + + final DummyClass dummyClass = new DummyClass(bits[0].getContext()); + + // these shouldn't throw + for(int i = 0; i < injectionOption.nSkip; ++i) { + dummyClass.descPassthroughMethod(passthroughDesc); + } + + // these should throw + for(int i = 0; i < injectionOption.nFire; ++i) { + assertPassthroughThrows(dummyClass, injectionOption.exceptionClass, passthroughDesc); + } + + // this shouldn't throw + dummyClass.descPassthroughMethod(passthroughDesc); + } +} diff --git a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java index 2c51ec08a..f19aab0a3 100644 --- a/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java +++ b/exec/jdbc/src/main/java/org/apache/drill/jdbc/DrillConnectionImpl.java @@ -115,6 +115,7 @@ abstract class DrillConnectionImpl extends AvaticaConnection implements org.apac return allocator; } + @Override public DrillClient getClient() { return client; } diff --git a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/JdbcAssert.java b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/JdbcAssert.java index b88d88055..f7ecf0a90 100644 --- a/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/JdbcAssert.java +++ b/exec/jdbc/src/test/java/org/apache/drill/jdbc/test/JdbcAssert.java @@ -19,6 +19,7 @@ package org.apache.drill.jdbc.test; import java.sql.Connection; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; @@ -85,15 +86,14 @@ public class JdbcAssert { } static String toString(ResultSet resultSet, int expectedRecordCount) throws SQLException { - StringBuilder buf = new StringBuilder(); - int total = 0, n; + final StringBuilder buf = new StringBuilder(); while (resultSet.next()) { - n = resultSet.getMetaData().getColumnCount(); - total++; + final ResultSetMetaData metaData = resultSet.getMetaData(); + final int n = metaData.getColumnCount(); String sep = ""; for (int i = 1; i <= n; i++) { buf.append(sep) - .append(resultSet.getMetaData().getColumnLabel(i)) + .append(metaData.getColumnLabel(i)) .append("=") .append(resultSet.getObject(i)); sep = "; "; |