aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java218
1 files changed, 126 insertions, 92 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index 39b699fe4..a434bf80b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -21,9 +21,12 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.util.DrillStringUtils;
+import org.apache.drill.common.util.function.CheckedConsumer;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
@@ -61,7 +64,7 @@ import org.apache.drill.exec.work.foreman.ForemanSetupException;
* parallelization for each major fragment will be determined. Once the amount of parallelization is done, assignment
* is done based on round robin assignment ordered by operator affinity (locality) to available execution Drillbits.
*/
-public class SimpleParallelizer implements ParallelizationParameters {
+public abstract class SimpleParallelizer implements QueryParallelizer {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleParallelizer.class);
private final long parallelizationThreshold;
@@ -69,7 +72,7 @@ public class SimpleParallelizer implements ParallelizationParameters {
private final int maxGlobalWidth;
private final double affinityFactor;
- public SimpleParallelizer(QueryContext context) {
+ protected SimpleParallelizer(QueryContext context) {
OptionManager optionManager = context.getOptions();
long sliceTarget = optionManager.getOption(ExecConstants.SLICE_TARGET_OPTION);
this.parallelizationThreshold = sliceTarget > 0 ? sliceTarget : 1;
@@ -81,7 +84,7 @@ public class SimpleParallelizer implements ParallelizationParameters {
this.affinityFactor = optionManager.getOption(ExecConstants.AFFINITY_FACTOR_KEY).float_val.intValue();
}
- public SimpleParallelizer(long parallelizationThreshold, int maxWidthPerNode, int maxGlobalWidth, double affinityFactor) {
+ protected SimpleParallelizer(long parallelizationThreshold, int maxWidthPerNode, int maxGlobalWidth, double affinityFactor) {
this.parallelizationThreshold = parallelizationThreshold;
this.maxWidthPerNode = maxWidthPerNode;
this.maxGlobalWidth = maxGlobalWidth;
@@ -108,27 +111,106 @@ public class SimpleParallelizer implements ParallelizationParameters {
return affinityFactor;
}
+ public Set<Wrapper> getRootFragments(PlanningSet planningSet) {
+ //The following code gets the root fragment by removing all the dependent fragments on which root fragments depend upon.
+ //This is fine because the later parallelizer code traverses from these root fragments to their respective dependent
+ //fragments.
+ final Set<Wrapper> roots = Sets.newHashSet();
+ for(Wrapper w : planningSet) {
+ roots.add(w);
+ }
+
+ //roots will be left over with the fragments which are not depended upon by any other fragments.
+ for(Wrapper wrapper : planningSet) {
+ final List<Wrapper> fragmentDependencies = wrapper.getFragmentDependencies();
+ if (fragmentDependencies != null && fragmentDependencies.size() > 0) {
+ for(Wrapper dependency : fragmentDependencies) {
+ if (roots.contains(dependency)) {
+ roots.remove(dependency);
+ }
+ }
+ }
+ }
+
+ return roots;
+ }
+
+ public PlanningSet prepareFragmentTree(Fragment rootFragment) {
+ PlanningSet planningSet = new PlanningSet();
+
+ initFragmentWrappers(rootFragment, planningSet);
+
+ constructFragmentDependencyGraph(rootFragment, planningSet);
+
+ return planningSet;
+ }
+
/**
- * Generate a set of assigned fragments based on the provided fragment tree. Do not allow parallelization stages
- * to go beyond the global max width.
+ * Traverse all the major fragments and parallelize each major fragment based on
+ * collected stats. The children fragments are parallelized before a parent
+ * fragment.
+ * @param planningSet Set of all major fragments and their context.
+ * @param roots Root nodes of the plan.
+ * @param activeEndpoints currently active drillbit endpoints.
+ * @throws PhysicalOperatorSetupException
+ */
+ public void collectStatsAndParallelizeFragments(PlanningSet planningSet, Set<Wrapper> roots,
+ Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException {
+ for (Wrapper wrapper : roots) {
+ traverse(wrapper, CheckedConsumer.throwingConsumerWrapper((Wrapper fragmentWrapper) -> {
+ // If this fragment is already parallelized then no need do it again.
+ // This happens in the case of fragments which have MUX operators.
+ if (fragmentWrapper.isEndpointsAssignmentDone()) {
+ return;
+ }
+ fragmentWrapper.getNode().getRoot().accept(new StatsCollector(planningSet), fragmentWrapper);
+ fragmentWrapper.getStats()
+ .getDistributionAffinity()
+ .getFragmentParallelizer()
+ .parallelizeFragment(fragmentWrapper, this, activeEndpoints);
+ //consolidate the cpu resources required by this major fragment per drillbit.
+ fragmentWrapper.computeCpuResources();
+ }));
+ }
+ }
+
+ public abstract void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
+ Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException;
+
+ /**
+ * The starting function for the whole parallelization and memory computation logic.
+ * 1) Initially a fragment tree is prepared which contains a wrapper for each fragment.
+ * The topology of this tree is same as that of the major fragment tree.
+ * 2) Traverse this fragment tree to collect stats for each major fragment and then
+ * parallelize each fragment. At this stage minor fragments are not created but all
+ * the required information to create minor fragment are computed.
+ * 3) Memory is computed for each operator and for the minor fragment.
+ * 4) Lastly all the above computed information is used to create the minor fragments
+ * for each major fragment.
*
- * @param options Option list
- * @param foremanNode The driving/foreman node for this query. (this node)
- * @param queryId The queryId for this query.
- * @param activeEndpoints The list of endpoints to consider for inclusion in planning this query.
- * @param rootFragment The root node of the PhysicalPlan that we will be parallelizing.
- * @param session UserSession of user who launched this query.
- * @param queryContextInfo Info related to the context when query has started.
- * @return The list of generated PlanFragment protobuf objects to be assigned out to the individual nodes.
+ * @param options List of options set by the user.
+ * @param foremanNode foreman node for this query plan.
+ * @param queryId Query ID.
+ * @param activeEndpoints currently active endpoins on which this plan will run.
+ * @param rootFragment Root major fragment.
+ * @param session session context.
+ * @param queryContextInfo query context.
+ * @return
* @throws ExecutionSetupException
*/
- public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,
- Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment,
- UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException {
+ @Override
+ public final QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,
+ Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment,
+ UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException {
+ PlanningSet planningSet = prepareFragmentTree(rootFragment);
+
+ Set<Wrapper> rootFragments = getRootFragments(planningSet);
- final PlanningSet planningSet = getFragmentsHelper(activeEndpoints, rootFragment);
- return generateWorkUnit(
- options, foremanNode, queryId, rootFragment, planningSet, session, queryContextInfo);
+ collectStatsAndParallelizeFragments(planningSet, rootFragments, activeEndpoints);
+
+ adjustMemory(planningSet, rootFragments, activeEndpoints);
+
+ return generateWorkUnit(options, foremanNode, queryId, rootFragment, planningSet, session, queryContextInfo);
}
/**
@@ -151,28 +233,7 @@ public class SimpleParallelizer implements ParallelizationParameters {
throw new UnsupportedOperationException("Use children classes");
}
- /**
- * Helper method to reuse the code for QueryWorkUnit(s) generation
- * @param activeEndpoints
- * @param rootFragment
- * @return A {@link PlanningSet}.
- * @throws ExecutionSetupException
- */
- protected PlanningSet getFragmentsHelper(Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment) throws ExecutionSetupException {
-
- PlanningSet planningSet = new PlanningSet();
- initFragmentWrappers(rootFragment, planningSet);
-
- final Set<Wrapper> leafFragments = constructFragmentDependencyGraph(planningSet);
-
- // Start parallelizing from leaf fragments
- for (Wrapper wrapper : leafFragments) {
- parallelizeFragment(wrapper, planningSet, activeEndpoints);
- }
- planningSet.findRootWrapper(rootFragment);
- return planningSet;
- }
// For every fragment, create a Wrapper in PlanningSet.
@VisibleForTesting
@@ -190,77 +251,49 @@ public class SimpleParallelizer implements ParallelizationParameters {
* @param planningSet
* @return Returns a list of leaf fragments in fragment dependency graph.
*/
- private static Set<Wrapper> constructFragmentDependencyGraph(PlanningSet planningSet) {
+ private void constructFragmentDependencyGraph(Fragment rootFragment, PlanningSet planningSet) {
// Set up dependency of fragments based on the affinity of exchange that separates the fragments.
- for (Wrapper currentFragmentWrapper : planningSet) {
- ExchangeFragmentPair sendingExchange = currentFragmentWrapper.getNode().getSendingExchangePair();
- if (sendingExchange != null) {
- ParallelizationDependency dependency = sendingExchange.getExchange().getParallelizationDependency();
- Wrapper receivingFragmentWrapper = planningSet.get(sendingExchange.getNode());
-
+ for(Wrapper currentFragment : planningSet) {
+ ExchangeFragmentPair sendingXchgForCurrFrag = currentFragment.getNode().getSendingExchangePair();
+ if (sendingXchgForCurrFrag != null) {
+ ParallelizationDependency dependency = sendingXchgForCurrFrag.getExchange().getParallelizationDependency();
+ Wrapper receivingFragmentWrapper = planningSet.get(sendingXchgForCurrFrag.getNode());
+
+ //Mostly Receivers of the current fragment depend on the sender of the child fragments. However there is a special case
+ //for DeMux Exchanges where the Sender of the current fragment depends on the receiver of the parent fragment.
if (dependency == ParallelizationDependency.RECEIVER_DEPENDS_ON_SENDER) {
- receivingFragmentWrapper.addFragmentDependency(currentFragmentWrapper);
+ receivingFragmentWrapper.addFragmentDependency(currentFragment);
} else if (dependency == ParallelizationDependency.SENDER_DEPENDS_ON_RECEIVER) {
- currentFragmentWrapper.addFragmentDependency(receivingFragmentWrapper);
+ currentFragment.addFragmentDependency(receivingFragmentWrapper);
}
}
}
-
- // Identify leaf fragments. Leaf fragments are fragments that have no other fragments depending on them for
- // parallelization info. First assume all fragments are leaf fragments. Go through the fragments one by one and
- // remove the fragment on which the current fragment depends.
-
- final Set<Wrapper> roots = Sets.newHashSet();
- for (Wrapper w : planningSet) {
- roots.add(w);
- }
-
- for (Wrapper wrapper : planningSet) {
- final List<Wrapper> fragmentDependencies = wrapper.getFragmentDependencies();
- if (fragmentDependencies != null && fragmentDependencies.size() > 0) {
- for (Wrapper dependency : fragmentDependencies) {
- if (roots.contains(dependency)) {
- roots.remove(dependency);
- }
- }
- }
- }
-
- return roots;
+ planningSet.findRootWrapper(rootFragment);
}
+
/**
- * Helper method for parallelizing a given fragment. Dependent fragments are parallelized first before
- * parallelizing the given fragment.
+ * Helper method to call operation on each fragment. Traversal calls operation on child fragments before
+ * calling it on the parent fragment.
*/
- private void parallelizeFragment(Wrapper fragmentWrapper, PlanningSet planningSet,
- Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException {
- // If the fragment is already parallelized, return.
- if (fragmentWrapper.isEndpointsAssignmentDone()) {
- return;
- }
+ protected void traverse(Wrapper fragmentWrapper, Consumer<Wrapper> operation) throws PhysicalOperatorSetupException {
- // First parallelize fragments on which this fragment depends.
final List<Wrapper> fragmentDependencies = fragmentWrapper.getFragmentDependencies();
if (fragmentDependencies != null && fragmentDependencies.size() > 0) {
for(Wrapper dependency : fragmentDependencies) {
- parallelizeFragment(dependency, planningSet, activeEndpoints);
+ traverse(dependency, operation);
}
}
-
- // Find stats. Stats include various factors including cost of physical operators, parallelizability of
- // work in physical operator and affinity of physical operator to certain nodes.
- fragmentWrapper.getNode().getRoot().accept(new StatsCollector(planningSet), fragmentWrapper);
-
- fragmentWrapper.getStats().getDistributionAffinity()
- .getFragmentParallelizer()
- .parallelizeFragment(fragmentWrapper, this, activeEndpoints);
+ operation.accept(fragmentWrapper);
}
+ // A function which returns the memory assigned for a particular physical operator.
+ protected abstract BiFunction<DrillbitEndpoint, PhysicalOperator, Long> getMemory();
+
protected QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,
- Fragment rootNode, PlanningSet planningSet,
- UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException {
+ Fragment rootNode, PlanningSet planningSet, UserSession session,
+ QueryContextInformation queryContextInfo) throws ExecutionSetupException {
List<MinorFragmentDefn> fragmentDefns = new ArrayList<>( );
MinorFragmentDefn rootFragmentDefn = null;
@@ -282,7 +315,8 @@ public class SimpleParallelizer implements ParallelizationParameters {
// Create a minorFragment for each major fragment.
for (int minorFragmentId = 0; minorFragmentId < wrapper.getWidth(); minorFragmentId++) {
- IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper);
+ IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper,
+ (fragmentWrapper, minorFragment) -> fragmentWrapper.getAssignedEndpoint(minorFragment), getMemory());
wrapper.resetAllocation();
PhysicalOperator op = physicalOperatorRoot.accept(Materializer.INSTANCE, iNode);
Preconditions.checkArgument(op instanceof FragmentRoot);