/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.drill.exec.work.foreman; import org.apache.drill.exec.work.filter.RuntimeFilterRouter; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import com.google.protobuf.InvalidProtocolBufferException; import io.netty.channel.ChannelFuture; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.logical.LogicalPlan; import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.OptimizerException; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.opt.BasicOptimizer; import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.planner.fragment.Fragment; import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor; import org.apache.drill.exec.planner.fragment.SimpleParallelizer; import org.apache.drill.exec.planner.sql.DirectPlan; import org.apache.drill.exec.planner.sql.DrillSqlWorker; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.ExecProtos.ServerPreparedStatementState; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.proto.UserBitShared.QueryResult; import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState; import org.apache.drill.exec.proto.UserProtos.PreparedStatementHandle; import org.apache.drill.exec.proto.UserProtos.RunQuery; import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.BaseRpcOutcomeListener; import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.UserClientConnection; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.FailureUtils; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.testing.ControlsInjector; import org.apache.drill.exec.testing.ControlsInjectorFactory; import org.apache.drill.exec.util.Pointer; import org.apache.drill.exec.work.QueryWorkUnit; import org.apache.drill.exec.work.WorkManager.WorkerBee; import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException; import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException; import org.apache.drill.exec.work.foreman.rm.QueryResourceManager; import org.codehaus.jackson.map.ObjectMapper; import java.io.IOException; import java.util.Date; import java.util.List; import static org.apache.drill.exec.server.FailureUtils.EXIT_CODE_HEAP_OOM; /** * Foreman manages all the fragments (local and remote) for a single query where this * is the driving/root node. * * The flow is as follows: *
Note that completion of this function is not the end of the Foreman's role
* in the query's lifecycle.
*/
@Override
public void run() {
// rename the thread we're using for debugging purposes
final Thread currentThread = Thread.currentThread();
final String originalName = currentThread.getName();
currentThread.setName(queryIdString + ":foreman");
try {
/*
Check if the foreman is ONLINE. If not don't accept any new queries.
*/
if (!drillbitContext.isForemanOnline()) {
throw new ForemanException("Query submission failed since Foreman is shutting down.");
}
} catch (ForemanException e) {
logger.debug("Failure while submitting query", e);
queryStateProcessor.addToEventQueue(QueryState.FAILED, e);
}
queryText = queryRequest.getPlan();
queryStateProcessor.moveToState(QueryState.PLANNING, null);
try {
injector.injectChecked(queryContext.getExecutionControls(), "run-try-beginning", ForemanException.class);
// convert a run query request into action
switch (queryRequest.getType()) {
case LOGICAL:
parseAndRunLogicalPlan(queryRequest.getPlan());
break;
case PHYSICAL:
parseAndRunPhysicalPlan(queryRequest.getPlan());
break;
case SQL:
final String sql = queryRequest.getPlan();
// log query id, username and query text before starting any real work. Also, put
// them together such that it is easy to search based on query id
logger.info("Query text for query with id {} issued by {}: {}", queryIdString,
queryContext.getQueryUserName(), sql);
runSQL(sql);
break;
case EXECUTION:
runFragment(queryRequest.getFragmentsList());
break;
case PREPARED_STATEMENT:
runPreparedStatement(queryRequest.getPreparedStatementHandle());
break;
default:
throw new IllegalStateException();
}
injector.injectChecked(queryContext.getExecutionControls(), "run-try-end", ForemanException.class);
} catch (final ForemanException e) {
queryStateProcessor.moveToState(QueryState.FAILED, e);
} catch (final OutOfMemoryError | OutOfMemoryException e) {
if (FailureUtils.isDirectMemoryOOM(e)) {
queryStateProcessor.moveToState(QueryState.FAILED, UserException.memoryError(e).build(logger));
} else {
/*
* FragmentExecutors use a DrillbitStatusListener to watch out for the death of their query's Foreman. So, if we
* die here, they should get notified about that, and cancel themselves; we don't have to attempt to notify
* them, which might not work under these conditions.
*/
FailureUtils.unrecoverableFailure(e, "Unable to handle out of memory condition in Foreman.", EXIT_CODE_HEAP_OOM);
}
} catch (AssertionError | Exception ex) {
queryStateProcessor.moveToState(QueryState.FAILED,
new ForemanException("Unexpected exception during fragment initialization: " + ex.getMessage(), ex));
} finally {
// restore the thread's original name
currentThread.setName(originalName);
}
/*
* Note that despite the run() completing, the Foreman continues to exist, and receives
* events (indirectly, through the QueryManager's use of stateListener), about fragment
* completions. It won't go away until everything is completed, failed, or cancelled.
*/
}
/**
* While one fragments where sanding out, other might have been completed. We don't want to process completed / failed
* events until all fragments are sent out. This method triggers events processing when all fragments were sent out.
*/
public void startProcessingEvents() {
queryStateProcessor.startProcessingEvents();
// If we received the resume signal before fragments are setup, the first call does not actually resume the
// fragments. Since setup is done, all fragments must have been delivered to remote nodes. Now we can resume.
if (resume) {
resume();
}
}
private ProfileOption setProfileOption(OptionManager options) {
if (! options.getOption(ExecConstants.ENABLE_QUERY_PROFILE_VALIDATOR)) {
return ProfileOption.NONE;
}
if (options.getOption(ExecConstants.QUERY_PROFILE_DEBUG_VALIDATOR)) {
return ProfileOption.SYNC;
} else {
return ProfileOption.ASYNC;
}
}
private void parseAndRunLogicalPlan(final String json) throws ExecutionSetupException {
LogicalPlan logicalPlan;
try {
logicalPlan = drillbitContext.getPlanReader().readLogicalPlan(json);
} catch (final IOException e) {
throw new ForemanException("Failure parsing logical plan.", e);
}
if (logicalPlan.getProperties().resultMode == ResultMode.LOGICAL) {
throw new ForemanException(
"Failure running plan. You requested a result mode of LOGICAL and submitted a logical plan. In this case you're output mode must be PHYSICAL or EXEC.");
}
log(logicalPlan);
final PhysicalPlan physicalPlan = convert(logicalPlan);
if (logicalPlan.getProperties().resultMode == ResultMode.PHYSICAL) {
returnPhysical(physicalPlan);
return;
}
log(physicalPlan);
runPhysicalPlan(physicalPlan);
}
private void log(final LogicalPlan plan) {
if (logger.isDebugEnabled()) {
logger.debug("Logical {}", plan.unparse(queryContext.getLpPersistence()));
}
}
private void log(final PhysicalPlan plan) {
if (logger.isDebugEnabled()) {
try {
final String planText = queryContext.getLpPersistence().getMapper().writeValueAsString(plan);
logger.debug("Physical {}", planText);
} catch (final IOException e) {
logger.warn("Error while attempting to log physical plan.", e);
}
}
}
private void returnPhysical(final PhysicalPlan plan) throws ExecutionSetupException {
final String jsonPlan = plan.unparse(queryContext.getLpPersistence().getMapper().writer());
runPhysicalPlan(DirectPlan.createDirectPlan(queryContext, new PhysicalFromLogicalExplain(jsonPlan)));
}
public static class PhysicalFromLogicalExplain {
public final String json;
public PhysicalFromLogicalExplain(final String json) {
this.json = json;
}
}
private void parseAndRunPhysicalPlan(final String json) throws ExecutionSetupException {
try {
final PhysicalPlan plan = drillbitContext.getPlanReader().readPhysicalPlan(json);
runPhysicalPlan(plan);
} catch (final IOException e) {
throw new ForemanSetupException("Failure while parsing physical plan.", e);
}
}
private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupException {
runPhysicalPlan(plan, null);
}
private void runPhysicalPlan(final PhysicalPlan plan, Pointer Note that before sending this result, we execute cleanup steps that could
* result in this result still being changed to a FAILED state.
*
* @param queryState one of COMPLETED or CANCELED
*/
public void setCompleted(final QueryState queryState) {
Preconditions.checkArgument((queryState == QueryState.COMPLETED) || (queryState == QueryState.CANCELED));
Preconditions.checkState(!isClosed);
Preconditions.checkState(resultState == null);
resultState = queryState;
}
/**
* Set up the result for a FAILED state.
*
* Failures that occur during cleanup processing will be added as suppressed
* exceptions.
*
* @param exception the exception that led to the FAILED state
*/
public void setFailed(final Exception exception) {
Preconditions.checkArgument(exception != null);
Preconditions.checkState(!isClosed);
Preconditions.checkState(resultState == null);
resultState = QueryState.FAILED;
resultException = exception;
}
/**
* Ignore the current status and force the given failure as current status.
* NOTE: Used only for testing purposes. Shouldn't be used in production.
*/
public void setForceFailure(final Exception exception) {
Preconditions.checkArgument(exception != null);
Preconditions.checkState(!isClosed);
resultState = QueryState.FAILED;
resultException = exception;
}
/**
* Add an exception to the result. All exceptions after the first become suppressed
* exceptions hanging off the first.
*
* @param exception the exception to add
*/
private void addException(final Exception exception) {
assert exception != null;
if (resultException == null) {
resultException = exception;
} else {
resultException.addSuppressed(exception);
}
}
/**
* Expose the current exception (if it exists). This is useful for secondary reporting to the query profile.
*
* @return the current Foreman result exception or null.
*/
public Exception getException() {
return resultException;
}
/**
* Close the given resource, catching and adding any caught exceptions via {@link #addException(Exception)}. If an
* exception is caught, it will change the result state to FAILED, regardless of what its current value.
*
* @param autoCloseable
* the resource to close
*/
private void suppressingClose(final AutoCloseable autoCloseable) {
Preconditions.checkState(!isClosed);
Preconditions.checkState(resultState != null);
if (autoCloseable == null) {
return;
}
try {
autoCloseable.close();
} catch(final Exception e) {
/*
* Even if the query completed successfully, we'll still report failure if we have
* problems cleaning up.
*/
resultState = QueryState.FAILED;
addException(e);
}
}
private void logQuerySummary() {
try {
LoggedQuery q = new LoggedQuery(
queryIdString,
queryContext.getQueryContextInfo().getDefaultSchemaName(),
queryText,
new Date(queryContext.getQueryContextInfo().getQueryStartTime()),
new Date(System.currentTimeMillis()),
queryStateProcessor.getState(),
queryContext.getSession().getCredentials().getUserName(),
initiatingClient.getRemoteAddress());
queryLogger.info(MAPPER.writeValueAsString(q));
} catch (Exception e) {
logger.error("Failure while recording query information to query log.", e);
}
}
@Override
public void close() {
Preconditions.checkState(!isClosed);
Preconditions.checkState(resultState != null);
logger.debug(queryIdString + ": cleaning up.");
injector.injectPause(queryContext.getExecutionControls(), "foreman-cleanup", logger);
if (enableRuntimeFilter && runtimeFilterRouter != null) {
runtimeFilterRouter.waitForComplete();
}
// remove the channel disconnected listener (doesn't throw)
closeFuture.removeListener(closeListener);
// log the query summary
logQuerySummary();
// These are straight forward removals from maps, so they won't throw.
drillbitContext.getWorkBus().removeFragmentStatusListener(queryId);
drillbitContext.getClusterCoordinator().removeDrillbitStatusListener(queryManager.getDrillbitStatusListener());
suppressingClose(queryContext);
/*
* We do our best to write the latest state, but even that could fail. If it does, we can't write
* the (possibly newly failing) state, so we continue on anyway.
*
* We only need to do this if the resultState differs from the last recorded state
*/
if (resultState != queryStateProcessor.getState()) {
suppressingClose(new AutoCloseable() {
@Override
public void close() throws Exception {
queryStateProcessor.recordNewState(resultState);
}
});
}
// set query end time before writing final profile
queryStateProcessor.close();
/*
* Construct the response based on the latest resultState. The builder shouldn't fail.
*/
final QueryResult.Builder resultBuilder = QueryResult.newBuilder()
.setQueryId(queryId)
.setQueryState(resultState);
final UserException uex;
if (resultException != null) {
final boolean verbose = queryContext.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
uex = UserException.systemError(resultException).addIdentity(queryContext.getCurrentEndpoint()).build(logger);
resultBuilder.addError(uex.getOrCreatePBError(verbose));
} else {
uex = null;
}
// Debug option: write query profile before sending final results so that
// the client can be certain the profile exists.
if (profileOption == ProfileOption.SYNC) {
queryManager.writeFinalProfile(uex);
}
/*
* If sending the result fails, we don't really have any way to modify the result we tried to send;
* it is possible it got sent but the result came from a later part of the code path. It is also
* possible the connection has gone away, so this is irrelevant because there's nowhere to
* send anything to.
*/
try {
// send whatever result we ended up with
initiatingClient.sendResult(responseListener, resultBuilder.build());
} catch(final Exception e) {
addException(e);
logger.warn("Exception sending result to client", resultException);
}
// Store the final result here so we can capture any error/errorId in the
// profile for later debugging.
// Normal behavior is to write the query profile AFTER sending results to the user.
// The observed
// user behavior is a possible time-lag between query return and appearance
// of the query profile in persistent storage. Also, the query might
// succeed, but the profile never appear if the profile write fails. This
// behavior is acceptable for an eventually-consistent distributed system.
// The key benefit is that the client does not wait for the persistent
// storage write; query completion occurs in parallel with profile
// persistence.
if (profileOption == ProfileOption.ASYNC) {
queryManager.writeFinalProfile(uex);
}
// Remove the Foreman from the running query list.
fragmentsRunner.getBee().retireForeman(Foreman.this);
try {
queryContext.close();
} catch (Exception e) {
logger.error("Unable to close query context for query {}", QueryIdHelper.getQueryId(queryId), e);
}
try {
queryManager.close();
} catch (final Exception e) {
logger.warn("unable to close query manager", e);
}
try {
queryRM.exit();
} finally {
isClosed = true;
}
}
}
private class ConnectionClosedListener implements GenericFutureListenerpreparedStatement
and submits as a new query.
*
* @param preparedStatementHandle prepared statement handle
* @throws ExecutionSetupException
*/
private void runPreparedStatement(final PreparedStatementHandle preparedStatementHandle)
throws ExecutionSetupException {
final ServerPreparedStatementState serverState;
try {
serverState =
ServerPreparedStatementState.PARSER.parseFrom(preparedStatementHandle.getServerInfo());
} catch (final InvalidProtocolBufferException ex) {
throw UserException.parseError(ex)
.message("Failed to parse the prepared statement handle. " +
"Make sure the handle is same as one returned from create prepared statement call.")
.build(logger);
}
queryText = serverState.getSqlQuery();
logger.info("Prepared statement query for QueryId {} : {}", queryId, queryText);
runSQL(queryText);
}
private static void validatePlan(final PhysicalPlan plan) throws ForemanSetupException {
if (plan.getProperties().resultMode != ResultMode.EXEC) {
throw new ForemanSetupException(String.format(
"Failure running plan. You requested a result mode of %s and a physical plan can only be output as EXEC",
plan.getProperties().resultMode));
}
}
private QueryWorkUnit getQueryWorkUnit(final PhysicalPlan plan) throws ExecutionSetupException {
final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null);
final SimpleParallelizer parallelizer = new SimpleParallelizer(queryContext);
return parallelizer.getFragments(
queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(),
queryId, queryContext.getOnlineEndpoints(), rootFragment,
initiatingClient.getSession(), queryContext.getQueryContextInfo());
}
private void logWorkUnit(QueryWorkUnit queryWorkUnit) {
if (! logger.isTraceEnabled()) {
return;
}
logger.trace(String.format("PlanFragments for query %s \n%s",
queryId, queryWorkUnit.stringifyFragments()));
}
private void runSQL(final String sql) throws ExecutionSetupException {
final Pointer