aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DefaultQueryParallelizer.java66
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java35
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java30
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java160
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueryParallelizer.java63
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueueQueryParallelizer.java173
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java218
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java47
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java28
11 files changed, 702 insertions, 135 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DefaultQueryParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DefaultQueryParallelizer.java
new file mode 100644
index 000000000..20875a470
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DefaultQueryParallelizer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.util.MemoryAllocationUtilities;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * Non RM version of the parallelizer. The parallelization logic is fully inherited from SimpleParallelizer.
+ * The memory computation of the operators is based on the earlier logic to assign memory for the buffered
+ * operators.
+ */
+public class DefaultQueryParallelizer extends SimpleParallelizer {
+ private final boolean planHasMemory;
+ private final QueryContext queryContext;
+
+ public DefaultQueryParallelizer(boolean memoryAvailableInPlan, QueryContext queryContext) {
+ super(queryContext);
+ this.planHasMemory = memoryAvailableInPlan;
+ this.queryContext = queryContext;
+ }
+
+ public DefaultQueryParallelizer(boolean memoryPlanning, long parallelizationThreshold, int maxWidthPerNode,
+ int maxGlobalWidth, double affinityFactor) {
+ super(parallelizationThreshold, maxWidthPerNode, maxGlobalWidth, affinityFactor);
+ this.planHasMemory = memoryPlanning;
+ this.queryContext = null;
+ }
+
+ @Override
+ public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
+ Collection<DrillbitEndpoint> activeEndpoints) {
+ if (planHasMemory) {
+ return;
+ }
+ List<PhysicalOperator> bufferedOpers = planningSet.getRootWrapper().getNode().getBufferedOperators();
+ MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(planHasMemory, bufferedOpers, queryContext);
+ }
+
+ @Override
+ protected BiFunction<DrillbitEndpoint, PhysicalOperator, Long> getMemory() {
+ return (endpoint, operator) -> operator.getMaxAllocation();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java
index a662adc6b..824060194 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java
@@ -17,9 +17,11 @@
*/
package org.apache.drill.exec.planner.fragment;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.Exchange;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
@@ -82,18 +84,14 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> {
return sendingExchange;
}
-// public <T, V> T accept(FragmentVisitor<T, V> visitor, V extra) {
-// return visitor.visit(this, extra);
-// }
-
public class ExchangeFragmentPair {
private Exchange exchange;
- private Fragment node;
+ private Fragment fragmentXchgTo;
- public ExchangeFragmentPair(Exchange exchange, Fragment node) {
+ public ExchangeFragmentPair(Exchange exchange, Fragment fragXchgTo) {
super();
this.exchange = exchange;
- this.node = node;
+ this.fragmentXchgTo = fragXchgTo;
}
public Exchange getExchange() {
@@ -101,7 +99,7 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> {
}
public Fragment getNode() {
- return node;
+ return fragmentXchgTo;
}
@Override
@@ -109,7 +107,7 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> {
final int prime = 31;
int result = 1;
result = prime * result + ((exchange == null) ? 0 : exchange.hashCode());
- result = prime * result + ((node == null) ? 0 : node.hashCode());
+ result = prime * result + ((fragmentXchgTo == null) ? 0 : fragmentXchgTo.hashCode());
return result;
}
@@ -167,10 +165,27 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> {
return true;
}
+ public List<PhysicalOperator> getBufferedOperators() {
+ List<PhysicalOperator> bufferedOps = new ArrayList<>();
+ root.accept(new BufferedOpFinder(), bufferedOps);
+ return bufferedOps;
+ }
+
+ protected static class BufferedOpFinder extends AbstractPhysicalVisitor<Void, List<PhysicalOperator>, RuntimeException> {
+ @Override
+ public Void visitOp(PhysicalOperator op, List<PhysicalOperator> value)
+ throws RuntimeException {
+ if (op.isBufferedOperator(null)) {
+ value.add(op);
+ }
+ visitChildren(op, value);
+ return null;
+ }
+ }
+
@Override
public String toString() {
return "FragmentNode [root=" + root + ", sendingExchange=" + sendingExchange + ", receivingExchangePairs="
+ receivingExchangePairs + "]";
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java
index a9be6f398..7a59f4f76 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java
@@ -26,7 +26,6 @@ import org.apache.drill.exec.work.foreman.ForemanSetupException;
* Responsible for breaking a plan into its constituent Fragments.
*/
public class MakeFragmentsVisitor extends AbstractPhysicalVisitor<Fragment, Fragment, ForemanSetupException> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MakeFragmentsVisitor.class);
public final static MakeFragmentsVisitor INSTANCE = new MakeFragmentsVisitor();
@@ -34,21 +33,20 @@ public class MakeFragmentsVisitor extends AbstractPhysicalVisitor<Fragment, Frag
}
@Override
- public Fragment visitExchange(Exchange exchange, Fragment value) throws ForemanSetupException {
-// logger.debug("Visiting Exchange {}", exchange);
- if (value == null) {
- throw new ForemanSetupException("The simple fragmenter was called without a FragmentBuilder value. This will only happen if the initial call to SimpleFragmenter is by a Exchange node. This should never happen since an Exchange node should never be the root node of a plan.");
+ public Fragment visitExchange(Exchange exchange, Fragment receivingFragment) throws ForemanSetupException {
+ if (receivingFragment == null) {
+ throw new ForemanSetupException("The simple fragmenter was called without a FragmentBuilder value. This will only happen if the initial call to SimpleFragmenter is by a" +
+ " Exchange node. This should never happen since an Exchange node should never be the root node of a plan.");
}
- Fragment next = getNextBuilder();
- value.addReceiveExchange(exchange, next);
- next.addSendExchange(exchange, value);
- exchange.getChild().accept(this, next);
- return value;
+ Fragment sendingFragment= getNextFragment();
+ receivingFragment.addReceiveExchange(exchange, sendingFragment);
+ sendingFragment.addSendExchange(exchange, receivingFragment);
+ exchange.getChild().accept(this, sendingFragment);
+ return sendingFragment;
}
@Override
public Fragment visitOp(PhysicalOperator op, Fragment value) throws ForemanSetupException{
-// logger.debug("Visiting Other {}", op);
value = ensureBuilder(value);
value.addOperator(op);
for (PhysicalOperator child : op) {
@@ -57,15 +55,15 @@ public class MakeFragmentsVisitor extends AbstractPhysicalVisitor<Fragment, Frag
return value;
}
- private Fragment ensureBuilder(Fragment value) throws ForemanSetupException{
- if (value != null) {
- return value;
+ private Fragment ensureBuilder(Fragment currentFragment) throws ForemanSetupException{
+ if (currentFragment != null) {
+ return currentFragment;
} else {
- return getNextBuilder();
+ return getNextFragment();
}
}
- public Fragment getNextBuilder() {
+ public Fragment getNextFragment() {
return new Fragment();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
index e33a38afc..7659f90f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.fragment;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
+import java.util.function.BiFunction;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.exception.FragmentSetupException;
@@ -30,7 +31,7 @@ import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Store;
import org.apache.drill.exec.physical.base.SubScan;
-
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.exec.physical.config.LateralJoinPOP;
import org.apache.drill.exec.physical.config.UnnestPOP;
@@ -164,15 +165,21 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
public static class IndexedFragmentNode{
private final Wrapper info;
+ private final BiFunction<Wrapper, Integer, DrillbitEndpoint> endpoint;
private final int minorFragmentId;
+ private final BiFunction<DrillbitEndpoint, PhysicalOperator, Long> memoryPerOperPerDrillbit;
SubScan subScan = null;
private final Deque<UnnestPOP> unnest = new ArrayDeque<>();
- public IndexedFragmentNode(int minorFragmentId, Wrapper info) {
+ public IndexedFragmentNode(int minorFragmentId, Wrapper info,
+ BiFunction<Wrapper, Integer, DrillbitEndpoint> wrapperToEndpoint,
+ BiFunction<DrillbitEndpoint, PhysicalOperator, Long> memoryReqs) {
super();
this.info = info;
+ this.endpoint = wrapperToEndpoint;
this.minorFragmentId = minorFragmentId;
+ this.memoryPerOperPerDrillbit = memoryReqs;
}
public Fragment getNode() {
@@ -188,7 +195,8 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
}
public void addAllocation(PhysicalOperator pop) {
- info.addAllocation(pop);
+ info.addInitialAllocation(pop.getInitialAllocation());
+ info.addMaxAllocation(memoryPerOperPerDrillbit.apply(this.endpoint.apply(info, minorFragmentId), pop));
}
public void addUnnest(UnnestPOP unnest) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java
new file mode 100644
index 000000000..443ab79cd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.AbstractMuxExchange;
+import org.apache.drill.exec.planner.AbstractOpWrapperVisitor;
+import org.apache.drill.exec.planner.cost.NodeResource;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.HashMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * A visitor to compute memory requirements for each operator in a minor fragment.
+ * This visitor will be called for each major fragment. It traverses the physical operators
+ * in that major fragment and computes the memory for each operator per each minor fragment.
+ * The minor fragment memory resources are further grouped into per Drillbit resource
+ * requirements.
+ */
+public class MemoryCalculator extends AbstractOpWrapperVisitor<Void, RuntimeException> {
+
+ private final PlanningSet planningSet;
+ // List of all the buffered operators and their memory requirement per drillbit.
+ private final Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> bufferedOperators;
+ private final QueryContext queryContext;
+
+ public MemoryCalculator(PlanningSet planningSet, QueryContext context) {
+ this.planningSet = planningSet;
+ this.bufferedOperators = new HashMap<>();
+ this.queryContext = context;
+ }
+
+ // Helper method to compute the minor fragment count per drillbit. This method returns
+ // a map with key as DrillbitEndpoint and value as the width (i.e #minorFragments)
+ // per Drillbit.
+ private Map<DrillbitEndpoint, Integer> getMinorFragCountPerDrillbit(Wrapper currFragment) {
+ return currFragment.getAssignedEndpoints().stream()
+ .collect(Collectors.groupingBy(Function.identity(),
+ Collectors.summingInt(x -> 1)));
+ }
+
+ // Helper method to merge the memory computations for each operator given memory per operator
+ // and the number of minor fragments per Drillbit.
+ private void merge(Wrapper currFrag,
+ Map<DrillbitEndpoint, Integer> minorFragsPerDrillBit,
+ Function<Entry<DrillbitEndpoint, Integer>, Long> getMemory) {
+
+ NodeResource.merge(currFrag.getResourceMap(),
+ minorFragsPerDrillBit.entrySet()
+ .stream()
+ .collect(Collectors.toMap((x) -> x.getKey(),
+ (x) -> NodeResource.create(0,
+ getMemory.apply(x)))));
+ }
+
+ @Override
+ public Void visitSendingExchange(Exchange exchange, Wrapper fragment) throws RuntimeException {
+ Wrapper receivingFragment = planningSet.get(fragment.getNode().getSendingExchangePair().getNode());
+ merge(fragment,
+ getMinorFragCountPerDrillbit(fragment),
+ // get the memory requirements for the sender operator.
+ (x) -> exchange.getSenderMemory(receivingFragment.getWidth(), x.getValue()));
+ return visitOp(exchange, fragment);
+ }
+
+ @Override
+ public Void visitReceivingExchange(Exchange exchange, Wrapper fragment) throws RuntimeException {
+
+ final List<Fragment.ExchangeFragmentPair> receivingExchangePairs = fragment.getNode().getReceivingExchangePairs();
+ final Map<DrillbitEndpoint, Integer> sendingFragsPerDrillBit = new HashMap<>();
+
+ for(Fragment.ExchangeFragmentPair pair : receivingExchangePairs) {
+ if (pair.getExchange() == exchange) {
+ Wrapper sendingFragment = planningSet.get(pair.getNode());
+ Preconditions.checkArgument(sendingFragment.isEndpointsAssignmentDone());
+ for (DrillbitEndpoint endpoint : sendingFragment.getAssignedEndpoints()) {
+ sendingFragsPerDrillBit.putIfAbsent(endpoint, 0);
+ sendingFragsPerDrillBit.put(endpoint, sendingFragsPerDrillBit.get(endpoint)+1);
+ }
+ }
+ }
+ final int totalSendingFrags = sendingFragsPerDrillBit.entrySet().stream()
+ .mapToInt((x) -> x.getValue()).reduce(0, (x, y) -> x+y);
+ merge(fragment,
+ getMinorFragCountPerDrillbit(fragment),
+ (x) -> exchange.getReceiverMemory(fragment.getWidth(),
+ // If the exchange is a MuxExchange then the sending fragments are from that particular drillbit otherwise
+ // sending fragments are from across the cluster.
+ exchange instanceof AbstractMuxExchange ? sendingFragsPerDrillBit.get(x.getKey()) : totalSendingFrags));
+ return null;
+ }
+
+ public List<Pair<PhysicalOperator, Long>> getBufferedOperators(DrillbitEndpoint endpoint) {
+ return this.bufferedOperators.getOrDefault(endpoint, new ArrayList<>());
+ }
+
+ @Override
+ public Void visitOp(PhysicalOperator op, Wrapper fragment) {
+ long memoryCost = (int)Math.ceil(op.getCost().getMemoryCost());
+ if (op.isBufferedOperator(queryContext)) {
+ // If the operator is a buffered operator then get the memory estimates of the optimizer.
+ // The memory estimates of the optimizer are for the whole operator spread across all the
+ // minor fragments. Divide this memory estimation by fragment width to get the memory
+ // requirement per minor fragment.
+ long memoryCostPerMinorFrag = (int)Math.ceil(memoryCost/fragment.getAssignedEndpoints().size());
+ Map<DrillbitEndpoint, Integer> drillbitEndpointMinorFragMap = getMinorFragCountPerDrillbit(fragment);
+
+ Map<DrillbitEndpoint,
+ Pair<PhysicalOperator, Long>> bufferedOperatorsPerDrillbit =
+ drillbitEndpointMinorFragMap.entrySet().stream()
+ .collect(Collectors.toMap((x) -> x.getKey(),
+ (x) -> Pair.of(op,
+ memoryCostPerMinorFrag * x.getValue())));
+ bufferedOperatorsPerDrillbit.entrySet().forEach((x) -> {
+ bufferedOperators.putIfAbsent(x.getKey(), new ArrayList<>());
+ bufferedOperators.get(x.getKey()).add(x.getValue());
+ });
+
+ merge(fragment,
+ drillbitEndpointMinorFragMap,
+ (x) -> memoryCostPerMinorFrag * x.getValue());
+
+ } else {
+ // Memory requirement for the non-buffered operators is just the batch size.
+ merge(fragment,
+ getMinorFragCountPerDrillbit(fragment),
+ (x) -> memoryCost * x.getValue());
+ }
+ for (PhysicalOperator child : op) {
+ child.accept(this, fragment);
+ }
+ return null;
+ }
+}
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueryParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueryParallelizer.java
new file mode 100644
index 000000000..097f4b249
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueryParallelizer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.proto.BitControl.QueryContextInformation;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.server.options.OptionList;
+import org.apache.drill.exec.work.QueryWorkUnit;
+
+import java.util.Collection;
+
+/**
+ * This class parallelizes the query plan. Once the optimizer finishes its job by producing a
+ * optimized plan, it is the job of this parallelizer to generate a parallel plan out of the
+ * optimized physical plan. It does so by using the optimizers estimates for row count etc.
+ * There are two kinds of parallelizers as explained below. Currently the difference in
+ * both of these parallelizers is only in the memory assignment for the physical operators.
+ *
+ * a) Default Parallelizer: It optimistically assumes that the whole cluster is running only the
+ * current query and based on heuristics assigns the optimal memory to the buffered operators.
+ *
+ * b) Queue Parallelizer: This parallelizer computes the memory that can be allocated at best based
+ * on the current cluster state(as to how much memory is available) and also the configuration
+ * of the queue that it can run on.
+ */
+public interface QueryParallelizer extends ParallelizationParameters {
+
+ /**
+ * This is the only function exposed to the consumer of this parallelizer (currently Foreman) to parallelize
+ * the plan. The caller transforms the plan to a fragment tree and supplies the required information for
+ * the parallelizer to do its job.
+ * @param options List of all options that are set for the current session.
+ * @param foremanNode Endpoint information of the foreman node.
+ * @param queryId Unique ID of the query.
+ * @param activeEndpoints Currently active endpoints on which the plan can run.
+ * @param rootFragment root of the fragment tree of the transformed physical plan
+ * @param session user session object.
+ * @param queryContextInfo query context.
+ * @return Executable query plan which contains all the information like minor frags and major frags.
+ * @throws ExecutionSetupException
+ */
+ QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,
+ Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment,
+ UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException;
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueueQueryParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueueQueryParallelizer.java
new file mode 100644
index 000000000..c986fca29
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueueQueryParallelizer.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.common.util.function.CheckedConsumer;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.cost.NodeResource;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Collection;
+import java.util.Set;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+/**
+ * Paralellizer specialized for managing resources for a query based on Queues. This parallelizer
+ * does not deal with increase/decrease of the parallelization of a query plan based on the current
+ * cluster state. However, the memory assignment for each operator, minor fragment and major
+ * fragment is based on the cluster state and provided queue configuration.
+ */
+public class QueueQueryParallelizer extends SimpleParallelizer {
+ private final boolean planHasMemory;
+ private final QueryContext queryContext;
+ private final Map<DrillbitEndpoint, Map<PhysicalOperator, Long>> operators;
+
+ public QueueQueryParallelizer(boolean memoryPlanning, QueryContext queryContext) {
+ super(queryContext);
+ this.planHasMemory = memoryPlanning;
+ this.queryContext = queryContext;
+ this.operators = new HashMap<>();
+ }
+
+ // return the memory computed for a physical operator on a drillbitendpoint.
+ public BiFunction<DrillbitEndpoint, PhysicalOperator, Long> getMemory() {
+ return (endpoint, operator) -> {
+ if (planHasMemory) {
+ return operators.get(endpoint).get(operator);
+ }
+ else {
+ return operator.getMaxAllocation();
+ }
+ };
+ }
+
+ /**
+ * Function called by the SimpleParallelizer to adjust the memory post parallelization.
+ * The overall logic is to traverse the fragment tree and call the MemoryCalculator on
+ * each major fragment. Once the memory is computed, resource requirement are accumulated
+ * per drillbit.
+ *
+ * The total resource requirements are used to select a queue. If the selected queue's
+ * resource limit is more/less than the query's requirement than the memory will be re-adjusted.
+ *
+ * @param planningSet context of the fragments.
+ * @param roots root fragments.
+ * @param activeEndpoints currently active endpoints.
+ * @throws PhysicalOperatorSetupException
+ */
+ public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
+ Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException {
+
+ if (planHasMemory) {
+ return;
+ }
+ // total node resources for the query plan maintained per drillbit.
+ final Map<DrillbitEndpoint, NodeResource> totalNodeResources =
+ activeEndpoints.stream().collect(Collectors.toMap(x ->x,
+ x -> NodeResource.create()));
+
+ // list of the physical operators and their memory requirements per drillbit.
+ final Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> operators =
+ activeEndpoints.stream().collect(Collectors.toMap(x -> x,
+ x -> new ArrayList<>()));
+
+ for (Wrapper wrapper : roots) {
+ traverse(wrapper, CheckedConsumer.throwingConsumerWrapper((Wrapper fragment) -> {
+ MemoryCalculator calculator = new MemoryCalculator(planningSet, queryContext);
+ fragment.getNode().getRoot().accept(calculator, fragment);
+ NodeResource.merge(totalNodeResources, fragment.getResourceMap());
+ operators.entrySet()
+ .stream()
+ .forEach((entry) -> entry.getValue()
+ .addAll(calculator.getBufferedOperators(entry.getKey())));
+ }));
+ }
+ //queryrm.selectQueue( pass the max node Resource) returns queue configuration.
+ Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> memoryAdjustedOperators = ensureOperatorMemoryWithinLimits(operators, totalNodeResources, 10);
+ memoryAdjustedOperators.entrySet().stream().forEach((x) -> {
+ Map<PhysicalOperator, Long> memoryPerOperator = x.getValue().stream()
+ .collect(Collectors.toMap(operatorLongPair -> operatorLongPair.getLeft(),
+ operatorLongPair -> operatorLongPair.getRight(),
+ (mem_1, mem_2) -> (mem_1 + mem_2)));
+ this.operators.put(x.getKey(), memoryPerOperator);
+ });
+ }
+
+
+ /**
+ * Helper method to adjust the memory for the buffered operators.
+ * @param memoryPerOperator list of physical operators per drillbit
+ * @param nodeResourceMap resources per drillbit.
+ * @param nodeLimit permissible node limit.
+ * @return list of operators which contain adjusted memory limits.
+ */
+ private Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>>
+ ensureOperatorMemoryWithinLimits(Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> memoryPerOperator,
+ Map<DrillbitEndpoint, NodeResource> nodeResourceMap, int nodeLimit) {
+ // Get the physical operators which are above the node memory limit.
+ Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> onlyMemoryAboveLimitOperators = new HashMap<>();
+ memoryPerOperator.entrySet().stream().forEach((entry) -> {
+ onlyMemoryAboveLimitOperators.putIfAbsent(entry.getKey(), new ArrayList<>());
+ if (nodeResourceMap.get(entry.getKey()).getMemory() > nodeLimit) {
+ onlyMemoryAboveLimitOperators.get(entry.getKey()).addAll(entry.getValue());
+ }
+ });
+
+
+ // Compute the total memory required by the physical operators on the drillbits which are above node limit.
+ // Then use the total memory to adjust the memory requirement based on the permissible node limit.
+ Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> memoryAdjustedDrillbits = new HashMap<>();
+ onlyMemoryAboveLimitOperators.entrySet().stream().forEach(
+ entry -> {
+ Long totalMemory = entry.getValue().stream().mapToLong(Pair::getValue).sum();
+ List<Pair<PhysicalOperator, Long>> adjustedMemory = entry.getValue().stream().map(operatorMemory -> {
+ // formula to adjust the memory is (optimalMemory / totalMemory(this is for all the operators)) * permissible_node_limit.
+ return Pair.of(operatorMemory.getKey(), (long) Math.ceil(operatorMemory.getValue()/totalMemory * nodeLimit));
+ }).collect(Collectors.toList());
+ memoryAdjustedDrillbits.put(entry.getKey(), adjustedMemory);
+ }
+ );
+
+ // Get all the operations on drillbits which were adjusted for memory and merge them with operators which are not
+ // adjusted for memory.
+ Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> allDrillbits = new HashMap<>();
+ memoryPerOperator.entrySet().stream().filter((entry) -> !memoryAdjustedDrillbits.containsKey(entry.getKey())).forEach(
+ operatorMemory -> {
+ allDrillbits.put(operatorMemory.getKey(), operatorMemory.getValue());
+ }
+ );
+
+ memoryAdjustedDrillbits.entrySet().stream().forEach(
+ operatorMemory -> {
+ allDrillbits.put(operatorMemory.getKey(), operatorMemory.getValue());
+ }
+ );
+
+ // At this point allDrillbits contains the operators on all drillbits. The memory also is adjusted based on the nodeLimit and
+ // the ratio of their requirements.
+ return allDrillbits;
+ }
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index 39b699fe4..a434bf80b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -21,9 +21,12 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.util.DrillStringUtils;
+import org.apache.drill.common.util.function.CheckedConsumer;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
@@ -61,7 +64,7 @@ import org.apache.drill.exec.work.foreman.ForemanSetupException;
* parallelization for each major fragment will be determined. Once the amount of parallelization is done, assignment
* is done based on round robin assignment ordered by operator affinity (locality) to available execution Drillbits.
*/
-public class SimpleParallelizer implements ParallelizationParameters {
+public abstract class SimpleParallelizer implements QueryParallelizer {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleParallelizer.class);
private final long parallelizationThreshold;
@@ -69,7 +72,7 @@ public class SimpleParallelizer implements ParallelizationParameters {
private final int maxGlobalWidth;
private final double affinityFactor;
- public SimpleParallelizer(QueryContext context) {
+ protected SimpleParallelizer(QueryContext context) {
OptionManager optionManager = context.getOptions();
long sliceTarget = optionManager.getOption(ExecConstants.SLICE_TARGET_OPTION);
this.parallelizationThreshold = sliceTarget > 0 ? sliceTarget : 1;
@@ -81,7 +84,7 @@ public class SimpleParallelizer implements ParallelizationParameters {
this.affinityFactor = optionManager.getOption(ExecConstants.AFFINITY_FACTOR_KEY).float_val.intValue();
}
- public SimpleParallelizer(long parallelizationThreshold, int maxWidthPerNode, int maxGlobalWidth, double affinityFactor) {
+ protected SimpleParallelizer(long parallelizationThreshold, int maxWidthPerNode, int maxGlobalWidth, double affinityFactor) {
this.parallelizationThreshold = parallelizationThreshold;
this.maxWidthPerNode = maxWidthPerNode;
this.maxGlobalWidth = maxGlobalWidth;
@@ -108,27 +111,106 @@ public class SimpleParallelizer implements ParallelizationParameters {
return affinityFactor;
}
+ public Set<Wrapper> getRootFragments(PlanningSet planningSet) {
+ //The following code gets the root fragment by removing all the dependent fragments on which root fragments depend upon.
+ //This is fine because the later parallelizer code traverses from these root fragments to their respective dependent
+ //fragments.
+ final Set<Wrapper> roots = Sets.newHashSet();
+ for(Wrapper w : planningSet) {
+ roots.add(w);
+ }
+
+ //roots will be left over with the fragments which are not depended upon by any other fragments.
+ for(Wrapper wrapper : planningSet) {
+ final List<Wrapper> fragmentDependencies = wrapper.getFragmentDependencies();
+ if (fragmentDependencies != null && fragmentDependencies.size() > 0) {
+ for(Wrapper dependency : fragmentDependencies) {
+ if (roots.contains(dependency)) {
+ roots.remove(dependency);
+ }
+ }
+ }
+ }
+
+ return roots;
+ }
+
+ public PlanningSet prepareFragmentTree(Fragment rootFragment) {
+ PlanningSet planningSet = new PlanningSet();
+
+ initFragmentWrappers(rootFragment, planningSet);
+
+ constructFragmentDependencyGraph(rootFragment, planningSet);
+
+ return planningSet;
+ }
+
/**
- * Generate a set of assigned fragments based on the provided fragment tree. Do not allow parallelization stages
- * to go beyond the global max width.
+ * Traverse all the major fragments and parallelize each major fragment based on
+ * collected stats. The children fragments are parallelized before a parent
+ * fragment.
+ * @param planningSet Set of all major fragments and their context.
+ * @param roots Root nodes of the plan.
+ * @param activeEndpoints currently active drillbit endpoints.
+ * @throws PhysicalOperatorSetupException
+ */
+ public void collectStatsAndParallelizeFragments(PlanningSet planningSet, Set<Wrapper> roots,
+ Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException {
+ for (Wrapper wrapper : roots) {
+ traverse(wrapper, CheckedConsumer.throwingConsumerWrapper((Wrapper fragmentWrapper) -> {
+ // If this fragment is already parallelized then no need do it again.
+ // This happens in the case of fragments which have MUX operators.
+ if (fragmentWrapper.isEndpointsAssignmentDone()) {
+ return;
+ }
+ fragmentWrapper.getNode().getRoot().accept(new StatsCollector(planningSet), fragmentWrapper);
+ fragmentWrapper.getStats()
+ .getDistributionAffinity()
+ .getFragmentParallelizer()
+ .parallelizeFragment(fragmentWrapper, this, activeEndpoints);
+ //consolidate the cpu resources required by this major fragment per drillbit.
+ fragmentWrapper.computeCpuResources();
+ }));
+ }
+ }
+
+ public abstract void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
+ Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException;
+
+ /**
+ * The starting function for the whole parallelization and memory computation logic.
+ * 1) Initially a fragment tree is prepared which contains a wrapper for each fragment.
+ * The topology of this tree is same as that of the major fragment tree.
+ * 2) Traverse this fragment tree to collect stats for each major fragment and then
+ * parallelize each fragment. At this stage minor fragments are not created but all
+ * the required information to create minor fragment are computed.
+ * 3) Memory is computed for each operator and for the minor fragment.
+ * 4) Lastly all the above computed information is used to create the minor fragments
+ * for each major fragment.
*
- * @param options Option list
- * @param foremanNode The driving/foreman node for this query. (this node)
- * @param queryId The queryId for this query.
- * @param activeEndpoints The list of endpoints to consider for inclusion in planning this query.
- * @param rootFragment The root node of the PhysicalPlan that we will be parallelizing.
- * @param session UserSession of user who launched this query.
- * @param queryContextInfo Info related to the context when query has started.
- * @return The list of generated PlanFragment protobuf objects to be assigned out to the individual nodes.
+ * @param options List of options set by the user.
+ * @param foremanNode foreman node for this query plan.
+ * @param queryId Query ID.
+ * @param activeEndpoints currently active endpoins on which this plan will run.
+ * @param rootFragment Root major fragment.
+ * @param session session context.
+ * @param queryContextInfo query context.
+ * @return
* @throws ExecutionSetupException
*/
- public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,
- Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment,
- UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException {
+ @Override
+ public final QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,
+ Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment,
+ UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException {
+ PlanningSet planningSet = prepareFragmentTree(rootFragment);
+
+ Set<Wrapper> rootFragments = getRootFragments(planningSet);
- final PlanningSet planningSet = getFragmentsHelper(activeEndpoints, rootFragment);
- return generateWorkUnit(
- options, foremanNode, queryId, rootFragment, planningSet, session, queryContextInfo);
+ collectStatsAndParallelizeFragments(planningSet, rootFragments, activeEndpoints);
+
+ adjustMemory(planningSet, rootFragments, activeEndpoints);
+
+ return generateWorkUnit(options, foremanNode, queryId, rootFragment, planningSet, session, queryContextInfo);
}
/**
@@ -151,28 +233,7 @@ public class SimpleParallelizer implements ParallelizationParameters {
throw new UnsupportedOperationException("Use children classes");
}
- /**
- * Helper method to reuse the code for QueryWorkUnit(s) generation
- * @param activeEndpoints
- * @param rootFragment
- * @return A {@link PlanningSet}.
- * @throws ExecutionSetupException
- */
- protected PlanningSet getFragmentsHelper(Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment) throws ExecutionSetupException {
-
- PlanningSet planningSet = new PlanningSet();
- initFragmentWrappers(rootFragment, planningSet);
-
- final Set<Wrapper> leafFragments = constructFragmentDependencyGraph(planningSet);
-
- // Start parallelizing from leaf fragments
- for (Wrapper wrapper : leafFragments) {
- parallelizeFragment(wrapper, planningSet, activeEndpoints);
- }
- planningSet.findRootWrapper(rootFragment);
- return planningSet;
- }
// For every fragment, create a Wrapper in PlanningSet.
@VisibleForTesting
@@ -190,77 +251,49 @@ public class SimpleParallelizer implements ParallelizationParameters {
* @param planningSet
* @return Returns a list of leaf fragments in fragment dependency graph.
*/
- private static Set<Wrapper> constructFragmentDependencyGraph(PlanningSet planningSet) {
+ private void constructFragmentDependencyGraph(Fragment rootFragment, PlanningSet planningSet) {
// Set up dependency of fragments based on the affinity of exchange that separates the fragments.
- for (Wrapper currentFragmentWrapper : planningSet) {
- ExchangeFragmentPair sendingExchange = currentFragmentWrapper.getNode().getSendingExchangePair();
- if (sendingExchange != null) {
- ParallelizationDependency dependency = sendingExchange.getExchange().getParallelizationDependency();
- Wrapper receivingFragmentWrapper = planningSet.get(sendingExchange.getNode());
-
+ for(Wrapper currentFragment : planningSet) {
+ ExchangeFragmentPair sendingXchgForCurrFrag = currentFragment.getNode().getSendingExchangePair();
+ if (sendingXchgForCurrFrag != null) {
+ ParallelizationDependency dependency = sendingXchgForCurrFrag.getExchange().getParallelizationDependency();
+ Wrapper receivingFragmentWrapper = planningSet.get(sendingXchgForCurrFrag.getNode());
+
+ //Mostly Receivers of the current fragment depend on the sender of the child fragments. However there is a special case
+ //for DeMux Exchanges where the Sender of the current fragment depends on the receiver of the parent fragment.
if (dependency == ParallelizationDependency.RECEIVER_DEPENDS_ON_SENDER) {
- receivingFragmentWrapper.addFragmentDependency(currentFragmentWrapper);
+ receivingFragmentWrapper.addFragmentDependency(currentFragment);
} else if (dependency == ParallelizationDependency.SENDER_DEPENDS_ON_RECEIVER) {
- currentFragmentWrapper.addFragmentDependency(receivingFragmentWrapper);
+ currentFragment.addFragmentDependency(receivingFragmentWrapper);
}
}
}
-
- // Identify leaf fragments. Leaf fragments are fragments that have no other fragments depending on them for
- // parallelization info. First assume all fragments are leaf fragments. Go through the fragments one by one and
- // remove the fragment on which the current fragment depends.
-
- final Set<Wrapper> roots = Sets.newHashSet();
- for (Wrapper w : planningSet) {
- roots.add(w);
- }
-
- for (Wrapper wrapper : planningSet) {
- final List<Wrapper> fragmentDependencies = wrapper.getFragmentDependencies();
- if (fragmentDependencies != null && fragmentDependencies.size() > 0) {
- for (Wrapper dependency : fragmentDependencies) {
- if (roots.contains(dependency)) {
- roots.remove(dependency);
- }
- }
- }
- }
-
- return roots;
+ planningSet.findRootWrapper(rootFragment);
}
+
/**
- * Helper method for parallelizing a given fragment. Dependent fragments are parallelized first before
- * parallelizing the given fragment.
+ * Helper method to call operation on each fragment. Traversal calls operation on child fragments before
+ * calling it on the parent fragment.
*/
- private void parallelizeFragment(Wrapper fragmentWrapper, PlanningSet planningSet,
- Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException {
- // If the fragment is already parallelized, return.
- if (fragmentWrapper.isEndpointsAssignmentDone()) {
- return;
- }
+ protected void traverse(Wrapper fragmentWrapper, Consumer<Wrapper> operation) throws PhysicalOperatorSetupException {
- // First parallelize fragments on which this fragment depends.
final List<Wrapper> fragmentDependencies = fragmentWrapper.getFragmentDependencies();
if (fragmentDependencies != null && fragmentDependencies.size() > 0) {
for(Wrapper dependency : fragmentDependencies) {
- parallelizeFragment(dependency, planningSet, activeEndpoints);
+ traverse(dependency, operation);
}
}
-
- // Find stats. Stats include various factors including cost of physical operators, parallelizability of
- // work in physical operator and affinity of physical operator to certain nodes.
- fragmentWrapper.getNode().getRoot().accept(new StatsCollector(planningSet), fragmentWrapper);
-
- fragmentWrapper.getStats().getDistributionAffinity()
- .getFragmentParallelizer()
- .parallelizeFragment(fragmentWrapper, this, activeEndpoints);
+ operation.accept(fragmentWrapper);
}
+ // A function which returns the memory assigned for a particular physical operator.
+ protected abstract BiFunction<DrillbitEndpoint, PhysicalOperator, Long> getMemory();
+
protected QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,
- Fragment rootNode, PlanningSet planningSet,
- UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException {
+ Fragment rootNode, PlanningSet planningSet, UserSession session,
+ QueryContextInformation queryContextInfo) throws ExecutionSetupException {
List<MinorFragmentDefn> fragmentDefns = new ArrayList<>( );
MinorFragmentDefn rootFragmentDefn = null;
@@ -282,7 +315,8 @@ public class SimpleParallelizer implements ParallelizationParameters {
// Create a minorFragment for each major fragment.
for (int minorFragmentId = 0; minorFragmentId < wrapper.getWidth(); minorFragmentId++) {
- IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper);
+ IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper,
+ (fragmentWrapper, minorFragment) -> fragmentWrapper.getAssignedEndpoint(minorFragment), getMemory());
wrapper.resetAllocation();
PhysicalOperator op = physicalOperatorRoot.accept(Materializer.INSTANCE, iNode);
Preconditions.checkArgument(op instanceof FragmentRoot);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
index 680e6a566..49af366ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
@@ -71,6 +71,7 @@ public class StatsCollector extends AbstractOpWrapperVisitor<Void, RuntimeExcept
for(ExchangeFragmentPair pair : receivingExchangePairs) {
if (pair.getExchange() == exchange) {
+ //This is the child fragment which is sending data to this fragment.
Wrapper sendingFragment = planningSet.get(pair.getNode());
if (sendingFragment.isEndpointsAssignmentDone()) {
sendingEndpoints.addAll(sendingFragment.getAssignedEndpoints());
@@ -105,7 +106,7 @@ public class StatsCollector extends AbstractOpWrapperVisitor<Void, RuntimeExcept
stats.addEndpointAffinities(hasAffinity.getOperatorAffinity());
stats.setDistributionAffinity(hasAffinity.getDistributionAffinity());
}
- stats.addCost(op.getCost());
+ stats.addCost(op.getCost().getOutputRowCount());
for (PhysicalOperator child : op) {
child.accept(this, wrapper);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
index 6da885044..ffa577e41 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
@@ -18,7 +18,12 @@
package org.apache.drill.exec.planner.fragment;
import java.util.List;
+import java.util.Map;
+import java.util.function.BinaryOperator;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.drill.exec.planner.cost.NodeResource;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
@@ -46,6 +51,11 @@ public class Wrapper {
private boolean endpointsAssigned;
private long initialAllocation = 0;
private long maxAllocation = 0;
+ // Resources (i.e memory and cpu) are stored per drillbit in this map.
+ // A Drillbit can have n number of minor fragments then the NodeResource
+ // contains cumulative resources required for all the minor fragments
+ // for that major fragment on that Drillbit.
+ private Map<DrillbitEndpoint, NodeResource> nodeResourceMap;
// List of fragments this particular fragment depends on for determining its parallelization and endpoint assignments.
private final List<Wrapper> fragmentDependencies = Lists.newArrayList();
@@ -58,6 +68,7 @@ public class Wrapper {
this.majorFragmentId = majorFragmentId;
this.node = node;
this.stats = new Stats();
+ nodeResourceMap = null;
}
public Stats getStats() {
@@ -94,10 +105,12 @@ public class Wrapper {
return maxAllocation;
}
- public void addAllocation(PhysicalOperator pop) {
- initialAllocation += pop.getInitialAllocation();
-// logger.debug("Incrementing initialAllocation by {} to {}. Pop: {}", pop.getInitialAllocation(), initialAllocation, pop);
- maxAllocation += pop.getMaxAllocation();
+ public void addInitialAllocation(long memory) {
+ initialAllocation += memory;
+ }
+
+ public void addMaxAllocation(long memory) {
+ maxAllocation += memory;
}
private class AssignEndpointsToScanAndStore extends AbstractPhysicalVisitor<Void, List<DrillbitEndpoint>, PhysicalOperatorSetupException>{
@@ -197,4 +210,30 @@ public class Wrapper {
public List<Wrapper> getFragmentDependencies() {
return ImmutableList.copyOf(fragmentDependencies);
}
+
+ /**
+ * Compute the cpu resources required for all the minor fragments of this major fragment.
+ * This information is stored per DrillbitEndpoint. It is assumed that this function is
+ * called only once.
+ */
+ public void computeCpuResources() {
+ Preconditions.checkArgument(nodeResourceMap == null);
+ BinaryOperator<NodeResource> merge = (first, second) -> {
+ NodeResource result = NodeResource.create();
+ result.add(first);
+ result.add(second);
+ return result;
+ };
+
+ Function<DrillbitEndpoint, NodeResource> cpuPerEndpoint = (endpoint) -> new NodeResource(1, 0);
+
+ nodeResourceMap = endpoints.stream()
+ .collect(Collectors.groupingBy(Function.identity(),
+ Collectors.reducing(NodeResource.create(),
+ cpuPerEndpoint, merge)));
+ }
+
+ public Map<DrillbitEndpoint, NodeResource> getResourceMap() {
+ return nodeResourceMap;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java
index 70d4e268a..c1250e3ac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.fragment.contrib;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.util.DrillStringUtils;
@@ -28,9 +29,9 @@ import org.apache.drill.exec.physical.base.Exchange;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.fragment.DefaultQueryParallelizer;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.PlanningSet;
-import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
import org.apache.drill.exec.planner.fragment.Wrapper;
import org.apache.drill.exec.planner.fragment.Materializer.IndexedFragmentNode;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
@@ -56,12 +57,12 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
* allows not to pollute parent class with non-authentic functionality
*
*/
-public class SplittingParallelizer extends SimpleParallelizer {
+public class SplittingParallelizer extends DefaultQueryParallelizer {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SplittingParallelizer.class);
- public SplittingParallelizer(QueryContext context) {
- super(context);
+ public SplittingParallelizer(boolean doMemoryPlanning, QueryContext context) {
+ super(doMemoryPlanning, context);
}
/**
@@ -81,7 +82,13 @@ public class SplittingParallelizer extends SimpleParallelizer {
Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment rootFragment,
UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException {
- final PlanningSet planningSet = getFragmentsHelper(activeEndpoints, rootFragment);
+ final PlanningSet planningSet = this.prepareFragmentTree(rootFragment);
+
+ Set<Wrapper> rootFragments = getRootFragments(planningSet);
+
+ collectStatsAndParallelizeFragments(planningSet, rootFragments, activeEndpoints);
+
+ adjustMemory(planningSet, rootFragments, activeEndpoints);
return generateWorkUnits(
options, foremanNode, queryId, reader, rootFragment, planningSet, session, queryContextInfo);
@@ -113,7 +120,7 @@ public class SplittingParallelizer extends SimpleParallelizer {
List<QueryWorkUnit> workUnits = Lists.newArrayList();
int plansCount = 0;
- DrillbitEndpoint[] endPoints = null;
+ DrillbitEndpoint[] leafFragEndpoints = null;
long initialAllocation = 0;
final Iterator<Wrapper> iter = planningSet.iterator();
@@ -130,12 +137,14 @@ public class SplittingParallelizer extends SimpleParallelizer {
// allocation
plansCount = wrapper.getWidth();
initialAllocation = (wrapper.getInitialAllocation() != 0 ) ? wrapper.getInitialAllocation()/plansCount : 0;
- endPoints = new DrillbitEndpoint[plansCount];
+ leafFragEndpoints = new DrillbitEndpoint[plansCount];
for (int mfId = 0; mfId < plansCount; mfId++) {
- endPoints[mfId] = wrapper.getAssignedEndpoint(mfId);
+ leafFragEndpoints[mfId] = wrapper.getAssignedEndpoint(mfId);
}
}
}
+
+ DrillbitEndpoint[] endPoints = leafFragEndpoints;
if ( plansCount == 0 ) {
// no exchange, return list of single QueryWorkUnit
workUnits.add(generateWorkUnit(options, foremanNode, queryId, rootNode, planningSet, session, queryContextInfo));
@@ -174,7 +183,8 @@ public class SplittingParallelizer extends SimpleParallelizer {
MinorFragmentDefn rootFragment = null;
FragmentRoot rootOperator = null;
- IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper);
+ IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper,
+ (fragmentWrapper, minorFragment) -> endPoints[minorFragment],getMemory());
wrapper.resetAllocation();
// two visitors here
// 1. To remove exchange