diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java | 14 |
1 files changed, 11 insertions, 3 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java index e33a38afc..7659f90f1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.fragment; import java.util.ArrayDeque; import java.util.Deque; import java.util.List; +import java.util.function.BiFunction; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.exception.FragmentSetupException; @@ -30,7 +31,7 @@ import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.Store; import org.apache.drill.exec.physical.base.SubScan; - +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.drill.exec.physical.config.LateralJoinPOP; import org.apache.drill.exec.physical.config.UnnestPOP; @@ -164,15 +165,21 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate public static class IndexedFragmentNode{ private final Wrapper info; + private final BiFunction<Wrapper, Integer, DrillbitEndpoint> endpoint; private final int minorFragmentId; + private final BiFunction<DrillbitEndpoint, PhysicalOperator, Long> memoryPerOperPerDrillbit; SubScan subScan = null; private final Deque<UnnestPOP> unnest = new ArrayDeque<>(); - public IndexedFragmentNode(int minorFragmentId, Wrapper info) { + public IndexedFragmentNode(int minorFragmentId, Wrapper info, + BiFunction<Wrapper, Integer, DrillbitEndpoint> wrapperToEndpoint, + BiFunction<DrillbitEndpoint, PhysicalOperator, Long> memoryReqs) { super(); this.info = info; + this.endpoint = wrapperToEndpoint; this.minorFragmentId = minorFragmentId; + this.memoryPerOperPerDrillbit = memoryReqs; } public Fragment getNode() { @@ -188,7 +195,8 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate } public void addAllocation(PhysicalOperator pop) { - info.addAllocation(pop); + info.addInitialAllocation(pop.getInitialAllocation()); + info.addMaxAllocation(memoryPerOperPerDrillbit.apply(this.endpoint.apply(info, minorFragmentId), pop)); } public void addUnnest(UnnestPOP unnest) { |