aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java28
1 files changed, 19 insertions, 9 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java
index 70d4e268a..c1250e3ac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.fragment.contrib;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.util.DrillStringUtils;
@@ -28,9 +29,9 @@ import org.apache.drill.exec.physical.base.Exchange;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.fragment.DefaultQueryParallelizer;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.PlanningSet;
-import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
import org.apache.drill.exec.planner.fragment.Wrapper;
import org.apache.drill.exec.planner.fragment.Materializer.IndexedFragmentNode;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
@@ -56,12 +57,12 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
* allows not to pollute parent class with non-authentic functionality
*
*/
-public class SplittingParallelizer extends SimpleParallelizer {
+public class SplittingParallelizer extends DefaultQueryParallelizer {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SplittingParallelizer.class);
- public SplittingParallelizer(QueryContext context) {
- super(context);
+ public SplittingParallelizer(boolean doMemoryPlanning, QueryContext context) {
+ super(doMemoryPlanning, context);
}
/**
@@ -81,7 +82,13 @@ public class SplittingParallelizer extends SimpleParallelizer {
Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment rootFragment,
UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException {
- final PlanningSet planningSet = getFragmentsHelper(activeEndpoints, rootFragment);
+ final PlanningSet planningSet = this.prepareFragmentTree(rootFragment);
+
+ Set<Wrapper> rootFragments = getRootFragments(planningSet);
+
+ collectStatsAndParallelizeFragments(planningSet, rootFragments, activeEndpoints);
+
+ adjustMemory(planningSet, rootFragments, activeEndpoints);
return generateWorkUnits(
options, foremanNode, queryId, reader, rootFragment, planningSet, session, queryContextInfo);
@@ -113,7 +120,7 @@ public class SplittingParallelizer extends SimpleParallelizer {
List<QueryWorkUnit> workUnits = Lists.newArrayList();
int plansCount = 0;
- DrillbitEndpoint[] endPoints = null;
+ DrillbitEndpoint[] leafFragEndpoints = null;
long initialAllocation = 0;
final Iterator<Wrapper> iter = planningSet.iterator();
@@ -130,12 +137,14 @@ public class SplittingParallelizer extends SimpleParallelizer {
// allocation
plansCount = wrapper.getWidth();
initialAllocation = (wrapper.getInitialAllocation() != 0 ) ? wrapper.getInitialAllocation()/plansCount : 0;
- endPoints = new DrillbitEndpoint[plansCount];
+ leafFragEndpoints = new DrillbitEndpoint[plansCount];
for (int mfId = 0; mfId < plansCount; mfId++) {
- endPoints[mfId] = wrapper.getAssignedEndpoint(mfId);
+ leafFragEndpoints[mfId] = wrapper.getAssignedEndpoint(mfId);
}
}
}
+
+ DrillbitEndpoint[] endPoints = leafFragEndpoints;
if ( plansCount == 0 ) {
// no exchange, return list of single QueryWorkUnit
workUnits.add(generateWorkUnit(options, foremanNode, queryId, rootNode, planningSet, session, queryContextInfo));
@@ -174,7 +183,8 @@ public class SplittingParallelizer extends SimpleParallelizer {
MinorFragmentDefn rootFragment = null;
FragmentRoot rootOperator = null;
- IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper);
+ IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper,
+ (fragmentWrapper, minorFragment) -> endPoints[minorFragment],getMemory());
wrapper.resetAllocation();
// two visitors here
// 1. To remove exchange