diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java | 47 |
1 files changed, 43 insertions, 4 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java index 6da885044..ffa577e41 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java @@ -18,7 +18,12 @@ package org.apache.drill.exec.planner.fragment; import java.util.List; +import java.util.Map; +import java.util.function.BinaryOperator; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.drill.exec.planner.cost.NodeResource; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; import org.apache.drill.exec.physical.PhysicalOperatorSetupException; import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; @@ -46,6 +51,11 @@ public class Wrapper { private boolean endpointsAssigned; private long initialAllocation = 0; private long maxAllocation = 0; + // Resources (i.e memory and cpu) are stored per drillbit in this map. + // A Drillbit can have n number of minor fragments then the NodeResource + // contains cumulative resources required for all the minor fragments + // for that major fragment on that Drillbit. + private Map<DrillbitEndpoint, NodeResource> nodeResourceMap; // List of fragments this particular fragment depends on for determining its parallelization and endpoint assignments. private final List<Wrapper> fragmentDependencies = Lists.newArrayList(); @@ -58,6 +68,7 @@ public class Wrapper { this.majorFragmentId = majorFragmentId; this.node = node; this.stats = new Stats(); + nodeResourceMap = null; } public Stats getStats() { @@ -94,10 +105,12 @@ public class Wrapper { return maxAllocation; } - public void addAllocation(PhysicalOperator pop) { - initialAllocation += pop.getInitialAllocation(); -// logger.debug("Incrementing initialAllocation by {} to {}. Pop: {}", pop.getInitialAllocation(), initialAllocation, pop); - maxAllocation += pop.getMaxAllocation(); + public void addInitialAllocation(long memory) { + initialAllocation += memory; + } + + public void addMaxAllocation(long memory) { + maxAllocation += memory; } private class AssignEndpointsToScanAndStore extends AbstractPhysicalVisitor<Void, List<DrillbitEndpoint>, PhysicalOperatorSetupException>{ @@ -197,4 +210,30 @@ public class Wrapper { public List<Wrapper> getFragmentDependencies() { return ImmutableList.copyOf(fragmentDependencies); } + + /** + * Compute the cpu resources required for all the minor fragments of this major fragment. + * This information is stored per DrillbitEndpoint. It is assumed that this function is + * called only once. + */ + public void computeCpuResources() { + Preconditions.checkArgument(nodeResourceMap == null); + BinaryOperator<NodeResource> merge = (first, second) -> { + NodeResource result = NodeResource.create(); + result.add(first); + result.add(second); + return result; + }; + + Function<DrillbitEndpoint, NodeResource> cpuPerEndpoint = (endpoint) -> new NodeResource(1, 0); + + nodeResourceMap = endpoints.stream() + .collect(Collectors.groupingBy(Function.identity(), + Collectors.reducing(NodeResource.create(), + cpuPerEndpoint, merge))); + } + + public Map<DrillbitEndpoint, NodeResource> getResourceMap() { + return nodeResourceMap; + } } |