diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java | 21 |
1 files changed, 12 insertions, 9 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 37b8a4073..521e50a35 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -38,7 +38,6 @@ import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.planner.fragment.Fragment; import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor; -import org.apache.drill.exec.planner.fragment.SimpleParallelizer; import org.apache.drill.exec.planner.sql.DirectPlan; import org.apache.drill.exec.planner.sql.DrillSqlWorker; import org.apache.drill.exec.proto.BitControl.PlanFragment; @@ -408,7 +407,7 @@ public class Foreman implements Runnable { validatePlan(plan); queryRM.visitAbstractPlan(plan); - final QueryWorkUnit work = getQueryWorkUnit(plan); + final QueryWorkUnit work = getQueryWorkUnit(plan, queryRM); if (enableRuntimeFilter) { runtimeFilterRouter = new RuntimeFilterRouter(work, drillbitContext); runtimeFilterRouter.collectRuntimeFilterParallelAndControlInfo(); @@ -463,7 +462,7 @@ public class Foreman implements Runnable { } catch (IOException e) { throw new ExecutionSetupException(String.format("Unable to parse FragmentRoot from fragment: %s", rootFragment.getFragmentJson())); } - queryRM.setCost(rootOperator.getCost()); + queryRM.setCost(rootOperator.getCost().getOutputRowCount()); fragmentsRunner.setFragmentsInfo(planFragments, rootFragment, rootOperator); @@ -561,14 +560,18 @@ public class Foreman implements Runnable { } } - private QueryWorkUnit getQueryWorkUnit(final PhysicalPlan plan) throws ExecutionSetupException { + private QueryWorkUnit getQueryWorkUnit(final PhysicalPlan plan, + final QueryResourceManager rm) throws ExecutionSetupException { + final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next(); + final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null); - final SimpleParallelizer parallelizer = new SimpleParallelizer(queryContext); - return parallelizer.getFragments( - queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(), - queryId, queryContext.getOnlineEndpoints(), rootFragment, - initiatingClient.getSession(), queryContext.getQueryContextInfo()); + + return rm.getParallelizer(plan.getProperties().hasResourcePlan).generateWorkUnit(queryContext.getOptions().getOptionList(), + queryContext.getCurrentEndpoint(), + queryId, queryContext.getOnlineEndpoints(), + rootFragment, initiatingClient.getSession(), + queryContext.getQueryContextInfo()); } private void logWorkUnit(QueryWorkUnit queryWorkUnit) { |