diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java | 28 |
1 files changed, 19 insertions, 9 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java index 70d4e268a..c1250e3ac 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.fragment.contrib; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Set; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.util.DrillStringUtils; @@ -28,9 +29,9 @@ import org.apache.drill.exec.physical.base.Exchange; import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.planner.PhysicalPlanReader; +import org.apache.drill.exec.planner.fragment.DefaultQueryParallelizer; import org.apache.drill.exec.planner.fragment.Fragment; import org.apache.drill.exec.planner.fragment.PlanningSet; -import org.apache.drill.exec.planner.fragment.SimpleParallelizer; import org.apache.drill.exec.planner.fragment.Wrapper; import org.apache.drill.exec.planner.fragment.Materializer.IndexedFragmentNode; import org.apache.drill.exec.proto.BitControl.PlanFragment; @@ -56,12 +57,12 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists; * allows not to pollute parent class with non-authentic functionality * */ -public class SplittingParallelizer extends SimpleParallelizer { +public class SplittingParallelizer extends DefaultQueryParallelizer { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SplittingParallelizer.class); - public SplittingParallelizer(QueryContext context) { - super(context); + public SplittingParallelizer(boolean doMemoryPlanning, QueryContext context) { + super(doMemoryPlanning, context); } /** @@ -81,7 +82,13 @@ public class SplittingParallelizer extends SimpleParallelizer { Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment rootFragment, UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException { - final PlanningSet planningSet = getFragmentsHelper(activeEndpoints, rootFragment); + final PlanningSet planningSet = this.prepareFragmentTree(rootFragment); + + Set<Wrapper> rootFragments = getRootFragments(planningSet); + + collectStatsAndParallelizeFragments(planningSet, rootFragments, activeEndpoints); + + adjustMemory(planningSet, rootFragments, activeEndpoints); return generateWorkUnits( options, foremanNode, queryId, reader, rootFragment, planningSet, session, queryContextInfo); @@ -113,7 +120,7 @@ public class SplittingParallelizer extends SimpleParallelizer { List<QueryWorkUnit> workUnits = Lists.newArrayList(); int plansCount = 0; - DrillbitEndpoint[] endPoints = null; + DrillbitEndpoint[] leafFragEndpoints = null; long initialAllocation = 0; final Iterator<Wrapper> iter = planningSet.iterator(); @@ -130,12 +137,14 @@ public class SplittingParallelizer extends SimpleParallelizer { // allocation plansCount = wrapper.getWidth(); initialAllocation = (wrapper.getInitialAllocation() != 0 ) ? wrapper.getInitialAllocation()/plansCount : 0; - endPoints = new DrillbitEndpoint[plansCount]; + leafFragEndpoints = new DrillbitEndpoint[plansCount]; for (int mfId = 0; mfId < plansCount; mfId++) { - endPoints[mfId] = wrapper.getAssignedEndpoint(mfId); + leafFragEndpoints[mfId] = wrapper.getAssignedEndpoint(mfId); } } } + + DrillbitEndpoint[] endPoints = leafFragEndpoints; if ( plansCount == 0 ) { // no exchange, return list of single QueryWorkUnit workUnits.add(generateWorkUnit(options, foremanNode, queryId, rootNode, planningSet, session, queryContextInfo)); @@ -174,7 +183,8 @@ public class SplittingParallelizer extends SimpleParallelizer { MinorFragmentDefn rootFragment = null; FragmentRoot rootOperator = null; - IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper); + IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper, + (fragmentWrapper, minorFragment) -> endPoints[minorFragment],getMemory()); wrapper.resetAllocation(); // two visitors here // 1. To remove exchange |