aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java14
1 files changed, 11 insertions, 3 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
index e33a38afc..7659f90f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.fragment;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
+import java.util.function.BiFunction;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.exception.FragmentSetupException;
@@ -30,7 +31,7 @@ import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Store;
import org.apache.drill.exec.physical.base.SubScan;
-
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.exec.physical.config.LateralJoinPOP;
import org.apache.drill.exec.physical.config.UnnestPOP;
@@ -164,15 +165,21 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
public static class IndexedFragmentNode{
private final Wrapper info;
+ private final BiFunction<Wrapper, Integer, DrillbitEndpoint> endpoint;
private final int minorFragmentId;
+ private final BiFunction<DrillbitEndpoint, PhysicalOperator, Long> memoryPerOperPerDrillbit;
SubScan subScan = null;
private final Deque<UnnestPOP> unnest = new ArrayDeque<>();
- public IndexedFragmentNode(int minorFragmentId, Wrapper info) {
+ public IndexedFragmentNode(int minorFragmentId, Wrapper info,
+ BiFunction<Wrapper, Integer, DrillbitEndpoint> wrapperToEndpoint,
+ BiFunction<DrillbitEndpoint, PhysicalOperator, Long> memoryReqs) {
super();
this.info = info;
+ this.endpoint = wrapperToEndpoint;
this.minorFragmentId = minorFragmentId;
+ this.memoryPerOperPerDrillbit = memoryReqs;
}
public Fragment getNode() {
@@ -188,7 +195,8 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
}
public void addAllocation(PhysicalOperator pop) {
- info.addAllocation(pop);
+ info.addInitialAllocation(pop.getInitialAllocation());
+ info.addMaxAllocation(memoryPerOperPerDrillbit.apply(this.endpoint.apply(info, minorFragmentId), pop));
}
public void addUnnest(UnnestPOP unnest) {