aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java21
1 files changed, 12 insertions, 9 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 37b8a4073..521e50a35 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -38,7 +38,6 @@ import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
-import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
import org.apache.drill.exec.planner.sql.DirectPlan;
import org.apache.drill.exec.planner.sql.DrillSqlWorker;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
@@ -408,7 +407,7 @@ public class Foreman implements Runnable {
validatePlan(plan);
queryRM.visitAbstractPlan(plan);
- final QueryWorkUnit work = getQueryWorkUnit(plan);
+ final QueryWorkUnit work = getQueryWorkUnit(plan, queryRM);
if (enableRuntimeFilter) {
runtimeFilterRouter = new RuntimeFilterRouter(work, drillbitContext);
runtimeFilterRouter.collectRuntimeFilterParallelAndControlInfo();
@@ -463,7 +462,7 @@ public class Foreman implements Runnable {
} catch (IOException e) {
throw new ExecutionSetupException(String.format("Unable to parse FragmentRoot from fragment: %s", rootFragment.getFragmentJson()));
}
- queryRM.setCost(rootOperator.getCost());
+ queryRM.setCost(rootOperator.getCost().getOutputRowCount());
fragmentsRunner.setFragmentsInfo(planFragments, rootFragment, rootOperator);
@@ -561,14 +560,18 @@ public class Foreman implements Runnable {
}
}
- private QueryWorkUnit getQueryWorkUnit(final PhysicalPlan plan) throws ExecutionSetupException {
+ private QueryWorkUnit getQueryWorkUnit(final PhysicalPlan plan,
+ final QueryResourceManager rm) throws ExecutionSetupException {
+
final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
+
final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null);
- final SimpleParallelizer parallelizer = new SimpleParallelizer(queryContext);
- return parallelizer.getFragments(
- queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(),
- queryId, queryContext.getOnlineEndpoints(), rootFragment,
- initiatingClient.getSession(), queryContext.getQueryContextInfo());
+
+ return rm.getParallelizer(plan.getProperties().hasResourcePlan).generateWorkUnit(queryContext.getOptions().getOptionList(),
+ queryContext.getCurrentEndpoint(),
+ queryId, queryContext.getOnlineEndpoints(),
+ rootFragment, initiatingClient.getSession(),
+ queryContext.getQueryContextInfo());
}
private void logWorkUnit(QueryWorkUnit queryWorkUnit) {