aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/work
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/work')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java21
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java4
5 files changed, 48 insertions, 12 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 37b8a4073..521e50a35 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -38,7 +38,6 @@ import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
-import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
import org.apache.drill.exec.planner.sql.DirectPlan;
import org.apache.drill.exec.planner.sql.DrillSqlWorker;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
@@ -408,7 +407,7 @@ public class Foreman implements Runnable {
validatePlan(plan);
queryRM.visitAbstractPlan(plan);
- final QueryWorkUnit work = getQueryWorkUnit(plan);
+ final QueryWorkUnit work = getQueryWorkUnit(plan, queryRM);
if (enableRuntimeFilter) {
runtimeFilterRouter = new RuntimeFilterRouter(work, drillbitContext);
runtimeFilterRouter.collectRuntimeFilterParallelAndControlInfo();
@@ -463,7 +462,7 @@ public class Foreman implements Runnable {
} catch (IOException e) {
throw new ExecutionSetupException(String.format("Unable to parse FragmentRoot from fragment: %s", rootFragment.getFragmentJson()));
}
- queryRM.setCost(rootOperator.getCost());
+ queryRM.setCost(rootOperator.getCost().getOutputRowCount());
fragmentsRunner.setFragmentsInfo(planFragments, rootFragment, rootOperator);
@@ -561,14 +560,18 @@ public class Foreman implements Runnable {
}
}
- private QueryWorkUnit getQueryWorkUnit(final PhysicalPlan plan) throws ExecutionSetupException {
+ private QueryWorkUnit getQueryWorkUnit(final PhysicalPlan plan,
+ final QueryResourceManager rm) throws ExecutionSetupException {
+
final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
+
final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null);
- final SimpleParallelizer parallelizer = new SimpleParallelizer(queryContext);
- return parallelizer.getFragments(
- queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(),
- queryId, queryContext.getOnlineEndpoints(), rootFragment,
- initiatingClient.getSession(), queryContext.getQueryContextInfo());
+
+ return rm.getParallelizer(plan.getProperties().hasResourcePlan).generateWorkUnit(queryContext.getOptions().getOptionList(),
+ queryContext.getCurrentEndpoint(),
+ queryId, queryContext.getOnlineEndpoints(),
+ rootFragment, initiatingClient.getSession(),
+ queryContext.getQueryContextInfo());
}
private void logWorkUnit(QueryWorkUnit queryWorkUnit) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java
index 77957d5a3..7c3cc04d5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java
@@ -20,6 +20,8 @@ package org.apache.drill.exec.work.foreman.rm;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.planner.fragment.QueryParallelizer;
+import org.apache.drill.exec.planner.fragment.DefaultQueryParallelizer;
import org.apache.drill.exec.util.MemoryAllocationUtilities;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.apache.drill.exec.work.foreman.Foreman;
@@ -46,12 +48,16 @@ public class DefaultResourceManager implements ResourceManager {
if (plan == null || plan.getProperties().hasResourcePlan) {
return;
}
- MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext);
+ MemoryAllocationUtilities.setupBufferedMemoryAllocations(plan, queryContext);
}
@Override
public void visitPhysicalPlan(QueryWorkUnit work) {
}
+
+ public QueryContext getQueryContext() {
+ return queryContext;
+ }
}
public static class DefaultQueryResourceManager extends DefaultResourceAllocator implements QueryResourceManager {
@@ -70,6 +76,11 @@ public class DefaultResourceManager implements ResourceManager {
}
@Override
+ public QueryParallelizer getParallelizer(boolean memoryPlanning){
+ return new DefaultQueryParallelizer(memoryPlanning, this.getQueryContext());
+ }
+
+ @Override
public void admit() {
// No queueing by default
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java
index 9e2a3a10f..4b9112194 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.work.foreman.rm;
+import org.apache.drill.exec.planner.fragment.QueryParallelizer;
import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException;
@@ -44,6 +45,15 @@ public interface QueryResourceManager extends QueryResourceAllocator {
void setCost(double cost);
/**
+ * Create a parallelizer to parallelize each major fragment of the query into
+ * many minor fragments. The parallelizer encapsulates the logic of how much
+ * memory and parallelism is required for the query.
+ * @param memoryPlanning memory planning needs to be done during parallelization
+ * @return
+ */
+ QueryParallelizer getParallelizer(boolean memoryPlanning);
+
+ /**
* Admit the query into the cluster. Blocks until the query
* can run. (Later revisions may use a more thread-friendly
* approach.)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
index c6eb9d3b5..5d1a1709d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
@@ -28,6 +28,8 @@ import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.fragment.QueryParallelizer;
+import org.apache.drill.exec.planner.fragment.QueueQueryParallelizer;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.work.QueryWorkUnit;
@@ -116,6 +118,10 @@ public class ThrottledResourceManager extends AbstractResourceManager {
}
}
+ public QueryContext getQueryContext() {
+ return queryContext;
+ }
+
private int countBufferingOperators(
Map<String, Collection<PhysicalOperator>> nodeMap) {
int width = 0;
@@ -287,6 +293,12 @@ public class ThrottledResourceManager extends AbstractResourceManager {
}
@Override
+ public QueryParallelizer getParallelizer(boolean planHasMemory) {
+ // currently memory planning is disabled. Enable it once the RM functionality is fully implemented.
+ return new QueueQueryParallelizer(true || planHasMemory, this.getQueryContext());
+ }
+
+ @Override
public void admit() throws QueueTimeoutException, QueryQueueException {
lease = rm.queue().enqueue(foreman.getQueryId(), queryCost);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
index e789c1dfa..e61814ab3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
@@ -113,7 +113,7 @@ public class PlanSplitter {
final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null);
- final SimpleParallelizer parallelizer = new SplittingParallelizer(queryContext);
+ final SimpleParallelizer parallelizer = new SplittingParallelizer(plan.getProperties().hasResourcePlan, queryContext);
List<PlanFragment> fragments = Lists.newArrayList();
@@ -134,7 +134,7 @@ public class PlanSplitter {
}
}
} else {
- final QueryWorkUnit queryWorkUnit = parallelizer.getFragments(queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(),
+ final QueryWorkUnit queryWorkUnit = parallelizer.generateWorkUnit(queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(),
queryId, queryContext.getActiveEndpoints(), rootFragment,
queryContext.getSession(), queryContext.getQueryContextInfo());
planner.visitPhysicalPlan(queryWorkUnit);