aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java12
1 files changed, 12 insertions, 0 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
index c6eb9d3b5..5d1a1709d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
@@ -28,6 +28,8 @@ import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.fragment.QueryParallelizer;
+import org.apache.drill.exec.planner.fragment.QueueQueryParallelizer;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.work.QueryWorkUnit;
@@ -116,6 +118,10 @@ public class ThrottledResourceManager extends AbstractResourceManager {
}
}
+ public QueryContext getQueryContext() {
+ return queryContext;
+ }
+
private int countBufferingOperators(
Map<String, Collection<PhysicalOperator>> nodeMap) {
int width = 0;
@@ -287,6 +293,12 @@ public class ThrottledResourceManager extends AbstractResourceManager {
}
@Override
+ public QueryParallelizer getParallelizer(boolean planHasMemory) {
+ // currently memory planning is disabled. Enable it once the RM functionality is fully implemented.
+ return new QueueQueryParallelizer(true || planHasMemory, this.getQueryContext());
+ }
+
+ @Override
public void admit() throws QueueTimeoutException, QueryQueueException {
lease = rm.queue().enqueue(foreman.getQueryId(), queryCost);
}