aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java160
1 files changed, 160 insertions, 0 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java
new file mode 100644
index 000000000..443ab79cd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.AbstractMuxExchange;
+import org.apache.drill.exec.planner.AbstractOpWrapperVisitor;
+import org.apache.drill.exec.planner.cost.NodeResource;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.HashMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * A visitor to compute memory requirements for each operator in a minor fragment.
+ * This visitor will be called for each major fragment. It traverses the physical operators
+ * in that major fragment and computes the memory for each operator per each minor fragment.
+ * The minor fragment memory resources are further grouped into per Drillbit resource
+ * requirements.
+ */
+public class MemoryCalculator extends AbstractOpWrapperVisitor<Void, RuntimeException> {
+
+ private final PlanningSet planningSet;
+ // List of all the buffered operators and their memory requirement per drillbit.
+ private final Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> bufferedOperators;
+ private final QueryContext queryContext;
+
+ public MemoryCalculator(PlanningSet planningSet, QueryContext context) {
+ this.planningSet = planningSet;
+ this.bufferedOperators = new HashMap<>();
+ this.queryContext = context;
+ }
+
+ // Helper method to compute the minor fragment count per drillbit. This method returns
+ // a map with key as DrillbitEndpoint and value as the width (i.e #minorFragments)
+ // per Drillbit.
+ private Map<DrillbitEndpoint, Integer> getMinorFragCountPerDrillbit(Wrapper currFragment) {
+ return currFragment.getAssignedEndpoints().stream()
+ .collect(Collectors.groupingBy(Function.identity(),
+ Collectors.summingInt(x -> 1)));
+ }
+
+ // Helper method to merge the memory computations for each operator given memory per operator
+ // and the number of minor fragments per Drillbit.
+ private void merge(Wrapper currFrag,
+ Map<DrillbitEndpoint, Integer> minorFragsPerDrillBit,
+ Function<Entry<DrillbitEndpoint, Integer>, Long> getMemory) {
+
+ NodeResource.merge(currFrag.getResourceMap(),
+ minorFragsPerDrillBit.entrySet()
+ .stream()
+ .collect(Collectors.toMap((x) -> x.getKey(),
+ (x) -> NodeResource.create(0,
+ getMemory.apply(x)))));
+ }
+
+ @Override
+ public Void visitSendingExchange(Exchange exchange, Wrapper fragment) throws RuntimeException {
+ Wrapper receivingFragment = planningSet.get(fragment.getNode().getSendingExchangePair().getNode());
+ merge(fragment,
+ getMinorFragCountPerDrillbit(fragment),
+ // get the memory requirements for the sender operator.
+ (x) -> exchange.getSenderMemory(receivingFragment.getWidth(), x.getValue()));
+ return visitOp(exchange, fragment);
+ }
+
+ @Override
+ public Void visitReceivingExchange(Exchange exchange, Wrapper fragment) throws RuntimeException {
+
+ final List<Fragment.ExchangeFragmentPair> receivingExchangePairs = fragment.getNode().getReceivingExchangePairs();
+ final Map<DrillbitEndpoint, Integer> sendingFragsPerDrillBit = new HashMap<>();
+
+ for(Fragment.ExchangeFragmentPair pair : receivingExchangePairs) {
+ if (pair.getExchange() == exchange) {
+ Wrapper sendingFragment = planningSet.get(pair.getNode());
+ Preconditions.checkArgument(sendingFragment.isEndpointsAssignmentDone());
+ for (DrillbitEndpoint endpoint : sendingFragment.getAssignedEndpoints()) {
+ sendingFragsPerDrillBit.putIfAbsent(endpoint, 0);
+ sendingFragsPerDrillBit.put(endpoint, sendingFragsPerDrillBit.get(endpoint)+1);
+ }
+ }
+ }
+ final int totalSendingFrags = sendingFragsPerDrillBit.entrySet().stream()
+ .mapToInt((x) -> x.getValue()).reduce(0, (x, y) -> x+y);
+ merge(fragment,
+ getMinorFragCountPerDrillbit(fragment),
+ (x) -> exchange.getReceiverMemory(fragment.getWidth(),
+ // If the exchange is a MuxExchange then the sending fragments are from that particular drillbit otherwise
+ // sending fragments are from across the cluster.
+ exchange instanceof AbstractMuxExchange ? sendingFragsPerDrillBit.get(x.getKey()) : totalSendingFrags));
+ return null;
+ }
+
+ public List<Pair<PhysicalOperator, Long>> getBufferedOperators(DrillbitEndpoint endpoint) {
+ return this.bufferedOperators.getOrDefault(endpoint, new ArrayList<>());
+ }
+
+ @Override
+ public Void visitOp(PhysicalOperator op, Wrapper fragment) {
+ long memoryCost = (int)Math.ceil(op.getCost().getMemoryCost());
+ if (op.isBufferedOperator(queryContext)) {
+ // If the operator is a buffered operator then get the memory estimates of the optimizer.
+ // The memory estimates of the optimizer are for the whole operator spread across all the
+ // minor fragments. Divide this memory estimation by fragment width to get the memory
+ // requirement per minor fragment.
+ long memoryCostPerMinorFrag = (int)Math.ceil(memoryCost/fragment.getAssignedEndpoints().size());
+ Map<DrillbitEndpoint, Integer> drillbitEndpointMinorFragMap = getMinorFragCountPerDrillbit(fragment);
+
+ Map<DrillbitEndpoint,
+ Pair<PhysicalOperator, Long>> bufferedOperatorsPerDrillbit =
+ drillbitEndpointMinorFragMap.entrySet().stream()
+ .collect(Collectors.toMap((x) -> x.getKey(),
+ (x) -> Pair.of(op,
+ memoryCostPerMinorFrag * x.getValue())));
+ bufferedOperatorsPerDrillbit.entrySet().forEach((x) -> {
+ bufferedOperators.putIfAbsent(x.getKey(), new ArrayList<>());
+ bufferedOperators.get(x.getKey()).add(x.getValue());
+ });
+
+ merge(fragment,
+ drillbitEndpointMinorFragMap,
+ (x) -> memoryCostPerMinorFrag * x.getValue());
+
+ } else {
+ // Memory requirement for the non-buffered operators is just the batch size.
+ merge(fragment,
+ getMinorFragCountPerDrillbit(fragment),
+ (x) -> memoryCost * x.getValue());
+ }
+ for (PhysicalOperator child : op) {
+ child.accept(this, fragment);
+ }
+ return null;
+ }
+}
+