aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java47
1 files changed, 43 insertions, 4 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
index 6da885044..ffa577e41 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
@@ -18,7 +18,12 @@
package org.apache.drill.exec.planner.fragment;
import java.util.List;
+import java.util.Map;
+import java.util.function.BinaryOperator;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.drill.exec.planner.cost.NodeResource;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
@@ -46,6 +51,11 @@ public class Wrapper {
private boolean endpointsAssigned;
private long initialAllocation = 0;
private long maxAllocation = 0;
+ // Resources (i.e memory and cpu) are stored per drillbit in this map.
+ // A Drillbit can have n number of minor fragments then the NodeResource
+ // contains cumulative resources required for all the minor fragments
+ // for that major fragment on that Drillbit.
+ private Map<DrillbitEndpoint, NodeResource> nodeResourceMap;
// List of fragments this particular fragment depends on for determining its parallelization and endpoint assignments.
private final List<Wrapper> fragmentDependencies = Lists.newArrayList();
@@ -58,6 +68,7 @@ public class Wrapper {
this.majorFragmentId = majorFragmentId;
this.node = node;
this.stats = new Stats();
+ nodeResourceMap = null;
}
public Stats getStats() {
@@ -94,10 +105,12 @@ public class Wrapper {
return maxAllocation;
}
- public void addAllocation(PhysicalOperator pop) {
- initialAllocation += pop.getInitialAllocation();
-// logger.debug("Incrementing initialAllocation by {} to {}. Pop: {}", pop.getInitialAllocation(), initialAllocation, pop);
- maxAllocation += pop.getMaxAllocation();
+ public void addInitialAllocation(long memory) {
+ initialAllocation += memory;
+ }
+
+ public void addMaxAllocation(long memory) {
+ maxAllocation += memory;
}
private class AssignEndpointsToScanAndStore extends AbstractPhysicalVisitor<Void, List<DrillbitEndpoint>, PhysicalOperatorSetupException>{
@@ -197,4 +210,30 @@ public class Wrapper {
public List<Wrapper> getFragmentDependencies() {
return ImmutableList.copyOf(fragmentDependencies);
}
+
+ /**
+ * Compute the cpu resources required for all the minor fragments of this major fragment.
+ * This information is stored per DrillbitEndpoint. It is assumed that this function is
+ * called only once.
+ */
+ public void computeCpuResources() {
+ Preconditions.checkArgument(nodeResourceMap == null);
+ BinaryOperator<NodeResource> merge = (first, second) -> {
+ NodeResource result = NodeResource.create();
+ result.add(first);
+ result.add(second);
+ return result;
+ };
+
+ Function<DrillbitEndpoint, NodeResource> cpuPerEndpoint = (endpoint) -> new NodeResource(1, 0);
+
+ nodeResourceMap = endpoints.stream()
+ .collect(Collectors.groupingBy(Function.identity(),
+ Collectors.reducing(NodeResource.create(),
+ cpuPerEndpoint, merge)));
+ }
+
+ public Map<DrillbitEndpoint, NodeResource> getResourceMap() {
+ return nodeResourceMap;
+ }
}