diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/work')
5 files changed, 48 insertions, 12 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 37b8a4073..521e50a35 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -38,7 +38,6 @@ import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.planner.fragment.Fragment; import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor; -import org.apache.drill.exec.planner.fragment.SimpleParallelizer; import org.apache.drill.exec.planner.sql.DirectPlan; import org.apache.drill.exec.planner.sql.DrillSqlWorker; import org.apache.drill.exec.proto.BitControl.PlanFragment; @@ -408,7 +407,7 @@ public class Foreman implements Runnable { validatePlan(plan); queryRM.visitAbstractPlan(plan); - final QueryWorkUnit work = getQueryWorkUnit(plan); + final QueryWorkUnit work = getQueryWorkUnit(plan, queryRM); if (enableRuntimeFilter) { runtimeFilterRouter = new RuntimeFilterRouter(work, drillbitContext); runtimeFilterRouter.collectRuntimeFilterParallelAndControlInfo(); @@ -463,7 +462,7 @@ public class Foreman implements Runnable { } catch (IOException e) { throw new ExecutionSetupException(String.format("Unable to parse FragmentRoot from fragment: %s", rootFragment.getFragmentJson())); } - queryRM.setCost(rootOperator.getCost()); + queryRM.setCost(rootOperator.getCost().getOutputRowCount()); fragmentsRunner.setFragmentsInfo(planFragments, rootFragment, rootOperator); @@ -561,14 +560,18 @@ public class Foreman implements Runnable { } } - private QueryWorkUnit getQueryWorkUnit(final PhysicalPlan plan) throws ExecutionSetupException { + private QueryWorkUnit getQueryWorkUnit(final PhysicalPlan plan, + final QueryResourceManager rm) throws ExecutionSetupException { + final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next(); + final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null); - final SimpleParallelizer parallelizer = new SimpleParallelizer(queryContext); - return parallelizer.getFragments( - queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(), - queryId, queryContext.getOnlineEndpoints(), rootFragment, - initiatingClient.getSession(), queryContext.getQueryContextInfo()); + + return rm.getParallelizer(plan.getProperties().hasResourcePlan).generateWorkUnit(queryContext.getOptions().getOptionList(), + queryContext.getCurrentEndpoint(), + queryId, queryContext.getOnlineEndpoints(), + rootFragment, initiatingClient.getSession(), + queryContext.getQueryContextInfo()); } private void logWorkUnit(QueryWorkUnit queryWorkUnit) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java index 77957d5a3..7c3cc04d5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java @@ -20,6 +20,8 @@ package org.apache.drill.exec.work.foreman.rm; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.planner.fragment.QueryParallelizer; +import org.apache.drill.exec.planner.fragment.DefaultQueryParallelizer; import org.apache.drill.exec.util.MemoryAllocationUtilities; import org.apache.drill.exec.work.QueryWorkUnit; import org.apache.drill.exec.work.foreman.Foreman; @@ -46,12 +48,16 @@ public class DefaultResourceManager implements ResourceManager { if (plan == null || plan.getProperties().hasResourcePlan) { return; } - MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext); + MemoryAllocationUtilities.setupBufferedMemoryAllocations(plan, queryContext); } @Override public void visitPhysicalPlan(QueryWorkUnit work) { } + + public QueryContext getQueryContext() { + return queryContext; + } } public static class DefaultQueryResourceManager extends DefaultResourceAllocator implements QueryResourceManager { @@ -70,6 +76,11 @@ public class DefaultResourceManager implements ResourceManager { } @Override + public QueryParallelizer getParallelizer(boolean memoryPlanning){ + return new DefaultQueryParallelizer(memoryPlanning, this.getQueryContext()); + } + + @Override public void admit() { // No queueing by default } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java index 9e2a3a10f..4b9112194 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.work.foreman.rm; +import org.apache.drill.exec.planner.fragment.QueryParallelizer; import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException; import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException; @@ -44,6 +45,15 @@ public interface QueryResourceManager extends QueryResourceAllocator { void setCost(double cost); /** + * Create a parallelizer to parallelize each major fragment of the query into + * many minor fragments. The parallelizer encapsulates the logic of how much + * memory and parallelism is required for the query. + * @param memoryPlanning memory planning needs to be done during parallelization + * @return + */ + QueryParallelizer getParallelizer(boolean memoryPlanning); + + /** * Admit the query into the cluster. Blocks until the query * can run. (Later revisions may use a more thread-friendly * approach.) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java index c6eb9d3b5..5d1a1709d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java @@ -28,6 +28,8 @@ import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.planner.fragment.QueryParallelizer; +import org.apache.drill.exec.planner.fragment.QueueQueryParallelizer; import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.work.QueryWorkUnit; @@ -116,6 +118,10 @@ public class ThrottledResourceManager extends AbstractResourceManager { } } + public QueryContext getQueryContext() { + return queryContext; + } + private int countBufferingOperators( Map<String, Collection<PhysicalOperator>> nodeMap) { int width = 0; @@ -287,6 +293,12 @@ public class ThrottledResourceManager extends AbstractResourceManager { } @Override + public QueryParallelizer getParallelizer(boolean planHasMemory) { + // currently memory planning is disabled. Enable it once the RM functionality is fully implemented. + return new QueueQueryParallelizer(true || planHasMemory, this.getQueryContext()); + } + + @Override public void admit() throws QueueTimeoutException, QueryQueueException { lease = rm.queue().enqueue(foreman.getQueryId(), queryCost); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java index e789c1dfa..e61814ab3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java @@ -113,7 +113,7 @@ public class PlanSplitter { final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next(); final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null); - final SimpleParallelizer parallelizer = new SplittingParallelizer(queryContext); + final SimpleParallelizer parallelizer = new SplittingParallelizer(plan.getProperties().hasResourcePlan, queryContext); List<PlanFragment> fragments = Lists.newArrayList(); @@ -134,7 +134,7 @@ public class PlanSplitter { } } } else { - final QueryWorkUnit queryWorkUnit = parallelizer.getFragments(queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(), + final QueryWorkUnit queryWorkUnit = parallelizer.generateWorkUnit(queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(), queryId, queryContext.getActiveEndpoints(), rootFragment, queryContext.getSession(), queryContext.getQueryContextInfo()); planner.visitPhysicalPlan(queryWorkUnit); |