aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/work
diff options
context:
space:
mode:
authorPaul Rogers <progers@maprtech.com>2017-08-30 14:32:17 -0700
committerPaul Rogers <progers@maprtech.com>2017-10-09 15:58:20 -0700
commitbbc42240483a0658691149aea3c509ccd0db4c79 (patch)
tree471957d30d786e3f384ffb882a088357a6acf97e /exec/java-exec/src/main/java/org/apache/drill/exec/work
parenta03f5429e368cf73286eec6101871f6e61a5b7d1 (diff)
DRILL-5716: Queue-driven memory allocation
* Creates new core resource management and query queue abstractions. * Adds queue information to the Protobuf layer. * Foreman and Planner changes - Abstracts memory management out to the new resource management layer. This means deferring generating the physical plan JSON to later in the process after memory planning. * Web UI changes * Adds queue information to the main page and the profile page to each query. * Also sorts the list of options displayed in the Web UI. - Added memory reserve A new config parameter, exec.queue.memory_reserve_ratio, sets aside a slice of total memory for operators that do not participate in the memory assignment process. The default is 20% testing will tell us if that value should be larger or smaller. * Additional minor fixes - Code cleanup. - Added mechanism to abandon lease release during shutdown. - Log queue configuration only when the config changes, rather than on every query. - Apply Boaz’ option to enforce a minimum memory allocation per operator. - Additional logging to help testers see what is happening. closes #928
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/work')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java80
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java278
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java30
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/AbstractResourceManager.java68
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java120
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java363
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DynamicResourceManager.java146
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/EmbeddedQueryQueue.java151
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryQueue.java140
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceAllocator.java56
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java73
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManager.java66
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManagerBuilder.java78
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java360
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/package-info.java31
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java14
17 files changed, 1870 insertions, 188 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
index 4f99f859b..a06d46c5a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,39 +17,99 @@
*/
package org.apache.drill.exec.work;
+import java.util.ArrayList;
import java.util.List;
import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.server.options.OptionList;
+import org.apache.drill.exec.work.foreman.ForemanSetupException;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
public class QueryWorkUnit {
- // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWorkUnit.class);
- private final PlanFragment rootFragment; // for local
+
+ /**
+ * Definition of a minor fragment that contains the (unserialized) fragment operator
+ * tree and the (partially built) fragment. Allows the resource manager to apply
+ * memory allocations before serializing the fragments to JSON.
+ */
+
+ public static class MinorFragmentDefn {
+ private PlanFragment fragment;
+ private final FragmentRoot root;
+ private final OptionList options;
+
+ public MinorFragmentDefn(final PlanFragment fragment, final FragmentRoot root, OptionList options) {
+ this.fragment = fragment;
+ this.root = root;
+ this.options = options;
+ }
+
+ public FragmentRoot root() { return root; }
+ public PlanFragment fragment() { return fragment; }
+ public OptionList options() { return options; }
+
+ public PlanFragment applyPlan(PhysicalPlanReader reader) throws ForemanSetupException {
+ // get plan as JSON
+ try {
+ final String plan = reader.writeJson(root);
+ final String optionsData = reader.writeJson(options);
+ return PlanFragment.newBuilder(fragment)
+ .setFragmentJson(plan)
+ .setOptionsJson(optionsData)
+ .build();
+ } catch (JsonProcessingException e) {
+ throw new ForemanSetupException("Failure while trying to convert fragment into json.", e);
+ }
+ }
+ }
+
+ private PlanFragment rootFragment; // for local
+ private final MinorFragmentDefn rootFragmentDefn;
private final FragmentRoot rootOperator; // for local
- private final List<PlanFragment> fragments;
+ private List<PlanFragment> fragments = new ArrayList<>();
+ private final List<MinorFragmentDefn> minorFragmentDefns;
- public QueryWorkUnit(final FragmentRoot rootOperator, final PlanFragment rootFragment,
- final List<PlanFragment> fragments) {
- Preconditions.checkNotNull(rootFragment);
- Preconditions.checkNotNull(fragments);
+ public QueryWorkUnit(final FragmentRoot rootOperator, final MinorFragmentDefn rootFragmentDefn,
+ final List<MinorFragmentDefn> minorFragmentDefns) {
Preconditions.checkNotNull(rootOperator);
+ Preconditions.checkNotNull(rootFragmentDefn);
+ Preconditions.checkNotNull(minorFragmentDefns);
- this.rootFragment = rootFragment;
- this.fragments = fragments;
+ this.rootFragmentDefn = rootFragmentDefn;
this.rootOperator = rootOperator;
+ this.minorFragmentDefns = minorFragmentDefns;
}
public PlanFragment getRootFragment() {
return rootFragment;
}
+ public MinorFragmentDefn getRootFragmentDefn() {
+ return rootFragmentDefn;
+ }
+
public List<PlanFragment> getFragments() {
return fragments;
}
+ public List<MinorFragmentDefn> getMinorFragmentDefns() {
+ return minorFragmentDefns;
+ }
+
public FragmentRoot getRootOperator() {
return rootOperator;
}
+
+ public void applyPlan(PhysicalPlanReader reader) throws ForemanSetupException {
+ assert rootFragment == null;
+ rootFragment = rootFragmentDefn.applyPlan(reader);
+ assert fragments.isEmpty();
+ for (MinorFragmentDefn defn : minorFragmentDefns) {
+ fragments.add(defn.applyPlan(reader));
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 800d3a7f0..6e560a963 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -149,7 +149,9 @@ public class WorkManager implements AutoCloseable {
}
}
- getContext().close();
+ if (getContext() != null) {
+ getContext().close();
+ }
}
public DrillbitContext getContext() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index d01d8fd7d..a1f9d9ffb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -17,17 +17,15 @@
*/
package org.apache.drill.exec.work.foreman;
-import com.codahale.metrics.Counter;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-import com.google.protobuf.InvalidProtocolBufferException;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFuture;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
import org.apache.drill.common.CatastrophicFailure;
import org.apache.drill.common.EventProcessor;
import org.apache.drill.common.concurrent.ExtendedLatch;
@@ -36,9 +34,6 @@ import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.logical.LogicalPlan;
import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.coord.ClusterCoordinator;
-import org.apache.drill.exec.coord.DistributedSemaphore;
-import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease;
import org.apache.drill.exec.exception.OptimizerException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.metrics.DrillMetrics;
@@ -73,43 +68,51 @@ import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
-import org.apache.drill.exec.util.MemoryAllocationUtilities;
import org.apache.drill.exec.util.Pointer;
import org.apache.drill.exec.work.EndpointListener;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.apache.drill.exec.work.WorkManager.WorkerBee;
+import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
+import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException;
+import org.apache.drill.exec.work.foreman.rm.QueryResourceManager;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.apache.drill.exec.work.fragment.FragmentStatusReporter;
import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
import org.apache.drill.exec.work.fragment.RootFragmentManager;
import org.codehaus.jackson.map.ObjectMapper;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+import com.codahale.metrics.Counter;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
/**
* Foreman manages all the fragments (local and remote) for a single query where this
* is the driving/root node.
*
* The flow is as follows:
- * - Foreman is submitted as a runnable.
- * - Runnable does query planning.
- * - state changes from PENDING to RUNNING
- * - Runnable sends out starting fragments
- * - Status listener are activated
- * - The Runnable's run() completes, but the Foreman stays around
- * - Foreman listens for state change messages.
- * - state change messages can drive the state to FAILED or CANCELED, in which case
- * messages are sent to running fragments to terminate
- * - when all fragments complete, state change messages drive the state to COMPLETED
+ * <ul>
+ * <li>Foreman is submitted as a runnable.</li>
+ * <li>Runnable does query planning.</li>
+ * <li>state changes from PENDING to RUNNING</li>
+ * <li>Runnable sends out starting fragments</li>
+ * <li>Status listener are activated</li>
+ * <li>The Runnable's run() completes, but the Foreman stays around</li>
+ * <li>Foreman listens for state change messages.</li>
+ * <li>state change messages can drive the state to FAILED or CANCELED, in which case
+ * messages are sent to running fragments to terminate</li>
+ * <li>when all fragments complete, state change messages drive the state to COMPLETED</li>
+ * </ul>
*/
+
public class Foreman implements Runnable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class);
private static final org.slf4j.Logger queryLogger = org.slf4j.LoggerFactory.getLogger("query.logger");
@@ -136,15 +139,13 @@ public class Foreman implements Runnable {
private boolean resume = false;
private final ProfileOption profileOption;
- private volatile DistributedLease lease; // used to limit the number of concurrent queries
+ private final QueryResourceManager queryRM;
private final ResponseSendListener responseListener = new ResponseSendListener();
private final StateSwitch stateSwitch = new StateSwitch();
private final ForemanResult foremanResult = new ForemanResult();
private final ConnectionClosedListener closeListener = new ConnectionClosedListener();
private final ChannelFuture closeFuture;
- private final boolean queuingEnabled;
-
private String queryText;
@@ -173,12 +174,9 @@ public class Foreman implements Runnable {
queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getStoreProvider(),
drillbitContext.getClusterCoordinator(), this);
- final OptionManager optionManager = queryContext.getOptions();
- queuingEnabled = optionManager.getOption(ExecConstants.ENABLE_QUEUE);
-
- final QueryState initialState = queuingEnabled ? QueryState.ENQUEUED : QueryState.STARTING;
- recordNewState(initialState);
+ recordNewState(QueryState.ENQUEUED);
enqueuedQueries.inc();
+ queryRM = drillbitContext.getResourceManager().newQueryRM(this);
profileOption = setProfileOption(queryContext.getOptions());
}
@@ -350,20 +348,6 @@ public class Foreman implements Runnable {
*/
}
- private void releaseLease() {
- while (lease != null) {
- try {
- lease.close();
- lease = null;
- } catch (final InterruptedException e) {
- // if we end up here, the while loop will try again
- } catch (final Exception e) {
- logger.warn("Failure while releasing lease.", e);
- break;
- }
- }
- }
-
private void parseAndRunLogicalPlan(final String json) throws ExecutionSetupException {
LogicalPlan logicalPlan;
try {
@@ -431,18 +415,17 @@ public class Foreman implements Runnable {
private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupException {
validatePlan(plan);
- MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext);
- //Marking endTime of Planning
- queryManager.markPlanningEndTime();
-
- if (queuingEnabled) {
- acquireQuerySemaphore(plan);
- moveToState(QueryState.STARTING, null);
- //Marking endTime of Waiting in Queue
- queryManager.markQueueWaitEndTime();
- }
+ queryRM.visitAbstractPlan(plan);
final QueryWorkUnit work = getQueryWorkUnit(plan);
+ queryRM.visitPhysicalPlan(work);
+ queryRM.setCost(plan.totalCost());
+ queryManager.setTotalCost(plan.totalCost());
+ work.applyPlan(drillbitContext.getPlanReader());
+ logWorkUnit(work);
+ admit(work);
+ queryManager.setQueueName(queryRM.queueName());
+
final List<PlanFragment> planFragments = work.getFragments();
final PlanFragment rootPlanFragment = work.getRootFragment();
assert queryId == rootPlanFragment.getHandle().getQueryId();
@@ -461,6 +444,22 @@ public class Foreman implements Runnable {
logger.debug("Fragments running.");
}
+ private void admit(QueryWorkUnit work) throws ForemanSetupException {
+ queryManager.markPlanningEndTime();
+ try {
+ queryRM.admit();
+ } catch (QueueTimeoutException e) {
+ throw UserException
+ .resourceError()
+ .message(e.getMessage())
+ .build(logger);
+ } catch (QueryQueueException e) {
+ throw new ForemanSetupException(e.getMessage(), e);
+ }
+ moveToState(QueryState.STARTING, null);
+ queryManager.markQueueWaitEndTime();
+ }
+
/**
* This is a helper method to run query based on the list of PlanFragment that were planned
* at some point of time
@@ -495,10 +494,8 @@ public class Foreman implements Runnable {
} catch (IOException e) {
throw new ExecutionSetupException(String.format("Unable to parse FragmentRoot from fragment: %s", rootFragment.getFragmentJson()));
}
- if (queuingEnabled) {
- acquireQuerySemaphore(rootOperator.getCost());
- moveToState(QueryState.STARTING, null);
- }
+ queryRM.setCost(rootOperator.getCost());
+ admit(null);
drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager.getFragmentStatusListener());
drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener());
@@ -548,62 +545,6 @@ public class Foreman implements Runnable {
}
}
- /**
- * This limits the number of "small" and "large" queries that a Drill cluster will run
- * simultaneously, if queueing is enabled. If the query is unable to run, this will block
- * until it can. Beware that this is called under run(), and so will consume a Thread
- * while it waits for the required distributed semaphore.
- *
- * @param plan the query plan
- * @throws ForemanSetupException
- */
- private void acquireQuerySemaphore(final PhysicalPlan plan) throws ForemanSetupException {
- double totalCost = 0;
- for (final PhysicalOperator ops : plan.getSortedOperators()) {
- totalCost += ops.getCost();
- }
-
- acquireQuerySemaphore(totalCost);
- return;
- }
-
- private void acquireQuerySemaphore(double totalCost) throws ForemanSetupException {
- final OptionManager optionManager = queryContext.getOptions();
- final long queueThreshold = optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_SIZE);
-
- final long queueTimeout = optionManager.getOption(ExecConstants.QUEUE_TIMEOUT);
- final String queueName;
-
- try {
- final ClusterCoordinator clusterCoordinator = drillbitContext.getClusterCoordinator();
- final DistributedSemaphore distributedSemaphore;
-
- // get the appropriate semaphore
- if (totalCost > queueThreshold) {
- final int largeQueue = (int) optionManager.getOption(ExecConstants.LARGE_QUEUE_SIZE);
- distributedSemaphore = clusterCoordinator.getSemaphore("query.large", largeQueue);
- queueName = "large";
- } else {
- final int smallQueue = (int) optionManager.getOption(ExecConstants.SMALL_QUEUE_SIZE);
- distributedSemaphore = clusterCoordinator.getSemaphore("query.small", smallQueue);
- queueName = "small";
- }
-
- lease = distributedSemaphore.acquire(queueTimeout, TimeUnit.MILLISECONDS);
- } catch (final Exception e) {
- throw new ForemanSetupException("Unable to acquire slot for query.", e);
- }
-
- if (lease == null) {
- throw UserException
- .resourceError()
- .message(
- "Unable to acquire queue resources for query within timeout. Timeout for %s queue was set at %d seconds.",
- queueName, queueTimeout / 1000)
- .build(logger);
- }
- }
-
Exception getCurrentException() {
return foremanResult.getException();
}
@@ -612,54 +553,55 @@ public class Foreman implements Runnable {
final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null);
final SimpleParallelizer parallelizer = new SimpleParallelizer(queryContext);
- final QueryWorkUnit queryWorkUnit = parallelizer.getFragments(
+ return parallelizer.getFragments(
queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(),
- queryId, queryContext.getActiveEndpoints(), drillbitContext.getPlanReader(), rootFragment,
+ queryId, queryContext.getActiveEndpoints(), rootFragment,
initiatingClient.getSession(), queryContext.getQueryContextInfo());
+ }
- if (logger.isTraceEnabled()) {
- final StringBuilder sb = new StringBuilder();
- sb.append("PlanFragments for query ");
- sb.append(queryId);
+ private void logWorkUnit(QueryWorkUnit queryWorkUnit) {
+ if (! logger.isTraceEnabled()) {
+ return;
+ }
+ final StringBuilder sb = new StringBuilder();
+ sb.append("PlanFragments for query ");
+ sb.append(queryId);
+ sb.append('\n');
+
+ final List<PlanFragment> planFragments = queryWorkUnit.getFragments();
+ final int fragmentCount = planFragments.size();
+ int fragmentIndex = 0;
+ for(final PlanFragment planFragment : planFragments) {
+ final FragmentHandle fragmentHandle = planFragment.getHandle();
+ sb.append("PlanFragment(");
+ sb.append(++fragmentIndex);
+ sb.append('/');
+ sb.append(fragmentCount);
+ sb.append(") major_fragment_id ");
+ sb.append(fragmentHandle.getMajorFragmentId());
+ sb.append(" minor_fragment_id ");
+ sb.append(fragmentHandle.getMinorFragmentId());
sb.append('\n');
- final List<PlanFragment> planFragments = queryWorkUnit.getFragments();
- final int fragmentCount = planFragments.size();
- int fragmentIndex = 0;
- for(final PlanFragment planFragment : planFragments) {
- final FragmentHandle fragmentHandle = planFragment.getHandle();
- sb.append("PlanFragment(");
- sb.append(++fragmentIndex);
- sb.append('/');
- sb.append(fragmentCount);
- sb.append(") major_fragment_id ");
- sb.append(fragmentHandle.getMajorFragmentId());
- sb.append(" minor_fragment_id ");
- sb.append(fragmentHandle.getMinorFragmentId());
- sb.append('\n');
-
- final DrillbitEndpoint endpointAssignment = planFragment.getAssignment();
- sb.append(" DrillbitEndpoint address ");
- sb.append(endpointAssignment.getAddress());
- sb.append('\n');
-
- String jsonString = "<<malformed JSON>>";
- sb.append(" fragment_json: ");
- final ObjectMapper objectMapper = new ObjectMapper();
- try
- {
- final Object json = objectMapper.readValue(planFragment.getFragmentJson(), Object.class);
- jsonString = objectMapper.defaultPrettyPrintingWriter().writeValueAsString(json);
- } catch(final Exception e) {
- // we've already set jsonString to a fallback value
- }
- sb.append(jsonString);
+ final DrillbitEndpoint endpointAssignment = planFragment.getAssignment();
+ sb.append(" DrillbitEndpoint address ");
+ sb.append(endpointAssignment.getAddress());
+ sb.append('\n');
- logger.trace(sb.toString());
+ String jsonString = "<<malformed JSON>>";
+ sb.append(" fragment_json: ");
+ final ObjectMapper objectMapper = new ObjectMapper();
+ try
+ {
+ final Object json = objectMapper.readValue(planFragment.getFragmentJson(), Object.class);
+ jsonString = objectMapper.defaultPrettyPrintingWriter().writeValueAsString(json);
+ } catch(final Exception e) {
+ // we've already set jsonString to a fallback value
}
- }
+ sb.append(jsonString);
- return queryWorkUnit;
+ logger.trace(sb.toString());
+ }
}
/**
@@ -897,7 +839,7 @@ public class Foreman implements Runnable {
runningQueries.dec();
completedQueries.inc();
try {
- releaseLease();
+ queryRM.exit();
} finally {
isClosed = true;
}
@@ -953,7 +895,7 @@ public class Foreman implements Runnable {
foremanResult.setCompleted(QueryState.CANCELED);
/*
* We don't close the foremanResult until we've gotten
- * acknowledgements, which happens below in the case for current state
+ * acknowledgments, which happens below in the case for current state
* == CANCELLATION_REQUESTED.
*/
return;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index ecbccf3c6..216a80df4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -36,7 +36,6 @@ import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.proto.SchemaUserBitShared;
import org.apache.drill.exec.proto.UserBitShared.FragmentState;
import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
@@ -99,6 +98,15 @@ public class QueryManager implements AutoCloseable {
// Is the query saved in transient store
private boolean inTransientStore;
+ /**
+ * Total query cost. This value is used to place the query into a queue
+ * and so has meaning to the user who wants to predict queue placement.
+ */
+
+ private double totalCost;
+
+ private String queueName;
+
public QueryManager(final QueryId queryId, final RunQuery runQuery, final PersistentStoreProvider storeProvider,
final ClusterCoordinator coordinator, final Foreman foreman) {
this.queryId = queryId;
@@ -191,6 +199,7 @@ public class QueryManager implements AutoCloseable {
* (3) Leaf fragment: running, send the cancel signal through a tunnel. The cancel is done directly.
*/
void cancelExecutingFragments(final DrillbitContext drillbitContext) {
+ @SuppressWarnings("resource")
final Controller controller = drillbitContext.getController();
for(final FragmentData data : fragmentDataSet) {
switch(data.getState()) {
@@ -219,6 +228,7 @@ public class QueryManager implements AutoCloseable {
* sending any message. Resume all fragments through the control tunnel.
*/
void unpauseExecutingFragments(final DrillbitContext drillbitContext) {
+ @SuppressWarnings("resource")
final Controller controller = drillbitContext.getController();
for(final FragmentData data : fragmentDataSet) {
final DrillbitEndpoint endpoint = data.getEndpoint();
@@ -318,6 +328,8 @@ public class QueryManager implements AutoCloseable {
.setUser(foreman.getQueryContext().getQueryUserName())
.setForeman(foreman.getQueryContext().getCurrentEndpoint())
.setStart(startTime)
+ .setTotalCost(totalCost)
+ .setQueueName(queueName == null ? "-" : queueName)
.setOptionsJson(getQueryOptionsAsJson());
if (queryText != null) {
@@ -332,7 +344,6 @@ public class QueryManager implements AutoCloseable {
}
private QueryProfile getQueryProfile(UserException ex) {
- final String queryText = foreman.getQueryText();
final QueryProfile.Builder profileBuilder = QueryProfile.newBuilder()
.setUser(foreman.getQueryContext().getQueryUserName())
.setType(runQuery.getType())
@@ -345,6 +356,8 @@ public class QueryManager implements AutoCloseable {
.setQueueWaitEnd(queueWaitEndTime)
.setTotalFragments(fragmentDataSet.size())
.setFinishedFragments(finishedFragments.get())
+ .setTotalCost(totalCost)
+ .setQueueName(queueName == null ? "-" : queueName)
.setOptionsJson(getQueryOptionsAsJson());
if (ex != null) {
@@ -360,6 +373,7 @@ public class QueryManager implements AutoCloseable {
profileBuilder.setPlan(planText);
}
+ final String queryText = foreman.getQueryText();
if (queryText != null) {
profileBuilder.setQuery(queryText);
}
@@ -392,7 +406,6 @@ public class QueryManager implements AutoCloseable {
profileBuilder.addFragmentProfile(builder);
return true;
}
-
}
private class InnerIter implements IntObjectPredicate<FragmentData> {
@@ -407,7 +420,6 @@ public class QueryManager implements AutoCloseable {
builder.addMinorFragmentProfile(data.getProfile());
return true;
}
-
}
void setPlanText(final String planText) {
@@ -430,6 +442,14 @@ public class QueryManager implements AutoCloseable {
queueWaitEndTime = System.currentTimeMillis();
}
+ public void setTotalCost(double totalCost) {
+ this.totalCost = totalCost;
+ }
+
+ public void setQueueName(String queueName) {
+ this.queueName = queueName;
+ }
+
/**
* Internal class used to track the number of pending completion messages required from particular node. This allows
* to know for each node that is part of this query, what portion of fragments are still outstanding. In the case that
@@ -536,7 +556,7 @@ public class QueryManager implements AutoCloseable {
return drillbitStatusListener;
}
- private final DrillbitStatusListener drillbitStatusListener = new DrillbitStatusListener(){
+ private final DrillbitStatusListener drillbitStatusListener = new DrillbitStatusListener() {
@Override
public void drillbitRegistered(final Set<DrillbitEndpoint> registeredDrillbits) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/AbstractResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/AbstractResourceManager.java
new file mode 100644
index 000000000..9bcaddade
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/AbstractResourceManager.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.server.DrillbitContext;
+
+/**
+ * Abstract base class for a resource manager. Handles tasks common to all
+ * resource managers: learning the resources available on this Drillbit.
+ * In the current version, Drillbits must be symmetrical, so that knowing
+ * the resources on one node is sufficient to know resources available on
+ * all nodes.
+ */
+
+public abstract class AbstractResourceManager implements ResourceManager {
+
+ protected final DrillbitContext context;
+ private final long memoryPerNode;
+ private final int cpusPerNode;
+
+ public AbstractResourceManager(final DrillbitContext context) {
+ this.context = context;
+ DrillConfig config = context.getConfig();
+
+ // Normally we use the actual direct memory configured on the JVM command
+ // line. However, if the config param is set, we use that instead (if it is
+ // lower than actual memory). Primarily for testing.
+
+ long memLimit = DrillConfig.getMaxDirectMemory();
+ long configMemoryPerNode = config.getBytes(ExecConstants.MAX_MEMORY_PER_NODE);
+ if (configMemoryPerNode > 0) {
+ memLimit = Math.min(memLimit, configMemoryPerNode);
+ }
+ memoryPerNode = memLimit;
+
+ // Do the same for CPUs.
+
+ int cpuLimit = Runtime.getRuntime().availableProcessors();
+ int configCpusPerNode = config.getInt(ExecConstants.MAX_CPUS_PER_NODE);
+ if (configCpusPerNode > 0) {
+ cpuLimit = Math.min(cpuLimit, configCpusPerNode);
+ }
+ cpusPerNode = cpuLimit;
+ }
+
+ @Override
+ public long memoryPerNode() { return memoryPerNode; }
+
+ @Override
+ public int cpusPerNode() { return cpusPerNode; }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java
new file mode 100644
index 000000000..c28fbbb59
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.util.MemoryAllocationUtilities;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.apache.drill.exec.work.foreman.Foreman;
+
+/**
+ * Represents a default resource manager for clusters that do not provide query
+ * queues. Without queues to provide a hard limit on the query admission rate,
+ * the number of active queries must be estimated and the resulting resource
+ * allocations will be rough estimates.
+ */
+
+public class DefaultResourceManager implements ResourceManager {
+
+ public static class DefaultResourceAllocator implements QueryResourceAllocator {
+
+ private QueryContext queryContext;
+
+ protected DefaultResourceAllocator(QueryContext queryContext) {
+ this.queryContext = queryContext;
+ }
+
+ @Override
+ public void visitAbstractPlan(PhysicalPlan plan) {
+ if (plan == null || plan.getProperties().hasResourcePlan) {
+ return;
+ }
+ MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext);
+ }
+
+ @Override
+ public void visitPhysicalPlan(QueryWorkUnit work) {
+ }
+ }
+
+ public static class DefaultQueryResourceManager extends DefaultResourceAllocator implements QueryResourceManager {
+
+ @SuppressWarnings("unused")
+ private final DefaultResourceManager rm;
+
+ public DefaultQueryResourceManager(final DefaultResourceManager rm, final Foreman foreman) {
+ super(foreman.getQueryContext());
+ this.rm = rm;
+ }
+
+ @Override
+ public void setCost(double cost) {
+ // Nothing to do by default.
+ }
+
+ @Override
+ public void admit() {
+ // No queueing by default
+ }
+
+ @Override
+ public void exit() {
+ // No queueing by default
+ }
+
+ @Override
+ public boolean hasQueue() { return false; }
+
+ @Override
+ public String queueName() { return null; }
+ }
+
+ public final long memoryPerNode;
+ public final int cpusPerNode;
+
+ public DefaultResourceManager() {
+ memoryPerNode = DrillConfig.getMaxDirectMemory();
+
+ // Note: CPUs are not yet used, they will be used in a future
+ // enhancement.
+
+ cpusPerNode = Runtime.getRuntime().availableProcessors();
+ }
+
+ @Override
+ public long memoryPerNode() { return memoryPerNode; }
+
+ @Override
+ public int cpusPerNode() { return cpusPerNode; }
+
+ @Override
+ public QueryResourceAllocator newResourceAllocator(QueryContext queryContext) {
+ return new DefaultResourceAllocator(queryContext);
+ }
+
+ @Override
+ public QueryResourceManager newQueryRM(final Foreman foreman) {
+ return new DefaultQueryResourceManager(this, foreman);
+ }
+
+ @Override
+ public void close() { }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java
new file mode 100644
index 000000000..9a4c78df5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.DistributedSemaphore;
+import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.SystemOptionManager;
+
+/**
+ * Distributed query queue which uses a Zookeeper distributed semaphore to
+ * control queuing across the cluster. The distributed queue is actually two
+ * queues: one for "small" queries, another for "large" queries. Query size is
+ * determined by the Planner's estimate of query cost.
+ * <p>
+ * This queue is configured using system options:
+ * <dl>
+ * <dt><tt>exec.queue.enable</tt>
+ * <dt>
+ * <dd>Set to true to enable the distributed queue.</dd>
+ * <dt><tt>exec.queue.large</tt>
+ * <dt>
+ * <dd>The maximum number of large queries to admit. Additional
+ * queries wait in the queue.</dd>
+ * <dt><tt>exec.queue.small</tt>
+ * <dt>
+ * <dd>The maximum number of small queries to admit. Additional
+ * queries wait in the queue.</dd>
+ * <dt><tt>exec.queue.threshold</tt>
+ * <dt>
+ * <dd>The cost threshold. Queries below this size are small, at
+ * or above this size are large..</dd>
+ * <dt><tt>exec.queue.timeout_millis</tt>
+ * <dt>
+ * <dd>The maximum time (in milliseconds) a query will wait in the
+ * queue before failing.</dd>
+ * </dl>
+ * <p>
+ * The above values are refreshed every five seconds. This aids performance
+ * a bit in systems with very high query arrival rates.
+ */
+
+public class DistributedQueryQueue implements QueryQueue {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedQueryQueue.class);
+
+ private class DistributedQueueLease implements QueueLease {
+ private final QueryId queryId;
+ private DistributedLease lease;
+ private final String queueName;
+
+ /**
+ * Memory allocated to the query. Though all queries in the queue use
+ * the same memory allocation rules, those rules can change at any time
+ * as the user changes system options. This value captures the value
+ * calculated at the time that this lease was granted.
+ */
+ private long queryMemory;
+
+ public DistributedQueueLease(QueryId queryId, String queueName,
+ DistributedLease lease, long queryMemory) {
+ this.queryId = queryId;
+ this.queueName = queueName;
+ this.lease = lease;
+ this.queryMemory = queryMemory;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Lease for %s queue to query %s",
+ queueName, QueryIdHelper.getQueryId(queryId));
+ }
+
+ @Override
+ public long queryMemoryPerNode() { return queryMemory; }
+
+ @Override
+ public void release() {
+ DistributedQueryQueue.this.release(this);
+ }
+
+ @Override
+ public String queueName() { return queueName; }
+ }
+
+ /**
+ * Exposes a snapshot of internal state information for use in status
+ * reporting, such as in the UI.
+ */
+
+ @XmlRootElement
+ public static class ZKQueueInfo {
+ public final int smallQueueSize;
+ public final int largeQueueSize;
+ public final double queueThreshold;
+ public final long memoryPerNode;
+ public final long memoryPerSmallQuery;
+ public final long memoryPerLargeQuery;
+
+ public ZKQueueInfo(DistributedQueryQueue queue) {
+ smallQueueSize = queue.configSet.smallQueueSize;
+ largeQueueSize = queue.configSet.largeQueueSize;
+ queueThreshold = queue.configSet.queueThreshold;
+ memoryPerNode = queue.memoryPerNode;
+ memoryPerSmallQuery = queue.memoryPerSmallQuery;
+ memoryPerLargeQuery = queue.memoryPerLargeQuery;
+ }
+ }
+
+ public interface StatusAdapter {
+ boolean inShutDown();
+ }
+
+ /**
+ * Holds runtime configuration options. Allows polling the options
+ * for changes, and easily detecting changes.
+ */
+
+ private static class ConfigSet {
+ private final long queueThreshold;
+ private final long queueTimeout;
+ private final int largeQueueSize;
+ private final int smallQueueSize;
+ private final double largeToSmallRatio;
+ private final double reserveMemoryRatio;
+ private final long minimumOperatorMemory;
+
+ public ConfigSet(SystemOptionManager optionManager) {
+ queueThreshold = optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_SIZE);
+ queueTimeout = optionManager.getOption(ExecConstants.QUEUE_TIMEOUT);
+
+ // Option manager supports only long values, but we do not expect
+ // more than 2 billion active queries, so queue size is stored as
+ // an int.
+ // TODO: Once DRILL-5832 is available, add an getInt() method to
+ // the option system to get the value as an int and do a range
+ // check.
+
+ largeQueueSize = (int) optionManager.getOption(ExecConstants.LARGE_QUEUE_SIZE);
+ smallQueueSize = (int) optionManager.getOption(ExecConstants.SMALL_QUEUE_SIZE);
+ largeToSmallRatio = optionManager.getOption(ExecConstants.QUEUE_MEMORY_RATIO);
+ reserveMemoryRatio = optionManager.getOption(ExecConstants.QUEUE_MEMORY_RESERVE);
+ minimumOperatorMemory = optionManager.getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP);
+ }
+
+ /**
+ * Determine if this config set is the same as another one. Detects
+ * whether the configuration has changed between one sync point and
+ * another.
+ * <p>
+ * Note that we cannot use <tt>equals()</tt> here as, according to
+ * Drill practice, <tt>equals()</tt> is for use in collections and
+ * must be accompanied by a <tt>hashCode()</tt> method. Since this
+ * class will never be used in a collection, and does not need a
+ * hash function, we cannot use <tt>equals()</tt>.
+ *
+ * @param otherSet another snapshot taken at another time
+ * @return true if this configuration is the same as another
+ * (no update between the two snapshots), false if the config has
+ * changed between the snapshots
+ */
+
+ public boolean isSameAs(ConfigSet otherSet) {
+ return queueThreshold == otherSet.queueThreshold &&
+ queueTimeout == otherSet.queueTimeout &&
+ largeQueueSize == otherSet.largeQueueSize &&
+ smallQueueSize == otherSet.smallQueueSize &&
+ largeToSmallRatio == otherSet.largeToSmallRatio &&
+ reserveMemoryRatio == otherSet.reserveMemoryRatio &&
+ minimumOperatorMemory == otherSet.minimumOperatorMemory;
+ }
+ }
+
+ private long memoryPerNode;
+ private SystemOptionManager optionManager;
+ private ConfigSet configSet;
+ private ClusterCoordinator clusterCoordinator;
+ private long nextRefreshTime;
+ private long memoryPerSmallQuery;
+ private long memoryPerLargeQuery;
+ private final StatusAdapter statusAdapter;
+
+ public DistributedQueryQueue(DrillbitContext context, StatusAdapter adapter) {
+ statusAdapter = adapter;
+ optionManager = context.getOptionManager();
+ clusterCoordinator = context.getClusterCoordinator();
+ }
+
+ @Override
+ public void setMemoryPerNode(long memoryPerNode) {
+ this.memoryPerNode = memoryPerNode;
+ refreshConfig();
+ }
+
+ @Override
+ public long defaultQueryMemoryPerNode(double cost) {
+ return (cost < configSet.queueThreshold)
+ ? memoryPerSmallQuery
+ : memoryPerLargeQuery;
+ }
+
+ @Override
+ public long minimumOperatorMemory() { return configSet.minimumOperatorMemory; }
+
+ /**
+ * This limits the number of "small" and "large" queries that a Drill cluster will run
+ * simultaneously, if queuing is enabled. If the query is unable to run, this will block
+ * until it can. Beware that this is called under run(), and so will consume a thread
+ * while it waits for the required distributed semaphore.
+ *
+ * @param queryId query identifier
+ * @param cost the query plan
+ * @throws QueryQueueException if the underlying ZK queuing mechanism fails
+ * @throws QueueTimeoutException if the query waits too long in the
+ * queue
+ */
+
+ @SuppressWarnings("resource")
+ @Override
+ public QueueLease enqueue(QueryId queryId, double cost) throws QueryQueueException, QueueTimeoutException {
+ final String queueName;
+ DistributedLease lease = null;
+ long queryMemory;
+ final DistributedSemaphore distributedSemaphore;
+ try {
+
+ // Only the refresh and queue computation is synchronized.
+
+ synchronized(this) {
+ refreshConfig();
+
+ // get the appropriate semaphore
+ if (cost >= configSet.queueThreshold) {
+ distributedSemaphore = clusterCoordinator.getSemaphore("query.large", configSet.largeQueueSize);
+ queueName = "large";
+ queryMemory = memoryPerLargeQuery;
+ } else {
+ distributedSemaphore = clusterCoordinator.getSemaphore("query.small", configSet.smallQueueSize);
+ queueName = "small";
+ queryMemory = memoryPerSmallQuery;
+ }
+ }
+ logger.debug("Query {} with cost {} placed into the {} queue.",
+ QueryIdHelper.getQueryId(queryId), cost, queueName);
+
+ lease = distributedSemaphore.acquire(configSet.queueTimeout, TimeUnit.MILLISECONDS);
+ } catch (final Exception e) {
+ logger.error("Unable to acquire slot for query " +
+ QueryIdHelper.getQueryId(queryId), e);
+ throw new QueryQueueException("Unable to acquire slot for query.", e);
+ }
+
+ if (lease == null) {
+ int timeoutSecs = (int) Math.round(configSet.queueTimeout/1000.0);
+ logger.warn("Queue timeout: {} after {} seconds.", queueName, timeoutSecs);
+ throw new QueueTimeoutException(queryId, queueName, timeoutSecs);
+ }
+ return new DistributedQueueLease(queryId, queueName, lease, queryMemory);
+ }
+
+ private synchronized void refreshConfig() {
+ long now = System.currentTimeMillis();
+ if (now < nextRefreshTime) {
+ return;
+ }
+ nextRefreshTime = now + 5000;
+
+ // Only update numbers, and log changes, if something
+ // actually changes.
+
+ ConfigSet newSet = new ConfigSet(optionManager);
+ if (newSet.isSameAs(configSet)) {
+ return;
+ }
+
+ configSet = newSet;
+
+ // Divide up memory between queues using admission rate
+ // to give more memory to larger queries and less to
+ // smaller queries. We assume that large queries are
+ // larger than small queries by a factor of
+ // largeToSmallRatio.
+
+ double totalUnits = configSet.largeToSmallRatio * configSet.largeQueueSize + configSet.smallQueueSize;
+ double availableMemory = Math.round(memoryPerNode * (1.0 - configSet.reserveMemoryRatio));
+ double memoryUnit = availableMemory / totalUnits;
+ memoryPerLargeQuery = Math.round(memoryUnit * configSet.largeToSmallRatio);
+ memoryPerSmallQuery = Math.round(memoryUnit);
+
+ logger.debug("Memory config: total memory per node = {}, available: {}, large/small memory ratio = {}",
+ memoryPerNode, availableMemory, configSet.largeToSmallRatio);
+ logger.debug("Reserve memory ratio: {}, bytes: {}",
+ configSet.reserveMemoryRatio, memoryPerNode - availableMemory);
+ logger.debug("Minimum operator memory: {}", configSet.minimumOperatorMemory);
+ logger.debug("Small queue: {} slots, {} bytes per slot",
+ configSet.smallQueueSize, memoryPerSmallQuery);
+ logger.debug("Large queue: {} slots, {} bytes per slot",
+ configSet.largeQueueSize, memoryPerLargeQuery);
+ logger.debug("Cost threshold: {}, timeout: {} ms.",
+ configSet.queueThreshold, configSet.queueTimeout);
+ }
+
+ @Override
+ public boolean enabled() { return true; }
+
+ public synchronized ZKQueueInfo getInfo() {
+ refreshConfig();
+ return new ZKQueueInfo(this);
+ }
+
+ private void release(QueueLease lease) {
+ DistributedQueueLease theLease = (DistributedQueueLease) lease;
+ for (;;) {
+ try {
+ theLease.lease.close();
+ theLease.lease = null;
+ break;
+ } catch (final InterruptedException e) {
+ // if we end up here, the loop will try again
+ } catch (final Exception e) {
+ logger.warn("Failure while releasing lease.", e);
+ break;
+ }
+ if (inShutdown()) {
+ logger.warn("In shutdown mode: abandoning attempt to release lease");
+ }
+ }
+ }
+
+ private boolean inShutdown() {
+ if (statusAdapter == null) {
+ return false;
+ }
+ return statusAdapter.inShutDown();
+ }
+
+ @Override
+ public void close() {
+ // Nothing to do.
+ }
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DynamicResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DynamicResourceManager.java
new file mode 100644
index 000000000..473401fab
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DynamicResourceManager.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.SystemOptionManager;
+import org.apache.drill.exec.work.foreman.Foreman;
+import org.apache.drill.exec.work.foreman.rm.DistributedQueryQueue.StatusAdapter;
+
+/**
+ * Wrapper around the default and/or distributed resource managers
+ * to allow dynamically enabling and disabling queueing.
+ */
+
+public class DynamicResourceManager implements ResourceManager {
+
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DynamicResourceManager.class);
+
+ private final DrillbitContext context;
+ private ResourceManager defaultRm;
+ private ResourceManager queueingRm;
+ private ResourceManager activeRm;
+ public long nextUpdateTime;
+ public final int recheckDelayMs = 5000;
+
+ public DynamicResourceManager(final DrillbitContext context) {
+ this.context = context;
+ refreshRM();
+ }
+
+ public synchronized ResourceManager activeRM() {
+ refreshRM();
+ return activeRm;
+ }
+
+ @Override
+ public long memoryPerNode() {
+ return activeRm.memoryPerNode();
+ }
+
+ @Override
+ public int cpusPerNode() {
+ return activeRm.cpusPerNode();
+ }
+
+ @Override
+ public synchronized QueryResourceAllocator newResourceAllocator(QueryContext queryContext) {
+ refreshRM();
+ return activeRm.newResourceAllocator(queryContext);
+ }
+
+ @Override
+ public synchronized QueryResourceManager newQueryRM(Foreman foreman) {
+ refreshRM();
+ return activeRm.newQueryRM(foreman);
+ }
+
+ private void refreshRM() {
+ long now = System.currentTimeMillis();
+ if (now < nextUpdateTime) {
+ return;
+ }
+ nextUpdateTime = now + recheckDelayMs;
+ @SuppressWarnings("resource")
+ SystemOptionManager systemOptions = context.getOptionManager();
+ if (systemOptions.getOption(ExecConstants.ENABLE_QUEUE)) {
+ if (queueingRm == null) {
+ StatusAdapter statusAdapter = new StatusAdapter() {
+ @Override
+ public boolean inShutDown() {
+ // Drill provides no shutdown state at present.
+ // TODO: Once DRILL-4286 (graceful shutdown) is merged, use the
+ // new Drillbit status to determine when the Drillbit
+ // is shutting down.
+ return false;
+ }
+ };
+ queueingRm = new ThrottledResourceManager(context,
+ new DistributedQueryQueue(context, statusAdapter));
+ }
+ if (activeRm != queueingRm) {
+ logger.debug("Enabling ZK-based query queue.");
+ activeRm = queueingRm;
+ }
+ } else {
+ if (defaultRm == null) {
+ defaultRm = new DefaultResourceManager();
+ }
+ if (activeRm != defaultRm) {
+ logger.debug("Disabling ZK-based query queue.");
+ activeRm = defaultRm;
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ RuntimeException ex = null;
+ try {
+ if (defaultRm != null) {
+ defaultRm.close();
+ }
+ } catch (RuntimeException e) {
+ ex = e;
+ } finally {
+ defaultRm = null;
+ }
+ try {
+ if (queueingRm != null) {
+ queueingRm.close();
+ }
+ } catch (RuntimeException e) {
+ ex = ex == null ? e : ex;
+ } finally {
+ queueingRm = null;
+ }
+ activeRm = null;
+ if (ex == null) {
+ return;
+ } else if (ex instanceof UserException) {
+ throw (UserException) ex;
+ } else {
+ throw UserException.systemError(ex)
+ .addContext("Failure closing resource managers.")
+ .build(logger);
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/EmbeddedQueryQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/EmbeddedQueryQueue.java
new file mode 100644
index 000000000..a042f8bb6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/EmbeddedQueryQueue.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.server.DrillbitContext;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Query queue to be used in an embedded Drillbit. This queue has scope of only
+ * the one Drillbit (not even multiple Drillbits in the same process.) Primarily
+ * intended for testing, but may possibly be useful for other embedded
+ * applications.
+ * <p>
+ * Configuration is via config parameters (not via system options as for the
+ * distributed queue.)
+ * <dl>
+ * <dt><tt>drill.queue.embedded.enabled</tt></dt>
+ * <dd>Set to true to enable the embedded queue. But, this setting has effect
+ * only if the Drillbit is, in fact, embedded.</dd>
+ * <dt><tt>drill.queue.embedded.size</tt></dt>
+ * <dd>The number of active queries, all others queue. There is no upper limit
+ * on the number of queued entries.</dt>
+ * <dt><tt>drill.queue.embedded.timeout_ms</tt></dt>
+ * <dd>The maximum time a query will wait in the queue before failing.</dd>
+ * </dl>
+ */
+
+public class EmbeddedQueryQueue implements QueryQueue {
+
+ public static String EMBEDDED_QUEUE = "drill.exec.queue.embedded";
+ public static String ENABLED = EMBEDDED_QUEUE + ".enable";
+ public static String QUEUE_SIZE = EMBEDDED_QUEUE + ".size";
+ public static String TIMEOUT_MS = EMBEDDED_QUEUE + ".timeout_ms";
+
+ public class EmbeddedQueueLease implements QueueLease {
+
+ private final QueryId queryId;
+ private boolean released;
+ private long queryMemory;
+
+ public EmbeddedQueueLease(QueryId queryId, long queryMemory) {
+ this.queryId = queryId;
+ this.queryMemory = queryMemory;
+ }
+
+ @Override
+ public String toString( ) {
+ return new StringBuilder()
+ .append("Embedded queue lease for ")
+ .append(QueryIdHelper.getQueryId(queryId))
+ .append(released ? " (released)" : "")
+ .toString();
+ }
+
+ @Override
+ public long queryMemoryPerNode() {
+ return queryMemory;
+ }
+
+ @Override
+ public void release() {
+ EmbeddedQueryQueue.this.release(this);
+ released = true;
+ }
+
+ @VisibleForTesting
+ boolean isReleased() { return released; }
+
+ @Override
+ public String queueName() { return "local-queue"; }
+ }
+
+ private final int queueTimeoutMs;
+ private final int queueSize;
+ private final Semaphore semaphore;
+ private long memoryPerQuery;
+ private final long minimumOperatorMemory;
+
+ public EmbeddedQueryQueue(DrillbitContext context) {
+ DrillConfig config = context.getConfig();
+ queueTimeoutMs = config.getInt(TIMEOUT_MS);
+ queueSize = config.getInt(QUEUE_SIZE);
+ semaphore = new Semaphore(queueSize, true);
+ minimumOperatorMemory = context.getOptionManager()
+ .getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP);
+ }
+
+ @Override
+ public boolean enabled() { return true; }
+
+ @Override
+ public void setMemoryPerNode(long memoryPerNode) {
+ memoryPerQuery = memoryPerNode / queueSize;
+ }
+
+ @Override
+ public long defaultQueryMemoryPerNode(double cost) {
+ return memoryPerQuery;
+ }
+
+ @Override
+ public QueueLease enqueue(QueryId queryId, double cost)
+ throws QueueTimeoutException, QueryQueueException {
+ try {
+ if (! semaphore.tryAcquire(queueTimeoutMs, TimeUnit.MILLISECONDS) ) {
+ throw new QueueTimeoutException(queryId, "embedded", queueTimeoutMs);
+ }
+ } catch (InterruptedException e) {
+ throw new QueryQueueException("Interrupted", e);
+ }
+ return new EmbeddedQueueLease(queryId, memoryPerQuery);
+ }
+
+ private void release(EmbeddedQueueLease lease) {
+ assert ! lease.released;
+ semaphore.release();
+ }
+
+ @Override
+ public void close() {
+ assert semaphore.availablePermits() == queueSize;
+ }
+
+ @Override
+ public long minimumOperatorMemory() {
+ return minimumOperatorMemory;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryQueue.java
new file mode 100644
index 000000000..72dc2d63a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryQueue.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+
+/**
+ * Interface which defines a queue implementation for query queues.
+ * Implementations can queue locally, queue distributed, or do
+ * nothing at all.
+ * <p>
+ * A queue can report itself as enabled or disabled. When enabled,
+ * all queries must obtain a lease prior to starting execution. The
+ * lease must be released at the completion of execution.
+ */
+
+public interface QueryQueue {
+
+ /**
+ * The opaque lease returned once a query is admitted
+ * for execution.
+ */
+
+ public interface QueueLease {
+ long queryMemoryPerNode();
+
+ /**
+ * Release a query lease obtained from {@link #queue(QueryId, double))}.
+ * Should be called by the per-query resource manager.
+ *
+ * @param lease the lease to be released.
+ */
+
+ void release();
+
+ String queueName();
+ };
+
+ /**
+ * Exception thrown if a query exceeds the configured wait time
+ * in the query queue.
+ */
+
+ @SuppressWarnings("serial")
+ public class QueueTimeoutException extends Exception {
+
+ private final QueryId queryId;
+ private final String queueName;
+ private final int timeoutMs;
+
+ public QueueTimeoutException(QueryId queryId, String queueName, int timeoutMs) {
+ super( String.format(
+ "Query timed out of the %s queue after %d ms.",
+ queueName, timeoutMs ));
+ this.queryId = queryId;
+ this.queueName = queueName;
+ this.timeoutMs = timeoutMs;
+ }
+
+ public QueryId queryId() { return queryId; }
+ public String queueName() { return queueName; }
+ public int timeoutMs() { return timeoutMs; }
+ }
+
+ /**
+ * Exception thrown for all non-timeout error conditions.
+ */
+
+ @SuppressWarnings("serial")
+ public class QueryQueueException extends Exception {
+ QueryQueueException(String msg, Exception e) {
+ super(msg, e);
+ }
+ }
+
+ void setMemoryPerNode(long memoryPerNode);
+
+ /**
+ * Return the amount of memory per node when creating a EXPLAIN
+ * query plan. Plans to be executed should get the query memory from
+ * the lease, as the lease may adjust the default amount on a per-query
+ * basis. This means that the memory used to execute the query may
+ * differ from the amount shown in an EXPLAIN plan.
+ *
+ * @return assumed memory per node, in bytes, to use when creating
+ * an EXPLAIN plan
+ */
+
+ long defaultQueryMemoryPerNode(double cost);
+
+ /**
+ * Optional floor on the amount of memory assigned per operator.
+ * This ensures that operators receive a certain amount, separate from
+ * any memory slicing. This can oversubscribe node memory if used
+ * incorrectly.
+ *
+ * @return minimum per-operator memory, in bytes
+ */
+
+ long minimumOperatorMemory();
+
+ /**
+ * Determine if the queue is enabled.
+ * @return true if the query is enabled, false otherwise.
+ */
+
+ boolean enabled();
+
+ /**
+ * Queue a query. The method returns only when the query is admitted for
+ * execution. As a result, the calling thread may block up to the configured
+ * wait time.
+ * @param queryId the query ID
+ * @param cost the cost of the query used for cost-based queueing
+ * @return the query lease which must be passed to {@link #release(QueueLease)}
+ * upon query completion
+ * @throws QueueTimeoutException if the query times out waiting to be
+ * admitted.
+ * @throws QueryQueueException for any other error condition.
+ */
+
+ QueueLease enqueue(QueryId queryId, double cost) throws QueueTimeoutException, QueryQueueException;
+
+ void close();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceAllocator.java
new file mode 100644
index 000000000..35dbe5987
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceAllocator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.work.QueryWorkUnit;
+
+/**
+ * Manages resources for an individual query in conjunction with the
+ * global {@link ResourceManager}. Handles memory and CPU allocation.
+ * Instances of this class handle query planning and are used when the
+ * client wants to plan the query, but not execute it. An implementation
+ * of {@link QueryResourceManager} is used to both plan the query and
+ * queue it for execution.
+ * <p>
+ * This interface allows a variety of resource management strategies to
+ * exist for different purposes.
+ * <p>
+ * The methods here assume external synchronization: a single query calls
+ * the methods at known times; there are no concurrent calls.
+ */
+
+public interface QueryResourceAllocator {
+
+ /**
+ * Make any needed adjustments to the query plan before parallelization.
+ *
+ * @param plan
+ */
+ void visitAbstractPlan(PhysicalPlan plan);
+
+ /**
+ * Provide the manager with the physical plan and node assignments
+ * for the query to be run. This class will plan memory for the query.
+ *
+ * @param plan
+ * @param work
+ */
+
+ void visitPhysicalPlan(QueryWorkUnit work);
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java
new file mode 100644
index 000000000..9e2a3a10f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
+import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException;
+
+/**
+ * Extends a {@link QueryResourceAllocator} to provide queueing support.
+ */
+
+public interface QueryResourceManager extends QueryResourceAllocator {
+
+ /**
+ * Hint that this resource manager queues. Allows the Foreman
+ * to short-circuit expensive logic if no queuing will actually
+ * be done. This is a static attribute per Drillbit run.
+ */
+
+ boolean hasQueue();
+
+ /**
+ * For some cases the foreman does not have a full plan, just a cost. In
+ * this case, this object will not plan memory, but still needs the cost
+ * to place the job into the correct queue.
+ * @param cost
+ */
+
+ void setCost(double cost);
+
+ /**
+ * Admit the query into the cluster. Blocks until the query
+ * can run. (Later revisions may use a more thread-friendly
+ * approach.)
+ * @throws QueryQueueException if something goes wrong with the
+ * queue mechanism
+ * @throws QueueTimeoutException if the query timed out waiting to
+ * be admitted.
+ */
+
+ void admit() throws QueueTimeoutException, QueryQueueException;
+
+ /**
+ * Returns the name of the queue (if any) on which the query was
+ * placed. Valid only after the query is admitted.
+ *
+ * @return queue name, or null if queuing is not enabled.
+ */
+
+ String queueName();
+
+ /**
+ * Mark the query as completing, giving up its slot in the
+ * cluster. Releases any lease that may be held for a system with queues.
+ */
+
+ void exit();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManager.java
new file mode 100644
index 000000000..71dabafa7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManager.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.work.foreman.Foreman;
+
+/**
+ * Drillbit-wide resource manager shared by all queries. Manages memory (at
+ * present) and CPU (planned). Since queries are the primary consumer of
+ * resources, manages resources by throttling queries into the system, and
+ * allocating resources to queries in order to control total use. An "null"
+ * implementation handles the case of no queuing. Clearly, the null case cannot
+ * effectively control resource use.
+ */
+
+public interface ResourceManager {
+
+ /**
+ * Returns the memory, in bytes, assigned to each node in a Drill cluster.
+ * Drill requires that nodes are symmetrical. So, knowing the memory on any
+ * one node also gives the memory on all other nodes.
+ *
+ * @return the memory, in bytes, available in each Drillbit
+ */
+ long memoryPerNode();
+
+ int cpusPerNode();
+
+ /**
+ * Create a resource manager to prepare or describe a query. In this form, no
+ * queuing is done, but the plan is created as if queuing had been done. Used
+ * when executing EXPLAIN PLAN.
+ *
+ * @return a resource manager for the query
+ */
+
+ QueryResourceAllocator newResourceAllocator(QueryContext queryContext);
+
+ /**
+ * Create a resource manager to execute a query.
+ *
+ * @param foreman
+ * Foreman which manages the execution
+ * @return a resource manager for the query
+ */
+
+ QueryResourceManager newQueryRM(final Foreman foreman);
+
+ void close();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManagerBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManagerBuilder.java
new file mode 100644
index 000000000..430589135
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManagerBuilder.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.local.LocalClusterCoordinator;
+import org.apache.drill.exec.server.DrillbitContext;
+
+/**
+ * Builds the proper resource manager and queue implementation for the configured
+ * system options.
+ * <p>
+ * <ul>
+ * <li>If the Drillbit is embedded<ul>
+ * <li>If queues are enabled, then the admission-controlled resource manager
+ * with the local query queue.</li>
+ * <li>Otherwise, the default resource manager and no queues.</li>
+ * </ul></li>
+ * <li>If the Drillbit is in a cluster<ul>
+ * <li>If queues are enabled, then the admission-controlled resource manager
+ * with the distributed query queue.</li>
+ * <li>Otherwise, the default resource manager and no queues.</li>
+ * </ul></li>
+ * </ul>
+ * Configuration settings:
+ * <dl>
+ * <dt>Cluster coordinator instance</dt>
+ * <dd>If an instance of <tt>LocalClusterCoordinator</tt>, the Drillbit is
+ * embedded, else it is in a cluster.</dd>
+ * <dt><tt>drill.exec.queue.embedded.enable</tt> boot config<dt>
+ * <dd>If enabled, and if embedded, then use the local queue.</dd>
+ * <dt><tt>exec.queue.enable</tt> system option</dt>
+ * <dd>If enabled, and if in a cluster, then use the distributed queue.</dd>
+ * </dl>
+ */
+public class ResourceManagerBuilder {
+
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResourceManagerBuilder.class);
+
+ private DrillbitContext context ;
+
+ public ResourceManagerBuilder(final DrillbitContext context) {
+ this.context = context;
+ }
+
+ @SuppressWarnings("resource")
+ public ResourceManager build() {
+ ClusterCoordinator coord = context.getClusterCoordinator();
+ DrillConfig config = context.getConfig();
+ if (coord instanceof LocalClusterCoordinator) {
+ if (config.getBoolean(EmbeddedQueryQueue.ENABLED)) {
+ logger.debug("Enabling embedded, local query queue.");
+ return new ThrottledResourceManager(context, new EmbeddedQueryQueue(context));
+ } else {
+ logger.debug("No query queueing enabled.");
+ return new DefaultResourceManager();
+ }
+ } else {
+ return new DynamicResourceManager(context);
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
new file mode 100644
index 000000000..b46fe0933
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.apache.drill.exec.work.QueryWorkUnit.MinorFragmentDefn;
+import org.apache.drill.exec.work.foreman.Foreman;
+import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
+import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueLease;
+import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+/**
+ * Global resource manager that provides basic admission control (AC) via a
+ * configured queue: either the Zookeeper-based distributed queue or the
+ * in-process embedded Drillbit queue. The queue places an upper limit on the
+ * number of running queries. This limit then "slices" memory and CPU between
+ * queries: each gets the same share of resources.
+ * <p>
+ * This is a "basic" implementation. Clearly, a more advanced implementation
+ * could look at query cost to determine whether to give a given query more or
+ * less than the "standard" share. That is left as a future exercise; in this
+ * version we just want to get the basics working.
+ * <p>
+ * This is the resource manager level. This resource manager is paired with a
+ * queue implementation to produce a complete solution. This composition-based
+ * approach allows sharing of functionality across queue implementations.
+ */
+
+public class ThrottledResourceManager extends AbstractResourceManager {
+
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
+ .getLogger(ThrottledResourceManager.class);
+
+ public static class QueuedResourceAllocator
+ implements QueryResourceAllocator {
+
+ protected final ThrottledResourceManager rm;
+ protected QueryContext queryContext;
+ protected PhysicalPlan plan;
+ protected QueryWorkUnit work;
+ protected double queryCost;
+
+ protected QueuedResourceAllocator(final ThrottledResourceManager rm,
+ QueryContext queryContext) {
+ this.rm = rm;
+ this.queryContext = queryContext;
+ }
+
+ @Override
+ public void visitAbstractPlan(PhysicalPlan plan) {
+ this.plan = plan;
+ queryCost = plan.totalCost();
+ }
+
+ @Override
+ public void visitPhysicalPlan(final QueryWorkUnit work) {
+ this.work = work;
+ planMemory();
+ }
+
+ private void planMemory() {
+ if (plan.getProperties().hasResourcePlan) {
+ logger.debug("Memory already planned.");
+ return;
+ }
+
+ // Group fragments by node.
+
+ Map<String, Collection<PhysicalOperator>> nodeMap = buildBufferedOpMap();
+
+ // Memory must be symmetric to avoid bottlenecks in which one node has
+ // sorts (say) with less memory than another, causing skew in data arrival
+ // rates for downstream operators.
+
+ int width = countBufferingOperators(nodeMap);
+
+ // Then, share memory evenly across the
+ // all sort operators on that node. This handles asymmetric distribution
+ // such as occurs if a sort appears in the root fragment (the one with
+ // screen),
+ // which is never parallelized.
+
+ for (Entry<String, Collection<PhysicalOperator>> entry : nodeMap.entrySet()) {
+ planNodeMemory(entry.getKey(), entry.getValue(), width);
+ }
+ }
+
+ private int countBufferingOperators(
+ Map<String, Collection<PhysicalOperator>> nodeMap) {
+ int width = 0;
+ for (Collection<PhysicalOperator> fragSorts : nodeMap.values()) {
+ width = Math.max(width, fragSorts.size());
+ }
+ return width;
+ }
+
+ /**
+ * Given the set of buffered operators (from any number of fragments) on a
+ * single node, shared the per-query memory equally across all the
+ * operators.
+ *
+ * @param nodeAddr
+ * @param bufferedOps
+ * @param width
+ */
+
+ private void planNodeMemory(String nodeAddr,
+ Collection<PhysicalOperator> bufferedOps, int width) {
+
+ // If no buffering operators, nothing to plan.
+
+ if (bufferedOps.isEmpty()) {
+ return;
+ }
+
+ // Divide node memory evenly among the set of operators, in any minor
+ // fragment, on the node. This is not very sophisticated: it does not
+ // deal with, say, three stacked sorts in which, if sort A runs, then
+ // B may be using memory, but C cannot be active. That kind of analysis
+ // is left as a later exercise.
+
+ long nodeMemory = queryMemoryPerNode();
+
+ // Set a floor on the amount of memory per operator based on the
+ // configured minimum. This is likely unhelpful because we are trying
+ // to work around constrained memory by assuming more than we actually
+ // have. This may lead to an OOM at run time.
+
+ long preferredOpMemory = nodeMemory / width;
+ long perOpMemory = Math.max(preferredOpMemory, rm.minimumOperatorMemory());
+ if (preferredOpMemory < perOpMemory) {
+ logger.warn("Preferred per-operator memory: {}, actual amount: {}",
+ preferredOpMemory, perOpMemory);
+ }
+ logger.debug(
+ "Query: {}, Node: {}, allocating {} bytes each for {} buffered operator(s).",
+ QueryIdHelper.getQueryId(queryContext.getQueryId()), nodeAddr,
+ perOpMemory, width);
+
+ for (PhysicalOperator op : bufferedOps) {
+
+ // Limit the memory to the maximum in the plan. Doing so is
+ // likely unnecessary, and perhaps harmful, because the pre-planned
+ // allocation is the default maximum hard-coded to 10 GB. This means
+ // that even if 20 GB is available to the sort, it won't use more
+ // than 10GB. This is probably more of a bug than a feature.
+
+ long alloc = Math.min(perOpMemory, op.getMaxAllocation());
+
+ // Place a floor on the memory that is the initial allocation,
+ // since we don't want the operator to run out of memory when it
+ // first starts.
+
+ alloc = Math.max(alloc, op.getInitialAllocation());
+
+ if (alloc > preferredOpMemory && alloc != perOpMemory) {
+ logger.warn("Allocated memory of {} for {} exceeds available memory of {} " +
+ "due to operator minimum",
+ alloc, op.getClass().getSimpleName(), preferredOpMemory);
+ }
+ else if (alloc < preferredOpMemory) {
+ logger.warn("Allocated memory of {} for {} is less than available memory " +
+ "of {} due to operator limit",
+ alloc, op.getClass().getSimpleName(), preferredOpMemory);
+ }
+ op.setMaxAllocation(alloc);
+ }
+ }
+
+ protected long queryMemoryPerNode() {
+ return rm.defaultQueryMemoryPerNode(plan.totalCost());
+ }
+
+ /**
+ * Build a list of external sorts grouped by node. We start with a list of
+ * minor fragments, each with an endpoint (node). Multiple minor fragments
+ * may appear on each node, and each minor fragment may have 0, 1 or more
+ * sorts.
+ *
+ * @return
+ */
+
+ private Map<String, Collection<PhysicalOperator>> buildBufferedOpMap() {
+ Multimap<String, PhysicalOperator> map = ArrayListMultimap.create();
+ getBufferedOps(map, work.getRootFragmentDefn());
+ for (MinorFragmentDefn defn : work.getMinorFragmentDefns()) {
+ getBufferedOps(map, defn);
+ }
+ return map.asMap();
+ }
+
+ /**
+ * Searches a fragment operator tree to find buffered within that fragment.
+ */
+
+ protected static class BufferedOpFinder extends
+ AbstractPhysicalVisitor<Void, List<PhysicalOperator>, RuntimeException> {
+ @Override
+ public Void visitOp(PhysicalOperator op, List<PhysicalOperator> value)
+ throws RuntimeException {
+ if (op.isBufferedOperator()) {
+ value.add(op);
+ }
+ visitChildren(op, value);
+ return null;
+ }
+ }
+
+ private void getBufferedOps(Multimap<String, PhysicalOperator> map,
+ MinorFragmentDefn defn) {
+ List<PhysicalOperator> bufferedOps = getBufferedOps(defn.root());
+ if (!bufferedOps.isEmpty()) {
+ map.putAll(defn.fragment().getAssignment().getAddress(), bufferedOps);
+ }
+ }
+
+ /**
+ * Search an individual fragment tree to find any buffered operators it may
+ * contain.
+ *
+ * @param root
+ * @return
+ */
+
+ private List<PhysicalOperator> getBufferedOps(FragmentRoot root) {
+ List<PhysicalOperator> bufferedOps = new ArrayList<>();
+ BufferedOpFinder finder = new BufferedOpFinder();
+ root.accept(finder, bufferedOps);
+ return bufferedOps;
+ }
+ }
+
+ /**
+ * Per-query resource manager. Handles resources and optional queue lease for
+ * a single query. As such, this is a non-shared resource: it is associated
+ * with a Foreman: a single thread at plan time, and a single event (in some
+ * thread) at query completion time. Because of these semantics, no
+ * synchronization is needed within this class.
+ */
+
+ public static class QueuedQueryResourceManager extends QueuedResourceAllocator
+ implements QueryResourceManager {
+
+ private final Foreman foreman;
+ private QueueLease lease;
+
+ public QueuedQueryResourceManager(final ThrottledResourceManager rm,
+ final Foreman foreman) {
+ super(rm, foreman.getQueryContext());
+ this.foreman = foreman;
+ }
+
+ @Override
+ public void setCost(double cost) {
+ this.queryCost = cost;
+ }
+
+ @Override
+ public void admit() throws QueueTimeoutException, QueryQueueException {
+ lease = rm.queue().enqueue(foreman.getQueryId(), queryCost);
+ }
+
+ @Override
+ protected long queryMemoryPerNode() {
+
+ // No lease: use static estimate.
+
+ if (lease == null) {
+ return super.queryMemoryPerNode();
+ }
+
+ // Use actual memory assigned to this query.
+
+ return lease.queryMemoryPerNode();
+ }
+
+ @Override
+ public void exit() {
+ if (lease != null) {
+ lease.release();
+ }
+ lease = null;
+ }
+
+ @Override
+ public boolean hasQueue() { return true; }
+
+ @Override
+ public String queueName() {
+ return lease == null ? null : lease.queueName();
+ }
+ }
+
+ private final QueryQueue queue;
+
+ public ThrottledResourceManager(final DrillbitContext drillbitContext,
+ final QueryQueue queue) {
+ super(drillbitContext);
+ this.queue = queue;
+ queue.setMemoryPerNode(memoryPerNode());
+ }
+
+ public long minimumOperatorMemory() {
+ return queue.minimumOperatorMemory();
+ }
+
+ public long defaultQueryMemoryPerNode(double cost) {
+ return queue.defaultQueryMemoryPerNode(cost);
+ }
+
+ public QueryQueue queue() { return queue; }
+
+ @Override
+ public QueryResourceAllocator newResourceAllocator(
+ QueryContext queryContext) {
+ return new QueuedResourceAllocator(this, queryContext);
+ }
+
+ @Override
+ public QueryResourceManager newQueryRM(Foreman foreman) {
+ return new QueuedQueryResourceManager(this, foreman);
+ }
+
+ @Override
+ public void close() {
+ queue.close();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/package-info.java
new file mode 100644
index 000000000..0b9e9da63
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/package-info.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Provides resource management and queuing support for the Drill foreman.
+ * The resource manager tracks total resources available to Drill. Several
+ * implementations are available: a default implementation for systems without
+ * queueing and an access-controlled (AC) version for systems with queues.
+ * <p>
+ * Each resource manager provides a per-query manager that is responsible
+ * for queuing the query (if needed) and memory allocation to the query based
+ * on query characteristics and memory assigned to the query.
+ * <p>
+ * Provides two different queue implementations. A distributed ZooKeeper queue
+ * and a local queue useful for embedded Drillbits (and for testing.)
+ */
+package org.apache.drill.exec.work.foreman.rm;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
index 2f945d85f..29b3580d3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -35,9 +35,9 @@ import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments;
import org.apache.drill.exec.rpc.UserClientConnection;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.util.MemoryAllocationUtilities;
import org.apache.drill.exec.util.Pointer;
import org.apache.drill.exec.work.QueryWorkUnit;
+import org.apache.drill.exec.work.foreman.rm.QueryResourceAllocator;
import com.google.common.collect.Lists;
@@ -60,6 +60,7 @@ public class PlanSplitter {
* @param connection
* @return
*/
+ @SuppressWarnings("resource")
public QueryPlanFragments planFragments(DrillbitContext dContext, QueryId queryId,
GetQueryPlanFragments req, UserClientConnection connection) {
QueryPlanFragments.Builder responseBuilder = QueryPlanFragments.newBuilder();
@@ -97,7 +98,8 @@ public class PlanSplitter {
throw new IllegalStateException("Planning fragments supports only SQL or PHYSICAL QueryType");
}
- MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext);
+ QueryResourceAllocator planner = dContext.getResourceManager().newResourceAllocator(queryContext);
+ planner.visitAbstractPlan(plan);
final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
@@ -113,6 +115,8 @@ public class PlanSplitter {
queryContext.getSession(), queryContext.getQueryContextInfo());
for (QueryWorkUnit queryWorkUnit : queryWorkUnits) {
+ planner.visitPhysicalPlan(queryWorkUnit);
+ queryWorkUnit.applyPlan(dContext.getPlanReader());
fragments.add(queryWorkUnit.getRootFragment());
List<PlanFragment> childFragments = queryWorkUnit.getFragments();
@@ -122,8 +126,10 @@ public class PlanSplitter {
}
} else {
final QueryWorkUnit queryWorkUnit = parallelizer.getFragments(queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(),
- queryId, queryContext.getActiveEndpoints(), dContext.getPlanReader(), rootFragment,
+ queryId, queryContext.getActiveEndpoints(), rootFragment,
queryContext.getSession(), queryContext.getQueryContextInfo());
+ planner.visitPhysicalPlan(queryWorkUnit);
+ queryWorkUnit.applyPlan(dContext.getPlanReader());
fragments.add(queryWorkUnit.getRootFragment());
fragments.addAll(queryWorkUnit.getFragments());
}