aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java
diff options
context:
space:
mode:
authorPaul Rogers <progers@maprtech.com>2017-08-30 14:32:17 -0700
committerPaul Rogers <progers@maprtech.com>2017-10-09 15:58:20 -0700
commitbbc42240483a0658691149aea3c509ccd0db4c79 (patch)
tree471957d30d786e3f384ffb882a088357a6acf97e /exec/java-exec/src/main/java
parenta03f5429e368cf73286eec6101871f6e61a5b7d1 (diff)
DRILL-5716: Queue-driven memory allocation
* Creates new core resource management and query queue abstractions. * Adds queue information to the Protobuf layer. * Foreman and Planner changes - Abstracts memory management out to the new resource management layer. This means deferring generating the physical plan JSON to later in the process after memory planning. * Web UI changes * Adds queue information to the main page and the profile page to each query. * Also sorts the list of options displayed in the Web UI. - Added memory reserve A new config parameter, exec.queue.memory_reserve_ratio, sets aside a slice of total memory for operators that do not participate in the memory assignment process. The default is 20% testing will tell us if that value should be larger or smaller. * Additional minor fixes - Code cleanup. - Added mechanism to abandon lease release during shutdown. - Log queue configuration only when the config changes, rather than on every query. - Apply Boaz’ option to enforce a minimum memory allocation per operator. - Additional logging to help testers see what is happening. closes #928
Diffstat (limited to 'exec/java-exec/src/main/java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java25
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfig.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java50
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java24
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java40
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java137
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/GenericExceptionMapper.java25
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogsResources.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StatusResources.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java87
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java1
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java19
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java80
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java278
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java30
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/AbstractResourceManager.java68
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java120
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java363
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DynamicResourceManager.java146
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/EmbeddedQueryQueue.java151
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryQueue.java140
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceAllocator.java56
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java73
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManager.java66
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManagerBuilder.java78
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java360
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/package-info.java31
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java14
47 files changed, 2221 insertions, 375 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 72a73fcfe..9890b45ca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -350,7 +350,7 @@ public final class ExecConstants {
public static final LongValidator MAX_QUERY_MEMORY_PER_NODE = new RangeLongValidator(MAX_QUERY_MEMORY_PER_NODE_KEY, 1024 * 1024, Long.MAX_VALUE);
/**
- * Minimum memory alocated to each buffered operator instance.
+ * Minimum memory allocated to each buffered operator instance.
* <p/>
* DEFAULT: 40 MB
*/
@@ -377,12 +377,31 @@ public final class ExecConstants {
public static final String AVERAGE_FIELD_WIDTH_KEY = "planner.memory.average_field_width";
public static final OptionValidator AVERAGE_FIELD_WIDTH = new PositiveLongValidator(AVERAGE_FIELD_WIDTH_KEY, Long.MAX_VALUE);
+ // Resource management boot-time options.
+
+ public static final String MAX_MEMORY_PER_NODE = "drill.exec.rm.memory_per_node";
+ public static final String MAX_CPUS_PER_NODE = "drill.exec.rm.cpus_per_node";
+
+ // Resource management system run-time options.
+
+ // Enables queues. When running embedded, enables an in-process queue. When
+ // running distributed, enables the Zookeeper-based distributed queue.
+
public static final BooleanValidator ENABLE_QUEUE = new BooleanValidator("exec.queue.enable");
- public static final LongValidator LARGE_QUEUE_SIZE = new PositiveLongValidator("exec.queue.large", 1000);
- public static final LongValidator SMALL_QUEUE_SIZE = new PositiveLongValidator("exec.queue.small", 100000);
+ public static final LongValidator LARGE_QUEUE_SIZE = new PositiveLongValidator("exec.queue.large", 10_000);
+ public static final LongValidator SMALL_QUEUE_SIZE = new PositiveLongValidator("exec.queue.small", 100_000);
public static final LongValidator QUEUE_THRESHOLD_SIZE = new PositiveLongValidator("exec.queue.threshold", Long.MAX_VALUE);
public static final LongValidator QUEUE_TIMEOUT = new PositiveLongValidator("exec.queue.timeout_millis", Long.MAX_VALUE);
+ // Ratio of memory for small queries vs. large queries.
+ // Each small query gets 1 unit, each large query gets QUEUE_MEMORY_RATIO units.
+ // A lower limit of 1 enforces the intuition that a large query should never get
+ // *less* memory than a small one.
+
+ public static final DoubleValidator QUEUE_MEMORY_RATIO = new RangeDoubleValidator("exec.queue.memory_ratio", 1.0, 1000);
+
+ public static final DoubleValidator QUEUE_MEMORY_RESERVE = new RangeDoubleValidator("exec.queue.memory_reserve_ratio", 0, 1.0);
+
public static final String ENABLE_VERBOSE_ERRORS_KEY = "exec.errors.verbose";
public static final OptionValidator ENABLE_VERBOSE_ERRORS = new BooleanValidator(ENABLE_VERBOSE_ERRORS_KEY);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfig.java
index 35c4a06c3..36f53ab1b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfig.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfig.java
@@ -53,6 +53,7 @@ public class TransientStoreConfig<V> {
@Override
public boolean equals(Object obj) {
if (obj instanceof TransientStoreConfig && obj.getClass().equals(getClass())) {
+ @SuppressWarnings("unchecked")
final TransientStoreConfig<V> other = (TransientStoreConfig<V>)obj;
return Objects.equal(name, other.name) && Objects.equal(serializer, other.serializer);
}
@@ -70,5 +71,4 @@ public class TransientStoreConfig<V> {
public static <V> TransientStoreConfigBuilder<V> newJacksonBuilder(final ObjectMapper mapper, final Class<V> klazz) {
return TransientStoreConfig.<V>newBuilder().serializer(new JacksonSerializer<>(mapper, klazz));
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
index 435f35f12..86074f0b4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java
@@ -60,10 +60,10 @@ import io.netty.buffer.DrillBuf;
// TODO - consider re-name to PlanningContext, as the query execution context actually appears
// in fragment contexts
public class QueryContext implements AutoCloseable, OptimizerRulesContext, SchemaConfigInfoProvider {
- private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class);
private final DrillbitContext drillbitContext;
private final UserSession session;
+ private final QueryId queryId;
private final QueryOptionManager queryOptions;
private final PlannerSettings plannerSettings;
private final ExecutionControls executionControls;
@@ -87,6 +87,7 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem
public QueryContext(final UserSession session, final DrillbitContext drillbitContext, QueryId queryId) {
this.drillbitContext = drillbitContext;
this.session = session;
+ this.queryId = queryId;
queryOptions = new QueryOptionManager(session.getOptions());
executionControls = new ExecutionControls(queryOptions, drillbitContext.getEndpoint());
plannerSettings = new PlannerSettings(queryOptions, getFunctionRegistry());
@@ -118,14 +119,12 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem
return plannerSettings;
}
- public UserSession getSession() {
- return session;
- }
+ public UserSession getSession() { return session; }
@Override
- public BufferAllocator getAllocator() {
- return allocator;
- }
+ public BufferAllocator getAllocator() { return allocator; }
+
+ public QueryId getQueryId( ) { return queryId; }
/**
* Return reference to default schema instance in a schema tree. Each {@link org.apache.calcite.schema.SchemaPlus}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
index e0902c84b..e5c96a79c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -28,6 +28,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Root;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -87,4 +88,17 @@ public class PhysicalPlan {
throw new RuntimeException(e);
}
}
+
+ public double totalCost() {
+ double totalCost = 0;
+ for (final PhysicalOperator ops : getSortedOperators()) {
+ totalCost += ops.getCost();
+ }
+ return totalCost;
+ }
+
+ @JsonIgnore
+ public Graph<PhysicalOperator, Root, Leaf> graph() {
+ return graph;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java
index 32910c352..b972dd3b6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,9 +17,10 @@
*/
package org.apache.drill.exec.physical.base;
-
/**
- * Describes the root operation within a particular Fragment. This includes things Sender nodes.
+ * Describes the root operation within a particular Fragment. This includes
+ * things like Sender nodes.
*/
+
public interface FragmentRoot extends FragmentLeaf {
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java
index 550dcb251..9c635e9ad 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java
@@ -66,7 +66,7 @@ public class HardAffinityFragmentParallelizer implements FragmentParallelizer {
}
}
- // Step 1: Find the width taking into various parameters
+ // Step 1: Find the width taking into account various parameters
// 1.1. Find the parallelization based on cost. Use max cost of all operators in this fragment; this is consistent
// with the calculation that ExcessiveExchangeRemover uses.
int width = (int) Math.ceil(stats.getMaxCost() / parameters.getSliceTarget());
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
index edec7e485..2fc754179 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -28,7 +28,6 @@ import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Store;
import org.apache.drill.exec.physical.base.SubScan;
-import org.apache.drill.exec.work.foreman.ForemanException;
import com.google.common.collect.Lists;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java
index 3e0f35a10..1529b6bf7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,8 +20,6 @@ package org.apache.drill.exec.planner.fragment;
import java.util.Iterator;
import java.util.Map;
-import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-
import com.google.common.collect.Maps;
public class PlanningSet implements Iterable<Wrapper> {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index 4584bd555..d2efcfba8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -46,9 +46,9 @@ import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.work.QueryWorkUnit;
+import org.apache.drill.exec.work.QueryWorkUnit.MinorFragmentDefn;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -70,7 +70,7 @@ public class SimpleParallelizer implements ParallelizationParameters {
public SimpleParallelizer(QueryContext context) {
OptionManager optionManager = context.getOptions();
- long sliceTarget = optionManager.getOption(ExecConstants.SLICE_TARGET).num_val;
+ long sliceTarget = optionManager.getOption(ExecConstants.SLICE_TARGET_OPTION);
this.parallelizationThreshold = sliceTarget > 0 ? sliceTarget : 1;
double cpu_load_average = optionManager.getOption(ExecConstants.CPU_LOAD_AVERAGE);
final long maxWidth = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE);
@@ -123,12 +123,12 @@ public class SimpleParallelizer implements ParallelizationParameters {
* @throws ExecutionSetupException
*/
public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,
- Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment rootFragment,
+ Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment,
UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException {
final PlanningSet planningSet = getFragmentsHelper(activeEndpoints, rootFragment);
return generateWorkUnit(
- options, foremanNode, queryId, reader, rootFragment, planningSet, session, queryContextInfo);
+ options, foremanNode, queryId, rootFragment, planningSet, session, queryContextInfo);
}
/**
@@ -150,6 +150,7 @@ public class SimpleParallelizer implements ParallelizationParameters {
// no op
throw new UnsupportedOperationException("Use children classes");
}
+
/**
* Helper method to reuse the code for QueryWorkUnit(s) generation
* @param activeEndpoints
@@ -209,6 +210,7 @@ public class SimpleParallelizer implements ParallelizationParameters {
// Identify leaf fragments. Leaf fragments are fragments that have no other fragments depending on them for
// parallelization info. First assume all fragments are leaf fragments. Go through the fragments one by one and
// remove the fragment on which the current fragment depends on.
+
final Set<Wrapper> roots = Sets.newHashSet();
for(Wrapper w : planningSet) {
roots.add(w);
@@ -257,11 +259,11 @@ public class SimpleParallelizer implements ParallelizationParameters {
}
protected QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,
- PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet,
+ Fragment rootNode, PlanningSet planningSet,
UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException {
- List<PlanFragment> fragments = Lists.newArrayList();
+ List<MinorFragmentDefn> fragmentDefns = new ArrayList<>( );
- PlanFragment rootFragment = null;
+ MinorFragmentDefn rootFragmentDefn = null;
FragmentRoot rootOperator = null;
// now we generate all the individual plan fragments and associated assignments. Note, we need all endpoints
@@ -287,16 +289,6 @@ public class SimpleParallelizer implements ParallelizationParameters {
Preconditions.checkArgument(op instanceof FragmentRoot);
FragmentRoot root = (FragmentRoot) op;
- // get plan as JSON
- String plan;
- String optionsData;
- try {
- plan = reader.writeJson(root);
- optionsData = reader.writeJson(options);
- } catch (JsonProcessingException e) {
- throw new ForemanSetupException("Failure while trying to convert fragment into json.", e);
- }
-
FragmentHandle handle = FragmentHandle //
.newBuilder() //
.setMajorFragmentId(wrapper.getMajorFragmentId()) //
@@ -306,40 +298,36 @@ public class SimpleParallelizer implements ParallelizationParameters {
PlanFragment fragment = PlanFragment.newBuilder() //
.setForeman(foremanNode) //
- .setFragmentJson(plan) //
.setHandle(handle) //
.setAssignment(wrapper.getAssignedEndpoint(minorFragmentId)) //
.setLeafFragment(isLeafFragment) //
.setContext(queryContextInfo)
.setMemInitial(wrapper.getInitialAllocation())//
.setMemMax(wrapper.getMaxAllocation())
- .setOptionsJson(optionsData)
.setCredentials(session.getCredentials())
.addAllCollector(CountRequiredFragments.getCollectors(root))
.build();
+ MinorFragmentDefn fragmentDefn = new MinorFragmentDefn(fragment, root, options);
+
if (isRootNode) {
- if (logger.isDebugEnabled()) {
- logger.debug("Root fragment:\n {}", DrillStringUtils.unescapeJava(fragment.toString()));
- }
- rootFragment = fragment;
+ logger.debug("Root fragment:\n {}", DrillStringUtils.unescapeJava(fragment.toString()));
+ rootFragmentDefn = fragmentDefn;
rootOperator = root;
} else {
- if (logger.isDebugEnabled()) {
- logger.debug("Remote fragment:\n {}", DrillStringUtils.unescapeJava(fragment.toString()));
- }
- fragments.add(fragment);
+ logger.debug("Remote fragment:\n {}", DrillStringUtils.unescapeJava(fragment.toString()));
+ fragmentDefns.add(fragmentDefn);
}
}
}
- return new QueryWorkUnit(rootOperator, rootFragment, fragments);
+ return new QueryWorkUnit(rootOperator, rootFragmentDefn, fragmentDefns);
}
-
/**
* Designed to setup initial values for arriving fragment accounting.
*/
+
protected static class CountRequiredFragments extends AbstractPhysicalVisitor<Void, List<Collector>, RuntimeException> {
private static final CountRequiredFragments INSTANCE = new CountRequiredFragments();
@@ -357,7 +345,6 @@ public class SimpleParallelizer implements ParallelizationParameters {
list.add(ep.getId());
}
-
collectors.add(Collector.newBuilder()
.setIsSpooling(receiver.isSpooling())
.setOppositeMajorFragmentId(receiver.getOppositeMajorFragmentId())
@@ -374,6 +361,5 @@ public class SimpleParallelizer implements ParallelizationParameters {
}
return null;
}
-
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java
index 644263eea..1549a6bb4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -53,7 +53,6 @@ public class SoftAffinityFragmentParallelizer implements FragmentParallelizer {
@Override
public void parallelizeFragment(final Wrapper fragmentWrapper, final ParallelizationParameters parameters,
final Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException {
- final Fragment fragment = fragmentWrapper.getNode();
// Find the parallelization width of fragment
final Stats stats = fragmentWrapper.getStats();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java
index 395a9e1f4..1eb12967d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java
@@ -41,9 +41,9 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId;
import org.apache.drill.exec.rpc.user.UserSession;
import org.apache.drill.exec.server.options.OptionList;
import org.apache.drill.exec.work.QueryWorkUnit;
+import org.apache.drill.exec.work.QueryWorkUnit.MinorFragmentDefn;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -115,7 +115,6 @@ public class SplittingParallelizer extends SimpleParallelizer {
int plansCount = 0;
DrillbitEndpoint[] endPoints = null;
long initialAllocation = 0;
- long maxAllocation = 0;
final Iterator<Wrapper> iter = planningSet.iterator();
while (iter.hasNext()) {
@@ -131,7 +130,6 @@ public class SplittingParallelizer extends SimpleParallelizer {
// allocation
plansCount = wrapper.getWidth();
initialAllocation = (wrapper.getInitialAllocation() != 0 ) ? wrapper.getInitialAllocation()/plansCount : 0;
- maxAllocation = (wrapper.getMaxAllocation() != 0 ) ? wrapper.getMaxAllocation()/plansCount : 0;
endPoints = new DrillbitEndpoint[plansCount];
for (int mfId = 0; mfId < plansCount; mfId++) {
endPoints[mfId] = wrapper.getAssignedEndpoint(mfId);
@@ -140,7 +138,7 @@ public class SplittingParallelizer extends SimpleParallelizer {
}
if ( plansCount == 0 ) {
// no exchange, return list of single QueryWorkUnit
- workUnits.add(generateWorkUnit(options, foremanNode, queryId, reader, rootNode, planningSet, session, queryContextInfo));
+ workUnits.add(generateWorkUnit(options, foremanNode, queryId, rootNode, planningSet, session, queryContextInfo));
return workUnits;
}
@@ -171,9 +169,9 @@ public class SplittingParallelizer extends SimpleParallelizer {
// Create a minorFragment for each major fragment.
for (int minorFragmentId = 0; minorFragmentId < plansCount; minorFragmentId++) {
// those fragments should be empty
- List<PlanFragment> fragments = Lists.newArrayList();
+ List<MinorFragmentDefn> fragments = Lists.newArrayList();
- PlanFragment rootFragment = null;
+ MinorFragmentDefn rootFragment = null;
FragmentRoot rootOperator = null;
IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper);
@@ -186,35 +184,25 @@ public class SplittingParallelizer extends SimpleParallelizer {
Preconditions.checkArgument(op instanceof FragmentRoot);
FragmentRoot root = (FragmentRoot) op;
- // get plan as JSON
- String plan;
- String optionsData;
- try {
- plan = reader.writeJson(root);
- optionsData = reader.writeJson(options);
- } catch (JsonProcessingException e) {
- throw new ForemanSetupException("Failure while trying to convert fragment into json.", e);
- }
PlanFragment fragment = PlanFragment.newBuilder() //
.setForeman(endPoints[minorFragmentId]) //
- .setFragmentJson(plan) //
.setHandle(handle) //
.setAssignment(endPoints[minorFragmentId]) //
.setLeafFragment(isLeafFragment) //
.setContext(queryContextInfo)
.setMemInitial(initialAllocation)//
.setMemMax(wrapper.getMaxAllocation()) // TODO - for some reason OOM is using leaf fragment max allocation divided by width
- .setOptionsJson(optionsData)
.setCredentials(session.getCredentials())
.addAllCollector(CountRequiredFragments.getCollectors(root))
.build();
+ MinorFragmentDefn fragmentDefn = new MinorFragmentDefn(fragment, root, options);
if (isRootNode) {
if (logger.isDebugEnabled()) {
logger.debug("Root fragment:\n {}", DrillStringUtils.unescapeJava(fragment.toString()));
}
- rootFragment = fragment;
+ rootFragment = fragmentDefn;
rootOperator = root;
} else {
if (logger.isDebugEnabled()) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
index 80aaf0d7f..a333ff22f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java
@@ -103,6 +103,7 @@ public class Drillbit implements AutoCloseable {
this(config, SystemOptionManager.createDefaultOptionDefinitions(), serviceSet, classpathScan);
}
+ @SuppressWarnings("resource")
@VisibleForTesting
public Drillbit(
final DrillConfig config,
@@ -122,7 +123,7 @@ public class Drillbit implements AutoCloseable {
storeProvider = new CachingPersistentStoreProvider(new LocalPersistentStoreProvider(config));
} else {
coord = new ZKClusterCoordinator(config);
- storeProvider = new PersistentStoreRegistry(this.coord, config).newPStoreProvider();
+ storeProvider = new PersistentStoreRegistry<ClusterCoordinator>(this.coord, config).newPStoreProvider();
isDistributedMode = true;
}
@@ -157,6 +158,7 @@ public class Drillbit implements AutoCloseable {
}
final DrillbitEndpoint md = engine.start();
manager.start(md, engine.getController(), engine.getDataConnectionCreator(), coord, storeProvider, profileStoreProvider);
+ @SuppressWarnings("resource")
final DrillbitContext drillbitContext = manager.getContext();
storageRegistry = drillbitContext.getStorage();
storageRegistry.init();
@@ -166,6 +168,10 @@ public class Drillbit implements AutoCloseable {
registrationHandle = coord.register(md);
webServer.start();
+ // Must start the RM after the above since it needs to read system options.
+
+ drillbitContext.startRM();
+
Runtime.getRuntime().addShutdownHook(new ShutdownThread(this, new StackTrace()));
logger.info("Startup completed ({} ms).", w.elapsed(TimeUnit.MILLISECONDS));
}
@@ -224,6 +230,7 @@ public class Drillbit implements AutoCloseable {
return;
}
+ @SuppressWarnings("resource")
final SystemOptionManager optionManager = getContext().getOptionManager();
// parse out the properties, validate, and then set them
@@ -324,7 +331,6 @@ public class Drillbit implements AutoCloseable {
return start(config, SystemOptionManager.createDefaultOptionDefinitions(), remoteServiceSet);
}
- @SuppressWarnings("resource")
@VisibleForTesting
public static Drillbit start(final DrillConfig config, final CaseInsensitiveMap<OptionDefinition> validators,
final RemoteServiceSet remoteServiceSet)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index 426b9d2ab..b8a8b1e5b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -39,6 +39,8 @@ import org.apache.drill.exec.server.options.SystemOptionManager;
import org.apache.drill.exec.store.SchemaFactory;
import org.apache.drill.exec.store.StoragePluginRegistry;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
+import org.apache.drill.exec.work.foreman.rm.ResourceManager;
+import org.apache.drill.exec.work.foreman.rm.ResourceManagerBuilder;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
@@ -66,6 +68,7 @@ public class DrillbitContext implements AutoCloseable {
// operator table for standard SQL operators and functions, Drill built-in UDFs
private final DrillOperatorTable table;
private final QueryProfileStoreContext profileStoreContext;
+ private ResourceManager resourceManager;
public DrillbitContext(
DrillbitEndpoint endpoint,
@@ -88,7 +91,7 @@ public class DrillbitContext implements AutoCloseable {
WorkEventBus workBus,
PersistentStoreProvider provider,
PersistentStoreProvider profileStoreProvider) {
- this.classpathScan = context.getClasspathScan();
+ classpathScan = context.getClasspathScan();
this.workBus = workBus;
this.controller = checkNotNull(controller);
this.context = checkNotNull(context);
@@ -96,29 +99,40 @@ public class DrillbitContext implements AutoCloseable {
this.connectionsPool = checkNotNull(connectionsPool);
this.endpoint = checkNotNull(endpoint);
this.provider = provider;
- this.lpPersistence = new LogicalPlanPersistence(context.getConfig(), classpathScan);
+ DrillConfig config = context.getConfig();
+ lpPersistence = new LogicalPlanPersistence(config, classpathScan);
- // TODO remove escaping "this".
- this.storagePlugins = context.getConfig()
+ storagePlugins = config
.getInstance(StoragePluginRegistry.STORAGE_PLUGIN_REGISTRY_IMPL, StoragePluginRegistry.class, this);
- this.reader = new PhysicalPlanReader(context.getConfig(), classpathScan, lpPersistence, endpoint, storagePlugins);
- this.operatorCreatorRegistry = new OperatorCreatorRegistry(classpathScan);
- this.systemOptions = new SystemOptionManager(lpPersistence, provider, context.getConfig(), context.getDefinitions());
- this.functionRegistry = new FunctionImplementationRegistry(context.getConfig(), classpathScan, systemOptions);
- this.compiler = new CodeCompiler(context.getConfig(), systemOptions);
+ reader = new PhysicalPlanReader(config, classpathScan, lpPersistence, endpoint, storagePlugins);
+ operatorCreatorRegistry = new OperatorCreatorRegistry(classpathScan);
+ systemOptions = new SystemOptionManager(lpPersistence, provider, config, context.getDefinitions());
+ functionRegistry = new FunctionImplementationRegistry(config, classpathScan, systemOptions);
+ compiler = new CodeCompiler(config, systemOptions);
// This operator table is built once and used for all queries which do not need dynamic UDF support.
- this.table = new DrillOperatorTable(functionRegistry, systemOptions);
+ table = new DrillOperatorTable(functionRegistry, systemOptions);
//This profile store context is built from the profileStoreProvider
- this.profileStoreContext = new QueryProfileStoreContext(context.getConfig(), profileStoreProvider, coord);
+ profileStoreContext = new QueryProfileStoreContext(context.getConfig(), profileStoreProvider, coord);
}
public QueryProfileStoreContext getProfileStoreContext() {
return profileStoreContext;
}
+ /**
+ * Starts the resource manager. Must be called separately from the
+ * constructor after the system property mechanism is initialized
+ * since the builder will consult system options to determine the
+ * proper RM to use.
+ */
+
+ public void startRM() {
+ resourceManager = new ResourceManagerBuilder(this).build();
+ }
+
public FunctionImplementationRegistry getFunctionImplementationRegistry() {
return functionRegistry;
}
@@ -243,4 +257,8 @@ public class DrillbitContext implements AutoCloseable {
getRemoteFunctionRegistry().close();
getCompiler().close();
}
+
+ public ResourceManager getResourceManager() {
+ return resourceManager;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java
index f2d352c98..7d30b5bd3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java
@@ -73,6 +73,9 @@ public class DrillConfigIterator implements Iterable<OptionValue> {
case NULL:
throw new IllegalStateException("Config value \"" + name + "\" has NULL type");
+
+ default:
+ throw new IllegalStateException("Unknown type: " + cv.valueType());
}
return optionValue;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java
index 8851e1a37..e99645d2c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java
@@ -19,6 +19,7 @@ package org.apache.drill.exec.server.options;
import java.util.ArrayList;
+@SuppressWarnings("serial")
public class OptionList extends ArrayList<OptionValue>{
public void merge(OptionList list){
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
index 52bf40327..635a1ae40 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java
@@ -17,14 +17,15 @@
*/
package org.apache.drill.exec.server.options;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Collections2;
+import java.util.Collection;
+import java.util.Map;
+
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.drill.common.map.CaseInsensitiveMap;
import org.apache.drill.exec.rpc.user.UserSession;
-import java.util.Collection;
-import java.util.Map;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
/**
* {@link OptionManager} that holds options within {@link org.apache.drill.exec.rpc.user.UserSession} context. Options
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index b0863eefe..bda6033bd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -160,6 +160,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
new OptionDefinition(ExecConstants.QUEUE_THRESHOLD_SIZE),
new OptionDefinition(ExecConstants.QUEUE_TIMEOUT),
new OptionDefinition(ExecConstants.SMALL_QUEUE_SIZE),
+ new OptionDefinition(ExecConstants.QUEUE_MEMORY_RESERVE, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
+ new OptionDefinition(ExecConstants.QUEUE_MEMORY_RATIO, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
new OptionDefinition(ExecConstants.MIN_HASH_TABLE_SIZE),
new OptionDefinition(ExecConstants.MAX_HASH_TABLE_SIZE),
new OptionDefinition(ExecConstants.EARLY_LIMIT0_OPT),
@@ -418,6 +420,11 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
@Override
public void close() throws Exception {
- options.close();
+ // If the server exits very early, the options may not yet have
+ // been created. Gracefully handle that case.
+
+ if (options != null) {
+ options.close();
+ }
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
index 238f5cacf..6eb47e659 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java
@@ -86,7 +86,6 @@ public class DrillRestServer extends ResourceConfig {
register(MultiPartFeature.class);
property(ServerProperties.METAINF_SERVICES_LOOKUP_DISABLE, true);
-
final boolean isAuthEnabled =
workManager.getContext().getConfig().getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED);
@@ -152,7 +151,6 @@ public class DrillRestServer extends ResourceConfig {
return configuration;
}
-
public static class AuthWebUserConnectionProvider implements Factory<WebUserConnection> {
@Inject
@@ -161,6 +159,7 @@ public class DrillRestServer extends ResourceConfig {
@Inject
WorkManager workManager;
+ @SuppressWarnings("resource")
@Override
public WebUserConnection provide() {
final HttpSession session = request.getSession();
@@ -228,6 +227,7 @@ public class DrillRestServer extends ResourceConfig {
@Inject
WorkManager workManager;
+ @SuppressWarnings("resource")
@Override
public WebUserConnection provide() {
final DrillbitContext drillbitContext = workManager.getContext();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
index 84c471e12..70b82ca92 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -32,8 +32,15 @@ import com.google.common.collect.Sets;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.rest.DrillRestServer.UserAuthEnabled;
import org.apache.drill.exec.work.WorkManager;
+import org.apache.drill.exec.work.foreman.rm.DistributedQueryQueue;
+import org.apache.drill.exec.work.foreman.rm.DistributedQueryQueue.ZKQueueInfo;
+import org.apache.drill.exec.work.foreman.rm.DynamicResourceManager;
+import org.apache.drill.exec.work.foreman.rm.QueryQueue;
+import org.apache.drill.exec.work.foreman.rm.ResourceManager;
+import org.apache.drill.exec.work.foreman.rm.ThrottledResourceManager;
import org.glassfish.jersey.server.mvc.Viewable;
import com.fasterxml.jackson.annotation.JsonCreator;
@@ -53,6 +60,7 @@ public class DrillRoot {
return ViewableWithPermissions.create(authEnabled.get(), "/rest/index.ftl", sc, getClusterInfoJSON());
}
+ @SuppressWarnings("resource")
@GET
@Path("/cluster.json")
@Produces(MediaType.APPLICATION_JSON)
@@ -60,10 +68,11 @@ public class DrillRoot {
final Collection<DrillbitInfo> drillbits = Sets.newTreeSet();
final Collection<String> mismatchedVersions = Sets.newTreeSet();
- final DrillbitEndpoint currentDrillbit = work.getContext().getEndpoint();
+ final DrillbitContext dbContext = work.getContext();
+ final DrillbitEndpoint currentDrillbit = dbContext.getEndpoint();
final String currentVersion = currentDrillbit.getVersion();
- final DrillConfig config = work.getContext().getConfig();
+ final DrillConfig config = dbContext.getConfig();
final boolean userEncryptionEnabled = config.getBoolean(ExecConstants.USER_ENCRYPTION_SASL_ENABLED);
final boolean bitEncryptionEnabled = config.getBoolean(ExecConstants.BIT_ENCRYPTION_SASL_ENABLED);
@@ -77,8 +86,90 @@ public class DrillRoot {
drillbits.add(drillbit);
}
- return new ClusterInfo(drillbits, currentVersion, mismatchedVersions,
- userEncryptionEnabled, bitEncryptionEnabled);
+ return new ClusterInfo(drillbits, currentVersion, mismatchedVersions,
+ userEncryptionEnabled, bitEncryptionEnabled,
+ QueueInfo.build(dbContext.getResourceManager()));
+ }
+
+ /**
+ * Pretty-printing wrapper class around the ZK-based queue summary.
+ */
+
+ @XmlRootElement
+ public static class QueueInfo {
+ private final ZKQueueInfo zkQueueInfo;
+
+ public static QueueInfo build(ResourceManager rm) {
+
+ // Consider queues enabled only if the ZK-based queues are in use.
+
+ ThrottledResourceManager throttledRM = null;
+ if (rm != null && rm instanceof DynamicResourceManager) {
+ DynamicResourceManager dynamicRM = (DynamicResourceManager) rm;
+ rm = dynamicRM.activeRM();
+ }
+ if (rm != null && rm instanceof ThrottledResourceManager) {
+ throttledRM = (ThrottledResourceManager) rm;
+ }
+ if (throttledRM == null) {
+ return new QueueInfo(null);
+ }
+ QueryQueue queue = throttledRM.queue();
+ if (queue == null || !(queue instanceof DistributedQueryQueue)) {
+ return new QueueInfo(null);
+ }
+
+ return new QueueInfo(((DistributedQueryQueue) queue).getInfo());
+ }
+
+ @JsonCreator
+ public QueueInfo(ZKQueueInfo queueInfo) {
+ zkQueueInfo = queueInfo;
+ }
+
+ public boolean isEnabled() { return zkQueueInfo != null; }
+
+ public int smallQueueSize() {
+ return isEnabled() ? zkQueueInfo.smallQueueSize : 0;
+ }
+
+ public int largeQueueSize() {
+ return isEnabled() ? zkQueueInfo.largeQueueSize : 0;
+ }
+
+ public String threshold() {
+ return isEnabled()
+ ? Double.toString(zkQueueInfo.queueThreshold)
+ : "N/A";
+ }
+
+ public String smallQueueMemory() {
+ return isEnabled()
+ ? toBytes(zkQueueInfo.memoryPerSmallQuery)
+ : "N/A";
+ }
+
+ public String largeQueueMemory() {
+ return isEnabled()
+ ? toBytes(zkQueueInfo.memoryPerLargeQuery)
+ : "N/A";
+ }
+
+ public String totalMemory() {
+ return isEnabled()
+ ? toBytes(zkQueueInfo.memoryPerNode)
+ : "N/A";
+ }
+
+ private final long ONE_MB = 1024 * 1024;
+
+ private String toBytes(long memory) {
+ if (memory < 10 * ONE_MB) {
+ return String.format("%,d bytes", memory);
+ } else {
+ return String.format("%,.0f MB", memory * 1.0D / ONE_MB);
+ }
+ }
}
@XmlRootElement
@@ -88,18 +179,21 @@ public class DrillRoot {
private final Collection<String> mismatchedVersions;
private final boolean userEncryptionEnabled;
private final boolean bitEncryptionEnabled;
+ private final QueueInfo queueInfo;
@JsonCreator
public ClusterInfo(Collection<DrillbitInfo> drillbits,
String currentVersion,
Collection<String> mismatchedVersions,
boolean userEncryption,
- boolean bitEncryption) {
+ boolean bitEncryption,
+ QueueInfo queueInfo) {
this.drillbits = Sets.newTreeSet(drillbits);
this.currentVersion = currentVersion;
this.mismatchedVersions = Sets.newTreeSet(mismatchedVersions);
this.userEncryptionEnabled = userEncryption;
this.bitEncryptionEnabled = bitEncryption;
+ this.queueInfo = queueInfo;
}
public Collection<DrillbitInfo> getDrillbits() {
@@ -117,6 +211,8 @@ public class DrillRoot {
public boolean isUserEncryptionEnabled() { return userEncryptionEnabled; }
public boolean isBitEncryptionEnabled() { return bitEncryptionEnabled; }
+
+ public QueueInfo queueInfo() { return queueInfo; }
}
public static class DrillbitInfo implements Comparable<DrillbitInfo> {
@@ -139,9 +235,7 @@ public class DrillRoot {
this.versionMatch = versionMatch;
}
- public String getAddress() {
- return address;
- }
+ public String getAddress() { return address; }
public String getUserPort() { return userPort; }
@@ -149,26 +243,20 @@ public class DrillRoot {
public String getDataPort() { return dataPort; }
- public String getVersion() {
- return version;
- }
+ public String getVersion() { return version; }
- public boolean isCurrent() {
- return current;
- }
+ public boolean isCurrent() { return current; }
- public boolean isVersionMatch() {
- return versionMatch;
- }
+ public boolean isVersionMatch() { return versionMatch; }
/**
- * Method used to sort drillbits. Current drillbit goes first.
- * Then drillbits with matching versions, after them drillbits with mismatching versions.
- * Matching drillbits are sorted according address natural order,
- * mismatching drillbits are sorted according version, address natural order.
+ * Method used to sort Drillbits. Current Drillbit goes first.
+ * Then Drillbits with matching versions, after them Drillbits with mismatching versions.
+ * Matching Drillbits are sorted according address natural order,
+ * mismatching Drillbits are sorted according version, address natural order.
*
- * @param drillbitToCompare drillbit to compare against
- * @return -1 if drillbit should be before, 1 if after in list
+ * @param drillbitToCompare Drillbit to compare against
+ * @return -1 if Drillbit should be before, 1 if after in list
*/
@Override
public int compareTo(DrillbitInfo drillbitToCompare) {
@@ -189,5 +277,4 @@ public class DrillRoot {
return this.versionMatch ? -1 : 1;
}
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/GenericExceptionMapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/GenericExceptionMapper.java
index cc00e9b90..7bf7c90e7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/GenericExceptionMapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/GenericExceptionMapper.java
@@ -20,22 +20,21 @@ package org.apache.drill.exec.server.rest;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ExceptionMapper;
-import javax.ws.rs.ext.Provider;
public class GenericExceptionMapper implements ExceptionMapper<Throwable> {
- @Override
- public Response toResponse(Throwable throwable) {
- return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode())
- .entity(new GenericErrorMessage(throwable.getMessage()))
- .type(MediaType.APPLICATION_JSON_TYPE)
- .build();
- }
+ @Override
+ public Response toResponse(Throwable throwable) {
+ return Response
+ .status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode())
+ .entity(new GenericErrorMessage(throwable.getMessage()))
+ .type(MediaType.APPLICATION_JSON_TYPE).build();
+ }
- public static class GenericErrorMessage {
- public final String errorMessage;
+ public static class GenericErrorMessage {
+ public final String errorMessage;
- public GenericErrorMessage(String errorMessage) {
- this.errorMessage = errorMessage;
- }
+ public GenericErrorMessage(String errorMessage) {
+ this.errorMessage = errorMessage;
}
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogsResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogsResources.java
index 16d213a8b..d24f03a86 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogsResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogsResources.java
@@ -111,6 +111,7 @@ public class LogsResources {
final int maxLines = work.getContext().getOptionManager().getOption(ExecConstants.WEB_LOGS_MAX_LINES).num_val.intValue();
try (BufferedReader br = new BufferedReader(new FileReader(file))) {
+ @SuppressWarnings("serial")
Map<Integer, String> cache = new LinkedHashMap<Integer, String>(maxLines, .75f, true) {
@Override
protected boolean removeEldestEntry(Map.Entry<Integer, String> eldest) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StatusResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StatusResources.java
index 4042a818f..e6b116ccd 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StatusResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StatusResources.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,6 +17,8 @@
*/
package org.apache.drill.exec.server.rest;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
@@ -80,6 +82,7 @@ public class StatusResources {
return ViewableWithPermissions.create(authEnabled.get(), "/rest/status.ftl", sc, getStatusJSON());
}
+ @SuppressWarnings("resource")
private List<OptionWrapper> getSystemOptionsJSONHelper(boolean internal)
{
List<OptionWrapper> options = new LinkedList<>();
@@ -90,6 +93,12 @@ public class StatusResources {
options.add(new OptionWrapper(option.name, option.getValue(), option.accessibleScopes, option.kind, option.scope));
}
+ Collections.sort(options, new Comparator<OptionWrapper>() {
+ @Override
+ public int compare(OptionWrapper o1, OptionWrapper o2) {
+ return o1.name.compareTo(o2.name);
+ }
+ });
return options;
}
@@ -132,6 +141,7 @@ public class StatusResources {
return getSystemOptionsHelper(true);
}
+ @SuppressWarnings("resource")
@POST
@Path("option/{optionName}")
@RolesAllowed(DrillUserPrincipal.ADMIN_ROLE)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
index ce26a316b..cf5eb57c9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -93,6 +93,7 @@ public class StorageResources {
return ViewableWithPermissions.create(authEnabled.get(), "/rest/storage/list.ftl", sc, list);
}
+ @SuppressWarnings("resource")
@GET
@Path("/storage/{name}.json")
@Produces(MediaType.APPLICATION_JSON)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
index 7c376f785..69f2cab76 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java
@@ -138,6 +138,7 @@ public class WebServer implements AutoCloseable {
* Start the web server including setup.
* @throws Exception
*/
+ @SuppressWarnings("resource")
public void start() throws Exception {
if (!config.getBoolean(ExecConstants.HTTP_ENABLE)) {
return;
@@ -266,6 +267,7 @@ public class WebServer implements AutoCloseable {
}
// Clear all the resources allocated for this session
+ @SuppressWarnings("resource")
final WebSessionResources webSessionResources =
(WebSessionResources) session.getAttribute(WebSessionResources.class.getSimpleName());
@@ -322,7 +324,7 @@ public class WebServer implements AutoCloseable {
* Create an HTTPS connector for given jetty server instance. If the admin has specified keystore/truststore settings
* they will be used else a self-signed certificate is generated and used.
*
- * @return Initialized {@link ServerConnector} for HTTPS connectios.
+ * @return Initialized {@link ServerConnector} for HTTPS connections.
* @throws Exception
*/
private ServerConnector createHttpsConnector(int port) throws Exception {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java
index fa2f43fa7..a61522a68 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java
@@ -31,78 +31,91 @@ import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
interface Comparators {
final static Comparator<MajorFragmentProfile> majorId = new Comparator<MajorFragmentProfile>() {
+ @Override
public int compare(final MajorFragmentProfile o1, final MajorFragmentProfile o2) {
return Long.compare(o1.getMajorFragmentId(), o2.getMajorFragmentId());
}
};
final static Comparator<MinorFragmentProfile> minorId = new Comparator<MinorFragmentProfile>() {
+ @Override
public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
return Long.compare(o1.getMinorFragmentId(), o2.getMinorFragmentId());
}
};
final static Comparator<MinorFragmentProfile> startTime = new Comparator<MinorFragmentProfile>() {
+ @Override
public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
return Long.compare(o1.getStartTime(), o2.getStartTime());
}
};
final static Comparator<MinorFragmentProfile> lastUpdate = new Comparator<MinorFragmentProfile>() {
+ @Override
public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
return Long.compare(o1.getLastUpdate(), o2.getLastUpdate());
}
};
final static Comparator<MinorFragmentProfile> lastProgress = new Comparator<MinorFragmentProfile>() {
+ @Override
public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
return Long.compare(o1.getLastProgress(), o2.getLastProgress());
}
};
final static Comparator<MinorFragmentProfile> endTime = new Comparator<MinorFragmentProfile>() {
+ @Override
public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
return Long.compare(o1.getEndTime(), o2.getEndTime());
}
};
final static Comparator<MinorFragmentProfile> fragmentPeakMemory = new Comparator<MinorFragmentProfile>() {
+ @Override
public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
return Long.compare(o1.getMaxMemoryUsed(), o2.getMaxMemoryUsed());
}
};
final static Comparator<MinorFragmentProfile> runTime = new Comparator<MinorFragmentProfile>() {
+ @Override
public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) {
return Long.compare(o1.getEndTime() - o1.getStartTime(), o2.getEndTime() - o2.getStartTime());
}
};
final static Comparator<OperatorProfile> operatorId = new Comparator<OperatorProfile>() {
+ @Override
public int compare(final OperatorProfile o1, final OperatorProfile o2) {
return Long.compare(o1.getOperatorId(), o2.getOperatorId());
}
};
final static Comparator<Pair<OperatorProfile, Integer>> setupTime = new Comparator<Pair<OperatorProfile, Integer>>() {
+ @Override
public int compare(final Pair<OperatorProfile, Integer> o1, final Pair<OperatorProfile, Integer> o2) {
return Long.compare(o1.getLeft().getSetupNanos(), o2.getLeft().getSetupNanos());
}
};
final static Comparator<Pair<OperatorProfile, Integer>> processTime = new Comparator<Pair<OperatorProfile, Integer>>() {
+ @Override
public int compare(final Pair<OperatorProfile, Integer> o1, final Pair<OperatorProfile, Integer> o2) {
return Long.compare(o1.getLeft().getProcessNanos(), o2.getLeft().getProcessNanos());
}
};
final static Comparator<Pair<OperatorProfile, Integer>> waitTime = new Comparator<Pair<OperatorProfile, Integer>>() {
+ @Override
public int compare(final Pair<OperatorProfile, Integer> o1, final Pair<OperatorProfile, Integer> o2) {
return Long.compare(o1.getLeft().getWaitNanos(), o2.getLeft().getWaitNanos());
}
};
final static Comparator<Pair<OperatorProfile, Integer>> operatorPeakMemory = new Comparator<Pair<OperatorProfile, Integer>>() {
+ @Override
public int compare(final Pair<OperatorProfile, Integer> o1, final Pair<OperatorProfile, Integer> o2) {
return Long.compare(o1.getLeft().getPeakLocalMemoryAllocated(), o2.getLeft().getPeakLocalMemoryAllocated());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
index 16c07d2ee..875c96ed5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java
@@ -73,17 +73,20 @@ public class ProfileResources {
public static class ProfileInfo implements Comparable<ProfileInfo> {
public static final SimpleDateFormat format = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss");
- private String queryId;
- private long startTime;
- private long endTime;
- private Date time;
- private String link;
- private String foreman;
- private String query;
- private String state;
- private String user;
-
- public ProfileInfo(DrillConfig drillConfig, String queryId, long startTime, long endTime, String foreman, String query, String state, String user) {
+ private final String queryId;
+ private final long startTime;
+ private final long endTime;
+ private final Date time;
+ private final String link;
+ private final String foreman;
+ private final String query;
+ private final String state;
+ private final String user;
+ private final double totalCost;
+ private final String queueName;
+
+ public ProfileInfo(DrillConfig drillConfig, String queryId, long startTime, long endTime, String foreman, String query,
+ String state, String user, double totalCost, String queueName) {
this.queryId = queryId;
this.startTime = startTime;
this.endTime = endTime;
@@ -93,52 +96,36 @@ public class ProfileResources {
this.query = query.substring(0, Math.min(query.length(), 150));
this.state = state;
this.user = user;
+ this.totalCost = totalCost;
+ this.queueName = queueName;
}
- public String getUser() {
- return user;
- }
+ public String getUser() { return user; }
- public String getQuery(){
- return query;
- }
+ public String getQuery() { return query; }
- public String getQueryId() {
- return queryId;
- }
+ public String getQueryId() { return queryId; }
- public String getTime() {
- return format.format(time);
- }
+ public String getTime() { return format.format(time); }
- public long getStartTime() {
- return startTime;
- }
+ public long getStartTime() { return startTime; }
- public long getEndTime() {
- return endTime;
- }
+ public long getEndTime() { return endTime; }
public String getDuration() {
return (new SimpleDurationFormat(startTime, endTime)).verbose();
}
- public String getState() {
- return state;
- }
+ public String getState() { return state; }
- public String getLink() {
- return link;
- }
+ public String getLink() { return link; }
@Override
public int compareTo(ProfileInfo other) {
return time.compareTo(other.time);
}
- public String getForeman() {
- return foreman;
- }
+ public String getForeman() { return foreman; }
/**
* Generates link which will return query profile in json representation.
@@ -164,6 +151,9 @@ public class ProfileResources {
return sb.toString();
}
+ public double getTotalCost() { return totalCost; }
+
+ public String getQueueName() { return queueName; }
}
protected PersistentStoreProvider getProvider() {
@@ -204,6 +194,7 @@ public class ProfileResources {
//max Param to cap listing of profiles
private static final String MAX_QPROFILES_PARAM = "max";
+ @SuppressWarnings("resource")
@GET
@Path("/profiles.json")
@Produces(MediaType.APPLICATION_JSON)
@@ -223,8 +214,11 @@ public class ProfileResources {
final Map.Entry<String, QueryInfo> runningEntry = runningEntries.next();
final QueryInfo profile = runningEntry.getValue();
if (principal.canManageProfileOf(profile.getUser())) {
- runningQueries.add(new ProfileInfo(work.getContext().getConfig(), runningEntry.getKey(), profile.getStart(), System.currentTimeMillis(), profile.getForeman()
- .getAddress(), profile.getQuery(), profile.getState().name(), profile.getUser()));
+ runningQueries.add(
+ new ProfileInfo(work.getContext().getConfig(),
+ runningEntry.getKey(), profile.getStart(), System.currentTimeMillis(),
+ profile.getForeman().getAddress(), profile.getQuery(), profile.getState().name(),
+ profile.getUser(), profile.getTotalCost(), profile.getQueueName()));
}
} catch (Exception e) {
errors.add(e.getMessage());
@@ -250,8 +244,11 @@ public class ProfileResources {
final Map.Entry<String, QueryProfile> profileEntry = range.next();
final QueryProfile profile = profileEntry.getValue();
if (principal.canManageProfileOf(profile.getUser())) {
- finishedQueries.add(new ProfileInfo(work.getContext().getConfig(), profileEntry.getKey(), profile.getStart(), profile.getEnd(), profile.getForeman().getAddress(),
- profile.getQuery(), profile.getState().name(), profile.getUser()));
+ finishedQueries.add(
+ new ProfileInfo(work.getContext().getConfig(),
+ profileEntry.getKey(), profile.getStart(), profile.getEnd(),
+ profile.getForeman().getAddress(), profile.getQuery(), profile.getState().name(),
+ profile.getUser(), profile.getTotalCost(), profile.getQueueName()));
}
} catch (Exception e) {
errors.add(e.getMessage());
@@ -277,6 +274,7 @@ public class ProfileResources {
return ViewableWithPermissions.create(authEnabled.get(), "/rest/profile/list.ftl", sc, profiles);
}
+ @SuppressWarnings("resource")
private QueryProfile getQueryProfile(String queryId) {
QueryId id = QueryIdHelper.getQueryIdFromString(queryId);
@@ -343,7 +341,7 @@ public class ProfileResources {
return ViewableWithPermissions.create(authEnabled.get(), "/rest/profile/profile.ftl", sc, wrapper);
}
-
+ @SuppressWarnings("resource")
@GET
@Path("/profiles/cancel/{queryid}")
@Produces(MediaType.TEXT_PLAIN)
@@ -372,7 +370,8 @@ public class ProfileResources {
}
}catch(Exception e){
logger.debug("Failure to find query as running profile.", e);
- return String.format("Failure attempting to cancel query %s. Unable to find information about where query is actively running.", queryId);
+ return String.format
+ ("Failure attempting to cancel query %s. Unable to find information about where query is actively running.", queryId);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
index 8f3cdfe6e..ef9ccc339 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java
@@ -20,7 +20,6 @@ package org.apache.drill.exec.server.rest.profile;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
index 7e32a4d97..9382f4a46 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java
@@ -34,7 +34,7 @@ import com.google.common.collect.Lists;
public class BatchPrinter {
public static void printHyperBatch(VectorAccessible batch, SelectionVector4 sv4) {
List<String> columns = Lists.newArrayList();
- for (VectorWrapper vw : batch) {
+ for (VectorWrapper<?> vw : batch) {
columns.add(vw.getValueVectors()[0].getField().getName());
}
int width = columns.size();
@@ -47,7 +47,7 @@ public class BatchPrinter {
System.out.printf("|\n");
System.out.println(StringUtils.repeat("-", width * 17 + 1));
}
- for (VectorWrapper vw : batch) {
+ for (VectorWrapper<?> vw : batch) {
Object o = vw.getValueVectors()[sv4.get(j) >>> 16].getAccessor().getObject(sv4.get(j) & 65535);
String value;
if (o == null) {
@@ -67,7 +67,7 @@ public class BatchPrinter {
public static void printBatch(VectorAccessible batch) {
List<String> columns = Lists.newArrayList();
List<ValueVector> vectors = Lists.newArrayList();
- for (VectorWrapper vw : batch) {
+ for (VectorWrapper<?> vw : batch) {
columns.add(vw.getValueVector().getField().getName());
vectors.add(vw.getValueVector());
}
@@ -101,7 +101,7 @@ public class BatchPrinter {
public static void printBatch(VectorAccessible batch, SelectionVector2 sv2) {
List<String> columns = Lists.newArrayList();
List<ValueVector> vectors = Lists.newArrayList();
- for (VectorWrapper vw : batch) {
+ for (VectorWrapper<?> vw : batch) {
columns.add(vw.getValueVector().getField().getName());
vectors.add(vw.getValueVector());
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
index 48724a445..6a805c142 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
@@ -33,7 +33,22 @@ public class MemoryAllocationUtilities {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MemoryAllocationUtilities.class);
/**
- * Helper method to setup SortMemoryAllocations
+ * Helper method to setup Memory Allocations
+ * <p>
+ * Plan the memory for buffered operators (the only ones that can spill in this release)
+ * based on assumptions. These assumptions are the amount of memory per node to give
+ * to each query and the number of sort operators per node.
+ * <p>
+ * The reason the total
+ * memory is an assumption is that we have know knowledge of the number of queries
+ * that can run, so we need the user to tell use that information by configuring the
+ * amount of memory to be assumed available to each query.
+ * <p>
+ * The number of sorts per node could be calculated, but we instead simply take
+ * the worst case: the maximum per-query, per-node parallization and assume that
+ * all sorts appear in all fragments &mdash; a gross oversimplification, but one
+ * that Drill has long made.
+ * <p>
* since this method can be used in multiple places adding it in this class
* rather than keeping it in Foreman
* @param plan
@@ -50,7 +65,7 @@ public class MemoryAllocationUtilities {
// look for external sorts
final List<PhysicalOperator> bufferedOpList = new LinkedList<>();
for (final PhysicalOperator op : plan.getSortedOperators()) {
- if ( op.isBufferedOperator() ) {
+ if (op.isBufferedOperator()) {
bufferedOpList.add(op);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
index 0200dc5a0..a9f178a66 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java
@@ -112,7 +112,7 @@ public class TestUtilities {
* @param jsonBatches : list of input strings, each element represent a batch. Each string could either
* be in the form of "[{...}, {...}, ..., {...}]", or in the form of "{...}".
* @param fragContext : fragment context
- * @param columnsToRead : list of schema pathes to read from JSON reader.
+ * @param columnsToRead : list of schema paths to read from JSON reader.
* @return
*/
public static Iterator<RecordReader> getJsonReadersFromBatchString(List<String> jsonBatches, FragmentContext fragContext, List<SchemaPath> columnsToRead) {
@@ -145,5 +145,4 @@ public class TestUtilities {
}
return readers.iterator();
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
index 4f99f859b..a06d46c5a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,39 +17,99 @@
*/
package org.apache.drill.exec.work;
+import java.util.ArrayList;
import java.util.List;
import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
+import org.apache.drill.exec.server.options.OptionList;
+import org.apache.drill.exec.work.foreman.ForemanSetupException;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
public class QueryWorkUnit {
- // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWorkUnit.class);
- private final PlanFragment rootFragment; // for local
+
+ /**
+ * Definition of a minor fragment that contains the (unserialized) fragment operator
+ * tree and the (partially built) fragment. Allows the resource manager to apply
+ * memory allocations before serializing the fragments to JSON.
+ */
+
+ public static class MinorFragmentDefn {
+ private PlanFragment fragment;
+ private final FragmentRoot root;
+ private final OptionList options;
+
+ public MinorFragmentDefn(final PlanFragment fragment, final FragmentRoot root, OptionList options) {
+ this.fragment = fragment;
+ this.root = root;
+ this.options = options;
+ }
+
+ public FragmentRoot root() { return root; }
+ public PlanFragment fragment() { return fragment; }
+ public OptionList options() { return options; }
+
+ public PlanFragment applyPlan(PhysicalPlanReader reader) throws ForemanSetupException {
+ // get plan as JSON
+ try {
+ final String plan = reader.writeJson(root);
+ final String optionsData = reader.writeJson(options);
+ return PlanFragment.newBuilder(fragment)
+ .setFragmentJson(plan)
+ .setOptionsJson(optionsData)
+ .build();
+ } catch (JsonProcessingException e) {
+ throw new ForemanSetupException("Failure while trying to convert fragment into json.", e);
+ }
+ }
+ }
+
+ private PlanFragment rootFragment; // for local
+ private final MinorFragmentDefn rootFragmentDefn;
private final FragmentRoot rootOperator; // for local
- private final List<PlanFragment> fragments;
+ private List<PlanFragment> fragments = new ArrayList<>();
+ private final List<MinorFragmentDefn> minorFragmentDefns;
- public QueryWorkUnit(final FragmentRoot rootOperator, final PlanFragment rootFragment,
- final List<PlanFragment> fragments) {
- Preconditions.checkNotNull(rootFragment);
- Preconditions.checkNotNull(fragments);
+ public QueryWorkUnit(final FragmentRoot rootOperator, final MinorFragmentDefn rootFragmentDefn,
+ final List<MinorFragmentDefn> minorFragmentDefns) {
Preconditions.checkNotNull(rootOperator);
+ Preconditions.checkNotNull(rootFragmentDefn);
+ Preconditions.checkNotNull(minorFragmentDefns);
- this.rootFragment = rootFragment;
- this.fragments = fragments;
+ this.rootFragmentDefn = rootFragmentDefn;
this.rootOperator = rootOperator;
+ this.minorFragmentDefns = minorFragmentDefns;
}
public PlanFragment getRootFragment() {
return rootFragment;
}
+ public MinorFragmentDefn getRootFragmentDefn() {
+ return rootFragmentDefn;
+ }
+
public List<PlanFragment> getFragments() {
return fragments;
}
+ public List<MinorFragmentDefn> getMinorFragmentDefns() {
+ return minorFragmentDefns;
+ }
+
public FragmentRoot getRootOperator() {
return rootOperator;
}
+
+ public void applyPlan(PhysicalPlanReader reader) throws ForemanSetupException {
+ assert rootFragment == null;
+ rootFragment = rootFragmentDefn.applyPlan(reader);
+ assert fragments.isEmpty();
+ for (MinorFragmentDefn defn : minorFragmentDefns) {
+ fragments.add(defn.applyPlan(reader));
+ }
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 800d3a7f0..6e560a963 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -149,7 +149,9 @@ public class WorkManager implements AutoCloseable {
}
}
- getContext().close();
+ if (getContext() != null) {
+ getContext().close();
+ }
}
public DrillbitContext getContext() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index d01d8fd7d..a1f9d9ffb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -17,17 +17,15 @@
*/
package org.apache.drill.exec.work.foreman;
-import com.codahale.metrics.Counter;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Sets;
-import com.google.protobuf.InvalidProtocolBufferException;
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFuture;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
import org.apache.drill.common.CatastrophicFailure;
import org.apache.drill.common.EventProcessor;
import org.apache.drill.common.concurrent.ExtendedLatch;
@@ -36,9 +34,6 @@ import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.logical.LogicalPlan;
import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.coord.ClusterCoordinator;
-import org.apache.drill.exec.coord.DistributedSemaphore;
-import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease;
import org.apache.drill.exec.exception.OptimizerException;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.metrics.DrillMetrics;
@@ -73,43 +68,51 @@ import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
-import org.apache.drill.exec.util.MemoryAllocationUtilities;
import org.apache.drill.exec.util.Pointer;
import org.apache.drill.exec.work.EndpointListener;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.apache.drill.exec.work.WorkManager.WorkerBee;
+import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
+import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException;
+import org.apache.drill.exec.work.foreman.rm.QueryResourceManager;
import org.apache.drill.exec.work.fragment.FragmentExecutor;
import org.apache.drill.exec.work.fragment.FragmentStatusReporter;
import org.apache.drill.exec.work.fragment.NonRootFragmentManager;
import org.apache.drill.exec.work.fragment.RootFragmentManager;
import org.codehaus.jackson.map.ObjectMapper;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+import com.codahale.metrics.Counter;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
/**
* Foreman manages all the fragments (local and remote) for a single query where this
* is the driving/root node.
*
* The flow is as follows:
- * - Foreman is submitted as a runnable.
- * - Runnable does query planning.
- * - state changes from PENDING to RUNNING
- * - Runnable sends out starting fragments
- * - Status listener are activated
- * - The Runnable's run() completes, but the Foreman stays around
- * - Foreman listens for state change messages.
- * - state change messages can drive the state to FAILED or CANCELED, in which case
- * messages are sent to running fragments to terminate
- * - when all fragments complete, state change messages drive the state to COMPLETED
+ * <ul>
+ * <li>Foreman is submitted as a runnable.</li>
+ * <li>Runnable does query planning.</li>
+ * <li>state changes from PENDING to RUNNING</li>
+ * <li>Runnable sends out starting fragments</li>
+ * <li>Status listener are activated</li>
+ * <li>The Runnable's run() completes, but the Foreman stays around</li>
+ * <li>Foreman listens for state change messages.</li>
+ * <li>state change messages can drive the state to FAILED or CANCELED, in which case
+ * messages are sent to running fragments to terminate</li>
+ * <li>when all fragments complete, state change messages drive the state to COMPLETED</li>
+ * </ul>
*/
+
public class Foreman implements Runnable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class);
private static final org.slf4j.Logger queryLogger = org.slf4j.LoggerFactory.getLogger("query.logger");
@@ -136,15 +139,13 @@ public class Foreman implements Runnable {
private boolean resume = false;
private final ProfileOption profileOption;
- private volatile DistributedLease lease; // used to limit the number of concurrent queries
+ private final QueryResourceManager queryRM;
private final ResponseSendListener responseListener = new ResponseSendListener();
private final StateSwitch stateSwitch = new StateSwitch();
private final ForemanResult foremanResult = new ForemanResult();
private final ConnectionClosedListener closeListener = new ConnectionClosedListener();
private final ChannelFuture closeFuture;
- private final boolean queuingEnabled;
-
private String queryText;
@@ -173,12 +174,9 @@ public class Foreman implements Runnable {
queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getStoreProvider(),
drillbitContext.getClusterCoordinator(), this);
- final OptionManager optionManager = queryContext.getOptions();
- queuingEnabled = optionManager.getOption(ExecConstants.ENABLE_QUEUE);
-
- final QueryState initialState = queuingEnabled ? QueryState.ENQUEUED : QueryState.STARTING;
- recordNewState(initialState);
+ recordNewState(QueryState.ENQUEUED);
enqueuedQueries.inc();
+ queryRM = drillbitContext.getResourceManager().newQueryRM(this);
profileOption = setProfileOption(queryContext.getOptions());
}
@@ -350,20 +348,6 @@ public class Foreman implements Runnable {
*/
}
- private void releaseLease() {
- while (lease != null) {
- try {
- lease.close();
- lease = null;
- } catch (final InterruptedException e) {
- // if we end up here, the while loop will try again
- } catch (final Exception e) {
- logger.warn("Failure while releasing lease.", e);
- break;
- }
- }
- }
-
private void parseAndRunLogicalPlan(final String json) throws ExecutionSetupException {
LogicalPlan logicalPlan;
try {
@@ -431,18 +415,17 @@ public class Foreman implements Runnable {
private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupException {
validatePlan(plan);
- MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext);
- //Marking endTime of Planning
- queryManager.markPlanningEndTime();
-
- if (queuingEnabled) {
- acquireQuerySemaphore(plan);
- moveToState(QueryState.STARTING, null);
- //Marking endTime of Waiting in Queue
- queryManager.markQueueWaitEndTime();
- }
+ queryRM.visitAbstractPlan(plan);
final QueryWorkUnit work = getQueryWorkUnit(plan);
+ queryRM.visitPhysicalPlan(work);
+ queryRM.setCost(plan.totalCost());
+ queryManager.setTotalCost(plan.totalCost());
+ work.applyPlan(drillbitContext.getPlanReader());
+ logWorkUnit(work);
+ admit(work);
+ queryManager.setQueueName(queryRM.queueName());
+
final List<PlanFragment> planFragments = work.getFragments();
final PlanFragment rootPlanFragment = work.getRootFragment();
assert queryId == rootPlanFragment.getHandle().getQueryId();
@@ -461,6 +444,22 @@ public class Foreman implements Runnable {
logger.debug("Fragments running.");
}
+ private void admit(QueryWorkUnit work) throws ForemanSetupException {
+ queryManager.markPlanningEndTime();
+ try {
+ queryRM.admit();
+ } catch (QueueTimeoutException e) {
+ throw UserException
+ .resourceError()
+ .message(e.getMessage())
+ .build(logger);
+ } catch (QueryQueueException e) {
+ throw new ForemanSetupException(e.getMessage(), e);
+ }
+ moveToState(QueryState.STARTING, null);
+ queryManager.markQueueWaitEndTime();
+ }
+
/**
* This is a helper method to run query based on the list of PlanFragment that were planned
* at some point of time
@@ -495,10 +494,8 @@ public class Foreman implements Runnable {
} catch (IOException e) {
throw new ExecutionSetupException(String.format("Unable to parse FragmentRoot from fragment: %s", rootFragment.getFragmentJson()));
}
- if (queuingEnabled) {
- acquireQuerySemaphore(rootOperator.getCost());
- moveToState(QueryState.STARTING, null);
- }
+ queryRM.setCost(rootOperator.getCost());
+ admit(null);
drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager.getFragmentStatusListener());
drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener());
@@ -548,62 +545,6 @@ public class Foreman implements Runnable {
}
}
- /**
- * This limits the number of "small" and "large" queries that a Drill cluster will run
- * simultaneously, if queueing is enabled. If the query is unable to run, this will block
- * until it can. Beware that this is called under run(), and so will consume a Thread
- * while it waits for the required distributed semaphore.
- *
- * @param plan the query plan
- * @throws ForemanSetupException
- */
- private void acquireQuerySemaphore(final PhysicalPlan plan) throws ForemanSetupException {
- double totalCost = 0;
- for (final PhysicalOperator ops : plan.getSortedOperators()) {
- totalCost += ops.getCost();
- }
-
- acquireQuerySemaphore(totalCost);
- return;
- }
-
- private void acquireQuerySemaphore(double totalCost) throws ForemanSetupException {
- final OptionManager optionManager = queryContext.getOptions();
- final long queueThreshold = optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_SIZE);
-
- final long queueTimeout = optionManager.getOption(ExecConstants.QUEUE_TIMEOUT);
- final String queueName;
-
- try {
- final ClusterCoordinator clusterCoordinator = drillbitContext.getClusterCoordinator();
- final DistributedSemaphore distributedSemaphore;
-
- // get the appropriate semaphore
- if (totalCost > queueThreshold) {
- final int largeQueue = (int) optionManager.getOption(ExecConstants.LARGE_QUEUE_SIZE);
- distributedSemaphore = clusterCoordinator.getSemaphore("query.large", largeQueue);
- queueName = "large";
- } else {
- final int smallQueue = (int) optionManager.getOption(ExecConstants.SMALL_QUEUE_SIZE);
- distributedSemaphore = clusterCoordinator.getSemaphore("query.small", smallQueue);
- queueName = "small";
- }
-
- lease = distributedSemaphore.acquire(queueTimeout, TimeUnit.MILLISECONDS);
- } catch (final Exception e) {
- throw new ForemanSetupException("Unable to acquire slot for query.", e);
- }
-
- if (lease == null) {
- throw UserException
- .resourceError()
- .message(
- "Unable to acquire queue resources for query within timeout. Timeout for %s queue was set at %d seconds.",
- queueName, queueTimeout / 1000)
- .build(logger);
- }
- }
-
Exception getCurrentException() {
return foremanResult.getException();
}
@@ -612,54 +553,55 @@ public class Foreman implements Runnable {
final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null);
final SimpleParallelizer parallelizer = new SimpleParallelizer(queryContext);
- final QueryWorkUnit queryWorkUnit = parallelizer.getFragments(
+ return parallelizer.getFragments(
queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(),
- queryId, queryContext.getActiveEndpoints(), drillbitContext.getPlanReader(), rootFragment,
+ queryId, queryContext.getActiveEndpoints(), rootFragment,
initiatingClient.getSession(), queryContext.getQueryContextInfo());
+ }
- if (logger.isTraceEnabled()) {
- final StringBuilder sb = new StringBuilder();
- sb.append("PlanFragments for query ");
- sb.append(queryId);
+ private void logWorkUnit(QueryWorkUnit queryWorkUnit) {
+ if (! logger.isTraceEnabled()) {
+ return;
+ }
+ final StringBuilder sb = new StringBuilder();
+ sb.append("PlanFragments for query ");
+ sb.append(queryId);
+ sb.append('\n');
+
+ final List<PlanFragment> planFragments = queryWorkUnit.getFragments();
+ final int fragmentCount = planFragments.size();
+ int fragmentIndex = 0;
+ for(final PlanFragment planFragment : planFragments) {
+ final FragmentHandle fragmentHandle = planFragment.getHandle();
+ sb.append("PlanFragment(");
+ sb.append(++fragmentIndex);
+ sb.append('/');
+ sb.append(fragmentCount);
+ sb.append(") major_fragment_id ");
+ sb.append(fragmentHandle.getMajorFragmentId());
+ sb.append(" minor_fragment_id ");
+ sb.append(fragmentHandle.getMinorFragmentId());
sb.append('\n');
- final List<PlanFragment> planFragments = queryWorkUnit.getFragments();
- final int fragmentCount = planFragments.size();
- int fragmentIndex = 0;
- for(final PlanFragment planFragment : planFragments) {
- final FragmentHandle fragmentHandle = planFragment.getHandle();
- sb.append("PlanFragment(");
- sb.append(++fragmentIndex);
- sb.append('/');
- sb.append(fragmentCount);
- sb.append(") major_fragment_id ");
- sb.append(fragmentHandle.getMajorFragmentId());
- sb.append(" minor_fragment_id ");
- sb.append(fragmentHandle.getMinorFragmentId());
- sb.append('\n');
-
- final DrillbitEndpoint endpointAssignment = planFragment.getAssignment();
- sb.append(" DrillbitEndpoint address ");
- sb.append(endpointAssignment.getAddress());
- sb.append('\n');
-
- String jsonString = "<<malformed JSON>>";
- sb.append(" fragment_json: ");
- final ObjectMapper objectMapper = new ObjectMapper();
- try
- {
- final Object json = objectMapper.readValue(planFragment.getFragmentJson(), Object.class);
- jsonString = objectMapper.defaultPrettyPrintingWriter().writeValueAsString(json);
- } catch(final Exception e) {
- // we've already set jsonString to a fallback value
- }
- sb.append(jsonString);
+ final DrillbitEndpoint endpointAssignment = planFragment.getAssignment();
+ sb.append(" DrillbitEndpoint address ");
+ sb.append(endpointAssignment.getAddress());
+ sb.append('\n');
- logger.trace(sb.toString());
+ String jsonString = "<<malformed JSON>>";
+ sb.append(" fragment_json: ");
+ final ObjectMapper objectMapper = new ObjectMapper();
+ try
+ {
+ final Object json = objectMapper.readValue(planFragment.getFragmentJson(), Object.class);
+ jsonString = objectMapper.defaultPrettyPrintingWriter().writeValueAsString(json);
+ } catch(final Exception e) {
+ // we've already set jsonString to a fallback value
}
- }
+ sb.append(jsonString);
- return queryWorkUnit;
+ logger.trace(sb.toString());
+ }
}
/**
@@ -897,7 +839,7 @@ public class Foreman implements Runnable {
runningQueries.dec();
completedQueries.inc();
try {
- releaseLease();
+ queryRM.exit();
} finally {
isClosed = true;
}
@@ -953,7 +895,7 @@ public class Foreman implements Runnable {
foremanResult.setCompleted(QueryState.CANCELED);
/*
* We don't close the foremanResult until we've gotten
- * acknowledgements, which happens below in the case for current state
+ * acknowledgments, which happens below in the case for current state
* == CANCELLATION_REQUESTED.
*/
return;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
index ecbccf3c6..216a80df4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java
@@ -36,7 +36,6 @@ import org.apache.drill.exec.proto.BitControl.PlanFragment;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.ExecProtos.FragmentHandle;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
-import org.apache.drill.exec.proto.SchemaUserBitShared;
import org.apache.drill.exec.proto.UserBitShared.FragmentState;
import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile;
import org.apache.drill.exec.proto.UserBitShared.QueryId;
@@ -99,6 +98,15 @@ public class QueryManager implements AutoCloseable {
// Is the query saved in transient store
private boolean inTransientStore;
+ /**
+ * Total query cost. This value is used to place the query into a queue
+ * and so has meaning to the user who wants to predict queue placement.
+ */
+
+ private double totalCost;
+
+ private String queueName;
+
public QueryManager(final QueryId queryId, final RunQuery runQuery, final PersistentStoreProvider storeProvider,
final ClusterCoordinator coordinator, final Foreman foreman) {
this.queryId = queryId;
@@ -191,6 +199,7 @@ public class QueryManager implements AutoCloseable {
* (3) Leaf fragment: running, send the cancel signal through a tunnel. The cancel is done directly.
*/
void cancelExecutingFragments(final DrillbitContext drillbitContext) {
+ @SuppressWarnings("resource")
final Controller controller = drillbitContext.getController();
for(final FragmentData data : fragmentDataSet) {
switch(data.getState()) {
@@ -219,6 +228,7 @@ public class QueryManager implements AutoCloseable {
* sending any message. Resume all fragments through the control tunnel.
*/
void unpauseExecutingFragments(final DrillbitContext drillbitContext) {
+ @SuppressWarnings("resource")
final Controller controller = drillbitContext.getController();
for(final FragmentData data : fragmentDataSet) {
final DrillbitEndpoint endpoint = data.getEndpoint();
@@ -318,6 +328,8 @@ public class QueryManager implements AutoCloseable {
.setUser(foreman.getQueryContext().getQueryUserName())
.setForeman(foreman.getQueryContext().getCurrentEndpoint())
.setStart(startTime)
+ .setTotalCost(totalCost)
+ .setQueueName(queueName == null ? "-" : queueName)
.setOptionsJson(getQueryOptionsAsJson());
if (queryText != null) {
@@ -332,7 +344,6 @@ public class QueryManager implements AutoCloseable {
}
private QueryProfile getQueryProfile(UserException ex) {
- final String queryText = foreman.getQueryText();
final QueryProfile.Builder profileBuilder = QueryProfile.newBuilder()
.setUser(foreman.getQueryContext().getQueryUserName())
.setType(runQuery.getType())
@@ -345,6 +356,8 @@ public class QueryManager implements AutoCloseable {
.setQueueWaitEnd(queueWaitEndTime)
.setTotalFragments(fragmentDataSet.size())
.setFinishedFragments(finishedFragments.get())
+ .setTotalCost(totalCost)
+ .setQueueName(queueName == null ? "-" : queueName)
.setOptionsJson(getQueryOptionsAsJson());
if (ex != null) {
@@ -360,6 +373,7 @@ public class QueryManager implements AutoCloseable {
profileBuilder.setPlan(planText);
}
+ final String queryText = foreman.getQueryText();
if (queryText != null) {
profileBuilder.setQuery(queryText);
}
@@ -392,7 +406,6 @@ public class QueryManager implements AutoCloseable {
profileBuilder.addFragmentProfile(builder);
return true;
}
-
}
private class InnerIter implements IntObjectPredicate<FragmentData> {
@@ -407,7 +420,6 @@ public class QueryManager implements AutoCloseable {
builder.addMinorFragmentProfile(data.getProfile());
return true;
}
-
}
void setPlanText(final String planText) {
@@ -430,6 +442,14 @@ public class QueryManager implements AutoCloseable {
queueWaitEndTime = System.currentTimeMillis();
}
+ public void setTotalCost(double totalCost) {
+ this.totalCost = totalCost;
+ }
+
+ public void setQueueName(String queueName) {
+ this.queueName = queueName;
+ }
+
/**
* Internal class used to track the number of pending completion messages required from particular node. This allows
* to know for each node that is part of this query, what portion of fragments are still outstanding. In the case that
@@ -536,7 +556,7 @@ public class QueryManager implements AutoCloseable {
return drillbitStatusListener;
}
- private final DrillbitStatusListener drillbitStatusListener = new DrillbitStatusListener(){
+ private final DrillbitStatusListener drillbitStatusListener = new DrillbitStatusListener() {
@Override
public void drillbitRegistered(final Set<DrillbitEndpoint> registeredDrillbits) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/AbstractResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/AbstractResourceManager.java
new file mode 100644
index 000000000..9bcaddade
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/AbstractResourceManager.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.server.DrillbitContext;
+
+/**
+ * Abstract base class for a resource manager. Handles tasks common to all
+ * resource managers: learning the resources available on this Drillbit.
+ * In the current version, Drillbits must be symmetrical, so that knowing
+ * the resources on one node is sufficient to know resources available on
+ * all nodes.
+ */
+
+public abstract class AbstractResourceManager implements ResourceManager {
+
+ protected final DrillbitContext context;
+ private final long memoryPerNode;
+ private final int cpusPerNode;
+
+ public AbstractResourceManager(final DrillbitContext context) {
+ this.context = context;
+ DrillConfig config = context.getConfig();
+
+ // Normally we use the actual direct memory configured on the JVM command
+ // line. However, if the config param is set, we use that instead (if it is
+ // lower than actual memory). Primarily for testing.
+
+ long memLimit = DrillConfig.getMaxDirectMemory();
+ long configMemoryPerNode = config.getBytes(ExecConstants.MAX_MEMORY_PER_NODE);
+ if (configMemoryPerNode > 0) {
+ memLimit = Math.min(memLimit, configMemoryPerNode);
+ }
+ memoryPerNode = memLimit;
+
+ // Do the same for CPUs.
+
+ int cpuLimit = Runtime.getRuntime().availableProcessors();
+ int configCpusPerNode = config.getInt(ExecConstants.MAX_CPUS_PER_NODE);
+ if (configCpusPerNode > 0) {
+ cpuLimit = Math.min(cpuLimit, configCpusPerNode);
+ }
+ cpusPerNode = cpuLimit;
+ }
+
+ @Override
+ public long memoryPerNode() { return memoryPerNode; }
+
+ @Override
+ public int cpusPerNode() { return cpusPerNode; }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java
new file mode 100644
index 000000000..c28fbbb59
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.server.BootStrapContext;
+import org.apache.drill.exec.util.MemoryAllocationUtilities;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.apache.drill.exec.work.foreman.Foreman;
+
+/**
+ * Represents a default resource manager for clusters that do not provide query
+ * queues. Without queues to provide a hard limit on the query admission rate,
+ * the number of active queries must be estimated and the resulting resource
+ * allocations will be rough estimates.
+ */
+
+public class DefaultResourceManager implements ResourceManager {
+
+ public static class DefaultResourceAllocator implements QueryResourceAllocator {
+
+ private QueryContext queryContext;
+
+ protected DefaultResourceAllocator(QueryContext queryContext) {
+ this.queryContext = queryContext;
+ }
+
+ @Override
+ public void visitAbstractPlan(PhysicalPlan plan) {
+ if (plan == null || plan.getProperties().hasResourcePlan) {
+ return;
+ }
+ MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext);
+ }
+
+ @Override
+ public void visitPhysicalPlan(QueryWorkUnit work) {
+ }
+ }
+
+ public static class DefaultQueryResourceManager extends DefaultResourceAllocator implements QueryResourceManager {
+
+ @SuppressWarnings("unused")
+ private final DefaultResourceManager rm;
+
+ public DefaultQueryResourceManager(final DefaultResourceManager rm, final Foreman foreman) {
+ super(foreman.getQueryContext());
+ this.rm = rm;
+ }
+
+ @Override
+ public void setCost(double cost) {
+ // Nothing to do by default.
+ }
+
+ @Override
+ public void admit() {
+ // No queueing by default
+ }
+
+ @Override
+ public void exit() {
+ // No queueing by default
+ }
+
+ @Override
+ public boolean hasQueue() { return false; }
+
+ @Override
+ public String queueName() { return null; }
+ }
+
+ public final long memoryPerNode;
+ public final int cpusPerNode;
+
+ public DefaultResourceManager() {
+ memoryPerNode = DrillConfig.getMaxDirectMemory();
+
+ // Note: CPUs are not yet used, they will be used in a future
+ // enhancement.
+
+ cpusPerNode = Runtime.getRuntime().availableProcessors();
+ }
+
+ @Override
+ public long memoryPerNode() { return memoryPerNode; }
+
+ @Override
+ public int cpusPerNode() { return cpusPerNode; }
+
+ @Override
+ public QueryResourceAllocator newResourceAllocator(QueryContext queryContext) {
+ return new DefaultResourceAllocator(queryContext);
+ }
+
+ @Override
+ public QueryResourceManager newQueryRM(final Foreman foreman) {
+ return new DefaultQueryResourceManager(this, foreman);
+ }
+
+ @Override
+ public void close() { }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java
new file mode 100644
index 000000000..9a4c78df5
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.DistributedSemaphore;
+import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.SystemOptionManager;
+
+/**
+ * Distributed query queue which uses a Zookeeper distributed semaphore to
+ * control queuing across the cluster. The distributed queue is actually two
+ * queues: one for "small" queries, another for "large" queries. Query size is
+ * determined by the Planner's estimate of query cost.
+ * <p>
+ * This queue is configured using system options:
+ * <dl>
+ * <dt><tt>exec.queue.enable</tt>
+ * <dt>
+ * <dd>Set to true to enable the distributed queue.</dd>
+ * <dt><tt>exec.queue.large</tt>
+ * <dt>
+ * <dd>The maximum number of large queries to admit. Additional
+ * queries wait in the queue.</dd>
+ * <dt><tt>exec.queue.small</tt>
+ * <dt>
+ * <dd>The maximum number of small queries to admit. Additional
+ * queries wait in the queue.</dd>
+ * <dt><tt>exec.queue.threshold</tt>
+ * <dt>
+ * <dd>The cost threshold. Queries below this size are small, at
+ * or above this size are large..</dd>
+ * <dt><tt>exec.queue.timeout_millis</tt>
+ * <dt>
+ * <dd>The maximum time (in milliseconds) a query will wait in the
+ * queue before failing.</dd>
+ * </dl>
+ * <p>
+ * The above values are refreshed every five seconds. This aids performance
+ * a bit in systems with very high query arrival rates.
+ */
+
+public class DistributedQueryQueue implements QueryQueue {
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedQueryQueue.class);
+
+ private class DistributedQueueLease implements QueueLease {
+ private final QueryId queryId;
+ private DistributedLease lease;
+ private final String queueName;
+
+ /**
+ * Memory allocated to the query. Though all queries in the queue use
+ * the same memory allocation rules, those rules can change at any time
+ * as the user changes system options. This value captures the value
+ * calculated at the time that this lease was granted.
+ */
+ private long queryMemory;
+
+ public DistributedQueueLease(QueryId queryId, String queueName,
+ DistributedLease lease, long queryMemory) {
+ this.queryId = queryId;
+ this.queueName = queueName;
+ this.lease = lease;
+ this.queryMemory = queryMemory;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Lease for %s queue to query %s",
+ queueName, QueryIdHelper.getQueryId(queryId));
+ }
+
+ @Override
+ public long queryMemoryPerNode() { return queryMemory; }
+
+ @Override
+ public void release() {
+ DistributedQueryQueue.this.release(this);
+ }
+
+ @Override
+ public String queueName() { return queueName; }
+ }
+
+ /**
+ * Exposes a snapshot of internal state information for use in status
+ * reporting, such as in the UI.
+ */
+
+ @XmlRootElement
+ public static class ZKQueueInfo {
+ public final int smallQueueSize;
+ public final int largeQueueSize;
+ public final double queueThreshold;
+ public final long memoryPerNode;
+ public final long memoryPerSmallQuery;
+ public final long memoryPerLargeQuery;
+
+ public ZKQueueInfo(DistributedQueryQueue queue) {
+ smallQueueSize = queue.configSet.smallQueueSize;
+ largeQueueSize = queue.configSet.largeQueueSize;
+ queueThreshold = queue.configSet.queueThreshold;
+ memoryPerNode = queue.memoryPerNode;
+ memoryPerSmallQuery = queue.memoryPerSmallQuery;
+ memoryPerLargeQuery = queue.memoryPerLargeQuery;
+ }
+ }
+
+ public interface StatusAdapter {
+ boolean inShutDown();
+ }
+
+ /**
+ * Holds runtime configuration options. Allows polling the options
+ * for changes, and easily detecting changes.
+ */
+
+ private static class ConfigSet {
+ private final long queueThreshold;
+ private final long queueTimeout;
+ private final int largeQueueSize;
+ private final int smallQueueSize;
+ private final double largeToSmallRatio;
+ private final double reserveMemoryRatio;
+ private final long minimumOperatorMemory;
+
+ public ConfigSet(SystemOptionManager optionManager) {
+ queueThreshold = optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_SIZE);
+ queueTimeout = optionManager.getOption(ExecConstants.QUEUE_TIMEOUT);
+
+ // Option manager supports only long values, but we do not expect
+ // more than 2 billion active queries, so queue size is stored as
+ // an int.
+ // TODO: Once DRILL-5832 is available, add an getInt() method to
+ // the option system to get the value as an int and do a range
+ // check.
+
+ largeQueueSize = (int) optionManager.getOption(ExecConstants.LARGE_QUEUE_SIZE);
+ smallQueueSize = (int) optionManager.getOption(ExecConstants.SMALL_QUEUE_SIZE);
+ largeToSmallRatio = optionManager.getOption(ExecConstants.QUEUE_MEMORY_RATIO);
+ reserveMemoryRatio = optionManager.getOption(ExecConstants.QUEUE_MEMORY_RESERVE);
+ minimumOperatorMemory = optionManager.getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP);
+ }
+
+ /**
+ * Determine if this config set is the same as another one. Detects
+ * whether the configuration has changed between one sync point and
+ * another.
+ * <p>
+ * Note that we cannot use <tt>equals()</tt> here as, according to
+ * Drill practice, <tt>equals()</tt> is for use in collections and
+ * must be accompanied by a <tt>hashCode()</tt> method. Since this
+ * class will never be used in a collection, and does not need a
+ * hash function, we cannot use <tt>equals()</tt>.
+ *
+ * @param otherSet another snapshot taken at another time
+ * @return true if this configuration is the same as another
+ * (no update between the two snapshots), false if the config has
+ * changed between the snapshots
+ */
+
+ public boolean isSameAs(ConfigSet otherSet) {
+ return queueThreshold == otherSet.queueThreshold &&
+ queueTimeout == otherSet.queueTimeout &&
+ largeQueueSize == otherSet.largeQueueSize &&
+ smallQueueSize == otherSet.smallQueueSize &&
+ largeToSmallRatio == otherSet.largeToSmallRatio &&
+ reserveMemoryRatio == otherSet.reserveMemoryRatio &&
+ minimumOperatorMemory == otherSet.minimumOperatorMemory;
+ }
+ }
+
+ private long memoryPerNode;
+ private SystemOptionManager optionManager;
+ private ConfigSet configSet;
+ private ClusterCoordinator clusterCoordinator;
+ private long nextRefreshTime;
+ private long memoryPerSmallQuery;
+ private long memoryPerLargeQuery;
+ private final StatusAdapter statusAdapter;
+
+ public DistributedQueryQueue(DrillbitContext context, StatusAdapter adapter) {
+ statusAdapter = adapter;
+ optionManager = context.getOptionManager();
+ clusterCoordinator = context.getClusterCoordinator();
+ }
+
+ @Override
+ public void setMemoryPerNode(long memoryPerNode) {
+ this.memoryPerNode = memoryPerNode;
+ refreshConfig();
+ }
+
+ @Override
+ public long defaultQueryMemoryPerNode(double cost) {
+ return (cost < configSet.queueThreshold)
+ ? memoryPerSmallQuery
+ : memoryPerLargeQuery;
+ }
+
+ @Override
+ public long minimumOperatorMemory() { return configSet.minimumOperatorMemory; }
+
+ /**
+ * This limits the number of "small" and "large" queries that a Drill cluster will run
+ * simultaneously, if queuing is enabled. If the query is unable to run, this will block
+ * until it can. Beware that this is called under run(), and so will consume a thread
+ * while it waits for the required distributed semaphore.
+ *
+ * @param queryId query identifier
+ * @param cost the query plan
+ * @throws QueryQueueException if the underlying ZK queuing mechanism fails
+ * @throws QueueTimeoutException if the query waits too long in the
+ * queue
+ */
+
+ @SuppressWarnings("resource")
+ @Override
+ public QueueLease enqueue(QueryId queryId, double cost) throws QueryQueueException, QueueTimeoutException {
+ final String queueName;
+ DistributedLease lease = null;
+ long queryMemory;
+ final DistributedSemaphore distributedSemaphore;
+ try {
+
+ // Only the refresh and queue computation is synchronized.
+
+ synchronized(this) {
+ refreshConfig();
+
+ // get the appropriate semaphore
+ if (cost >= configSet.queueThreshold) {
+ distributedSemaphore = clusterCoordinator.getSemaphore("query.large", configSet.largeQueueSize);
+ queueName = "large";
+ queryMemory = memoryPerLargeQuery;
+ } else {
+ distributedSemaphore = clusterCoordinator.getSemaphore("query.small", configSet.smallQueueSize);
+ queueName = "small";
+ queryMemory = memoryPerSmallQuery;
+ }
+ }
+ logger.debug("Query {} with cost {} placed into the {} queue.",
+ QueryIdHelper.getQueryId(queryId), cost, queueName);
+
+ lease = distributedSemaphore.acquire(configSet.queueTimeout, TimeUnit.MILLISECONDS);
+ } catch (final Exception e) {
+ logger.error("Unable to acquire slot for query " +
+ QueryIdHelper.getQueryId(queryId), e);
+ throw new QueryQueueException("Unable to acquire slot for query.", e);
+ }
+
+ if (lease == null) {
+ int timeoutSecs = (int) Math.round(configSet.queueTimeout/1000.0);
+ logger.warn("Queue timeout: {} after {} seconds.", queueName, timeoutSecs);
+ throw new QueueTimeoutException(queryId, queueName, timeoutSecs);
+ }
+ return new DistributedQueueLease(queryId, queueName, lease, queryMemory);
+ }
+
+ private synchronized void refreshConfig() {
+ long now = System.currentTimeMillis();
+ if (now < nextRefreshTime) {
+ return;
+ }
+ nextRefreshTime = now + 5000;
+
+ // Only update numbers, and log changes, if something
+ // actually changes.
+
+ ConfigSet newSet = new ConfigSet(optionManager);
+ if (newSet.isSameAs(configSet)) {
+ return;
+ }
+
+ configSet = newSet;
+
+ // Divide up memory between queues using admission rate
+ // to give more memory to larger queries and less to
+ // smaller queries. We assume that large queries are
+ // larger than small queries by a factor of
+ // largeToSmallRatio.
+
+ double totalUnits = configSet.largeToSmallRatio * configSet.largeQueueSize + configSet.smallQueueSize;
+ double availableMemory = Math.round(memoryPerNode * (1.0 - configSet.reserveMemoryRatio));
+ double memoryUnit = availableMemory / totalUnits;
+ memoryPerLargeQuery = Math.round(memoryUnit * configSet.largeToSmallRatio);
+ memoryPerSmallQuery = Math.round(memoryUnit);
+
+ logger.debug("Memory config: total memory per node = {}, available: {}, large/small memory ratio = {}",
+ memoryPerNode, availableMemory, configSet.largeToSmallRatio);
+ logger.debug("Reserve memory ratio: {}, bytes: {}",
+ configSet.reserveMemoryRatio, memoryPerNode - availableMemory);
+ logger.debug("Minimum operator memory: {}", configSet.minimumOperatorMemory);
+ logger.debug("Small queue: {} slots, {} bytes per slot",
+ configSet.smallQueueSize, memoryPerSmallQuery);
+ logger.debug("Large queue: {} slots, {} bytes per slot",
+ configSet.largeQueueSize, memoryPerLargeQuery);
+ logger.debug("Cost threshold: {}, timeout: {} ms.",
+ configSet.queueThreshold, configSet.queueTimeout);
+ }
+
+ @Override
+ public boolean enabled() { return true; }
+
+ public synchronized ZKQueueInfo getInfo() {
+ refreshConfig();
+ return new ZKQueueInfo(this);
+ }
+
+ private void release(QueueLease lease) {
+ DistributedQueueLease theLease = (DistributedQueueLease) lease;
+ for (;;) {
+ try {
+ theLease.lease.close();
+ theLease.lease = null;
+ break;
+ } catch (final InterruptedException e) {
+ // if we end up here, the loop will try again
+ } catch (final Exception e) {
+ logger.warn("Failure while releasing lease.", e);
+ break;
+ }
+ if (inShutdown()) {
+ logger.warn("In shutdown mode: abandoning attempt to release lease");
+ }
+ }
+ }
+
+ private boolean inShutdown() {
+ if (statusAdapter == null) {
+ return false;
+ }
+ return statusAdapter.inShutDown();
+ }
+
+ @Override
+ public void close() {
+ // Nothing to do.
+ }
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DynamicResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DynamicResourceManager.java
new file mode 100644
index 000000000..473401fab
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DynamicResourceManager.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.server.options.SystemOptionManager;
+import org.apache.drill.exec.work.foreman.Foreman;
+import org.apache.drill.exec.work.foreman.rm.DistributedQueryQueue.StatusAdapter;
+
+/**
+ * Wrapper around the default and/or distributed resource managers
+ * to allow dynamically enabling and disabling queueing.
+ */
+
+public class DynamicResourceManager implements ResourceManager {
+
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DynamicResourceManager.class);
+
+ private final DrillbitContext context;
+ private ResourceManager defaultRm;
+ private ResourceManager queueingRm;
+ private ResourceManager activeRm;
+ public long nextUpdateTime;
+ public final int recheckDelayMs = 5000;
+
+ public DynamicResourceManager(final DrillbitContext context) {
+ this.context = context;
+ refreshRM();
+ }
+
+ public synchronized ResourceManager activeRM() {
+ refreshRM();
+ return activeRm;
+ }
+
+ @Override
+ public long memoryPerNode() {
+ return activeRm.memoryPerNode();
+ }
+
+ @Override
+ public int cpusPerNode() {
+ return activeRm.cpusPerNode();
+ }
+
+ @Override
+ public synchronized QueryResourceAllocator newResourceAllocator(QueryContext queryContext) {
+ refreshRM();
+ return activeRm.newResourceAllocator(queryContext);
+ }
+
+ @Override
+ public synchronized QueryResourceManager newQueryRM(Foreman foreman) {
+ refreshRM();
+ return activeRm.newQueryRM(foreman);
+ }
+
+ private void refreshRM() {
+ long now = System.currentTimeMillis();
+ if (now < nextUpdateTime) {
+ return;
+ }
+ nextUpdateTime = now + recheckDelayMs;
+ @SuppressWarnings("resource")
+ SystemOptionManager systemOptions = context.getOptionManager();
+ if (systemOptions.getOption(ExecConstants.ENABLE_QUEUE)) {
+ if (queueingRm == null) {
+ StatusAdapter statusAdapter = new StatusAdapter() {
+ @Override
+ public boolean inShutDown() {
+ // Drill provides no shutdown state at present.
+ // TODO: Once DRILL-4286 (graceful shutdown) is merged, use the
+ // new Drillbit status to determine when the Drillbit
+ // is shutting down.
+ return false;
+ }
+ };
+ queueingRm = new ThrottledResourceManager(context,
+ new DistributedQueryQueue(context, statusAdapter));
+ }
+ if (activeRm != queueingRm) {
+ logger.debug("Enabling ZK-based query queue.");
+ activeRm = queueingRm;
+ }
+ } else {
+ if (defaultRm == null) {
+ defaultRm = new DefaultResourceManager();
+ }
+ if (activeRm != defaultRm) {
+ logger.debug("Disabling ZK-based query queue.");
+ activeRm = defaultRm;
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ RuntimeException ex = null;
+ try {
+ if (defaultRm != null) {
+ defaultRm.close();
+ }
+ } catch (RuntimeException e) {
+ ex = e;
+ } finally {
+ defaultRm = null;
+ }
+ try {
+ if (queueingRm != null) {
+ queueingRm.close();
+ }
+ } catch (RuntimeException e) {
+ ex = ex == null ? e : ex;
+ } finally {
+ queueingRm = null;
+ }
+ activeRm = null;
+ if (ex == null) {
+ return;
+ } else if (ex instanceof UserException) {
+ throw (UserException) ex;
+ } else {
+ throw UserException.systemError(ex)
+ .addContext("Failure closing resource managers.")
+ .build(logger);
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/EmbeddedQueryQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/EmbeddedQueryQueue.java
new file mode 100644
index 000000000..a042f8bb6
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/EmbeddedQueryQueue.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.server.DrillbitContext;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Query queue to be used in an embedded Drillbit. This queue has scope of only
+ * the one Drillbit (not even multiple Drillbits in the same process.) Primarily
+ * intended for testing, but may possibly be useful for other embedded
+ * applications.
+ * <p>
+ * Configuration is via config parameters (not via system options as for the
+ * distributed queue.)
+ * <dl>
+ * <dt><tt>drill.queue.embedded.enabled</tt></dt>
+ * <dd>Set to true to enable the embedded queue. But, this setting has effect
+ * only if the Drillbit is, in fact, embedded.</dd>
+ * <dt><tt>drill.queue.embedded.size</tt></dt>
+ * <dd>The number of active queries, all others queue. There is no upper limit
+ * on the number of queued entries.</dt>
+ * <dt><tt>drill.queue.embedded.timeout_ms</tt></dt>
+ * <dd>The maximum time a query will wait in the queue before failing.</dd>
+ * </dl>
+ */
+
+public class EmbeddedQueryQueue implements QueryQueue {
+
+ public static String EMBEDDED_QUEUE = "drill.exec.queue.embedded";
+ public static String ENABLED = EMBEDDED_QUEUE + ".enable";
+ public static String QUEUE_SIZE = EMBEDDED_QUEUE + ".size";
+ public static String TIMEOUT_MS = EMBEDDED_QUEUE + ".timeout_ms";
+
+ public class EmbeddedQueueLease implements QueueLease {
+
+ private final QueryId queryId;
+ private boolean released;
+ private long queryMemory;
+
+ public EmbeddedQueueLease(QueryId queryId, long queryMemory) {
+ this.queryId = queryId;
+ this.queryMemory = queryMemory;
+ }
+
+ @Override
+ public String toString( ) {
+ return new StringBuilder()
+ .append("Embedded queue lease for ")
+ .append(QueryIdHelper.getQueryId(queryId))
+ .append(released ? " (released)" : "")
+ .toString();
+ }
+
+ @Override
+ public long queryMemoryPerNode() {
+ return queryMemory;
+ }
+
+ @Override
+ public void release() {
+ EmbeddedQueryQueue.this.release(this);
+ released = true;
+ }
+
+ @VisibleForTesting
+ boolean isReleased() { return released; }
+
+ @Override
+ public String queueName() { return "local-queue"; }
+ }
+
+ private final int queueTimeoutMs;
+ private final int queueSize;
+ private final Semaphore semaphore;
+ private long memoryPerQuery;
+ private final long minimumOperatorMemory;
+
+ public EmbeddedQueryQueue(DrillbitContext context) {
+ DrillConfig config = context.getConfig();
+ queueTimeoutMs = config.getInt(TIMEOUT_MS);
+ queueSize = config.getInt(QUEUE_SIZE);
+ semaphore = new Semaphore(queueSize, true);
+ minimumOperatorMemory = context.getOptionManager()
+ .getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP);
+ }
+
+ @Override
+ public boolean enabled() { return true; }
+
+ @Override
+ public void setMemoryPerNode(long memoryPerNode) {
+ memoryPerQuery = memoryPerNode / queueSize;
+ }
+
+ @Override
+ public long defaultQueryMemoryPerNode(double cost) {
+ return memoryPerQuery;
+ }
+
+ @Override
+ public QueueLease enqueue(QueryId queryId, double cost)
+ throws QueueTimeoutException, QueryQueueException {
+ try {
+ if (! semaphore.tryAcquire(queueTimeoutMs, TimeUnit.MILLISECONDS) ) {
+ throw new QueueTimeoutException(queryId, "embedded", queueTimeoutMs);
+ }
+ } catch (InterruptedException e) {
+ throw new QueryQueueException("Interrupted", e);
+ }
+ return new EmbeddedQueueLease(queryId, memoryPerQuery);
+ }
+
+ private void release(EmbeddedQueueLease lease) {
+ assert ! lease.released;
+ semaphore.release();
+ }
+
+ @Override
+ public void close() {
+ assert semaphore.availablePermits() == queueSize;
+ }
+
+ @Override
+ public long minimumOperatorMemory() {
+ return minimumOperatorMemory;
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryQueue.java
new file mode 100644
index 000000000..72dc2d63a
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryQueue.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+
+/**
+ * Interface which defines a queue implementation for query queues.
+ * Implementations can queue locally, queue distributed, or do
+ * nothing at all.
+ * <p>
+ * A queue can report itself as enabled or disabled. When enabled,
+ * all queries must obtain a lease prior to starting execution. The
+ * lease must be released at the completion of execution.
+ */
+
+public interface QueryQueue {
+
+ /**
+ * The opaque lease returned once a query is admitted
+ * for execution.
+ */
+
+ public interface QueueLease {
+ long queryMemoryPerNode();
+
+ /**
+ * Release a query lease obtained from {@link #queue(QueryId, double))}.
+ * Should be called by the per-query resource manager.
+ *
+ * @param lease the lease to be released.
+ */
+
+ void release();
+
+ String queueName();
+ };
+
+ /**
+ * Exception thrown if a query exceeds the configured wait time
+ * in the query queue.
+ */
+
+ @SuppressWarnings("serial")
+ public class QueueTimeoutException extends Exception {
+
+ private final QueryId queryId;
+ private final String queueName;
+ private final int timeoutMs;
+
+ public QueueTimeoutException(QueryId queryId, String queueName, int timeoutMs) {
+ super( String.format(
+ "Query timed out of the %s queue after %d ms.",
+ queueName, timeoutMs ));
+ this.queryId = queryId;
+ this.queueName = queueName;
+ this.timeoutMs = timeoutMs;
+ }
+
+ public QueryId queryId() { return queryId; }
+ public String queueName() { return queueName; }
+ public int timeoutMs() { return timeoutMs; }
+ }
+
+ /**
+ * Exception thrown for all non-timeout error conditions.
+ */
+
+ @SuppressWarnings("serial")
+ public class QueryQueueException extends Exception {
+ QueryQueueException(String msg, Exception e) {
+ super(msg, e);
+ }
+ }
+
+ void setMemoryPerNode(long memoryPerNode);
+
+ /**
+ * Return the amount of memory per node when creating a EXPLAIN
+ * query plan. Plans to be executed should get the query memory from
+ * the lease, as the lease may adjust the default amount on a per-query
+ * basis. This means that the memory used to execute the query may
+ * differ from the amount shown in an EXPLAIN plan.
+ *
+ * @return assumed memory per node, in bytes, to use when creating
+ * an EXPLAIN plan
+ */
+
+ long defaultQueryMemoryPerNode(double cost);
+
+ /**
+ * Optional floor on the amount of memory assigned per operator.
+ * This ensures that operators receive a certain amount, separate from
+ * any memory slicing. This can oversubscribe node memory if used
+ * incorrectly.
+ *
+ * @return minimum per-operator memory, in bytes
+ */
+
+ long minimumOperatorMemory();
+
+ /**
+ * Determine if the queue is enabled.
+ * @return true if the query is enabled, false otherwise.
+ */
+
+ boolean enabled();
+
+ /**
+ * Queue a query. The method returns only when the query is admitted for
+ * execution. As a result, the calling thread may block up to the configured
+ * wait time.
+ * @param queryId the query ID
+ * @param cost the cost of the query used for cost-based queueing
+ * @return the query lease which must be passed to {@link #release(QueueLease)}
+ * upon query completion
+ * @throws QueueTimeoutException if the query times out waiting to be
+ * admitted.
+ * @throws QueryQueueException for any other error condition.
+ */
+
+ QueueLease enqueue(QueryId queryId, double cost) throws QueueTimeoutException, QueryQueueException;
+
+ void close();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceAllocator.java
new file mode 100644
index 000000000..35dbe5987
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceAllocator.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.work.QueryWorkUnit;
+
+/**
+ * Manages resources for an individual query in conjunction with the
+ * global {@link ResourceManager}. Handles memory and CPU allocation.
+ * Instances of this class handle query planning and are used when the
+ * client wants to plan the query, but not execute it. An implementation
+ * of {@link QueryResourceManager} is used to both plan the query and
+ * queue it for execution.
+ * <p>
+ * This interface allows a variety of resource management strategies to
+ * exist for different purposes.
+ * <p>
+ * The methods here assume external synchronization: a single query calls
+ * the methods at known times; there are no concurrent calls.
+ */
+
+public interface QueryResourceAllocator {
+
+ /**
+ * Make any needed adjustments to the query plan before parallelization.
+ *
+ * @param plan
+ */
+ void visitAbstractPlan(PhysicalPlan plan);
+
+ /**
+ * Provide the manager with the physical plan and node assignments
+ * for the query to be run. This class will plan memory for the query.
+ *
+ * @param plan
+ * @param work
+ */
+
+ void visitPhysicalPlan(QueryWorkUnit work);
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java
new file mode 100644
index 000000000..9e2a3a10f
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
+import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException;
+
+/**
+ * Extends a {@link QueryResourceAllocator} to provide queueing support.
+ */
+
+public interface QueryResourceManager extends QueryResourceAllocator {
+
+ /**
+ * Hint that this resource manager queues. Allows the Foreman
+ * to short-circuit expensive logic if no queuing will actually
+ * be done. This is a static attribute per Drillbit run.
+ */
+
+ boolean hasQueue();
+
+ /**
+ * For some cases the foreman does not have a full plan, just a cost. In
+ * this case, this object will not plan memory, but still needs the cost
+ * to place the job into the correct queue.
+ * @param cost
+ */
+
+ void setCost(double cost);
+
+ /**
+ * Admit the query into the cluster. Blocks until the query
+ * can run. (Later revisions may use a more thread-friendly
+ * approach.)
+ * @throws QueryQueueException if something goes wrong with the
+ * queue mechanism
+ * @throws QueueTimeoutException if the query timed out waiting to
+ * be admitted.
+ */
+
+ void admit() throws QueueTimeoutException, QueryQueueException;
+
+ /**
+ * Returns the name of the queue (if any) on which the query was
+ * placed. Valid only after the query is admitted.
+ *
+ * @return queue name, or null if queuing is not enabled.
+ */
+
+ String queueName();
+
+ /**
+ * Mark the query as completing, giving up its slot in the
+ * cluster. Releases any lease that may be held for a system with queues.
+ */
+
+ void exit();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManager.java
new file mode 100644
index 000000000..71dabafa7
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManager.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.work.foreman.Foreman;
+
+/**
+ * Drillbit-wide resource manager shared by all queries. Manages memory (at
+ * present) and CPU (planned). Since queries are the primary consumer of
+ * resources, manages resources by throttling queries into the system, and
+ * allocating resources to queries in order to control total use. An "null"
+ * implementation handles the case of no queuing. Clearly, the null case cannot
+ * effectively control resource use.
+ */
+
+public interface ResourceManager {
+
+ /**
+ * Returns the memory, in bytes, assigned to each node in a Drill cluster.
+ * Drill requires that nodes are symmetrical. So, knowing the memory on any
+ * one node also gives the memory on all other nodes.
+ *
+ * @return the memory, in bytes, available in each Drillbit
+ */
+ long memoryPerNode();
+
+ int cpusPerNode();
+
+ /**
+ * Create a resource manager to prepare or describe a query. In this form, no
+ * queuing is done, but the plan is created as if queuing had been done. Used
+ * when executing EXPLAIN PLAN.
+ *
+ * @return a resource manager for the query
+ */
+
+ QueryResourceAllocator newResourceAllocator(QueryContext queryContext);
+
+ /**
+ * Create a resource manager to execute a query.
+ *
+ * @param foreman
+ * Foreman which manages the execution
+ * @return a resource manager for the query
+ */
+
+ QueryResourceManager newQueryRM(final Foreman foreman);
+
+ void close();
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManagerBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManagerBuilder.java
new file mode 100644
index 000000000..430589135
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManagerBuilder.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.exec.coord.ClusterCoordinator;
+import org.apache.drill.exec.coord.local.LocalClusterCoordinator;
+import org.apache.drill.exec.server.DrillbitContext;
+
+/**
+ * Builds the proper resource manager and queue implementation for the configured
+ * system options.
+ * <p>
+ * <ul>
+ * <li>If the Drillbit is embedded<ul>
+ * <li>If queues are enabled, then the admission-controlled resource manager
+ * with the local query queue.</li>
+ * <li>Otherwise, the default resource manager and no queues.</li>
+ * </ul></li>
+ * <li>If the Drillbit is in a cluster<ul>
+ * <li>If queues are enabled, then the admission-controlled resource manager
+ * with the distributed query queue.</li>
+ * <li>Otherwise, the default resource manager and no queues.</li>
+ * </ul></li>
+ * </ul>
+ * Configuration settings:
+ * <dl>
+ * <dt>Cluster coordinator instance</dt>
+ * <dd>If an instance of <tt>LocalClusterCoordinator</tt>, the Drillbit is
+ * embedded, else it is in a cluster.</dd>
+ * <dt><tt>drill.exec.queue.embedded.enable</tt> boot config<dt>
+ * <dd>If enabled, and if embedded, then use the local queue.</dd>
+ * <dt><tt>exec.queue.enable</tt> system option</dt>
+ * <dd>If enabled, and if in a cluster, then use the distributed queue.</dd>
+ * </dl>
+ */
+public class ResourceManagerBuilder {
+
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResourceManagerBuilder.class);
+
+ private DrillbitContext context ;
+
+ public ResourceManagerBuilder(final DrillbitContext context) {
+ this.context = context;
+ }
+
+ @SuppressWarnings("resource")
+ public ResourceManager build() {
+ ClusterCoordinator coord = context.getClusterCoordinator();
+ DrillConfig config = context.getConfig();
+ if (coord instanceof LocalClusterCoordinator) {
+ if (config.getBoolean(EmbeddedQueryQueue.ENABLED)) {
+ logger.debug("Enabling embedded, local query queue.");
+ return new ThrottledResourceManager(context, new EmbeddedQueryQueue(context));
+ } else {
+ logger.debug("No query queueing enabled.");
+ return new DefaultResourceManager();
+ }
+ } else {
+ return new DynamicResourceManager(context);
+ }
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
new file mode 100644
index 000000000..b46fe0933
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.foreman.rm;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
+import org.apache.drill.exec.physical.base.FragmentRoot;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.proto.helper.QueryIdHelper;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.exec.work.QueryWorkUnit;
+import org.apache.drill.exec.work.QueryWorkUnit.MinorFragmentDefn;
+import org.apache.drill.exec.work.foreman.Foreman;
+import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
+import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueLease;
+import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+
+/**
+ * Global resource manager that provides basic admission control (AC) via a
+ * configured queue: either the Zookeeper-based distributed queue or the
+ * in-process embedded Drillbit queue. The queue places an upper limit on the
+ * number of running queries. This limit then "slices" memory and CPU between
+ * queries: each gets the same share of resources.
+ * <p>
+ * This is a "basic" implementation. Clearly, a more advanced implementation
+ * could look at query cost to determine whether to give a given query more or
+ * less than the "standard" share. That is left as a future exercise; in this
+ * version we just want to get the basics working.
+ * <p>
+ * This is the resource manager level. This resource manager is paired with a
+ * queue implementation to produce a complete solution. This composition-based
+ * approach allows sharing of functionality across queue implementations.
+ */
+
+public class ThrottledResourceManager extends AbstractResourceManager {
+
+ private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory
+ .getLogger(ThrottledResourceManager.class);
+
+ public static class QueuedResourceAllocator
+ implements QueryResourceAllocator {
+
+ protected final ThrottledResourceManager rm;
+ protected QueryContext queryContext;
+ protected PhysicalPlan plan;
+ protected QueryWorkUnit work;
+ protected double queryCost;
+
+ protected QueuedResourceAllocator(final ThrottledResourceManager rm,
+ QueryContext queryContext) {
+ this.rm = rm;
+ this.queryContext = queryContext;
+ }
+
+ @Override
+ public void visitAbstractPlan(PhysicalPlan plan) {
+ this.plan = plan;
+ queryCost = plan.totalCost();
+ }
+
+ @Override
+ public void visitPhysicalPlan(final QueryWorkUnit work) {
+ this.work = work;
+ planMemory();
+ }
+
+ private void planMemory() {
+ if (plan.getProperties().hasResourcePlan) {
+ logger.debug("Memory already planned.");
+ return;
+ }
+
+ // Group fragments by node.
+
+ Map<String, Collection<PhysicalOperator>> nodeMap = buildBufferedOpMap();
+
+ // Memory must be symmetric to avoid bottlenecks in which one node has
+ // sorts (say) with less memory than another, causing skew in data arrival
+ // rates for downstream operators.
+
+ int width = countBufferingOperators(nodeMap);
+
+ // Then, share memory evenly across the
+ // all sort operators on that node. This handles asymmetric distribution
+ // such as occurs if a sort appears in the root fragment (the one with
+ // screen),
+ // which is never parallelized.
+
+ for (Entry<String, Collection<PhysicalOperator>> entry : nodeMap.entrySet()) {
+ planNodeMemory(entry.getKey(), entry.getValue(), width);
+ }
+ }
+
+ private int countBufferingOperators(
+ Map<String, Collection<PhysicalOperator>> nodeMap) {
+ int width = 0;
+ for (Collection<PhysicalOperator> fragSorts : nodeMap.values()) {
+ width = Math.max(width, fragSorts.size());
+ }
+ return width;
+ }
+
+ /**
+ * Given the set of buffered operators (from any number of fragments) on a
+ * single node, shared the per-query memory equally across all the
+ * operators.
+ *
+ * @param nodeAddr
+ * @param bufferedOps
+ * @param width
+ */
+
+ private void planNodeMemory(String nodeAddr,
+ Collection<PhysicalOperator> bufferedOps, int width) {
+
+ // If no buffering operators, nothing to plan.
+
+ if (bufferedOps.isEmpty()) {
+ return;
+ }
+
+ // Divide node memory evenly among the set of operators, in any minor
+ // fragment, on the node. This is not very sophisticated: it does not
+ // deal with, say, three stacked sorts in which, if sort A runs, then
+ // B may be using memory, but C cannot be active. That kind of analysis
+ // is left as a later exercise.
+
+ long nodeMemory = queryMemoryPerNode();
+
+ // Set a floor on the amount of memory per operator based on the
+ // configured minimum. This is likely unhelpful because we are trying
+ // to work around constrained memory by assuming more than we actually
+ // have. This may lead to an OOM at run time.
+
+ long preferredOpMemory = nodeMemory / width;
+ long perOpMemory = Math.max(preferredOpMemory, rm.minimumOperatorMemory());
+ if (preferredOpMemory < perOpMemory) {
+ logger.warn("Preferred per-operator memory: {}, actual amount: {}",
+ preferredOpMemory, perOpMemory);
+ }
+ logger.debug(
+ "Query: {}, Node: {}, allocating {} bytes each for {} buffered operator(s).",
+ QueryIdHelper.getQueryId(queryContext.getQueryId()), nodeAddr,
+ perOpMemory, width);
+
+ for (PhysicalOperator op : bufferedOps) {
+
+ // Limit the memory to the maximum in the plan. Doing so is
+ // likely unnecessary, and perhaps harmful, because the pre-planned
+ // allocation is the default maximum hard-coded to 10 GB. This means
+ // that even if 20 GB is available to the sort, it won't use more
+ // than 10GB. This is probably more of a bug than a feature.
+
+ long alloc = Math.min(perOpMemory, op.getMaxAllocation());
+
+ // Place a floor on the memory that is the initial allocation,
+ // since we don't want the operator to run out of memory when it
+ // first starts.
+
+ alloc = Math.max(alloc, op.getInitialAllocation());
+
+ if (alloc > preferredOpMemory && alloc != perOpMemory) {
+ logger.warn("Allocated memory of {} for {} exceeds available memory of {} " +
+ "due to operator minimum",
+ alloc, op.getClass().getSimpleName(), preferredOpMemory);
+ }
+ else if (alloc < preferredOpMemory) {
+ logger.warn("Allocated memory of {} for {} is less than available memory " +
+ "of {} due to operator limit",
+ alloc, op.getClass().getSimpleName(), preferredOpMemory);
+ }
+ op.setMaxAllocation(alloc);
+ }
+ }
+
+ protected long queryMemoryPerNode() {
+ return rm.defaultQueryMemoryPerNode(plan.totalCost());
+ }
+
+ /**
+ * Build a list of external sorts grouped by node. We start with a list of
+ * minor fragments, each with an endpoint (node). Multiple minor fragments
+ * may appear on each node, and each minor fragment may have 0, 1 or more
+ * sorts.
+ *
+ * @return
+ */
+
+ private Map<String, Collection<PhysicalOperator>> buildBufferedOpMap() {
+ Multimap<String, PhysicalOperator> map = ArrayListMultimap.create();
+ getBufferedOps(map, work.getRootFragmentDefn());
+ for (MinorFragmentDefn defn : work.getMinorFragmentDefns()) {
+ getBufferedOps(map, defn);
+ }
+ return map.asMap();
+ }
+
+ /**
+ * Searches a fragment operator tree to find buffered within that fragment.
+ */
+
+ protected static class BufferedOpFinder extends
+ AbstractPhysicalVisitor<Void, List<PhysicalOperator>, RuntimeException> {
+ @Override
+ public Void visitOp(PhysicalOperator op, List<PhysicalOperator> value)
+ throws RuntimeException {
+ if (op.isBufferedOperator()) {
+ value.add(op);
+ }
+ visitChildren(op, value);
+ return null;
+ }
+ }
+
+ private void getBufferedOps(Multimap<String, PhysicalOperator> map,
+ MinorFragmentDefn defn) {
+ List<PhysicalOperator> bufferedOps = getBufferedOps(defn.root());
+ if (!bufferedOps.isEmpty()) {
+ map.putAll(defn.fragment().getAssignment().getAddress(), bufferedOps);
+ }
+ }
+
+ /**
+ * Search an individual fragment tree to find any buffered operators it may
+ * contain.
+ *
+ * @param root
+ * @return
+ */
+
+ private List<PhysicalOperator> getBufferedOps(FragmentRoot root) {
+ List<PhysicalOperator> bufferedOps = new ArrayList<>();
+ BufferedOpFinder finder = new BufferedOpFinder();
+ root.accept(finder, bufferedOps);
+ return bufferedOps;
+ }
+ }
+
+ /**
+ * Per-query resource manager. Handles resources and optional queue lease for
+ * a single query. As such, this is a non-shared resource: it is associated
+ * with a Foreman: a single thread at plan time, and a single event (in some
+ * thread) at query completion time. Because of these semantics, no
+ * synchronization is needed within this class.
+ */
+
+ public static class QueuedQueryResourceManager extends QueuedResourceAllocator
+ implements QueryResourceManager {
+
+ private final Foreman foreman;
+ private QueueLease lease;
+
+ public QueuedQueryResourceManager(final ThrottledResourceManager rm,
+ final Foreman foreman) {
+ super(rm, foreman.getQueryContext());
+ this.foreman = foreman;
+ }
+
+ @Override
+ public void setCost(double cost) {
+ this.queryCost = cost;
+ }
+
+ @Override
+ public void admit() throws QueueTimeoutException, QueryQueueException {
+ lease = rm.queue().enqueue(foreman.getQueryId(), queryCost);
+ }
+
+ @Override
+ protected long queryMemoryPerNode() {
+
+ // No lease: use static estimate.
+
+ if (lease == null) {
+ return super.queryMemoryPerNode();
+ }
+
+ // Use actual memory assigned to this query.
+
+ return lease.queryMemoryPerNode();
+ }
+
+ @Override
+ public void exit() {
+ if (lease != null) {
+ lease.release();
+ }
+ lease = null;
+ }
+
+ @Override
+ public boolean hasQueue() { return true; }
+
+ @Override
+ public String queueName() {
+ return lease == null ? null : lease.queueName();
+ }
+ }
+
+ private final QueryQueue queue;
+
+ public ThrottledResourceManager(final DrillbitContext drillbitContext,
+ final QueryQueue queue) {
+ super(drillbitContext);
+ this.queue = queue;
+ queue.setMemoryPerNode(memoryPerNode());
+ }
+
+ public long minimumOperatorMemory() {
+ return queue.minimumOperatorMemory();
+ }
+
+ public long defaultQueryMemoryPerNode(double cost) {
+ return queue.defaultQueryMemoryPerNode(cost);
+ }
+
+ public QueryQueue queue() { return queue; }
+
+ @Override
+ public QueryResourceAllocator newResourceAllocator(
+ QueryContext queryContext) {
+ return new QueuedResourceAllocator(this, queryContext);
+ }
+
+ @Override
+ public QueryResourceManager newQueryRM(Foreman foreman) {
+ return new QueuedQueryResourceManager(this, foreman);
+ }
+
+ @Override
+ public void close() {
+ queue.close();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/package-info.java
new file mode 100644
index 000000000..0b9e9da63
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/package-info.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Provides resource management and queuing support for the Drill foreman.
+ * The resource manager tracks total resources available to Drill. Several
+ * implementations are available: a default implementation for systems without
+ * queueing and an access-controlled (AC) version for systems with queues.
+ * <p>
+ * Each resource manager provides a per-query manager that is responsible
+ * for queuing the query (if needed) and memory allocation to the query based
+ * on query characteristics and memory assigned to the query.
+ * <p>
+ * Provides two different queue implementations. A distributed ZooKeeper queue
+ * and a local queue useful for embedded Drillbits (and for testing.)
+ */
+package org.apache.drill.exec.work.foreman.rm;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
index 2f945d85f..29b3580d3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -35,9 +35,9 @@ import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments;
import org.apache.drill.exec.rpc.UserClientConnection;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.util.MemoryAllocationUtilities;
import org.apache.drill.exec.util.Pointer;
import org.apache.drill.exec.work.QueryWorkUnit;
+import org.apache.drill.exec.work.foreman.rm.QueryResourceAllocator;
import com.google.common.collect.Lists;
@@ -60,6 +60,7 @@ public class PlanSplitter {
* @param connection
* @return
*/
+ @SuppressWarnings("resource")
public QueryPlanFragments planFragments(DrillbitContext dContext, QueryId queryId,
GetQueryPlanFragments req, UserClientConnection connection) {
QueryPlanFragments.Builder responseBuilder = QueryPlanFragments.newBuilder();
@@ -97,7 +98,8 @@ public class PlanSplitter {
throw new IllegalStateException("Planning fragments supports only SQL or PHYSICAL QueryType");
}
- MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext);
+ QueryResourceAllocator planner = dContext.getResourceManager().newResourceAllocator(queryContext);
+ planner.visitAbstractPlan(plan);
final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
@@ -113,6 +115,8 @@ public class PlanSplitter {
queryContext.getSession(), queryContext.getQueryContextInfo());
for (QueryWorkUnit queryWorkUnit : queryWorkUnits) {
+ planner.visitPhysicalPlan(queryWorkUnit);
+ queryWorkUnit.applyPlan(dContext.getPlanReader());
fragments.add(queryWorkUnit.getRootFragment());
List<PlanFragment> childFragments = queryWorkUnit.getFragments();
@@ -122,8 +126,10 @@ public class PlanSplitter {
}
} else {
final QueryWorkUnit queryWorkUnit = parallelizer.getFragments(queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(),
- queryId, queryContext.getActiveEndpoints(), dContext.getPlanReader(), rootFragment,
+ queryId, queryContext.getActiveEndpoints(), rootFragment,
queryContext.getSession(), queryContext.getQueryContextInfo());
+ planner.visitPhysicalPlan(queryWorkUnit);
+ queryWorkUnit.applyPlan(dContext.getPlanReader());
fragments.add(queryWorkUnit.getRootFragment());
fragments.addAll(queryWorkUnit.getFragments());
}