diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java | 218 |
1 files changed, 126 insertions, 92 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java index 39b699fe4..a434bf80b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java @@ -21,9 +21,12 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Consumer; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.util.DrillStringUtils; +import org.apache.drill.common.util.function.CheckedConsumer; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.physical.MinorFragmentEndpoint; @@ -61,7 +64,7 @@ import org.apache.drill.exec.work.foreman.ForemanSetupException; * parallelization for each major fragment will be determined. Once the amount of parallelization is done, assignment * is done based on round robin assignment ordered by operator affinity (locality) to available execution Drillbits. */ -public class SimpleParallelizer implements ParallelizationParameters { +public abstract class SimpleParallelizer implements QueryParallelizer { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleParallelizer.class); private final long parallelizationThreshold; @@ -69,7 +72,7 @@ public class SimpleParallelizer implements ParallelizationParameters { private final int maxGlobalWidth; private final double affinityFactor; - public SimpleParallelizer(QueryContext context) { + protected SimpleParallelizer(QueryContext context) { OptionManager optionManager = context.getOptions(); long sliceTarget = optionManager.getOption(ExecConstants.SLICE_TARGET_OPTION); this.parallelizationThreshold = sliceTarget > 0 ? sliceTarget : 1; @@ -81,7 +84,7 @@ public class SimpleParallelizer implements ParallelizationParameters { this.affinityFactor = optionManager.getOption(ExecConstants.AFFINITY_FACTOR_KEY).float_val.intValue(); } - public SimpleParallelizer(long parallelizationThreshold, int maxWidthPerNode, int maxGlobalWidth, double affinityFactor) { + protected SimpleParallelizer(long parallelizationThreshold, int maxWidthPerNode, int maxGlobalWidth, double affinityFactor) { this.parallelizationThreshold = parallelizationThreshold; this.maxWidthPerNode = maxWidthPerNode; this.maxGlobalWidth = maxGlobalWidth; @@ -108,27 +111,106 @@ public class SimpleParallelizer implements ParallelizationParameters { return affinityFactor; } + public Set<Wrapper> getRootFragments(PlanningSet planningSet) { + //The following code gets the root fragment by removing all the dependent fragments on which root fragments depend upon. + //This is fine because the later parallelizer code traverses from these root fragments to their respective dependent + //fragments. + final Set<Wrapper> roots = Sets.newHashSet(); + for(Wrapper w : planningSet) { + roots.add(w); + } + + //roots will be left over with the fragments which are not depended upon by any other fragments. + for(Wrapper wrapper : planningSet) { + final List<Wrapper> fragmentDependencies = wrapper.getFragmentDependencies(); + if (fragmentDependencies != null && fragmentDependencies.size() > 0) { + for(Wrapper dependency : fragmentDependencies) { + if (roots.contains(dependency)) { + roots.remove(dependency); + } + } + } + } + + return roots; + } + + public PlanningSet prepareFragmentTree(Fragment rootFragment) { + PlanningSet planningSet = new PlanningSet(); + + initFragmentWrappers(rootFragment, planningSet); + + constructFragmentDependencyGraph(rootFragment, planningSet); + + return planningSet; + } + /** - * Generate a set of assigned fragments based on the provided fragment tree. Do not allow parallelization stages - * to go beyond the global max width. + * Traverse all the major fragments and parallelize each major fragment based on + * collected stats. The children fragments are parallelized before a parent + * fragment. + * @param planningSet Set of all major fragments and their context. + * @param roots Root nodes of the plan. + * @param activeEndpoints currently active drillbit endpoints. + * @throws PhysicalOperatorSetupException + */ + public void collectStatsAndParallelizeFragments(PlanningSet planningSet, Set<Wrapper> roots, + Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException { + for (Wrapper wrapper : roots) { + traverse(wrapper, CheckedConsumer.throwingConsumerWrapper((Wrapper fragmentWrapper) -> { + // If this fragment is already parallelized then no need do it again. + // This happens in the case of fragments which have MUX operators. + if (fragmentWrapper.isEndpointsAssignmentDone()) { + return; + } + fragmentWrapper.getNode().getRoot().accept(new StatsCollector(planningSet), fragmentWrapper); + fragmentWrapper.getStats() + .getDistributionAffinity() + .getFragmentParallelizer() + .parallelizeFragment(fragmentWrapper, this, activeEndpoints); + //consolidate the cpu resources required by this major fragment per drillbit. + fragmentWrapper.computeCpuResources(); + })); + } + } + + public abstract void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots, + Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException; + + /** + * The starting function for the whole parallelization and memory computation logic. + * 1) Initially a fragment tree is prepared which contains a wrapper for each fragment. + * The topology of this tree is same as that of the major fragment tree. + * 2) Traverse this fragment tree to collect stats for each major fragment and then + * parallelize each fragment. At this stage minor fragments are not created but all + * the required information to create minor fragment are computed. + * 3) Memory is computed for each operator and for the minor fragment. + * 4) Lastly all the above computed information is used to create the minor fragments + * for each major fragment. * - * @param options Option list - * @param foremanNode The driving/foreman node for this query. (this node) - * @param queryId The queryId for this query. - * @param activeEndpoints The list of endpoints to consider for inclusion in planning this query. - * @param rootFragment The root node of the PhysicalPlan that we will be parallelizing. - * @param session UserSession of user who launched this query. - * @param queryContextInfo Info related to the context when query has started. - * @return The list of generated PlanFragment protobuf objects to be assigned out to the individual nodes. + * @param options List of options set by the user. + * @param foremanNode foreman node for this query plan. + * @param queryId Query ID. + * @param activeEndpoints currently active endpoins on which this plan will run. + * @param rootFragment Root major fragment. + * @param session session context. + * @param queryContextInfo query context. + * @return * @throws ExecutionSetupException */ - public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, - Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment, - UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException { + @Override + public final QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, + Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment, + UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException { + PlanningSet planningSet = prepareFragmentTree(rootFragment); + + Set<Wrapper> rootFragments = getRootFragments(planningSet); - final PlanningSet planningSet = getFragmentsHelper(activeEndpoints, rootFragment); - return generateWorkUnit( - options, foremanNode, queryId, rootFragment, planningSet, session, queryContextInfo); + collectStatsAndParallelizeFragments(planningSet, rootFragments, activeEndpoints); + + adjustMemory(planningSet, rootFragments, activeEndpoints); + + return generateWorkUnit(options, foremanNode, queryId, rootFragment, planningSet, session, queryContextInfo); } /** @@ -151,28 +233,7 @@ public class SimpleParallelizer implements ParallelizationParameters { throw new UnsupportedOperationException("Use children classes"); } - /** - * Helper method to reuse the code for QueryWorkUnit(s) generation - * @param activeEndpoints - * @param rootFragment - * @return A {@link PlanningSet}. - * @throws ExecutionSetupException - */ - protected PlanningSet getFragmentsHelper(Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment) throws ExecutionSetupException { - - PlanningSet planningSet = new PlanningSet(); - initFragmentWrappers(rootFragment, planningSet); - - final Set<Wrapper> leafFragments = constructFragmentDependencyGraph(planningSet); - - // Start parallelizing from leaf fragments - for (Wrapper wrapper : leafFragments) { - parallelizeFragment(wrapper, planningSet, activeEndpoints); - } - planningSet.findRootWrapper(rootFragment); - return planningSet; - } // For every fragment, create a Wrapper in PlanningSet. @VisibleForTesting @@ -190,77 +251,49 @@ public class SimpleParallelizer implements ParallelizationParameters { * @param planningSet * @return Returns a list of leaf fragments in fragment dependency graph. */ - private static Set<Wrapper> constructFragmentDependencyGraph(PlanningSet planningSet) { + private void constructFragmentDependencyGraph(Fragment rootFragment, PlanningSet planningSet) { // Set up dependency of fragments based on the affinity of exchange that separates the fragments. - for (Wrapper currentFragmentWrapper : planningSet) { - ExchangeFragmentPair sendingExchange = currentFragmentWrapper.getNode().getSendingExchangePair(); - if (sendingExchange != null) { - ParallelizationDependency dependency = sendingExchange.getExchange().getParallelizationDependency(); - Wrapper receivingFragmentWrapper = planningSet.get(sendingExchange.getNode()); - + for(Wrapper currentFragment : planningSet) { + ExchangeFragmentPair sendingXchgForCurrFrag = currentFragment.getNode().getSendingExchangePair(); + if (sendingXchgForCurrFrag != null) { + ParallelizationDependency dependency = sendingXchgForCurrFrag.getExchange().getParallelizationDependency(); + Wrapper receivingFragmentWrapper = planningSet.get(sendingXchgForCurrFrag.getNode()); + + //Mostly Receivers of the current fragment depend on the sender of the child fragments. However there is a special case + //for DeMux Exchanges where the Sender of the current fragment depends on the receiver of the parent fragment. if (dependency == ParallelizationDependency.RECEIVER_DEPENDS_ON_SENDER) { - receivingFragmentWrapper.addFragmentDependency(currentFragmentWrapper); + receivingFragmentWrapper.addFragmentDependency(currentFragment); } else if (dependency == ParallelizationDependency.SENDER_DEPENDS_ON_RECEIVER) { - currentFragmentWrapper.addFragmentDependency(receivingFragmentWrapper); + currentFragment.addFragmentDependency(receivingFragmentWrapper); } } } - - // Identify leaf fragments. Leaf fragments are fragments that have no other fragments depending on them for - // parallelization info. First assume all fragments are leaf fragments. Go through the fragments one by one and - // remove the fragment on which the current fragment depends. - - final Set<Wrapper> roots = Sets.newHashSet(); - for (Wrapper w : planningSet) { - roots.add(w); - } - - for (Wrapper wrapper : planningSet) { - final List<Wrapper> fragmentDependencies = wrapper.getFragmentDependencies(); - if (fragmentDependencies != null && fragmentDependencies.size() > 0) { - for (Wrapper dependency : fragmentDependencies) { - if (roots.contains(dependency)) { - roots.remove(dependency); - } - } - } - } - - return roots; + planningSet.findRootWrapper(rootFragment); } + /** - * Helper method for parallelizing a given fragment. Dependent fragments are parallelized first before - * parallelizing the given fragment. + * Helper method to call operation on each fragment. Traversal calls operation on child fragments before + * calling it on the parent fragment. */ - private void parallelizeFragment(Wrapper fragmentWrapper, PlanningSet planningSet, - Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException { - // If the fragment is already parallelized, return. - if (fragmentWrapper.isEndpointsAssignmentDone()) { - return; - } + protected void traverse(Wrapper fragmentWrapper, Consumer<Wrapper> operation) throws PhysicalOperatorSetupException { - // First parallelize fragments on which this fragment depends. final List<Wrapper> fragmentDependencies = fragmentWrapper.getFragmentDependencies(); if (fragmentDependencies != null && fragmentDependencies.size() > 0) { for(Wrapper dependency : fragmentDependencies) { - parallelizeFragment(dependency, planningSet, activeEndpoints); + traverse(dependency, operation); } } - - // Find stats. Stats include various factors including cost of physical operators, parallelizability of - // work in physical operator and affinity of physical operator to certain nodes. - fragmentWrapper.getNode().getRoot().accept(new StatsCollector(planningSet), fragmentWrapper); - - fragmentWrapper.getStats().getDistributionAffinity() - .getFragmentParallelizer() - .parallelizeFragment(fragmentWrapper, this, activeEndpoints); + operation.accept(fragmentWrapper); } + // A function which returns the memory assigned for a particular physical operator. + protected abstract BiFunction<DrillbitEndpoint, PhysicalOperator, Long> getMemory(); + protected QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, - Fragment rootNode, PlanningSet planningSet, - UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException { + Fragment rootNode, PlanningSet planningSet, UserSession session, + QueryContextInformation queryContextInfo) throws ExecutionSetupException { List<MinorFragmentDefn> fragmentDefns = new ArrayList<>( ); MinorFragmentDefn rootFragmentDefn = null; @@ -282,7 +315,8 @@ public class SimpleParallelizer implements ParallelizationParameters { // Create a minorFragment for each major fragment. for (int minorFragmentId = 0; minorFragmentId < wrapper.getWidth(); minorFragmentId++) { - IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper); + IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper, + (fragmentWrapper, minorFragment) -> fragmentWrapper.getAssignedEndpoint(minorFragment), getMemory()); wrapper.resetAllocation(); PhysicalOperator op = physicalOperatorRoot.accept(Materializer.INSTANCE, iNode); Preconditions.checkArgument(op instanceof FragmentRoot); |