diff options
author | Paul Rogers <progers@maprtech.com> | 2017-08-30 14:32:17 -0700 |
---|---|---|
committer | Paul Rogers <progers@maprtech.com> | 2017-10-09 15:58:20 -0700 |
commit | bbc42240483a0658691149aea3c509ccd0db4c79 (patch) | |
tree | 471957d30d786e3f384ffb882a088357a6acf97e /exec/java-exec/src/main/java | |
parent | a03f5429e368cf73286eec6101871f6e61a5b7d1 (diff) |
DRILL-5716: Queue-driven memory allocation
* Creates new core resource management and query queue abstractions.
* Adds queue information to the Protobuf layer.
* Foreman and Planner changes
- Abstracts memory management out to the new resource management layer.
This means deferring generating the physical plan JSON to later in the
process after memory planning.
* Web UI changes
* Adds queue information to the main page and the profile page to each
query.
* Also sorts the list of options displayed in the Web UI.
- Added memory reserve
A new config parameter, exec.queue.memory_reserve_ratio, sets aside a
slice of total memory for operators that do not participate in the
memory assignment process. The default is 20% testing will tell us if
that value should be larger or smaller.
* Additional minor fixes
- Code cleanup.
- Added mechanism to abandon lease release during shutdown.
- Log queue configuration only when the config changes, rather than on
every query.
- Apply Boaz’ option to enforce a minimum memory allocation per
operator.
- Additional logging to help testers see what is happening.
closes #928
Diffstat (limited to 'exec/java-exec/src/main/java')
47 files changed, 2221 insertions, 375 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index 72a73fcfe..9890b45ca 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -350,7 +350,7 @@ public final class ExecConstants { public static final LongValidator MAX_QUERY_MEMORY_PER_NODE = new RangeLongValidator(MAX_QUERY_MEMORY_PER_NODE_KEY, 1024 * 1024, Long.MAX_VALUE); /** - * Minimum memory alocated to each buffered operator instance. + * Minimum memory allocated to each buffered operator instance. * <p/> * DEFAULT: 40 MB */ @@ -377,12 +377,31 @@ public final class ExecConstants { public static final String AVERAGE_FIELD_WIDTH_KEY = "planner.memory.average_field_width"; public static final OptionValidator AVERAGE_FIELD_WIDTH = new PositiveLongValidator(AVERAGE_FIELD_WIDTH_KEY, Long.MAX_VALUE); + // Resource management boot-time options. + + public static final String MAX_MEMORY_PER_NODE = "drill.exec.rm.memory_per_node"; + public static final String MAX_CPUS_PER_NODE = "drill.exec.rm.cpus_per_node"; + + // Resource management system run-time options. + + // Enables queues. When running embedded, enables an in-process queue. When + // running distributed, enables the Zookeeper-based distributed queue. + public static final BooleanValidator ENABLE_QUEUE = new BooleanValidator("exec.queue.enable"); - public static final LongValidator LARGE_QUEUE_SIZE = new PositiveLongValidator("exec.queue.large", 1000); - public static final LongValidator SMALL_QUEUE_SIZE = new PositiveLongValidator("exec.queue.small", 100000); + public static final LongValidator LARGE_QUEUE_SIZE = new PositiveLongValidator("exec.queue.large", 10_000); + public static final LongValidator SMALL_QUEUE_SIZE = new PositiveLongValidator("exec.queue.small", 100_000); public static final LongValidator QUEUE_THRESHOLD_SIZE = new PositiveLongValidator("exec.queue.threshold", Long.MAX_VALUE); public static final LongValidator QUEUE_TIMEOUT = new PositiveLongValidator("exec.queue.timeout_millis", Long.MAX_VALUE); + // Ratio of memory for small queries vs. large queries. + // Each small query gets 1 unit, each large query gets QUEUE_MEMORY_RATIO units. + // A lower limit of 1 enforces the intuition that a large query should never get + // *less* memory than a small one. + + public static final DoubleValidator QUEUE_MEMORY_RATIO = new RangeDoubleValidator("exec.queue.memory_ratio", 1.0, 1000); + + public static final DoubleValidator QUEUE_MEMORY_RESERVE = new RangeDoubleValidator("exec.queue.memory_reserve_ratio", 0, 1.0); + public static final String ENABLE_VERBOSE_ERRORS_KEY = "exec.errors.verbose"; public static final OptionValidator ENABLE_VERBOSE_ERRORS = new BooleanValidator(ENABLE_VERBOSE_ERRORS_KEY); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfig.java index 35c4a06c3..36f53ab1b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/coord/store/TransientStoreConfig.java @@ -53,6 +53,7 @@ public class TransientStoreConfig<V> { @Override public boolean equals(Object obj) { if (obj instanceof TransientStoreConfig && obj.getClass().equals(getClass())) { + @SuppressWarnings("unchecked") final TransientStoreConfig<V> other = (TransientStoreConfig<V>)obj; return Objects.equal(name, other.name) && Objects.equal(serializer, other.serializer); } @@ -70,5 +71,4 @@ public class TransientStoreConfig<V> { public static <V> TransientStoreConfigBuilder<V> newJacksonBuilder(final ObjectMapper mapper, final Class<V> klazz) { return TransientStoreConfig.<V>newBuilder().serializer(new JacksonSerializer<>(mapper, klazz)); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java index 435f35f12..86074f0b4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/QueryContext.java @@ -60,10 +60,10 @@ import io.netty.buffer.DrillBuf; // TODO - consider re-name to PlanningContext, as the query execution context actually appears // in fragment contexts public class QueryContext implements AutoCloseable, OptimizerRulesContext, SchemaConfigInfoProvider { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryContext.class); private final DrillbitContext drillbitContext; private final UserSession session; + private final QueryId queryId; private final QueryOptionManager queryOptions; private final PlannerSettings plannerSettings; private final ExecutionControls executionControls; @@ -87,6 +87,7 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem public QueryContext(final UserSession session, final DrillbitContext drillbitContext, QueryId queryId) { this.drillbitContext = drillbitContext; this.session = session; + this.queryId = queryId; queryOptions = new QueryOptionManager(session.getOptions()); executionControls = new ExecutionControls(queryOptions, drillbitContext.getEndpoint()); plannerSettings = new PlannerSettings(queryOptions, getFunctionRegistry()); @@ -118,14 +119,12 @@ public class QueryContext implements AutoCloseable, OptimizerRulesContext, Schem return plannerSettings; } - public UserSession getSession() { - return session; - } + public UserSession getSession() { return session; } @Override - public BufferAllocator getAllocator() { - return allocator; - } + public BufferAllocator getAllocator() { return allocator; } + + public QueryId getQueryId( ) { return queryId; } /** * Return reference to default schema instance in a schema tree. Each {@link org.apache.calcite.schema.SchemaPlus} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java index e0902c84b..e5c96a79c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -28,6 +28,7 @@ import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.Root; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyOrder; import com.fasterxml.jackson.core.JsonProcessingException; @@ -87,4 +88,17 @@ public class PhysicalPlan { throw new RuntimeException(e); } } + + public double totalCost() { + double totalCost = 0; + for (final PhysicalOperator ops : getSortedOperators()) { + totalCost += ops.getCost(); + } + return totalCost; + } + + @JsonIgnore + public Graph<PhysicalOperator, Root, Leaf> graph() { + return graph; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java index 32910c352..b972dd3b6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/FragmentRoot.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,9 +17,10 @@ */ package org.apache.drill.exec.physical.base; - /** - * Describes the root operation within a particular Fragment. This includes things Sender nodes. + * Describes the root operation within a particular Fragment. This includes + * things like Sender nodes. */ + public interface FragmentRoot extends FragmentLeaf { } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java index 550dcb251..9c635e9ad 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/HardAffinityFragmentParallelizer.java @@ -66,7 +66,7 @@ public class HardAffinityFragmentParallelizer implements FragmentParallelizer { } } - // Step 1: Find the width taking into various parameters + // Step 1: Find the width taking into account various parameters // 1.1. Find the parallelization based on cost. Use max cost of all operators in this fragment; this is consistent // with the calculation that ExcessiveExchangeRemover uses. int width = (int) Math.ceil(stats.getMaxCost() / parameters.getSliceTarget()); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java index edec7e485..2fc754179 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -28,7 +28,6 @@ import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.Store; import org.apache.drill.exec.physical.base.SubScan; -import org.apache.drill.exec.work.foreman.ForemanException; import com.google.common.collect.Lists; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java index 3e0f35a10..1529b6bf7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/PlanningSet.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -20,8 +20,6 @@ package org.apache.drill.exec.planner.fragment; import java.util.Iterator; import java.util.Map; -import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; - import com.google.common.collect.Maps; public class PlanningSet implements Iterable<Wrapper> { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java index 4584bd555..d2efcfba8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -46,9 +46,9 @@ import org.apache.drill.exec.rpc.user.UserSession; import org.apache.drill.exec.server.options.OptionList; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.work.QueryWorkUnit; +import org.apache.drill.exec.work.QueryWorkUnit.MinorFragmentDefn; import org.apache.drill.exec.work.foreman.ForemanSetupException; -import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -70,7 +70,7 @@ public class SimpleParallelizer implements ParallelizationParameters { public SimpleParallelizer(QueryContext context) { OptionManager optionManager = context.getOptions(); - long sliceTarget = optionManager.getOption(ExecConstants.SLICE_TARGET).num_val; + long sliceTarget = optionManager.getOption(ExecConstants.SLICE_TARGET_OPTION); this.parallelizationThreshold = sliceTarget > 0 ? sliceTarget : 1; double cpu_load_average = optionManager.getOption(ExecConstants.CPU_LOAD_AVERAGE); final long maxWidth = optionManager.getOption(ExecConstants.MAX_WIDTH_PER_NODE); @@ -123,12 +123,12 @@ public class SimpleParallelizer implements ParallelizationParameters { * @throws ExecutionSetupException */ public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, - Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment rootFragment, + Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment, UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException { final PlanningSet planningSet = getFragmentsHelper(activeEndpoints, rootFragment); return generateWorkUnit( - options, foremanNode, queryId, reader, rootFragment, planningSet, session, queryContextInfo); + options, foremanNode, queryId, rootFragment, planningSet, session, queryContextInfo); } /** @@ -150,6 +150,7 @@ public class SimpleParallelizer implements ParallelizationParameters { // no op throw new UnsupportedOperationException("Use children classes"); } + /** * Helper method to reuse the code for QueryWorkUnit(s) generation * @param activeEndpoints @@ -209,6 +210,7 @@ public class SimpleParallelizer implements ParallelizationParameters { // Identify leaf fragments. Leaf fragments are fragments that have no other fragments depending on them for // parallelization info. First assume all fragments are leaf fragments. Go through the fragments one by one and // remove the fragment on which the current fragment depends on. + final Set<Wrapper> roots = Sets.newHashSet(); for(Wrapper w : planningSet) { roots.add(w); @@ -257,11 +259,11 @@ public class SimpleParallelizer implements ParallelizationParameters { } protected QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, - PhysicalPlanReader reader, Fragment rootNode, PlanningSet planningSet, + Fragment rootNode, PlanningSet planningSet, UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException { - List<PlanFragment> fragments = Lists.newArrayList(); + List<MinorFragmentDefn> fragmentDefns = new ArrayList<>( ); - PlanFragment rootFragment = null; + MinorFragmentDefn rootFragmentDefn = null; FragmentRoot rootOperator = null; // now we generate all the individual plan fragments and associated assignments. Note, we need all endpoints @@ -287,16 +289,6 @@ public class SimpleParallelizer implements ParallelizationParameters { Preconditions.checkArgument(op instanceof FragmentRoot); FragmentRoot root = (FragmentRoot) op; - // get plan as JSON - String plan; - String optionsData; - try { - plan = reader.writeJson(root); - optionsData = reader.writeJson(options); - } catch (JsonProcessingException e) { - throw new ForemanSetupException("Failure while trying to convert fragment into json.", e); - } - FragmentHandle handle = FragmentHandle // .newBuilder() // .setMajorFragmentId(wrapper.getMajorFragmentId()) // @@ -306,40 +298,36 @@ public class SimpleParallelizer implements ParallelizationParameters { PlanFragment fragment = PlanFragment.newBuilder() // .setForeman(foremanNode) // - .setFragmentJson(plan) // .setHandle(handle) // .setAssignment(wrapper.getAssignedEndpoint(minorFragmentId)) // .setLeafFragment(isLeafFragment) // .setContext(queryContextInfo) .setMemInitial(wrapper.getInitialAllocation())// .setMemMax(wrapper.getMaxAllocation()) - .setOptionsJson(optionsData) .setCredentials(session.getCredentials()) .addAllCollector(CountRequiredFragments.getCollectors(root)) .build(); + MinorFragmentDefn fragmentDefn = new MinorFragmentDefn(fragment, root, options); + if (isRootNode) { - if (logger.isDebugEnabled()) { - logger.debug("Root fragment:\n {}", DrillStringUtils.unescapeJava(fragment.toString())); - } - rootFragment = fragment; + logger.debug("Root fragment:\n {}", DrillStringUtils.unescapeJava(fragment.toString())); + rootFragmentDefn = fragmentDefn; rootOperator = root; } else { - if (logger.isDebugEnabled()) { - logger.debug("Remote fragment:\n {}", DrillStringUtils.unescapeJava(fragment.toString())); - } - fragments.add(fragment); + logger.debug("Remote fragment:\n {}", DrillStringUtils.unescapeJava(fragment.toString())); + fragmentDefns.add(fragmentDefn); } } } - return new QueryWorkUnit(rootOperator, rootFragment, fragments); + return new QueryWorkUnit(rootOperator, rootFragmentDefn, fragmentDefns); } - /** * Designed to setup initial values for arriving fragment accounting. */ + protected static class CountRequiredFragments extends AbstractPhysicalVisitor<Void, List<Collector>, RuntimeException> { private static final CountRequiredFragments INSTANCE = new CountRequiredFragments(); @@ -357,7 +345,6 @@ public class SimpleParallelizer implements ParallelizationParameters { list.add(ep.getId()); } - collectors.add(Collector.newBuilder() .setIsSpooling(receiver.isSpooling()) .setOppositeMajorFragmentId(receiver.getOppositeMajorFragmentId()) @@ -374,6 +361,5 @@ public class SimpleParallelizer implements ParallelizationParameters { } return null; } - } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java index 644263eea..1549a6bb4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SoftAffinityFragmentParallelizer.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -53,7 +53,6 @@ public class SoftAffinityFragmentParallelizer implements FragmentParallelizer { @Override public void parallelizeFragment(final Wrapper fragmentWrapper, final ParallelizationParameters parameters, final Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException { - final Fragment fragment = fragmentWrapper.getNode(); // Find the parallelization width of fragment final Stats stats = fragmentWrapper.getStats(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java index 395a9e1f4..1eb12967d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java @@ -41,9 +41,9 @@ import org.apache.drill.exec.proto.UserBitShared.QueryId; import org.apache.drill.exec.rpc.user.UserSession; import org.apache.drill.exec.server.options.OptionList; import org.apache.drill.exec.work.QueryWorkUnit; +import org.apache.drill.exec.work.QueryWorkUnit.MinorFragmentDefn; import org.apache.drill.exec.work.foreman.ForemanSetupException; -import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -115,7 +115,6 @@ public class SplittingParallelizer extends SimpleParallelizer { int plansCount = 0; DrillbitEndpoint[] endPoints = null; long initialAllocation = 0; - long maxAllocation = 0; final Iterator<Wrapper> iter = planningSet.iterator(); while (iter.hasNext()) { @@ -131,7 +130,6 @@ public class SplittingParallelizer extends SimpleParallelizer { // allocation plansCount = wrapper.getWidth(); initialAllocation = (wrapper.getInitialAllocation() != 0 ) ? wrapper.getInitialAllocation()/plansCount : 0; - maxAllocation = (wrapper.getMaxAllocation() != 0 ) ? wrapper.getMaxAllocation()/plansCount : 0; endPoints = new DrillbitEndpoint[plansCount]; for (int mfId = 0; mfId < plansCount; mfId++) { endPoints[mfId] = wrapper.getAssignedEndpoint(mfId); @@ -140,7 +138,7 @@ public class SplittingParallelizer extends SimpleParallelizer { } if ( plansCount == 0 ) { // no exchange, return list of single QueryWorkUnit - workUnits.add(generateWorkUnit(options, foremanNode, queryId, reader, rootNode, planningSet, session, queryContextInfo)); + workUnits.add(generateWorkUnit(options, foremanNode, queryId, rootNode, planningSet, session, queryContextInfo)); return workUnits; } @@ -171,9 +169,9 @@ public class SplittingParallelizer extends SimpleParallelizer { // Create a minorFragment for each major fragment. for (int minorFragmentId = 0; minorFragmentId < plansCount; minorFragmentId++) { // those fragments should be empty - List<PlanFragment> fragments = Lists.newArrayList(); + List<MinorFragmentDefn> fragments = Lists.newArrayList(); - PlanFragment rootFragment = null; + MinorFragmentDefn rootFragment = null; FragmentRoot rootOperator = null; IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper); @@ -186,35 +184,25 @@ public class SplittingParallelizer extends SimpleParallelizer { Preconditions.checkArgument(op instanceof FragmentRoot); FragmentRoot root = (FragmentRoot) op; - // get plan as JSON - String plan; - String optionsData; - try { - plan = reader.writeJson(root); - optionsData = reader.writeJson(options); - } catch (JsonProcessingException e) { - throw new ForemanSetupException("Failure while trying to convert fragment into json.", e); - } PlanFragment fragment = PlanFragment.newBuilder() // .setForeman(endPoints[minorFragmentId]) // - .setFragmentJson(plan) // .setHandle(handle) // .setAssignment(endPoints[minorFragmentId]) // .setLeafFragment(isLeafFragment) // .setContext(queryContextInfo) .setMemInitial(initialAllocation)// .setMemMax(wrapper.getMaxAllocation()) // TODO - for some reason OOM is using leaf fragment max allocation divided by width - .setOptionsJson(optionsData) .setCredentials(session.getCredentials()) .addAllCollector(CountRequiredFragments.getCollectors(root)) .build(); + MinorFragmentDefn fragmentDefn = new MinorFragmentDefn(fragment, root, options); if (isRootNode) { if (logger.isDebugEnabled()) { logger.debug("Root fragment:\n {}", DrillStringUtils.unescapeJava(fragment.toString())); } - rootFragment = fragment; + rootFragment = fragmentDefn; rootOperator = root; } else { if (logger.isDebugEnabled()) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java index 80aaf0d7f..a333ff22f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/Drillbit.java @@ -103,6 +103,7 @@ public class Drillbit implements AutoCloseable { this(config, SystemOptionManager.createDefaultOptionDefinitions(), serviceSet, classpathScan); } + @SuppressWarnings("resource") @VisibleForTesting public Drillbit( final DrillConfig config, @@ -122,7 +123,7 @@ public class Drillbit implements AutoCloseable { storeProvider = new CachingPersistentStoreProvider(new LocalPersistentStoreProvider(config)); } else { coord = new ZKClusterCoordinator(config); - storeProvider = new PersistentStoreRegistry(this.coord, config).newPStoreProvider(); + storeProvider = new PersistentStoreRegistry<ClusterCoordinator>(this.coord, config).newPStoreProvider(); isDistributedMode = true; } @@ -157,6 +158,7 @@ public class Drillbit implements AutoCloseable { } final DrillbitEndpoint md = engine.start(); manager.start(md, engine.getController(), engine.getDataConnectionCreator(), coord, storeProvider, profileStoreProvider); + @SuppressWarnings("resource") final DrillbitContext drillbitContext = manager.getContext(); storageRegistry = drillbitContext.getStorage(); storageRegistry.init(); @@ -166,6 +168,10 @@ public class Drillbit implements AutoCloseable { registrationHandle = coord.register(md); webServer.start(); + // Must start the RM after the above since it needs to read system options. + + drillbitContext.startRM(); + Runtime.getRuntime().addShutdownHook(new ShutdownThread(this, new StackTrace())); logger.info("Startup completed ({} ms).", w.elapsed(TimeUnit.MILLISECONDS)); } @@ -224,6 +230,7 @@ public class Drillbit implements AutoCloseable { return; } + @SuppressWarnings("resource") final SystemOptionManager optionManager = getContext().getOptionManager(); // parse out the properties, validate, and then set them @@ -324,7 +331,6 @@ public class Drillbit implements AutoCloseable { return start(config, SystemOptionManager.createDefaultOptionDefinitions(), remoteServiceSet); } - @SuppressWarnings("resource") @VisibleForTesting public static Drillbit start(final DrillConfig config, final CaseInsensitiveMap<OptionDefinition> validators, final RemoteServiceSet remoteServiceSet) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java index 426b9d2ab..b8a8b1e5b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java @@ -39,6 +39,8 @@ import org.apache.drill.exec.server.options.SystemOptionManager; import org.apache.drill.exec.store.SchemaFactory; import org.apache.drill.exec.store.StoragePluginRegistry; import org.apache.drill.exec.store.sys.PersistentStoreProvider; +import org.apache.drill.exec.work.foreman.rm.ResourceManager; +import org.apache.drill.exec.work.foreman.rm.ResourceManagerBuilder; import java.util.Collection; import java.util.concurrent.ExecutorService; @@ -66,6 +68,7 @@ public class DrillbitContext implements AutoCloseable { // operator table for standard SQL operators and functions, Drill built-in UDFs private final DrillOperatorTable table; private final QueryProfileStoreContext profileStoreContext; + private ResourceManager resourceManager; public DrillbitContext( DrillbitEndpoint endpoint, @@ -88,7 +91,7 @@ public class DrillbitContext implements AutoCloseable { WorkEventBus workBus, PersistentStoreProvider provider, PersistentStoreProvider profileStoreProvider) { - this.classpathScan = context.getClasspathScan(); + classpathScan = context.getClasspathScan(); this.workBus = workBus; this.controller = checkNotNull(controller); this.context = checkNotNull(context); @@ -96,29 +99,40 @@ public class DrillbitContext implements AutoCloseable { this.connectionsPool = checkNotNull(connectionsPool); this.endpoint = checkNotNull(endpoint); this.provider = provider; - this.lpPersistence = new LogicalPlanPersistence(context.getConfig(), classpathScan); + DrillConfig config = context.getConfig(); + lpPersistence = new LogicalPlanPersistence(config, classpathScan); - // TODO remove escaping "this". - this.storagePlugins = context.getConfig() + storagePlugins = config .getInstance(StoragePluginRegistry.STORAGE_PLUGIN_REGISTRY_IMPL, StoragePluginRegistry.class, this); - this.reader = new PhysicalPlanReader(context.getConfig(), classpathScan, lpPersistence, endpoint, storagePlugins); - this.operatorCreatorRegistry = new OperatorCreatorRegistry(classpathScan); - this.systemOptions = new SystemOptionManager(lpPersistence, provider, context.getConfig(), context.getDefinitions()); - this.functionRegistry = new FunctionImplementationRegistry(context.getConfig(), classpathScan, systemOptions); - this.compiler = new CodeCompiler(context.getConfig(), systemOptions); + reader = new PhysicalPlanReader(config, classpathScan, lpPersistence, endpoint, storagePlugins); + operatorCreatorRegistry = new OperatorCreatorRegistry(classpathScan); + systemOptions = new SystemOptionManager(lpPersistence, provider, config, context.getDefinitions()); + functionRegistry = new FunctionImplementationRegistry(config, classpathScan, systemOptions); + compiler = new CodeCompiler(config, systemOptions); // This operator table is built once and used for all queries which do not need dynamic UDF support. - this.table = new DrillOperatorTable(functionRegistry, systemOptions); + table = new DrillOperatorTable(functionRegistry, systemOptions); //This profile store context is built from the profileStoreProvider - this.profileStoreContext = new QueryProfileStoreContext(context.getConfig(), profileStoreProvider, coord); + profileStoreContext = new QueryProfileStoreContext(context.getConfig(), profileStoreProvider, coord); } public QueryProfileStoreContext getProfileStoreContext() { return profileStoreContext; } + /** + * Starts the resource manager. Must be called separately from the + * constructor after the system property mechanism is initialized + * since the builder will consult system options to determine the + * proper RM to use. + */ + + public void startRM() { + resourceManager = new ResourceManagerBuilder(this).build(); + } + public FunctionImplementationRegistry getFunctionImplementationRegistry() { return functionRegistry; } @@ -243,4 +257,8 @@ public class DrillbitContext implements AutoCloseable { getRemoteFunctionRegistry().close(); getCompiler().close(); } + + public ResourceManager getResourceManager() { + return resourceManager; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java index f2d352c98..7d30b5bd3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/DrillConfigIterator.java @@ -73,6 +73,9 @@ public class DrillConfigIterator implements Iterable<OptionValue> { case NULL: throw new IllegalStateException("Config value \"" + name + "\" has NULL type"); + + default: + throw new IllegalStateException("Unknown type: " + cv.valueType()); } return optionValue; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java index 8851e1a37..e99645d2c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/OptionList.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.server.options; import java.util.ArrayList; +@SuppressWarnings("serial") public class OptionList extends ArrayList<OptionValue>{ public void merge(OptionList list){ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java index 52bf40327..635a1ae40 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SessionOptionManager.java @@ -17,14 +17,15 @@ */ package org.apache.drill.exec.server.options; -import com.google.common.base.Predicate; -import com.google.common.collect.Collections2; +import java.util.Collection; +import java.util.Map; + import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.drill.common.map.CaseInsensitiveMap; import org.apache.drill.exec.rpc.user.UserSession; -import java.util.Collection; -import java.util.Map; +import com.google.common.base.Predicate; +import com.google.common.collect.Collections2; /** * {@link OptionManager} that holds options within {@link org.apache.drill.exec.rpc.user.UserSession} context. Options diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index b0863eefe..bda6033bd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -160,6 +160,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea new OptionDefinition(ExecConstants.QUEUE_THRESHOLD_SIZE), new OptionDefinition(ExecConstants.QUEUE_TIMEOUT), new OptionDefinition(ExecConstants.SMALL_QUEUE_SIZE), + new OptionDefinition(ExecConstants.QUEUE_MEMORY_RESERVE, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)), + new OptionDefinition(ExecConstants.QUEUE_MEMORY_RATIO, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)), new OptionDefinition(ExecConstants.MIN_HASH_TABLE_SIZE), new OptionDefinition(ExecConstants.MAX_HASH_TABLE_SIZE), new OptionDefinition(ExecConstants.EARLY_LIMIT0_OPT), @@ -418,6 +420,11 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea @Override public void close() throws Exception { - options.close(); + // If the server exits very early, the options may not yet have + // been created. Gracefully handle that case. + + if (options != null) { + options.close(); + } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java index 238f5cacf..6eb47e659 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRestServer.java @@ -86,7 +86,6 @@ public class DrillRestServer extends ResourceConfig { register(MultiPartFeature.class); property(ServerProperties.METAINF_SERVICES_LOOKUP_DISABLE, true); - final boolean isAuthEnabled = workManager.getContext().getConfig().getBoolean(ExecConstants.USER_AUTHENTICATION_ENABLED); @@ -152,7 +151,6 @@ public class DrillRestServer extends ResourceConfig { return configuration; } - public static class AuthWebUserConnectionProvider implements Factory<WebUserConnection> { @Inject @@ -161,6 +159,7 @@ public class DrillRestServer extends ResourceConfig { @Inject WorkManager workManager; + @SuppressWarnings("resource") @Override public WebUserConnection provide() { final HttpSession session = request.getSession(); @@ -228,6 +227,7 @@ public class DrillRestServer extends ResourceConfig { @Inject WorkManager workManager; + @SuppressWarnings("resource") @Override public WebUserConnection provide() { final DrillbitContext drillbitContext = workManager.getContext(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java index 84c471e12..70b82ca92 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/DrillRoot.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -32,8 +32,15 @@ import com.google.common.collect.Sets; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.rest.DrillRestServer.UserAuthEnabled; import org.apache.drill.exec.work.WorkManager; +import org.apache.drill.exec.work.foreman.rm.DistributedQueryQueue; +import org.apache.drill.exec.work.foreman.rm.DistributedQueryQueue.ZKQueueInfo; +import org.apache.drill.exec.work.foreman.rm.DynamicResourceManager; +import org.apache.drill.exec.work.foreman.rm.QueryQueue; +import org.apache.drill.exec.work.foreman.rm.ResourceManager; +import org.apache.drill.exec.work.foreman.rm.ThrottledResourceManager; import org.glassfish.jersey.server.mvc.Viewable; import com.fasterxml.jackson.annotation.JsonCreator; @@ -53,6 +60,7 @@ public class DrillRoot { return ViewableWithPermissions.create(authEnabled.get(), "/rest/index.ftl", sc, getClusterInfoJSON()); } + @SuppressWarnings("resource") @GET @Path("/cluster.json") @Produces(MediaType.APPLICATION_JSON) @@ -60,10 +68,11 @@ public class DrillRoot { final Collection<DrillbitInfo> drillbits = Sets.newTreeSet(); final Collection<String> mismatchedVersions = Sets.newTreeSet(); - final DrillbitEndpoint currentDrillbit = work.getContext().getEndpoint(); + final DrillbitContext dbContext = work.getContext(); + final DrillbitEndpoint currentDrillbit = dbContext.getEndpoint(); final String currentVersion = currentDrillbit.getVersion(); - final DrillConfig config = work.getContext().getConfig(); + final DrillConfig config = dbContext.getConfig(); final boolean userEncryptionEnabled = config.getBoolean(ExecConstants.USER_ENCRYPTION_SASL_ENABLED); final boolean bitEncryptionEnabled = config.getBoolean(ExecConstants.BIT_ENCRYPTION_SASL_ENABLED); @@ -77,8 +86,90 @@ public class DrillRoot { drillbits.add(drillbit); } - return new ClusterInfo(drillbits, currentVersion, mismatchedVersions, - userEncryptionEnabled, bitEncryptionEnabled); + return new ClusterInfo(drillbits, currentVersion, mismatchedVersions, + userEncryptionEnabled, bitEncryptionEnabled, + QueueInfo.build(dbContext.getResourceManager())); + } + + /** + * Pretty-printing wrapper class around the ZK-based queue summary. + */ + + @XmlRootElement + public static class QueueInfo { + private final ZKQueueInfo zkQueueInfo; + + public static QueueInfo build(ResourceManager rm) { + + // Consider queues enabled only if the ZK-based queues are in use. + + ThrottledResourceManager throttledRM = null; + if (rm != null && rm instanceof DynamicResourceManager) { + DynamicResourceManager dynamicRM = (DynamicResourceManager) rm; + rm = dynamicRM.activeRM(); + } + if (rm != null && rm instanceof ThrottledResourceManager) { + throttledRM = (ThrottledResourceManager) rm; + } + if (throttledRM == null) { + return new QueueInfo(null); + } + QueryQueue queue = throttledRM.queue(); + if (queue == null || !(queue instanceof DistributedQueryQueue)) { + return new QueueInfo(null); + } + + return new QueueInfo(((DistributedQueryQueue) queue).getInfo()); + } + + @JsonCreator + public QueueInfo(ZKQueueInfo queueInfo) { + zkQueueInfo = queueInfo; + } + + public boolean isEnabled() { return zkQueueInfo != null; } + + public int smallQueueSize() { + return isEnabled() ? zkQueueInfo.smallQueueSize : 0; + } + + public int largeQueueSize() { + return isEnabled() ? zkQueueInfo.largeQueueSize : 0; + } + + public String threshold() { + return isEnabled() + ? Double.toString(zkQueueInfo.queueThreshold) + : "N/A"; + } + + public String smallQueueMemory() { + return isEnabled() + ? toBytes(zkQueueInfo.memoryPerSmallQuery) + : "N/A"; + } + + public String largeQueueMemory() { + return isEnabled() + ? toBytes(zkQueueInfo.memoryPerLargeQuery) + : "N/A"; + } + + public String totalMemory() { + return isEnabled() + ? toBytes(zkQueueInfo.memoryPerNode) + : "N/A"; + } + + private final long ONE_MB = 1024 * 1024; + + private String toBytes(long memory) { + if (memory < 10 * ONE_MB) { + return String.format("%,d bytes", memory); + } else { + return String.format("%,.0f MB", memory * 1.0D / ONE_MB); + } + } } @XmlRootElement @@ -88,18 +179,21 @@ public class DrillRoot { private final Collection<String> mismatchedVersions; private final boolean userEncryptionEnabled; private final boolean bitEncryptionEnabled; + private final QueueInfo queueInfo; @JsonCreator public ClusterInfo(Collection<DrillbitInfo> drillbits, String currentVersion, Collection<String> mismatchedVersions, boolean userEncryption, - boolean bitEncryption) { + boolean bitEncryption, + QueueInfo queueInfo) { this.drillbits = Sets.newTreeSet(drillbits); this.currentVersion = currentVersion; this.mismatchedVersions = Sets.newTreeSet(mismatchedVersions); this.userEncryptionEnabled = userEncryption; this.bitEncryptionEnabled = bitEncryption; + this.queueInfo = queueInfo; } public Collection<DrillbitInfo> getDrillbits() { @@ -117,6 +211,8 @@ public class DrillRoot { public boolean isUserEncryptionEnabled() { return userEncryptionEnabled; } public boolean isBitEncryptionEnabled() { return bitEncryptionEnabled; } + + public QueueInfo queueInfo() { return queueInfo; } } public static class DrillbitInfo implements Comparable<DrillbitInfo> { @@ -139,9 +235,7 @@ public class DrillRoot { this.versionMatch = versionMatch; } - public String getAddress() { - return address; - } + public String getAddress() { return address; } public String getUserPort() { return userPort; } @@ -149,26 +243,20 @@ public class DrillRoot { public String getDataPort() { return dataPort; } - public String getVersion() { - return version; - } + public String getVersion() { return version; } - public boolean isCurrent() { - return current; - } + public boolean isCurrent() { return current; } - public boolean isVersionMatch() { - return versionMatch; - } + public boolean isVersionMatch() { return versionMatch; } /** - * Method used to sort drillbits. Current drillbit goes first. - * Then drillbits with matching versions, after them drillbits with mismatching versions. - * Matching drillbits are sorted according address natural order, - * mismatching drillbits are sorted according version, address natural order. + * Method used to sort Drillbits. Current Drillbit goes first. + * Then Drillbits with matching versions, after them Drillbits with mismatching versions. + * Matching Drillbits are sorted according address natural order, + * mismatching Drillbits are sorted according version, address natural order. * - * @param drillbitToCompare drillbit to compare against - * @return -1 if drillbit should be before, 1 if after in list + * @param drillbitToCompare Drillbit to compare against + * @return -1 if Drillbit should be before, 1 if after in list */ @Override public int compareTo(DrillbitInfo drillbitToCompare) { @@ -189,5 +277,4 @@ public class DrillRoot { return this.versionMatch ? -1 : 1; } } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/GenericExceptionMapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/GenericExceptionMapper.java index cc00e9b90..7bf7c90e7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/GenericExceptionMapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/GenericExceptionMapper.java @@ -20,22 +20,21 @@ package org.apache.drill.exec.server.rest; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.ext.ExceptionMapper; -import javax.ws.rs.ext.Provider; public class GenericExceptionMapper implements ExceptionMapper<Throwable> { - @Override - public Response toResponse(Throwable throwable) { - return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()) - .entity(new GenericErrorMessage(throwable.getMessage())) - .type(MediaType.APPLICATION_JSON_TYPE) - .build(); - } + @Override + public Response toResponse(Throwable throwable) { + return Response + .status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()) + .entity(new GenericErrorMessage(throwable.getMessage())) + .type(MediaType.APPLICATION_JSON_TYPE).build(); + } - public static class GenericErrorMessage { - public final String errorMessage; + public static class GenericErrorMessage { + public final String errorMessage; - public GenericErrorMessage(String errorMessage) { - this.errorMessage = errorMessage; - } + public GenericErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; } + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogsResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogsResources.java index 16d213a8b..d24f03a86 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogsResources.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/LogsResources.java @@ -111,6 +111,7 @@ public class LogsResources { final int maxLines = work.getContext().getOptionManager().getOption(ExecConstants.WEB_LOGS_MAX_LINES).num_val.intValue(); try (BufferedReader br = new BufferedReader(new FileReader(file))) { + @SuppressWarnings("serial") Map<Integer, String> cache = new LinkedHashMap<Integer, String>(maxLines, .75f, true) { @Override protected boolean removeEldestEntry(Map.Entry<Integer, String> eldest) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StatusResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StatusResources.java index 4042a818f..e6b116ccd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StatusResources.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StatusResources.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.server.rest; +import java.util.Collections; +import java.util.Comparator; import java.util.LinkedList; import java.util.List; @@ -80,6 +82,7 @@ public class StatusResources { return ViewableWithPermissions.create(authEnabled.get(), "/rest/status.ftl", sc, getStatusJSON()); } + @SuppressWarnings("resource") private List<OptionWrapper> getSystemOptionsJSONHelper(boolean internal) { List<OptionWrapper> options = new LinkedList<>(); @@ -90,6 +93,12 @@ public class StatusResources { options.add(new OptionWrapper(option.name, option.getValue(), option.accessibleScopes, option.kind, option.scope)); } + Collections.sort(options, new Comparator<OptionWrapper>() { + @Override + public int compare(OptionWrapper o1, OptionWrapper o2) { + return o1.name.compareTo(o2.name); + } + }); return options; } @@ -132,6 +141,7 @@ public class StatusResources { return getSystemOptionsHelper(true); } + @SuppressWarnings("resource") @POST @Path("option/{optionName}") @RolesAllowed(DrillUserPrincipal.ADMIN_ROLE) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java index ce26a316b..cf5eb57c9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/StorageResources.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -93,6 +93,7 @@ public class StorageResources { return ViewableWithPermissions.create(authEnabled.get(), "/rest/storage/list.ftl", sc, list); } + @SuppressWarnings("resource") @GET @Path("/storage/{name}.json") @Produces(MediaType.APPLICATION_JSON) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java index 7c376f785..69f2cab76 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/WebServer.java @@ -138,6 +138,7 @@ public class WebServer implements AutoCloseable { * Start the web server including setup. * @throws Exception */ + @SuppressWarnings("resource") public void start() throws Exception { if (!config.getBoolean(ExecConstants.HTTP_ENABLE)) { return; @@ -266,6 +267,7 @@ public class WebServer implements AutoCloseable { } // Clear all the resources allocated for this session + @SuppressWarnings("resource") final WebSessionResources webSessionResources = (WebSessionResources) session.getAttribute(WebSessionResources.class.getSimpleName()); @@ -322,7 +324,7 @@ public class WebServer implements AutoCloseable { * Create an HTTPS connector for given jetty server instance. If the admin has specified keystore/truststore settings * they will be used else a self-signed certificate is generated and used. * - * @return Initialized {@link ServerConnector} for HTTPS connectios. + * @return Initialized {@link ServerConnector} for HTTPS connections. * @throws Exception */ private ServerConnector createHttpsConnector(int port) throws Exception { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java index fa2f43fa7..a61522a68 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/Comparators.java @@ -31,78 +31,91 @@ import org.apache.drill.exec.proto.UserBitShared.OperatorProfile; interface Comparators { final static Comparator<MajorFragmentProfile> majorId = new Comparator<MajorFragmentProfile>() { + @Override public int compare(final MajorFragmentProfile o1, final MajorFragmentProfile o2) { return Long.compare(o1.getMajorFragmentId(), o2.getMajorFragmentId()); } }; final static Comparator<MinorFragmentProfile> minorId = new Comparator<MinorFragmentProfile>() { + @Override public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) { return Long.compare(o1.getMinorFragmentId(), o2.getMinorFragmentId()); } }; final static Comparator<MinorFragmentProfile> startTime = new Comparator<MinorFragmentProfile>() { + @Override public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) { return Long.compare(o1.getStartTime(), o2.getStartTime()); } }; final static Comparator<MinorFragmentProfile> lastUpdate = new Comparator<MinorFragmentProfile>() { + @Override public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) { return Long.compare(o1.getLastUpdate(), o2.getLastUpdate()); } }; final static Comparator<MinorFragmentProfile> lastProgress = new Comparator<MinorFragmentProfile>() { + @Override public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) { return Long.compare(o1.getLastProgress(), o2.getLastProgress()); } }; final static Comparator<MinorFragmentProfile> endTime = new Comparator<MinorFragmentProfile>() { + @Override public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) { return Long.compare(o1.getEndTime(), o2.getEndTime()); } }; final static Comparator<MinorFragmentProfile> fragmentPeakMemory = new Comparator<MinorFragmentProfile>() { + @Override public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) { return Long.compare(o1.getMaxMemoryUsed(), o2.getMaxMemoryUsed()); } }; final static Comparator<MinorFragmentProfile> runTime = new Comparator<MinorFragmentProfile>() { + @Override public int compare(final MinorFragmentProfile o1, final MinorFragmentProfile o2) { return Long.compare(o1.getEndTime() - o1.getStartTime(), o2.getEndTime() - o2.getStartTime()); } }; final static Comparator<OperatorProfile> operatorId = new Comparator<OperatorProfile>() { + @Override public int compare(final OperatorProfile o1, final OperatorProfile o2) { return Long.compare(o1.getOperatorId(), o2.getOperatorId()); } }; final static Comparator<Pair<OperatorProfile, Integer>> setupTime = new Comparator<Pair<OperatorProfile, Integer>>() { + @Override public int compare(final Pair<OperatorProfile, Integer> o1, final Pair<OperatorProfile, Integer> o2) { return Long.compare(o1.getLeft().getSetupNanos(), o2.getLeft().getSetupNanos()); } }; final static Comparator<Pair<OperatorProfile, Integer>> processTime = new Comparator<Pair<OperatorProfile, Integer>>() { + @Override public int compare(final Pair<OperatorProfile, Integer> o1, final Pair<OperatorProfile, Integer> o2) { return Long.compare(o1.getLeft().getProcessNanos(), o2.getLeft().getProcessNanos()); } }; final static Comparator<Pair<OperatorProfile, Integer>> waitTime = new Comparator<Pair<OperatorProfile, Integer>>() { + @Override public int compare(final Pair<OperatorProfile, Integer> o1, final Pair<OperatorProfile, Integer> o2) { return Long.compare(o1.getLeft().getWaitNanos(), o2.getLeft().getWaitNanos()); } }; final static Comparator<Pair<OperatorProfile, Integer>> operatorPeakMemory = new Comparator<Pair<OperatorProfile, Integer>>() { + @Override public int compare(final Pair<OperatorProfile, Integer> o1, final Pair<OperatorProfile, Integer> o2) { return Long.compare(o1.getLeft().getPeakLocalMemoryAllocated(), o2.getLeft().getPeakLocalMemoryAllocated()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java index 16c07d2ee..875c96ed5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileResources.java @@ -73,17 +73,20 @@ public class ProfileResources { public static class ProfileInfo implements Comparable<ProfileInfo> { public static final SimpleDateFormat format = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss"); - private String queryId; - private long startTime; - private long endTime; - private Date time; - private String link; - private String foreman; - private String query; - private String state; - private String user; - - public ProfileInfo(DrillConfig drillConfig, String queryId, long startTime, long endTime, String foreman, String query, String state, String user) { + private final String queryId; + private final long startTime; + private final long endTime; + private final Date time; + private final String link; + private final String foreman; + private final String query; + private final String state; + private final String user; + private final double totalCost; + private final String queueName; + + public ProfileInfo(DrillConfig drillConfig, String queryId, long startTime, long endTime, String foreman, String query, + String state, String user, double totalCost, String queueName) { this.queryId = queryId; this.startTime = startTime; this.endTime = endTime; @@ -93,52 +96,36 @@ public class ProfileResources { this.query = query.substring(0, Math.min(query.length(), 150)); this.state = state; this.user = user; + this.totalCost = totalCost; + this.queueName = queueName; } - public String getUser() { - return user; - } + public String getUser() { return user; } - public String getQuery(){ - return query; - } + public String getQuery() { return query; } - public String getQueryId() { - return queryId; - } + public String getQueryId() { return queryId; } - public String getTime() { - return format.format(time); - } + public String getTime() { return format.format(time); } - public long getStartTime() { - return startTime; - } + public long getStartTime() { return startTime; } - public long getEndTime() { - return endTime; - } + public long getEndTime() { return endTime; } public String getDuration() { return (new SimpleDurationFormat(startTime, endTime)).verbose(); } - public String getState() { - return state; - } + public String getState() { return state; } - public String getLink() { - return link; - } + public String getLink() { return link; } @Override public int compareTo(ProfileInfo other) { return time.compareTo(other.time); } - public String getForeman() { - return foreman; - } + public String getForeman() { return foreman; } /** * Generates link which will return query profile in json representation. @@ -164,6 +151,9 @@ public class ProfileResources { return sb.toString(); } + public double getTotalCost() { return totalCost; } + + public String getQueueName() { return queueName; } } protected PersistentStoreProvider getProvider() { @@ -204,6 +194,7 @@ public class ProfileResources { //max Param to cap listing of profiles private static final String MAX_QPROFILES_PARAM = "max"; + @SuppressWarnings("resource") @GET @Path("/profiles.json") @Produces(MediaType.APPLICATION_JSON) @@ -223,8 +214,11 @@ public class ProfileResources { final Map.Entry<String, QueryInfo> runningEntry = runningEntries.next(); final QueryInfo profile = runningEntry.getValue(); if (principal.canManageProfileOf(profile.getUser())) { - runningQueries.add(new ProfileInfo(work.getContext().getConfig(), runningEntry.getKey(), profile.getStart(), System.currentTimeMillis(), profile.getForeman() - .getAddress(), profile.getQuery(), profile.getState().name(), profile.getUser())); + runningQueries.add( + new ProfileInfo(work.getContext().getConfig(), + runningEntry.getKey(), profile.getStart(), System.currentTimeMillis(), + profile.getForeman().getAddress(), profile.getQuery(), profile.getState().name(), + profile.getUser(), profile.getTotalCost(), profile.getQueueName())); } } catch (Exception e) { errors.add(e.getMessage()); @@ -250,8 +244,11 @@ public class ProfileResources { final Map.Entry<String, QueryProfile> profileEntry = range.next(); final QueryProfile profile = profileEntry.getValue(); if (principal.canManageProfileOf(profile.getUser())) { - finishedQueries.add(new ProfileInfo(work.getContext().getConfig(), profileEntry.getKey(), profile.getStart(), profile.getEnd(), profile.getForeman().getAddress(), - profile.getQuery(), profile.getState().name(), profile.getUser())); + finishedQueries.add( + new ProfileInfo(work.getContext().getConfig(), + profileEntry.getKey(), profile.getStart(), profile.getEnd(), + profile.getForeman().getAddress(), profile.getQuery(), profile.getState().name(), + profile.getUser(), profile.getTotalCost(), profile.getQueueName())); } } catch (Exception e) { errors.add(e.getMessage()); @@ -277,6 +274,7 @@ public class ProfileResources { return ViewableWithPermissions.create(authEnabled.get(), "/rest/profile/list.ftl", sc, profiles); } + @SuppressWarnings("resource") private QueryProfile getQueryProfile(String queryId) { QueryId id = QueryIdHelper.getQueryIdFromString(queryId); @@ -343,7 +341,7 @@ public class ProfileResources { return ViewableWithPermissions.create(authEnabled.get(), "/rest/profile/profile.ftl", sc, wrapper); } - + @SuppressWarnings("resource") @GET @Path("/profiles/cancel/{queryid}") @Produces(MediaType.TEXT_PLAIN) @@ -372,7 +370,8 @@ public class ProfileResources { } }catch(Exception e){ logger.debug("Failure to find query as running profile.", e); - return String.format("Failure attempting to cancel query %s. Unable to find information about where query is actively running.", queryId); + return String.format + ("Failure attempting to cancel query %s. Unable to find information about where query is actively running.", queryId); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java index 8f3cdfe6e..ef9ccc339 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/rest/profile/ProfileWrapper.java @@ -20,7 +20,6 @@ package org.apache.drill.exec.server.rest.profile; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java index 7e32a4d97..9382f4a46 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/BatchPrinter.java @@ -34,7 +34,7 @@ import com.google.common.collect.Lists; public class BatchPrinter { public static void printHyperBatch(VectorAccessible batch, SelectionVector4 sv4) { List<String> columns = Lists.newArrayList(); - for (VectorWrapper vw : batch) { + for (VectorWrapper<?> vw : batch) { columns.add(vw.getValueVectors()[0].getField().getName()); } int width = columns.size(); @@ -47,7 +47,7 @@ public class BatchPrinter { System.out.printf("|\n"); System.out.println(StringUtils.repeat("-", width * 17 + 1)); } - for (VectorWrapper vw : batch) { + for (VectorWrapper<?> vw : batch) { Object o = vw.getValueVectors()[sv4.get(j) >>> 16].getAccessor().getObject(sv4.get(j) & 65535); String value; if (o == null) { @@ -67,7 +67,7 @@ public class BatchPrinter { public static void printBatch(VectorAccessible batch) { List<String> columns = Lists.newArrayList(); List<ValueVector> vectors = Lists.newArrayList(); - for (VectorWrapper vw : batch) { + for (VectorWrapper<?> vw : batch) { columns.add(vw.getValueVector().getField().getName()); vectors.add(vw.getValueVector()); } @@ -101,7 +101,7 @@ public class BatchPrinter { public static void printBatch(VectorAccessible batch, SelectionVector2 sv2) { List<String> columns = Lists.newArrayList(); List<ValueVector> vectors = Lists.newArrayList(); - for (VectorWrapper vw : batch) { + for (VectorWrapper<?> vw : batch) { columns.add(vw.getValueVector().getField().getName()); vectors.add(vw.getValueVector()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java index 48724a445..6a805c142 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java @@ -33,7 +33,22 @@ public class MemoryAllocationUtilities { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MemoryAllocationUtilities.class); /** - * Helper method to setup SortMemoryAllocations + * Helper method to setup Memory Allocations + * <p> + * Plan the memory for buffered operators (the only ones that can spill in this release) + * based on assumptions. These assumptions are the amount of memory per node to give + * to each query and the number of sort operators per node. + * <p> + * The reason the total + * memory is an assumption is that we have know knowledge of the number of queries + * that can run, so we need the user to tell use that information by configuring the + * amount of memory to be assumed available to each query. + * <p> + * The number of sorts per node could be calculated, but we instead simply take + * the worst case: the maximum per-query, per-node parallization and assume that + * all sorts appear in all fragments — a gross oversimplification, but one + * that Drill has long made. + * <p> * since this method can be used in multiple places adding it in this class * rather than keeping it in Foreman * @param plan @@ -50,7 +65,7 @@ public class MemoryAllocationUtilities { // look for external sorts final List<PhysicalOperator> bufferedOpList = new LinkedList<>(); for (final PhysicalOperator op : plan.getSortedOperators()) { - if ( op.isBufferedOperator() ) { + if (op.isBufferedOperator()) { bufferedOpList.add(op); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java index 0200dc5a0..a9f178a66 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/TestUtilities.java @@ -112,7 +112,7 @@ public class TestUtilities { * @param jsonBatches : list of input strings, each element represent a batch. Each string could either * be in the form of "[{...}, {...}, ..., {...}]", or in the form of "{...}". * @param fragContext : fragment context - * @param columnsToRead : list of schema pathes to read from JSON reader. + * @param columnsToRead : list of schema paths to read from JSON reader. * @return */ public static Iterator<RecordReader> getJsonReadersFromBatchString(List<String> jsonBatches, FragmentContext fragContext, List<SchemaPath> columnsToRead) { @@ -145,5 +145,4 @@ public class TestUtilities { } return readers.iterator(); } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java index 4f99f859b..a06d46c5a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/QueryWorkUnit.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -17,39 +17,99 @@ */ package org.apache.drill.exec.work; +import java.util.ArrayList; import java.util.List; import org.apache.drill.exec.physical.base.FragmentRoot; +import org.apache.drill.exec.planner.PhysicalPlanReader; import org.apache.drill.exec.proto.BitControl.PlanFragment; +import org.apache.drill.exec.server.options.OptionList; +import org.apache.drill.exec.work.foreman.ForemanSetupException; +import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.base.Preconditions; public class QueryWorkUnit { - // private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryWorkUnit.class); - private final PlanFragment rootFragment; // for local + + /** + * Definition of a minor fragment that contains the (unserialized) fragment operator + * tree and the (partially built) fragment. Allows the resource manager to apply + * memory allocations before serializing the fragments to JSON. + */ + + public static class MinorFragmentDefn { + private PlanFragment fragment; + private final FragmentRoot root; + private final OptionList options; + + public MinorFragmentDefn(final PlanFragment fragment, final FragmentRoot root, OptionList options) { + this.fragment = fragment; + this.root = root; + this.options = options; + } + + public FragmentRoot root() { return root; } + public PlanFragment fragment() { return fragment; } + public OptionList options() { return options; } + + public PlanFragment applyPlan(PhysicalPlanReader reader) throws ForemanSetupException { + // get plan as JSON + try { + final String plan = reader.writeJson(root); + final String optionsData = reader.writeJson(options); + return PlanFragment.newBuilder(fragment) + .setFragmentJson(plan) + .setOptionsJson(optionsData) + .build(); + } catch (JsonProcessingException e) { + throw new ForemanSetupException("Failure while trying to convert fragment into json.", e); + } + } + } + + private PlanFragment rootFragment; // for local + private final MinorFragmentDefn rootFragmentDefn; private final FragmentRoot rootOperator; // for local - private final List<PlanFragment> fragments; + private List<PlanFragment> fragments = new ArrayList<>(); + private final List<MinorFragmentDefn> minorFragmentDefns; - public QueryWorkUnit(final FragmentRoot rootOperator, final PlanFragment rootFragment, - final List<PlanFragment> fragments) { - Preconditions.checkNotNull(rootFragment); - Preconditions.checkNotNull(fragments); + public QueryWorkUnit(final FragmentRoot rootOperator, final MinorFragmentDefn rootFragmentDefn, + final List<MinorFragmentDefn> minorFragmentDefns) { Preconditions.checkNotNull(rootOperator); + Preconditions.checkNotNull(rootFragmentDefn); + Preconditions.checkNotNull(minorFragmentDefns); - this.rootFragment = rootFragment; - this.fragments = fragments; + this.rootFragmentDefn = rootFragmentDefn; this.rootOperator = rootOperator; + this.minorFragmentDefns = minorFragmentDefns; } public PlanFragment getRootFragment() { return rootFragment; } + public MinorFragmentDefn getRootFragmentDefn() { + return rootFragmentDefn; + } + public List<PlanFragment> getFragments() { return fragments; } + public List<MinorFragmentDefn> getMinorFragmentDefns() { + return minorFragmentDefns; + } + public FragmentRoot getRootOperator() { return rootOperator; } + + public void applyPlan(PhysicalPlanReader reader) throws ForemanSetupException { + assert rootFragment == null; + rootFragment = rootFragmentDefn.applyPlan(reader); + assert fragments.isEmpty(); + for (MinorFragmentDefn defn : minorFragmentDefns) { + fragments.add(defn.applyPlan(reader)); + } + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java index 800d3a7f0..6e560a963 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java @@ -149,7 +149,9 @@ public class WorkManager implements AutoCloseable { } } - getContext().close(); + if (getContext() != null) { + getContext().close(); + } } public DrillbitContext getContext() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index d01d8fd7d..a1f9d9ffb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -17,17 +17,15 @@ */ package org.apache.drill.exec.work.foreman; -import com.codahale.metrics.Counter; -import com.google.common.base.Preconditions; -import com.google.common.collect.ArrayListMultimap; -import com.google.common.collect.Lists; -import com.google.common.collect.Multimap; -import com.google.common.collect.Sets; -import com.google.protobuf.InvalidProtocolBufferException; -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelFuture; -import io.netty.util.concurrent.Future; -import io.netty.util.concurrent.GenericFutureListener; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + import org.apache.drill.common.CatastrophicFailure; import org.apache.drill.common.EventProcessor; import org.apache.drill.common.concurrent.ExtendedLatch; @@ -36,9 +34,6 @@ import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.logical.LogicalPlan; import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode; import org.apache.drill.exec.ExecConstants; -import org.apache.drill.exec.coord.ClusterCoordinator; -import org.apache.drill.exec.coord.DistributedSemaphore; -import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease; import org.apache.drill.exec.exception.OptimizerException; import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.metrics.DrillMetrics; @@ -73,43 +68,51 @@ import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.testing.ControlsInjector; import org.apache.drill.exec.testing.ControlsInjectorFactory; -import org.apache.drill.exec.util.MemoryAllocationUtilities; import org.apache.drill.exec.util.Pointer; import org.apache.drill.exec.work.EndpointListener; import org.apache.drill.exec.work.QueryWorkUnit; import org.apache.drill.exec.work.WorkManager.WorkerBee; +import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException; +import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException; +import org.apache.drill.exec.work.foreman.rm.QueryResourceManager; import org.apache.drill.exec.work.fragment.FragmentExecutor; import org.apache.drill.exec.work.fragment.FragmentStatusReporter; import org.apache.drill.exec.work.fragment.NonRootFragmentManager; import org.apache.drill.exec.work.fragment.RootFragmentManager; import org.codehaus.jackson.map.ObjectMapper; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import com.codahale.metrics.Counter; +import com.google.common.base.Preconditions; +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import com.google.protobuf.InvalidProtocolBufferException; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; /** * Foreman manages all the fragments (local and remote) for a single query where this * is the driving/root node. * * The flow is as follows: - * - Foreman is submitted as a runnable. - * - Runnable does query planning. - * - state changes from PENDING to RUNNING - * - Runnable sends out starting fragments - * - Status listener are activated - * - The Runnable's run() completes, but the Foreman stays around - * - Foreman listens for state change messages. - * - state change messages can drive the state to FAILED or CANCELED, in which case - * messages are sent to running fragments to terminate - * - when all fragments complete, state change messages drive the state to COMPLETED + * <ul> + * <li>Foreman is submitted as a runnable.</li> + * <li>Runnable does query planning.</li> + * <li>state changes from PENDING to RUNNING</li> + * <li>Runnable sends out starting fragments</li> + * <li>Status listener are activated</li> + * <li>The Runnable's run() completes, but the Foreman stays around</li> + * <li>Foreman listens for state change messages.</li> + * <li>state change messages can drive the state to FAILED or CANCELED, in which case + * messages are sent to running fragments to terminate</li> + * <li>when all fragments complete, state change messages drive the state to COMPLETED</li> + * </ul> */ + public class Foreman implements Runnable { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(Foreman.class); private static final org.slf4j.Logger queryLogger = org.slf4j.LoggerFactory.getLogger("query.logger"); @@ -136,15 +139,13 @@ public class Foreman implements Runnable { private boolean resume = false; private final ProfileOption profileOption; - private volatile DistributedLease lease; // used to limit the number of concurrent queries + private final QueryResourceManager queryRM; private final ResponseSendListener responseListener = new ResponseSendListener(); private final StateSwitch stateSwitch = new StateSwitch(); private final ForemanResult foremanResult = new ForemanResult(); private final ConnectionClosedListener closeListener = new ConnectionClosedListener(); private final ChannelFuture closeFuture; - private final boolean queuingEnabled; - private String queryText; @@ -173,12 +174,9 @@ public class Foreman implements Runnable { queryManager = new QueryManager(queryId, queryRequest, drillbitContext.getStoreProvider(), drillbitContext.getClusterCoordinator(), this); - final OptionManager optionManager = queryContext.getOptions(); - queuingEnabled = optionManager.getOption(ExecConstants.ENABLE_QUEUE); - - final QueryState initialState = queuingEnabled ? QueryState.ENQUEUED : QueryState.STARTING; - recordNewState(initialState); + recordNewState(QueryState.ENQUEUED); enqueuedQueries.inc(); + queryRM = drillbitContext.getResourceManager().newQueryRM(this); profileOption = setProfileOption(queryContext.getOptions()); } @@ -350,20 +348,6 @@ public class Foreman implements Runnable { */ } - private void releaseLease() { - while (lease != null) { - try { - lease.close(); - lease = null; - } catch (final InterruptedException e) { - // if we end up here, the while loop will try again - } catch (final Exception e) { - logger.warn("Failure while releasing lease.", e); - break; - } - } - } - private void parseAndRunLogicalPlan(final String json) throws ExecutionSetupException { LogicalPlan logicalPlan; try { @@ -431,18 +415,17 @@ public class Foreman implements Runnable { private void runPhysicalPlan(final PhysicalPlan plan) throws ExecutionSetupException { validatePlan(plan); - MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext); - //Marking endTime of Planning - queryManager.markPlanningEndTime(); - - if (queuingEnabled) { - acquireQuerySemaphore(plan); - moveToState(QueryState.STARTING, null); - //Marking endTime of Waiting in Queue - queryManager.markQueueWaitEndTime(); - } + queryRM.visitAbstractPlan(plan); final QueryWorkUnit work = getQueryWorkUnit(plan); + queryRM.visitPhysicalPlan(work); + queryRM.setCost(plan.totalCost()); + queryManager.setTotalCost(plan.totalCost()); + work.applyPlan(drillbitContext.getPlanReader()); + logWorkUnit(work); + admit(work); + queryManager.setQueueName(queryRM.queueName()); + final List<PlanFragment> planFragments = work.getFragments(); final PlanFragment rootPlanFragment = work.getRootFragment(); assert queryId == rootPlanFragment.getHandle().getQueryId(); @@ -461,6 +444,22 @@ public class Foreman implements Runnable { logger.debug("Fragments running."); } + private void admit(QueryWorkUnit work) throws ForemanSetupException { + queryManager.markPlanningEndTime(); + try { + queryRM.admit(); + } catch (QueueTimeoutException e) { + throw UserException + .resourceError() + .message(e.getMessage()) + .build(logger); + } catch (QueryQueueException e) { + throw new ForemanSetupException(e.getMessage(), e); + } + moveToState(QueryState.STARTING, null); + queryManager.markQueueWaitEndTime(); + } + /** * This is a helper method to run query based on the list of PlanFragment that were planned * at some point of time @@ -495,10 +494,8 @@ public class Foreman implements Runnable { } catch (IOException e) { throw new ExecutionSetupException(String.format("Unable to parse FragmentRoot from fragment: %s", rootFragment.getFragmentJson())); } - if (queuingEnabled) { - acquireQuerySemaphore(rootOperator.getCost()); - moveToState(QueryState.STARTING, null); - } + queryRM.setCost(rootOperator.getCost()); + admit(null); drillbitContext.getWorkBus().addFragmentStatusListener(queryId, queryManager.getFragmentStatusListener()); drillbitContext.getClusterCoordinator().addDrillbitStatusListener(queryManager.getDrillbitStatusListener()); @@ -548,62 +545,6 @@ public class Foreman implements Runnable { } } - /** - * This limits the number of "small" and "large" queries that a Drill cluster will run - * simultaneously, if queueing is enabled. If the query is unable to run, this will block - * until it can. Beware that this is called under run(), and so will consume a Thread - * while it waits for the required distributed semaphore. - * - * @param plan the query plan - * @throws ForemanSetupException - */ - private void acquireQuerySemaphore(final PhysicalPlan plan) throws ForemanSetupException { - double totalCost = 0; - for (final PhysicalOperator ops : plan.getSortedOperators()) { - totalCost += ops.getCost(); - } - - acquireQuerySemaphore(totalCost); - return; - } - - private void acquireQuerySemaphore(double totalCost) throws ForemanSetupException { - final OptionManager optionManager = queryContext.getOptions(); - final long queueThreshold = optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_SIZE); - - final long queueTimeout = optionManager.getOption(ExecConstants.QUEUE_TIMEOUT); - final String queueName; - - try { - final ClusterCoordinator clusterCoordinator = drillbitContext.getClusterCoordinator(); - final DistributedSemaphore distributedSemaphore; - - // get the appropriate semaphore - if (totalCost > queueThreshold) { - final int largeQueue = (int) optionManager.getOption(ExecConstants.LARGE_QUEUE_SIZE); - distributedSemaphore = clusterCoordinator.getSemaphore("query.large", largeQueue); - queueName = "large"; - } else { - final int smallQueue = (int) optionManager.getOption(ExecConstants.SMALL_QUEUE_SIZE); - distributedSemaphore = clusterCoordinator.getSemaphore("query.small", smallQueue); - queueName = "small"; - } - - lease = distributedSemaphore.acquire(queueTimeout, TimeUnit.MILLISECONDS); - } catch (final Exception e) { - throw new ForemanSetupException("Unable to acquire slot for query.", e); - } - - if (lease == null) { - throw UserException - .resourceError() - .message( - "Unable to acquire queue resources for query within timeout. Timeout for %s queue was set at %d seconds.", - queueName, queueTimeout / 1000) - .build(logger); - } - } - Exception getCurrentException() { return foremanResult.getException(); } @@ -612,54 +553,55 @@ public class Foreman implements Runnable { final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next(); final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null); final SimpleParallelizer parallelizer = new SimpleParallelizer(queryContext); - final QueryWorkUnit queryWorkUnit = parallelizer.getFragments( + return parallelizer.getFragments( queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(), - queryId, queryContext.getActiveEndpoints(), drillbitContext.getPlanReader(), rootFragment, + queryId, queryContext.getActiveEndpoints(), rootFragment, initiatingClient.getSession(), queryContext.getQueryContextInfo()); + } - if (logger.isTraceEnabled()) { - final StringBuilder sb = new StringBuilder(); - sb.append("PlanFragments for query "); - sb.append(queryId); + private void logWorkUnit(QueryWorkUnit queryWorkUnit) { + if (! logger.isTraceEnabled()) { + return; + } + final StringBuilder sb = new StringBuilder(); + sb.append("PlanFragments for query "); + sb.append(queryId); + sb.append('\n'); + + final List<PlanFragment> planFragments = queryWorkUnit.getFragments(); + final int fragmentCount = planFragments.size(); + int fragmentIndex = 0; + for(final PlanFragment planFragment : planFragments) { + final FragmentHandle fragmentHandle = planFragment.getHandle(); + sb.append("PlanFragment("); + sb.append(++fragmentIndex); + sb.append('/'); + sb.append(fragmentCount); + sb.append(") major_fragment_id "); + sb.append(fragmentHandle.getMajorFragmentId()); + sb.append(" minor_fragment_id "); + sb.append(fragmentHandle.getMinorFragmentId()); sb.append('\n'); - final List<PlanFragment> planFragments = queryWorkUnit.getFragments(); - final int fragmentCount = planFragments.size(); - int fragmentIndex = 0; - for(final PlanFragment planFragment : planFragments) { - final FragmentHandle fragmentHandle = planFragment.getHandle(); - sb.append("PlanFragment("); - sb.append(++fragmentIndex); - sb.append('/'); - sb.append(fragmentCount); - sb.append(") major_fragment_id "); - sb.append(fragmentHandle.getMajorFragmentId()); - sb.append(" minor_fragment_id "); - sb.append(fragmentHandle.getMinorFragmentId()); - sb.append('\n'); - - final DrillbitEndpoint endpointAssignment = planFragment.getAssignment(); - sb.append(" DrillbitEndpoint address "); - sb.append(endpointAssignment.getAddress()); - sb.append('\n'); - - String jsonString = "<<malformed JSON>>"; - sb.append(" fragment_json: "); - final ObjectMapper objectMapper = new ObjectMapper(); - try - { - final Object json = objectMapper.readValue(planFragment.getFragmentJson(), Object.class); - jsonString = objectMapper.defaultPrettyPrintingWriter().writeValueAsString(json); - } catch(final Exception e) { - // we've already set jsonString to a fallback value - } - sb.append(jsonString); + final DrillbitEndpoint endpointAssignment = planFragment.getAssignment(); + sb.append(" DrillbitEndpoint address "); + sb.append(endpointAssignment.getAddress()); + sb.append('\n'); - logger.trace(sb.toString()); + String jsonString = "<<malformed JSON>>"; + sb.append(" fragment_json: "); + final ObjectMapper objectMapper = new ObjectMapper(); + try + { + final Object json = objectMapper.readValue(planFragment.getFragmentJson(), Object.class); + jsonString = objectMapper.defaultPrettyPrintingWriter().writeValueAsString(json); + } catch(final Exception e) { + // we've already set jsonString to a fallback value } - } + sb.append(jsonString); - return queryWorkUnit; + logger.trace(sb.toString()); + } } /** @@ -897,7 +839,7 @@ public class Foreman implements Runnable { runningQueries.dec(); completedQueries.inc(); try { - releaseLease(); + queryRM.exit(); } finally { isClosed = true; } @@ -953,7 +895,7 @@ public class Foreman implements Runnable { foremanResult.setCompleted(QueryState.CANCELED); /* * We don't close the foremanResult until we've gotten - * acknowledgements, which happens below in the case for current state + * acknowledgments, which happens below in the case for current state * == CANCELLATION_REQUESTED. */ return; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java index ecbccf3c6..216a80df4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/QueryManager.java @@ -36,7 +36,6 @@ import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.ExecProtos.FragmentHandle; import org.apache.drill.exec.proto.GeneralRPCProtos.Ack; -import org.apache.drill.exec.proto.SchemaUserBitShared; import org.apache.drill.exec.proto.UserBitShared.FragmentState; import org.apache.drill.exec.proto.UserBitShared.MajorFragmentProfile; import org.apache.drill.exec.proto.UserBitShared.QueryId; @@ -99,6 +98,15 @@ public class QueryManager implements AutoCloseable { // Is the query saved in transient store private boolean inTransientStore; + /** + * Total query cost. This value is used to place the query into a queue + * and so has meaning to the user who wants to predict queue placement. + */ + + private double totalCost; + + private String queueName; + public QueryManager(final QueryId queryId, final RunQuery runQuery, final PersistentStoreProvider storeProvider, final ClusterCoordinator coordinator, final Foreman foreman) { this.queryId = queryId; @@ -191,6 +199,7 @@ public class QueryManager implements AutoCloseable { * (3) Leaf fragment: running, send the cancel signal through a tunnel. The cancel is done directly. */ void cancelExecutingFragments(final DrillbitContext drillbitContext) { + @SuppressWarnings("resource") final Controller controller = drillbitContext.getController(); for(final FragmentData data : fragmentDataSet) { switch(data.getState()) { @@ -219,6 +228,7 @@ public class QueryManager implements AutoCloseable { * sending any message. Resume all fragments through the control tunnel. */ void unpauseExecutingFragments(final DrillbitContext drillbitContext) { + @SuppressWarnings("resource") final Controller controller = drillbitContext.getController(); for(final FragmentData data : fragmentDataSet) { final DrillbitEndpoint endpoint = data.getEndpoint(); @@ -318,6 +328,8 @@ public class QueryManager implements AutoCloseable { .setUser(foreman.getQueryContext().getQueryUserName()) .setForeman(foreman.getQueryContext().getCurrentEndpoint()) .setStart(startTime) + .setTotalCost(totalCost) + .setQueueName(queueName == null ? "-" : queueName) .setOptionsJson(getQueryOptionsAsJson()); if (queryText != null) { @@ -332,7 +344,6 @@ public class QueryManager implements AutoCloseable { } private QueryProfile getQueryProfile(UserException ex) { - final String queryText = foreman.getQueryText(); final QueryProfile.Builder profileBuilder = QueryProfile.newBuilder() .setUser(foreman.getQueryContext().getQueryUserName()) .setType(runQuery.getType()) @@ -345,6 +356,8 @@ public class QueryManager implements AutoCloseable { .setQueueWaitEnd(queueWaitEndTime) .setTotalFragments(fragmentDataSet.size()) .setFinishedFragments(finishedFragments.get()) + .setTotalCost(totalCost) + .setQueueName(queueName == null ? "-" : queueName) .setOptionsJson(getQueryOptionsAsJson()); if (ex != null) { @@ -360,6 +373,7 @@ public class QueryManager implements AutoCloseable { profileBuilder.setPlan(planText); } + final String queryText = foreman.getQueryText(); if (queryText != null) { profileBuilder.setQuery(queryText); } @@ -392,7 +406,6 @@ public class QueryManager implements AutoCloseable { profileBuilder.addFragmentProfile(builder); return true; } - } private class InnerIter implements IntObjectPredicate<FragmentData> { @@ -407,7 +420,6 @@ public class QueryManager implements AutoCloseable { builder.addMinorFragmentProfile(data.getProfile()); return true; } - } void setPlanText(final String planText) { @@ -430,6 +442,14 @@ public class QueryManager implements AutoCloseable { queueWaitEndTime = System.currentTimeMillis(); } + public void setTotalCost(double totalCost) { + this.totalCost = totalCost; + } + + public void setQueueName(String queueName) { + this.queueName = queueName; + } + /** * Internal class used to track the number of pending completion messages required from particular node. This allows * to know for each node that is part of this query, what portion of fragments are still outstanding. In the case that @@ -536,7 +556,7 @@ public class QueryManager implements AutoCloseable { return drillbitStatusListener; } - private final DrillbitStatusListener drillbitStatusListener = new DrillbitStatusListener(){ + private final DrillbitStatusListener drillbitStatusListener = new DrillbitStatusListener() { @Override public void drillbitRegistered(final Set<DrillbitEndpoint> registeredDrillbits) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/AbstractResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/AbstractResourceManager.java new file mode 100644 index 000000000..9bcaddade --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/AbstractResourceManager.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.server.DrillbitContext; + +/** + * Abstract base class for a resource manager. Handles tasks common to all + * resource managers: learning the resources available on this Drillbit. + * In the current version, Drillbits must be symmetrical, so that knowing + * the resources on one node is sufficient to know resources available on + * all nodes. + */ + +public abstract class AbstractResourceManager implements ResourceManager { + + protected final DrillbitContext context; + private final long memoryPerNode; + private final int cpusPerNode; + + public AbstractResourceManager(final DrillbitContext context) { + this.context = context; + DrillConfig config = context.getConfig(); + + // Normally we use the actual direct memory configured on the JVM command + // line. However, if the config param is set, we use that instead (if it is + // lower than actual memory). Primarily for testing. + + long memLimit = DrillConfig.getMaxDirectMemory(); + long configMemoryPerNode = config.getBytes(ExecConstants.MAX_MEMORY_PER_NODE); + if (configMemoryPerNode > 0) { + memLimit = Math.min(memLimit, configMemoryPerNode); + } + memoryPerNode = memLimit; + + // Do the same for CPUs. + + int cpuLimit = Runtime.getRuntime().availableProcessors(); + int configCpusPerNode = config.getInt(ExecConstants.MAX_CPUS_PER_NODE); + if (configCpusPerNode > 0) { + cpuLimit = Math.min(cpuLimit, configCpusPerNode); + } + cpusPerNode = cpuLimit; + } + + @Override + public long memoryPerNode() { return memoryPerNode; } + + @Override + public int cpusPerNode() { return cpusPerNode; } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java new file mode 100644 index 000000000..c28fbbb59 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.server.BootStrapContext; +import org.apache.drill.exec.util.MemoryAllocationUtilities; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.apache.drill.exec.work.foreman.Foreman; + +/** + * Represents a default resource manager for clusters that do not provide query + * queues. Without queues to provide a hard limit on the query admission rate, + * the number of active queries must be estimated and the resulting resource + * allocations will be rough estimates. + */ + +public class DefaultResourceManager implements ResourceManager { + + public static class DefaultResourceAllocator implements QueryResourceAllocator { + + private QueryContext queryContext; + + protected DefaultResourceAllocator(QueryContext queryContext) { + this.queryContext = queryContext; + } + + @Override + public void visitAbstractPlan(PhysicalPlan plan) { + if (plan == null || plan.getProperties().hasResourcePlan) { + return; + } + MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext); + } + + @Override + public void visitPhysicalPlan(QueryWorkUnit work) { + } + } + + public static class DefaultQueryResourceManager extends DefaultResourceAllocator implements QueryResourceManager { + + @SuppressWarnings("unused") + private final DefaultResourceManager rm; + + public DefaultQueryResourceManager(final DefaultResourceManager rm, final Foreman foreman) { + super(foreman.getQueryContext()); + this.rm = rm; + } + + @Override + public void setCost(double cost) { + // Nothing to do by default. + } + + @Override + public void admit() { + // No queueing by default + } + + @Override + public void exit() { + // No queueing by default + } + + @Override + public boolean hasQueue() { return false; } + + @Override + public String queueName() { return null; } + } + + public final long memoryPerNode; + public final int cpusPerNode; + + public DefaultResourceManager() { + memoryPerNode = DrillConfig.getMaxDirectMemory(); + + // Note: CPUs are not yet used, they will be used in a future + // enhancement. + + cpusPerNode = Runtime.getRuntime().availableProcessors(); + } + + @Override + public long memoryPerNode() { return memoryPerNode; } + + @Override + public int cpusPerNode() { return cpusPerNode; } + + @Override + public QueryResourceAllocator newResourceAllocator(QueryContext queryContext) { + return new DefaultResourceAllocator(queryContext); + } + + @Override + public QueryResourceManager newQueryRM(final Foreman foreman) { + return new DefaultQueryResourceManager(this, foreman); + } + + @Override + public void close() { } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java new file mode 100644 index 000000000..9a4c78df5 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DistributedQueryQueue.java @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import java.util.concurrent.TimeUnit; + +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.coord.DistributedSemaphore; +import org.apache.drill.exec.coord.DistributedSemaphore.DistributedLease; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.options.SystemOptionManager; + +/** + * Distributed query queue which uses a Zookeeper distributed semaphore to + * control queuing across the cluster. The distributed queue is actually two + * queues: one for "small" queries, another for "large" queries. Query size is + * determined by the Planner's estimate of query cost. + * <p> + * This queue is configured using system options: + * <dl> + * <dt><tt>exec.queue.enable</tt> + * <dt> + * <dd>Set to true to enable the distributed queue.</dd> + * <dt><tt>exec.queue.large</tt> + * <dt> + * <dd>The maximum number of large queries to admit. Additional + * queries wait in the queue.</dd> + * <dt><tt>exec.queue.small</tt> + * <dt> + * <dd>The maximum number of small queries to admit. Additional + * queries wait in the queue.</dd> + * <dt><tt>exec.queue.threshold</tt> + * <dt> + * <dd>The cost threshold. Queries below this size are small, at + * or above this size are large..</dd> + * <dt><tt>exec.queue.timeout_millis</tt> + * <dt> + * <dd>The maximum time (in milliseconds) a query will wait in the + * queue before failing.</dd> + * </dl> + * <p> + * The above values are refreshed every five seconds. This aids performance + * a bit in systems with very high query arrival rates. + */ + +public class DistributedQueryQueue implements QueryQueue { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DistributedQueryQueue.class); + + private class DistributedQueueLease implements QueueLease { + private final QueryId queryId; + private DistributedLease lease; + private final String queueName; + + /** + * Memory allocated to the query. Though all queries in the queue use + * the same memory allocation rules, those rules can change at any time + * as the user changes system options. This value captures the value + * calculated at the time that this lease was granted. + */ + private long queryMemory; + + public DistributedQueueLease(QueryId queryId, String queueName, + DistributedLease lease, long queryMemory) { + this.queryId = queryId; + this.queueName = queueName; + this.lease = lease; + this.queryMemory = queryMemory; + } + + @Override + public String toString() { + return String.format("Lease for %s queue to query %s", + queueName, QueryIdHelper.getQueryId(queryId)); + } + + @Override + public long queryMemoryPerNode() { return queryMemory; } + + @Override + public void release() { + DistributedQueryQueue.this.release(this); + } + + @Override + public String queueName() { return queueName; } + } + + /** + * Exposes a snapshot of internal state information for use in status + * reporting, such as in the UI. + */ + + @XmlRootElement + public static class ZKQueueInfo { + public final int smallQueueSize; + public final int largeQueueSize; + public final double queueThreshold; + public final long memoryPerNode; + public final long memoryPerSmallQuery; + public final long memoryPerLargeQuery; + + public ZKQueueInfo(DistributedQueryQueue queue) { + smallQueueSize = queue.configSet.smallQueueSize; + largeQueueSize = queue.configSet.largeQueueSize; + queueThreshold = queue.configSet.queueThreshold; + memoryPerNode = queue.memoryPerNode; + memoryPerSmallQuery = queue.memoryPerSmallQuery; + memoryPerLargeQuery = queue.memoryPerLargeQuery; + } + } + + public interface StatusAdapter { + boolean inShutDown(); + } + + /** + * Holds runtime configuration options. Allows polling the options + * for changes, and easily detecting changes. + */ + + private static class ConfigSet { + private final long queueThreshold; + private final long queueTimeout; + private final int largeQueueSize; + private final int smallQueueSize; + private final double largeToSmallRatio; + private final double reserveMemoryRatio; + private final long minimumOperatorMemory; + + public ConfigSet(SystemOptionManager optionManager) { + queueThreshold = optionManager.getOption(ExecConstants.QUEUE_THRESHOLD_SIZE); + queueTimeout = optionManager.getOption(ExecConstants.QUEUE_TIMEOUT); + + // Option manager supports only long values, but we do not expect + // more than 2 billion active queries, so queue size is stored as + // an int. + // TODO: Once DRILL-5832 is available, add an getInt() method to + // the option system to get the value as an int and do a range + // check. + + largeQueueSize = (int) optionManager.getOption(ExecConstants.LARGE_QUEUE_SIZE); + smallQueueSize = (int) optionManager.getOption(ExecConstants.SMALL_QUEUE_SIZE); + largeToSmallRatio = optionManager.getOption(ExecConstants.QUEUE_MEMORY_RATIO); + reserveMemoryRatio = optionManager.getOption(ExecConstants.QUEUE_MEMORY_RESERVE); + minimumOperatorMemory = optionManager.getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP); + } + + /** + * Determine if this config set is the same as another one. Detects + * whether the configuration has changed between one sync point and + * another. + * <p> + * Note that we cannot use <tt>equals()</tt> here as, according to + * Drill practice, <tt>equals()</tt> is for use in collections and + * must be accompanied by a <tt>hashCode()</tt> method. Since this + * class will never be used in a collection, and does not need a + * hash function, we cannot use <tt>equals()</tt>. + * + * @param otherSet another snapshot taken at another time + * @return true if this configuration is the same as another + * (no update between the two snapshots), false if the config has + * changed between the snapshots + */ + + public boolean isSameAs(ConfigSet otherSet) { + return queueThreshold == otherSet.queueThreshold && + queueTimeout == otherSet.queueTimeout && + largeQueueSize == otherSet.largeQueueSize && + smallQueueSize == otherSet.smallQueueSize && + largeToSmallRatio == otherSet.largeToSmallRatio && + reserveMemoryRatio == otherSet.reserveMemoryRatio && + minimumOperatorMemory == otherSet.minimumOperatorMemory; + } + } + + private long memoryPerNode; + private SystemOptionManager optionManager; + private ConfigSet configSet; + private ClusterCoordinator clusterCoordinator; + private long nextRefreshTime; + private long memoryPerSmallQuery; + private long memoryPerLargeQuery; + private final StatusAdapter statusAdapter; + + public DistributedQueryQueue(DrillbitContext context, StatusAdapter adapter) { + statusAdapter = adapter; + optionManager = context.getOptionManager(); + clusterCoordinator = context.getClusterCoordinator(); + } + + @Override + public void setMemoryPerNode(long memoryPerNode) { + this.memoryPerNode = memoryPerNode; + refreshConfig(); + } + + @Override + public long defaultQueryMemoryPerNode(double cost) { + return (cost < configSet.queueThreshold) + ? memoryPerSmallQuery + : memoryPerLargeQuery; + } + + @Override + public long minimumOperatorMemory() { return configSet.minimumOperatorMemory; } + + /** + * This limits the number of "small" and "large" queries that a Drill cluster will run + * simultaneously, if queuing is enabled. If the query is unable to run, this will block + * until it can. Beware that this is called under run(), and so will consume a thread + * while it waits for the required distributed semaphore. + * + * @param queryId query identifier + * @param cost the query plan + * @throws QueryQueueException if the underlying ZK queuing mechanism fails + * @throws QueueTimeoutException if the query waits too long in the + * queue + */ + + @SuppressWarnings("resource") + @Override + public QueueLease enqueue(QueryId queryId, double cost) throws QueryQueueException, QueueTimeoutException { + final String queueName; + DistributedLease lease = null; + long queryMemory; + final DistributedSemaphore distributedSemaphore; + try { + + // Only the refresh and queue computation is synchronized. + + synchronized(this) { + refreshConfig(); + + // get the appropriate semaphore + if (cost >= configSet.queueThreshold) { + distributedSemaphore = clusterCoordinator.getSemaphore("query.large", configSet.largeQueueSize); + queueName = "large"; + queryMemory = memoryPerLargeQuery; + } else { + distributedSemaphore = clusterCoordinator.getSemaphore("query.small", configSet.smallQueueSize); + queueName = "small"; + queryMemory = memoryPerSmallQuery; + } + } + logger.debug("Query {} with cost {} placed into the {} queue.", + QueryIdHelper.getQueryId(queryId), cost, queueName); + + lease = distributedSemaphore.acquire(configSet.queueTimeout, TimeUnit.MILLISECONDS); + } catch (final Exception e) { + logger.error("Unable to acquire slot for query " + + QueryIdHelper.getQueryId(queryId), e); + throw new QueryQueueException("Unable to acquire slot for query.", e); + } + + if (lease == null) { + int timeoutSecs = (int) Math.round(configSet.queueTimeout/1000.0); + logger.warn("Queue timeout: {} after {} seconds.", queueName, timeoutSecs); + throw new QueueTimeoutException(queryId, queueName, timeoutSecs); + } + return new DistributedQueueLease(queryId, queueName, lease, queryMemory); + } + + private synchronized void refreshConfig() { + long now = System.currentTimeMillis(); + if (now < nextRefreshTime) { + return; + } + nextRefreshTime = now + 5000; + + // Only update numbers, and log changes, if something + // actually changes. + + ConfigSet newSet = new ConfigSet(optionManager); + if (newSet.isSameAs(configSet)) { + return; + } + + configSet = newSet; + + // Divide up memory between queues using admission rate + // to give more memory to larger queries and less to + // smaller queries. We assume that large queries are + // larger than small queries by a factor of + // largeToSmallRatio. + + double totalUnits = configSet.largeToSmallRatio * configSet.largeQueueSize + configSet.smallQueueSize; + double availableMemory = Math.round(memoryPerNode * (1.0 - configSet.reserveMemoryRatio)); + double memoryUnit = availableMemory / totalUnits; + memoryPerLargeQuery = Math.round(memoryUnit * configSet.largeToSmallRatio); + memoryPerSmallQuery = Math.round(memoryUnit); + + logger.debug("Memory config: total memory per node = {}, available: {}, large/small memory ratio = {}", + memoryPerNode, availableMemory, configSet.largeToSmallRatio); + logger.debug("Reserve memory ratio: {}, bytes: {}", + configSet.reserveMemoryRatio, memoryPerNode - availableMemory); + logger.debug("Minimum operator memory: {}", configSet.minimumOperatorMemory); + logger.debug("Small queue: {} slots, {} bytes per slot", + configSet.smallQueueSize, memoryPerSmallQuery); + logger.debug("Large queue: {} slots, {} bytes per slot", + configSet.largeQueueSize, memoryPerLargeQuery); + logger.debug("Cost threshold: {}, timeout: {} ms.", + configSet.queueThreshold, configSet.queueTimeout); + } + + @Override + public boolean enabled() { return true; } + + public synchronized ZKQueueInfo getInfo() { + refreshConfig(); + return new ZKQueueInfo(this); + } + + private void release(QueueLease lease) { + DistributedQueueLease theLease = (DistributedQueueLease) lease; + for (;;) { + try { + theLease.lease.close(); + theLease.lease = null; + break; + } catch (final InterruptedException e) { + // if we end up here, the loop will try again + } catch (final Exception e) { + logger.warn("Failure while releasing lease.", e); + break; + } + if (inShutdown()) { + logger.warn("In shutdown mode: abandoning attempt to release lease"); + } + } + } + + private boolean inShutdown() { + if (statusAdapter == null) { + return false; + } + return statusAdapter.inShutDown(); + } + + @Override + public void close() { + // Nothing to do. + } +}
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DynamicResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DynamicResourceManager.java new file mode 100644 index 000000000..473401fab --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DynamicResourceManager.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.server.options.SystemOptionManager; +import org.apache.drill.exec.work.foreman.Foreman; +import org.apache.drill.exec.work.foreman.rm.DistributedQueryQueue.StatusAdapter; + +/** + * Wrapper around the default and/or distributed resource managers + * to allow dynamically enabling and disabling queueing. + */ + +public class DynamicResourceManager implements ResourceManager { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DynamicResourceManager.class); + + private final DrillbitContext context; + private ResourceManager defaultRm; + private ResourceManager queueingRm; + private ResourceManager activeRm; + public long nextUpdateTime; + public final int recheckDelayMs = 5000; + + public DynamicResourceManager(final DrillbitContext context) { + this.context = context; + refreshRM(); + } + + public synchronized ResourceManager activeRM() { + refreshRM(); + return activeRm; + } + + @Override + public long memoryPerNode() { + return activeRm.memoryPerNode(); + } + + @Override + public int cpusPerNode() { + return activeRm.cpusPerNode(); + } + + @Override + public synchronized QueryResourceAllocator newResourceAllocator(QueryContext queryContext) { + refreshRM(); + return activeRm.newResourceAllocator(queryContext); + } + + @Override + public synchronized QueryResourceManager newQueryRM(Foreman foreman) { + refreshRM(); + return activeRm.newQueryRM(foreman); + } + + private void refreshRM() { + long now = System.currentTimeMillis(); + if (now < nextUpdateTime) { + return; + } + nextUpdateTime = now + recheckDelayMs; + @SuppressWarnings("resource") + SystemOptionManager systemOptions = context.getOptionManager(); + if (systemOptions.getOption(ExecConstants.ENABLE_QUEUE)) { + if (queueingRm == null) { + StatusAdapter statusAdapter = new StatusAdapter() { + @Override + public boolean inShutDown() { + // Drill provides no shutdown state at present. + // TODO: Once DRILL-4286 (graceful shutdown) is merged, use the + // new Drillbit status to determine when the Drillbit + // is shutting down. + return false; + } + }; + queueingRm = new ThrottledResourceManager(context, + new DistributedQueryQueue(context, statusAdapter)); + } + if (activeRm != queueingRm) { + logger.debug("Enabling ZK-based query queue."); + activeRm = queueingRm; + } + } else { + if (defaultRm == null) { + defaultRm = new DefaultResourceManager(); + } + if (activeRm != defaultRm) { + logger.debug("Disabling ZK-based query queue."); + activeRm = defaultRm; + } + } + } + + @Override + public void close() { + RuntimeException ex = null; + try { + if (defaultRm != null) { + defaultRm.close(); + } + } catch (RuntimeException e) { + ex = e; + } finally { + defaultRm = null; + } + try { + if (queueingRm != null) { + queueingRm.close(); + } + } catch (RuntimeException e) { + ex = ex == null ? e : ex; + } finally { + queueingRm = null; + } + activeRm = null; + if (ex == null) { + return; + } else if (ex instanceof UserException) { + throw (UserException) ex; + } else { + throw UserException.systemError(ex) + .addContext("Failure closing resource managers.") + .build(logger); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/EmbeddedQueryQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/EmbeddedQueryQueue.java new file mode 100644 index 000000000..a042f8bb6 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/EmbeddedQueryQueue.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.server.DrillbitContext; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Query queue to be used in an embedded Drillbit. This queue has scope of only + * the one Drillbit (not even multiple Drillbits in the same process.) Primarily + * intended for testing, but may possibly be useful for other embedded + * applications. + * <p> + * Configuration is via config parameters (not via system options as for the + * distributed queue.) + * <dl> + * <dt><tt>drill.queue.embedded.enabled</tt></dt> + * <dd>Set to true to enable the embedded queue. But, this setting has effect + * only if the Drillbit is, in fact, embedded.</dd> + * <dt><tt>drill.queue.embedded.size</tt></dt> + * <dd>The number of active queries, all others queue. There is no upper limit + * on the number of queued entries.</dt> + * <dt><tt>drill.queue.embedded.timeout_ms</tt></dt> + * <dd>The maximum time a query will wait in the queue before failing.</dd> + * </dl> + */ + +public class EmbeddedQueryQueue implements QueryQueue { + + public static String EMBEDDED_QUEUE = "drill.exec.queue.embedded"; + public static String ENABLED = EMBEDDED_QUEUE + ".enable"; + public static String QUEUE_SIZE = EMBEDDED_QUEUE + ".size"; + public static String TIMEOUT_MS = EMBEDDED_QUEUE + ".timeout_ms"; + + public class EmbeddedQueueLease implements QueueLease { + + private final QueryId queryId; + private boolean released; + private long queryMemory; + + public EmbeddedQueueLease(QueryId queryId, long queryMemory) { + this.queryId = queryId; + this.queryMemory = queryMemory; + } + + @Override + public String toString( ) { + return new StringBuilder() + .append("Embedded queue lease for ") + .append(QueryIdHelper.getQueryId(queryId)) + .append(released ? " (released)" : "") + .toString(); + } + + @Override + public long queryMemoryPerNode() { + return queryMemory; + } + + @Override + public void release() { + EmbeddedQueryQueue.this.release(this); + released = true; + } + + @VisibleForTesting + boolean isReleased() { return released; } + + @Override + public String queueName() { return "local-queue"; } + } + + private final int queueTimeoutMs; + private final int queueSize; + private final Semaphore semaphore; + private long memoryPerQuery; + private final long minimumOperatorMemory; + + public EmbeddedQueryQueue(DrillbitContext context) { + DrillConfig config = context.getConfig(); + queueTimeoutMs = config.getInt(TIMEOUT_MS); + queueSize = config.getInt(QUEUE_SIZE); + semaphore = new Semaphore(queueSize, true); + minimumOperatorMemory = context.getOptionManager() + .getOption(ExecConstants.MIN_MEMORY_PER_BUFFERED_OP); + } + + @Override + public boolean enabled() { return true; } + + @Override + public void setMemoryPerNode(long memoryPerNode) { + memoryPerQuery = memoryPerNode / queueSize; + } + + @Override + public long defaultQueryMemoryPerNode(double cost) { + return memoryPerQuery; + } + + @Override + public QueueLease enqueue(QueryId queryId, double cost) + throws QueueTimeoutException, QueryQueueException { + try { + if (! semaphore.tryAcquire(queueTimeoutMs, TimeUnit.MILLISECONDS) ) { + throw new QueueTimeoutException(queryId, "embedded", queueTimeoutMs); + } + } catch (InterruptedException e) { + throw new QueryQueueException("Interrupted", e); + } + return new EmbeddedQueueLease(queryId, memoryPerQuery); + } + + private void release(EmbeddedQueueLease lease) { + assert ! lease.released; + semaphore.release(); + } + + @Override + public void close() { + assert semaphore.availablePermits() == queueSize; + } + + @Override + public long minimumOperatorMemory() { + return minimumOperatorMemory; + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryQueue.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryQueue.java new file mode 100644 index 000000000..72dc2d63a --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryQueue.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import org.apache.drill.exec.proto.UserBitShared.QueryId; + +/** + * Interface which defines a queue implementation for query queues. + * Implementations can queue locally, queue distributed, or do + * nothing at all. + * <p> + * A queue can report itself as enabled or disabled. When enabled, + * all queries must obtain a lease prior to starting execution. The + * lease must be released at the completion of execution. + */ + +public interface QueryQueue { + + /** + * The opaque lease returned once a query is admitted + * for execution. + */ + + public interface QueueLease { + long queryMemoryPerNode(); + + /** + * Release a query lease obtained from {@link #queue(QueryId, double))}. + * Should be called by the per-query resource manager. + * + * @param lease the lease to be released. + */ + + void release(); + + String queueName(); + }; + + /** + * Exception thrown if a query exceeds the configured wait time + * in the query queue. + */ + + @SuppressWarnings("serial") + public class QueueTimeoutException extends Exception { + + private final QueryId queryId; + private final String queueName; + private final int timeoutMs; + + public QueueTimeoutException(QueryId queryId, String queueName, int timeoutMs) { + super( String.format( + "Query timed out of the %s queue after %d ms.", + queueName, timeoutMs )); + this.queryId = queryId; + this.queueName = queueName; + this.timeoutMs = timeoutMs; + } + + public QueryId queryId() { return queryId; } + public String queueName() { return queueName; } + public int timeoutMs() { return timeoutMs; } + } + + /** + * Exception thrown for all non-timeout error conditions. + */ + + @SuppressWarnings("serial") + public class QueryQueueException extends Exception { + QueryQueueException(String msg, Exception e) { + super(msg, e); + } + } + + void setMemoryPerNode(long memoryPerNode); + + /** + * Return the amount of memory per node when creating a EXPLAIN + * query plan. Plans to be executed should get the query memory from + * the lease, as the lease may adjust the default amount on a per-query + * basis. This means that the memory used to execute the query may + * differ from the amount shown in an EXPLAIN plan. + * + * @return assumed memory per node, in bytes, to use when creating + * an EXPLAIN plan + */ + + long defaultQueryMemoryPerNode(double cost); + + /** + * Optional floor on the amount of memory assigned per operator. + * This ensures that operators receive a certain amount, separate from + * any memory slicing. This can oversubscribe node memory if used + * incorrectly. + * + * @return minimum per-operator memory, in bytes + */ + + long minimumOperatorMemory(); + + /** + * Determine if the queue is enabled. + * @return true if the query is enabled, false otherwise. + */ + + boolean enabled(); + + /** + * Queue a query. The method returns only when the query is admitted for + * execution. As a result, the calling thread may block up to the configured + * wait time. + * @param queryId the query ID + * @param cost the cost of the query used for cost-based queueing + * @return the query lease which must be passed to {@link #release(QueueLease)} + * upon query completion + * @throws QueueTimeoutException if the query times out waiting to be + * admitted. + * @throws QueryQueueException for any other error condition. + */ + + QueueLease enqueue(QueryId queryId, double cost) throws QueueTimeoutException, QueryQueueException; + + void close(); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceAllocator.java new file mode 100644 index 000000000..35dbe5987 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceAllocator.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.work.QueryWorkUnit; + +/** + * Manages resources for an individual query in conjunction with the + * global {@link ResourceManager}. Handles memory and CPU allocation. + * Instances of this class handle query planning and are used when the + * client wants to plan the query, but not execute it. An implementation + * of {@link QueryResourceManager} is used to both plan the query and + * queue it for execution. + * <p> + * This interface allows a variety of resource management strategies to + * exist for different purposes. + * <p> + * The methods here assume external synchronization: a single query calls + * the methods at known times; there are no concurrent calls. + */ + +public interface QueryResourceAllocator { + + /** + * Make any needed adjustments to the query plan before parallelization. + * + * @param plan + */ + void visitAbstractPlan(PhysicalPlan plan); + + /** + * Provide the manager with the physical plan and node assignments + * for the query to be run. This class will plan memory for the query. + * + * @param plan + * @param work + */ + + void visitPhysicalPlan(QueryWorkUnit work); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java new file mode 100644 index 000000000..9e2a3a10f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException; +import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException; + +/** + * Extends a {@link QueryResourceAllocator} to provide queueing support. + */ + +public interface QueryResourceManager extends QueryResourceAllocator { + + /** + * Hint that this resource manager queues. Allows the Foreman + * to short-circuit expensive logic if no queuing will actually + * be done. This is a static attribute per Drillbit run. + */ + + boolean hasQueue(); + + /** + * For some cases the foreman does not have a full plan, just a cost. In + * this case, this object will not plan memory, but still needs the cost + * to place the job into the correct queue. + * @param cost + */ + + void setCost(double cost); + + /** + * Admit the query into the cluster. Blocks until the query + * can run. (Later revisions may use a more thread-friendly + * approach.) + * @throws QueryQueueException if something goes wrong with the + * queue mechanism + * @throws QueueTimeoutException if the query timed out waiting to + * be admitted. + */ + + void admit() throws QueueTimeoutException, QueryQueueException; + + /** + * Returns the name of the queue (if any) on which the query was + * placed. Valid only after the query is admitted. + * + * @return queue name, or null if queuing is not enabled. + */ + + String queueName(); + + /** + * Mark the query as completing, giving up its slot in the + * cluster. Releases any lease that may be held for a system with queues. + */ + + void exit(); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManager.java new file mode 100644 index 000000000..71dabafa7 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManager.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.work.foreman.Foreman; + +/** + * Drillbit-wide resource manager shared by all queries. Manages memory (at + * present) and CPU (planned). Since queries are the primary consumer of + * resources, manages resources by throttling queries into the system, and + * allocating resources to queries in order to control total use. An "null" + * implementation handles the case of no queuing. Clearly, the null case cannot + * effectively control resource use. + */ + +public interface ResourceManager { + + /** + * Returns the memory, in bytes, assigned to each node in a Drill cluster. + * Drill requires that nodes are symmetrical. So, knowing the memory on any + * one node also gives the memory on all other nodes. + * + * @return the memory, in bytes, available in each Drillbit + */ + long memoryPerNode(); + + int cpusPerNode(); + + /** + * Create a resource manager to prepare or describe a query. In this form, no + * queuing is done, but the plan is created as if queuing had been done. Used + * when executing EXPLAIN PLAN. + * + * @return a resource manager for the query + */ + + QueryResourceAllocator newResourceAllocator(QueryContext queryContext); + + /** + * Create a resource manager to execute a query. + * + * @param foreman + * Foreman which manages the execution + * @return a resource manager for the query + */ + + QueryResourceManager newQueryRM(final Foreman foreman); + + void close(); +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManagerBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManagerBuilder.java new file mode 100644 index 000000000..430589135 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ResourceManagerBuilder.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.exec.coord.ClusterCoordinator; +import org.apache.drill.exec.coord.local.LocalClusterCoordinator; +import org.apache.drill.exec.server.DrillbitContext; + +/** + * Builds the proper resource manager and queue implementation for the configured + * system options. + * <p> + * <ul> + * <li>If the Drillbit is embedded<ul> + * <li>If queues are enabled, then the admission-controlled resource manager + * with the local query queue.</li> + * <li>Otherwise, the default resource manager and no queues.</li> + * </ul></li> + * <li>If the Drillbit is in a cluster<ul> + * <li>If queues are enabled, then the admission-controlled resource manager + * with the distributed query queue.</li> + * <li>Otherwise, the default resource manager and no queues.</li> + * </ul></li> + * </ul> + * Configuration settings: + * <dl> + * <dt>Cluster coordinator instance</dt> + * <dd>If an instance of <tt>LocalClusterCoordinator</tt>, the Drillbit is + * embedded, else it is in a cluster.</dd> + * <dt><tt>drill.exec.queue.embedded.enable</tt> boot config<dt> + * <dd>If enabled, and if embedded, then use the local queue.</dd> + * <dt><tt>exec.queue.enable</tt> system option</dt> + * <dd>If enabled, and if in a cluster, then use the distributed queue.</dd> + * </dl> + */ +public class ResourceManagerBuilder { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ResourceManagerBuilder.class); + + private DrillbitContext context ; + + public ResourceManagerBuilder(final DrillbitContext context) { + this.context = context; + } + + @SuppressWarnings("resource") + public ResourceManager build() { + ClusterCoordinator coord = context.getClusterCoordinator(); + DrillConfig config = context.getConfig(); + if (coord instanceof LocalClusterCoordinator) { + if (config.getBoolean(EmbeddedQueryQueue.ENABLED)) { + logger.debug("Enabling embedded, local query queue."); + return new ThrottledResourceManager(context, new EmbeddedQueryQueue(context)); + } else { + logger.debug("No query queueing enabled."); + return new DefaultResourceManager(); + } + } else { + return new DynamicResourceManager(context); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java new file mode 100644 index 000000000..b46fe0933 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.foreman.rm; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; +import org.apache.drill.exec.physical.base.FragmentRoot; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.proto.helper.QueryIdHelper; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.exec.work.QueryWorkUnit; +import org.apache.drill.exec.work.QueryWorkUnit.MinorFragmentDefn; +import org.apache.drill.exec.work.foreman.Foreman; +import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException; +import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueLease; +import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; + +/** + * Global resource manager that provides basic admission control (AC) via a + * configured queue: either the Zookeeper-based distributed queue or the + * in-process embedded Drillbit queue. The queue places an upper limit on the + * number of running queries. This limit then "slices" memory and CPU between + * queries: each gets the same share of resources. + * <p> + * This is a "basic" implementation. Clearly, a more advanced implementation + * could look at query cost to determine whether to give a given query more or + * less than the "standard" share. That is left as a future exercise; in this + * version we just want to get the basics working. + * <p> + * This is the resource manager level. This resource manager is paired with a + * queue implementation to produce a complete solution. This composition-based + * approach allows sharing of functionality across queue implementations. + */ + +public class ThrottledResourceManager extends AbstractResourceManager { + + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory + .getLogger(ThrottledResourceManager.class); + + public static class QueuedResourceAllocator + implements QueryResourceAllocator { + + protected final ThrottledResourceManager rm; + protected QueryContext queryContext; + protected PhysicalPlan plan; + protected QueryWorkUnit work; + protected double queryCost; + + protected QueuedResourceAllocator(final ThrottledResourceManager rm, + QueryContext queryContext) { + this.rm = rm; + this.queryContext = queryContext; + } + + @Override + public void visitAbstractPlan(PhysicalPlan plan) { + this.plan = plan; + queryCost = plan.totalCost(); + } + + @Override + public void visitPhysicalPlan(final QueryWorkUnit work) { + this.work = work; + planMemory(); + } + + private void planMemory() { + if (plan.getProperties().hasResourcePlan) { + logger.debug("Memory already planned."); + return; + } + + // Group fragments by node. + + Map<String, Collection<PhysicalOperator>> nodeMap = buildBufferedOpMap(); + + // Memory must be symmetric to avoid bottlenecks in which one node has + // sorts (say) with less memory than another, causing skew in data arrival + // rates for downstream operators. + + int width = countBufferingOperators(nodeMap); + + // Then, share memory evenly across the + // all sort operators on that node. This handles asymmetric distribution + // such as occurs if a sort appears in the root fragment (the one with + // screen), + // which is never parallelized. + + for (Entry<String, Collection<PhysicalOperator>> entry : nodeMap.entrySet()) { + planNodeMemory(entry.getKey(), entry.getValue(), width); + } + } + + private int countBufferingOperators( + Map<String, Collection<PhysicalOperator>> nodeMap) { + int width = 0; + for (Collection<PhysicalOperator> fragSorts : nodeMap.values()) { + width = Math.max(width, fragSorts.size()); + } + return width; + } + + /** + * Given the set of buffered operators (from any number of fragments) on a + * single node, shared the per-query memory equally across all the + * operators. + * + * @param nodeAddr + * @param bufferedOps + * @param width + */ + + private void planNodeMemory(String nodeAddr, + Collection<PhysicalOperator> bufferedOps, int width) { + + // If no buffering operators, nothing to plan. + + if (bufferedOps.isEmpty()) { + return; + } + + // Divide node memory evenly among the set of operators, in any minor + // fragment, on the node. This is not very sophisticated: it does not + // deal with, say, three stacked sorts in which, if sort A runs, then + // B may be using memory, but C cannot be active. That kind of analysis + // is left as a later exercise. + + long nodeMemory = queryMemoryPerNode(); + + // Set a floor on the amount of memory per operator based on the + // configured minimum. This is likely unhelpful because we are trying + // to work around constrained memory by assuming more than we actually + // have. This may lead to an OOM at run time. + + long preferredOpMemory = nodeMemory / width; + long perOpMemory = Math.max(preferredOpMemory, rm.minimumOperatorMemory()); + if (preferredOpMemory < perOpMemory) { + logger.warn("Preferred per-operator memory: {}, actual amount: {}", + preferredOpMemory, perOpMemory); + } + logger.debug( + "Query: {}, Node: {}, allocating {} bytes each for {} buffered operator(s).", + QueryIdHelper.getQueryId(queryContext.getQueryId()), nodeAddr, + perOpMemory, width); + + for (PhysicalOperator op : bufferedOps) { + + // Limit the memory to the maximum in the plan. Doing so is + // likely unnecessary, and perhaps harmful, because the pre-planned + // allocation is the default maximum hard-coded to 10 GB. This means + // that even if 20 GB is available to the sort, it won't use more + // than 10GB. This is probably more of a bug than a feature. + + long alloc = Math.min(perOpMemory, op.getMaxAllocation()); + + // Place a floor on the memory that is the initial allocation, + // since we don't want the operator to run out of memory when it + // first starts. + + alloc = Math.max(alloc, op.getInitialAllocation()); + + if (alloc > preferredOpMemory && alloc != perOpMemory) { + logger.warn("Allocated memory of {} for {} exceeds available memory of {} " + + "due to operator minimum", + alloc, op.getClass().getSimpleName(), preferredOpMemory); + } + else if (alloc < preferredOpMemory) { + logger.warn("Allocated memory of {} for {} is less than available memory " + + "of {} due to operator limit", + alloc, op.getClass().getSimpleName(), preferredOpMemory); + } + op.setMaxAllocation(alloc); + } + } + + protected long queryMemoryPerNode() { + return rm.defaultQueryMemoryPerNode(plan.totalCost()); + } + + /** + * Build a list of external sorts grouped by node. We start with a list of + * minor fragments, each with an endpoint (node). Multiple minor fragments + * may appear on each node, and each minor fragment may have 0, 1 or more + * sorts. + * + * @return + */ + + private Map<String, Collection<PhysicalOperator>> buildBufferedOpMap() { + Multimap<String, PhysicalOperator> map = ArrayListMultimap.create(); + getBufferedOps(map, work.getRootFragmentDefn()); + for (MinorFragmentDefn defn : work.getMinorFragmentDefns()) { + getBufferedOps(map, defn); + } + return map.asMap(); + } + + /** + * Searches a fragment operator tree to find buffered within that fragment. + */ + + protected static class BufferedOpFinder extends + AbstractPhysicalVisitor<Void, List<PhysicalOperator>, RuntimeException> { + @Override + public Void visitOp(PhysicalOperator op, List<PhysicalOperator> value) + throws RuntimeException { + if (op.isBufferedOperator()) { + value.add(op); + } + visitChildren(op, value); + return null; + } + } + + private void getBufferedOps(Multimap<String, PhysicalOperator> map, + MinorFragmentDefn defn) { + List<PhysicalOperator> bufferedOps = getBufferedOps(defn.root()); + if (!bufferedOps.isEmpty()) { + map.putAll(defn.fragment().getAssignment().getAddress(), bufferedOps); + } + } + + /** + * Search an individual fragment tree to find any buffered operators it may + * contain. + * + * @param root + * @return + */ + + private List<PhysicalOperator> getBufferedOps(FragmentRoot root) { + List<PhysicalOperator> bufferedOps = new ArrayList<>(); + BufferedOpFinder finder = new BufferedOpFinder(); + root.accept(finder, bufferedOps); + return bufferedOps; + } + } + + /** + * Per-query resource manager. Handles resources and optional queue lease for + * a single query. As such, this is a non-shared resource: it is associated + * with a Foreman: a single thread at plan time, and a single event (in some + * thread) at query completion time. Because of these semantics, no + * synchronization is needed within this class. + */ + + public static class QueuedQueryResourceManager extends QueuedResourceAllocator + implements QueryResourceManager { + + private final Foreman foreman; + private QueueLease lease; + + public QueuedQueryResourceManager(final ThrottledResourceManager rm, + final Foreman foreman) { + super(rm, foreman.getQueryContext()); + this.foreman = foreman; + } + + @Override + public void setCost(double cost) { + this.queryCost = cost; + } + + @Override + public void admit() throws QueueTimeoutException, QueryQueueException { + lease = rm.queue().enqueue(foreman.getQueryId(), queryCost); + } + + @Override + protected long queryMemoryPerNode() { + + // No lease: use static estimate. + + if (lease == null) { + return super.queryMemoryPerNode(); + } + + // Use actual memory assigned to this query. + + return lease.queryMemoryPerNode(); + } + + @Override + public void exit() { + if (lease != null) { + lease.release(); + } + lease = null; + } + + @Override + public boolean hasQueue() { return true; } + + @Override + public String queueName() { + return lease == null ? null : lease.queueName(); + } + } + + private final QueryQueue queue; + + public ThrottledResourceManager(final DrillbitContext drillbitContext, + final QueryQueue queue) { + super(drillbitContext); + this.queue = queue; + queue.setMemoryPerNode(memoryPerNode()); + } + + public long minimumOperatorMemory() { + return queue.minimumOperatorMemory(); + } + + public long defaultQueryMemoryPerNode(double cost) { + return queue.defaultQueryMemoryPerNode(cost); + } + + public QueryQueue queue() { return queue; } + + @Override + public QueryResourceAllocator newResourceAllocator( + QueryContext queryContext) { + return new QueuedResourceAllocator(this, queryContext); + } + + @Override + public QueryResourceManager newQueryRM(Foreman foreman) { + return new QueuedQueryResourceManager(this, foreman); + } + + @Override + public void close() { + queue.close(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/package-info.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/package-info.java new file mode 100644 index 000000000..0b9e9da63 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/package-info.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Provides resource management and queuing support for the Drill foreman. + * The resource manager tracks total resources available to Drill. Several + * implementations are available: a default implementation for systems without + * queueing and an access-controlled (AC) version for systems with queues. + * <p> + * Each resource manager provides a per-query manager that is responsible + * for queuing the query (if needed) and memory allocation to the query based + * on query characteristics and memory assigned to the query. + * <p> + * Provides two different queue implementations. A distributed ZooKeeper queue + * and a local queue useful for embedded Drillbits (and for testing.) + */ +package org.apache.drill.exec.work.foreman.rm; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java index 2f945d85f..29b3580d3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -35,9 +35,9 @@ import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments; import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments; import org.apache.drill.exec.rpc.UserClientConnection; import org.apache.drill.exec.server.DrillbitContext; -import org.apache.drill.exec.util.MemoryAllocationUtilities; import org.apache.drill.exec.util.Pointer; import org.apache.drill.exec.work.QueryWorkUnit; +import org.apache.drill.exec.work.foreman.rm.QueryResourceAllocator; import com.google.common.collect.Lists; @@ -60,6 +60,7 @@ public class PlanSplitter { * @param connection * @return */ + @SuppressWarnings("resource") public QueryPlanFragments planFragments(DrillbitContext dContext, QueryId queryId, GetQueryPlanFragments req, UserClientConnection connection) { QueryPlanFragments.Builder responseBuilder = QueryPlanFragments.newBuilder(); @@ -97,7 +98,8 @@ public class PlanSplitter { throw new IllegalStateException("Planning fragments supports only SQL or PHYSICAL QueryType"); } - MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext); + QueryResourceAllocator planner = dContext.getResourceManager().newResourceAllocator(queryContext); + planner.visitAbstractPlan(plan); final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next(); @@ -113,6 +115,8 @@ public class PlanSplitter { queryContext.getSession(), queryContext.getQueryContextInfo()); for (QueryWorkUnit queryWorkUnit : queryWorkUnits) { + planner.visitPhysicalPlan(queryWorkUnit); + queryWorkUnit.applyPlan(dContext.getPlanReader()); fragments.add(queryWorkUnit.getRootFragment()); List<PlanFragment> childFragments = queryWorkUnit.getFragments(); @@ -122,8 +126,10 @@ public class PlanSplitter { } } else { final QueryWorkUnit queryWorkUnit = parallelizer.getFragments(queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(), - queryId, queryContext.getActiveEndpoints(), dContext.getPlanReader(), rootFragment, + queryId, queryContext.getActiveEndpoints(), rootFragment, queryContext.getSession(), queryContext.getQueryContextInfo()); + planner.visitPhysicalPlan(queryWorkUnit); + queryWorkUnit.applyPlan(dContext.getPlanReader()); fragments.add(queryWorkUnit.getRootFragment()); fragments.addAll(queryWorkUnit.getFragments()); } |