diff options
author | HanumathRao <hanu.ncr@gmail.com> | 2019-02-28 00:12:05 -0800 |
---|---|---|
committer | Sorabh Hamirwasia <sorabh@apache.org> | 2019-03-14 21:34:06 -0700 |
commit | d22e68b83d1d0cc0539d79ae0cb3aa70ae3242ad (patch) | |
tree | 76f816aead22b0d7668d06f1b13ccad5c142960c | |
parent | 5aa38a51d90998234b4ca46434ce225df72addc5 (diff) |
DRILL-7068: Support memory adjustment framework for resource management with Queues.
closes #1677
38 files changed, 1375 insertions, 224 deletions
diff --git a/common/src/main/java/org/apache/drill/common/util/function/CheckedConsumer.java b/common/src/main/java/org/apache/drill/common/util/function/CheckedConsumer.java new file mode 100644 index 000000000..bd9b69b3e --- /dev/null +++ b/common/src/main/java/org/apache/drill/common/util/function/CheckedConsumer.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.common.util.function; + +import java.util.function.Consumer; +import org.apache.drill.common.exceptions.ErrorHelper; + +@FunctionalInterface +public interface CheckedConsumer<T, E extends Throwable> { + void accept(T t) throws E; + + static <T> Consumer<T> throwingConsumerWrapper( + CheckedConsumer<T, Exception> throwingConsumer) { + + return i -> { + try { + throwingConsumer.accept(i); + } catch (Exception ex) { + ErrorHelper.sneakyThrow(ex); + } + }; + } +} + diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java index d87473305..2a5cdddf8 100644 --- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java +++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java @@ -35,7 +35,7 @@ public class KafkaFilterPushdownTest extends KafkaTestBase { private static final String expectedSubStr = " \"kafkaScanSpec\" : {\n" + " \"topicName\" : \"drill-pushdown-topic\"\n" + " },\n" + - " \"cost\" : %s.0"; + " \"cost\""; @BeforeClass public static void setup() throws Exception { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java index 6eba65833..13c3c2ac8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java @@ -93,7 +93,7 @@ public class PhysicalPlan { public double totalCost() { double totalCost = 0; for (final PhysicalOperator ops : getSortedOperators()) { - totalCost += ops.getCost(); + totalCost += ops.getCost().getOutputRowCount(); } return totalCost; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java index 71289b699..aad4e8184 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.base; import com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.drill.common.graph.GraphVisitor; import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.planner.cost.PrelCostEstimates; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; @@ -35,7 +36,7 @@ public abstract class AbstractBase implements PhysicalOperator { private final String userName; private int id; - private double cost; + private PrelCostEstimates cost = PrelCostEstimates.ZERO_COST; public AbstractBase() { userName = null; @@ -89,12 +90,12 @@ public abstract class AbstractBase implements PhysicalOperator { } @Override - public double getCost() { + public PrelCostEstimates getCost() { return cost; } @Override - public void setCost(double cost) { + public void setCost(PrelCostEstimates cost) { this.cost = cost; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java index ebd7288a4..4106205f3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java @@ -130,4 +130,20 @@ public abstract class AbstractExchange extends AbstractSingle implements Exchang public ParallelizationDependency getParallelizationDependency() { return ParallelizationDependency.RECEIVER_DEPENDS_ON_SENDER; } + + // Memory requirement of the sender for the given receivers and senders in a major fragment. + // Currently set to zero but later once batch sizing for Exchanges is completed it will call + // appropriate function. + @Override + public long getSenderMemory(int receivers, int senders) { + return 0; + } + + // Memory requirement of the receiver for the given receivers and senders in a major fragment. + // Currently set to zero but later once batch sizing for Exchanges is completed it will calling + // apropriate function. + @Override + public long getReceiverMemory(int receivers, int senders) { + return 0; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java index 56e9b9c90..7de915c4c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java @@ -79,6 +79,23 @@ public interface Exchange extends PhysicalOperator { Receiver getReceiver(int minorFragmentId); /** + * Returns the memory requirement for the sender side of the exchange operator. + * @param receiverCount number of receivers at the receiving end of this exchange operator. + * @param senderCount number of senders sending the rows for this exchange operator. + * @return Total memory required by this operator. + */ + long getSenderMemory(int receiverCount, int senderCount); + + /** + * Returns the memory requirement for the receiver side of the exchange operator. + * @param receiverCount number of receivers receiving the rows sent by the sender side of this + * exchange operator. + * @param senderCount number of senders sending the rows. + * @return Total memory required by this operator. + */ + long getReceiverMemory(int receiverCount, int senderCount); + + /** * Provide parallelization parameters for sender side of the exchange. Output includes min width, * max width and affinity to Drillbits. * diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java index 82fb53bf4..3bca02ea0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.graph.GraphValue; import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.planner.cost.PrelCostEstimates; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import com.fasterxml.jackson.annotation.JsonIdentityInfo; @@ -101,10 +102,10 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> { void setOperatorId(int id); @JsonProperty("cost") - void setCost(double cost); + void setCost(PrelCostEstimates cost); @JsonProperty("cost") - double getCost(); + PrelCostEstimates getCost(); /** * Name of the user whom to impersonate while setting up the implementation (RecordBatch) of this diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index c185ac7ac..5aed88ef8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -121,7 +121,7 @@ public class PartitionSenderRootExec extends BaseRootExec { stats.setLongStat(Metric.N_RECEIVERS, outGoingBatchCount); // Algorithm to figure out number of threads to parallelize output // numberOfRows/sliceTarget/numReceivers/threadfactor - this.cost = operator.getChild().getCost(); + this.cost = operator.getChild().getCost().getOutputRowCount(); final OptionManager optMgr = context.getOptions(); long sliceTarget = optMgr.getOption(ExecConstants.SLICE_TARGET).num_val; int threadFactor = optMgr.getOption(PlannerSettings.PARTITION_SENDER_THREADS_FACTOR.getOptionName()).num_val.intValue(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java index faf09afe6..8c268e22f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java @@ -24,4 +24,6 @@ public interface DrillRelOptCost extends RelOptCost { double getNetwork(); + double getMemory(); + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/NodeResource.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/NodeResource.java new file mode 100644 index 000000000..ad7bc2e75 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/NodeResource.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.cost; + +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import java.util.Map; + +/** + * This class abstracts the resources like cpu and memory used up by the operators. + * In future network resources can also be incorporated if required. + */ +public class NodeResource { + private long cpu; + private long memory; + + public NodeResource(long cpu, long memory) { + this.cpu = cpu; + this.memory = memory; + } + + public void add(NodeResource other) { + if (other == null) { + return; + } + this.cpu += other.cpu; + this.memory += other.memory; + } + + public long getMemory() { + return memory; + } + + // A utility function to merge the node resources from one drillbit map to other drillbit map. + public static Map<DrillbitEndpoint, NodeResource> merge(Map<DrillbitEndpoint, NodeResource> to, + Map<DrillbitEndpoint, NodeResource> from) { + to.entrySet().stream().forEach((toEntry) -> toEntry.getValue().add(from.get(toEntry.getKey()))); + return to; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("CPU: ").append(cpu).append("Memory: ").append(memory); + return sb.toString(); + } + + public static NodeResource create() { + return create(0,0); + } + + public static NodeResource create(long cpu) { + return create(cpu,0); + } + + public static NodeResource create(long cpu, long memory) { + return new NodeResource(cpu, memory); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/PrelCostEstimates.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/PrelCostEstimates.java new file mode 100644 index 000000000..7cf1b9e11 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/PrelCostEstimates.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.cost; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +/** + * Cost estimates per physical relation. These cost estimations are computed + * during physical planning by the optimizer. These are also used post physical + * planning to compute memory requirements in minor fragment generation phase. + * + */ +@JsonTypeName("cost-estimates") +public class PrelCostEstimates { + // memory requirement for an operator. + private final double memoryCost; + // number of rows that are output by this operator. + private final double outputRowCount; + + public static PrelCostEstimates ZERO_COST = new PrelCostEstimates(0,0); + + public PrelCostEstimates(@JsonProperty("memoryCost") double memory, + @JsonProperty("outputRowCount") double rowCount) { + this.memoryCost = memory; + this.outputRowCount = rowCount; + } + + public double getOutputRowCount() { + return outputRowCount; + } + + public double getMemoryCost() { + return memoryCost; + } +}
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DefaultQueryParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DefaultQueryParallelizer.java new file mode 100644 index 000000000..20875a470 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DefaultQueryParallelizer.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.fragment; + +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.util.MemoryAllocationUtilities; + +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.function.BiFunction; + +/** + * Non RM version of the parallelizer. The parallelization logic is fully inherited from SimpleParallelizer. + * The memory computation of the operators is based on the earlier logic to assign memory for the buffered + * operators. + */ +public class DefaultQueryParallelizer extends SimpleParallelizer { + private final boolean planHasMemory; + private final QueryContext queryContext; + + public DefaultQueryParallelizer(boolean memoryAvailableInPlan, QueryContext queryContext) { + super(queryContext); + this.planHasMemory = memoryAvailableInPlan; + this.queryContext = queryContext; + } + + public DefaultQueryParallelizer(boolean memoryPlanning, long parallelizationThreshold, int maxWidthPerNode, + int maxGlobalWidth, double affinityFactor) { + super(parallelizationThreshold, maxWidthPerNode, maxGlobalWidth, affinityFactor); + this.planHasMemory = memoryPlanning; + this.queryContext = null; + } + + @Override + public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots, + Collection<DrillbitEndpoint> activeEndpoints) { + if (planHasMemory) { + return; + } + List<PhysicalOperator> bufferedOpers = planningSet.getRootWrapper().getNode().getBufferedOperators(); + MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(planHasMemory, bufferedOpers, queryContext); + } + + @Override + protected BiFunction<DrillbitEndpoint, PhysicalOperator, Long> getMemory() { + return (endpoint, operator) -> operator.getMaxAllocation(); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java index a662adc6b..824060194 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java @@ -17,9 +17,11 @@ */ package org.apache.drill.exec.planner.fragment; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; import org.apache.drill.exec.physical.base.Exchange; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.work.foreman.ForemanSetupException; @@ -82,18 +84,14 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> { return sendingExchange; } -// public <T, V> T accept(FragmentVisitor<T, V> visitor, V extra) { -// return visitor.visit(this, extra); -// } - public class ExchangeFragmentPair { private Exchange exchange; - private Fragment node; + private Fragment fragmentXchgTo; - public ExchangeFragmentPair(Exchange exchange, Fragment node) { + public ExchangeFragmentPair(Exchange exchange, Fragment fragXchgTo) { super(); this.exchange = exchange; - this.node = node; + this.fragmentXchgTo = fragXchgTo; } public Exchange getExchange() { @@ -101,7 +99,7 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> { } public Fragment getNode() { - return node; + return fragmentXchgTo; } @Override @@ -109,7 +107,7 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> { final int prime = 31; int result = 1; result = prime * result + ((exchange == null) ? 0 : exchange.hashCode()); - result = prime * result + ((node == null) ? 0 : node.hashCode()); + result = prime * result + ((fragmentXchgTo == null) ? 0 : fragmentXchgTo.hashCode()); return result; } @@ -167,10 +165,27 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> { return true; } + public List<PhysicalOperator> getBufferedOperators() { + List<PhysicalOperator> bufferedOps = new ArrayList<>(); + root.accept(new BufferedOpFinder(), bufferedOps); + return bufferedOps; + } + + protected static class BufferedOpFinder extends AbstractPhysicalVisitor<Void, List<PhysicalOperator>, RuntimeException> { + @Override + public Void visitOp(PhysicalOperator op, List<PhysicalOperator> value) + throws RuntimeException { + if (op.isBufferedOperator(null)) { + value.add(op); + } + visitChildren(op, value); + return null; + } + } + @Override public String toString() { return "FragmentNode [root=" + root + ", sendingExchange=" + sendingExchange + ", receivingExchangePairs=" + receivingExchangePairs + "]"; } - } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java index a9be6f398..7a59f4f76 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java @@ -26,7 +26,6 @@ import org.apache.drill.exec.work.foreman.ForemanSetupException; * Responsible for breaking a plan into its constituent Fragments. */ public class MakeFragmentsVisitor extends AbstractPhysicalVisitor<Fragment, Fragment, ForemanSetupException> { - static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MakeFragmentsVisitor.class); public final static MakeFragmentsVisitor INSTANCE = new MakeFragmentsVisitor(); @@ -34,21 +33,20 @@ public class MakeFragmentsVisitor extends AbstractPhysicalVisitor<Fragment, Frag } @Override - public Fragment visitExchange(Exchange exchange, Fragment value) throws ForemanSetupException { -// logger.debug("Visiting Exchange {}", exchange); - if (value == null) { - throw new ForemanSetupException("The simple fragmenter was called without a FragmentBuilder value. This will only happen if the initial call to SimpleFragmenter is by a Exchange node. This should never happen since an Exchange node should never be the root node of a plan."); + public Fragment visitExchange(Exchange exchange, Fragment receivingFragment) throws ForemanSetupException { + if (receivingFragment == null) { + throw new ForemanSetupException("The simple fragmenter was called without a FragmentBuilder value. This will only happen if the initial call to SimpleFragmenter is by a" + + " Exchange node. This should never happen since an Exchange node should never be the root node of a plan."); } - Fragment next = getNextBuilder(); - value.addReceiveExchange(exchange, next); - next.addSendExchange(exchange, value); - exchange.getChild().accept(this, next); - return value; + Fragment sendingFragment= getNextFragment(); + receivingFragment.addReceiveExchange(exchange, sendingFragment); + sendingFragment.addSendExchange(exchange, receivingFragment); + exchange.getChild().accept(this, sendingFragment); + return sendingFragment; } @Override public Fragment visitOp(PhysicalOperator op, Fragment value) throws ForemanSetupException{ -// logger.debug("Visiting Other {}", op); value = ensureBuilder(value); value.addOperator(op); for (PhysicalOperator child : op) { @@ -57,15 +55,15 @@ public class MakeFragmentsVisitor extends AbstractPhysicalVisitor<Fragment, Frag return value; } - private Fragment ensureBuilder(Fragment value) throws ForemanSetupException{ - if (value != null) { - return value; + private Fragment ensureBuilder(Fragment currentFragment) throws ForemanSetupException{ + if (currentFragment != null) { + return currentFragment; } else { - return getNextBuilder(); + return getNextFragment(); } } - public Fragment getNextBuilder() { + public Fragment getNextFragment() { return new Fragment(); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java index e33a38afc..7659f90f1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.fragment; import java.util.ArrayDeque; import java.util.Deque; import java.util.List; +import java.util.function.BiFunction; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.exception.FragmentSetupException; @@ -30,7 +31,7 @@ import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.base.Store; import org.apache.drill.exec.physical.base.SubScan; - +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.drill.exec.physical.config.LateralJoinPOP; import org.apache.drill.exec.physical.config.UnnestPOP; @@ -164,15 +165,21 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate public static class IndexedFragmentNode{ private final Wrapper info; + private final BiFunction<Wrapper, Integer, DrillbitEndpoint> endpoint; private final int minorFragmentId; + private final BiFunction<DrillbitEndpoint, PhysicalOperator, Long> memoryPerOperPerDrillbit; SubScan subScan = null; private final Deque<UnnestPOP> unnest = new ArrayDeque<>(); - public IndexedFragmentNode(int minorFragmentId, Wrapper info) { + public IndexedFragmentNode(int minorFragmentId, Wrapper info, + BiFunction<Wrapper, Integer, DrillbitEndpoint> wrapperToEndpoint, + BiFunction<DrillbitEndpoint, PhysicalOperator, Long> memoryReqs) { super(); this.info = info; + this.endpoint = wrapperToEndpoint; this.minorFragmentId = minorFragmentId; + this.memoryPerOperPerDrillbit = memoryReqs; } public Fragment getNode() { @@ -188,7 +195,8 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate } public void addAllocation(PhysicalOperator pop) { - info.addAllocation(pop); + info.addInitialAllocation(pop.getInitialAllocation()); + info.addMaxAllocation(memoryPerOperPerDrillbit.apply(this.endpoint.apply(info, minorFragmentId), pop)); } public void addUnnest(UnnestPOP unnest) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java new file mode 100644 index 000000000..443ab79cd --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.fragment; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.physical.base.Exchange; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.physical.config.AbstractMuxExchange; +import org.apache.drill.exec.planner.AbstractOpWrapperVisitor; +import org.apache.drill.exec.planner.cost.NodeResource; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.HashMap; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * A visitor to compute memory requirements for each operator in a minor fragment. + * This visitor will be called for each major fragment. It traverses the physical operators + * in that major fragment and computes the memory for each operator per each minor fragment. + * The minor fragment memory resources are further grouped into per Drillbit resource + * requirements. + */ +public class MemoryCalculator extends AbstractOpWrapperVisitor<Void, RuntimeException> { + + private final PlanningSet planningSet; + // List of all the buffered operators and their memory requirement per drillbit. + private final Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> bufferedOperators; + private final QueryContext queryContext; + + public MemoryCalculator(PlanningSet planningSet, QueryContext context) { + this.planningSet = planningSet; + this.bufferedOperators = new HashMap<>(); + this.queryContext = context; + } + + // Helper method to compute the minor fragment count per drillbit. This method returns + // a map with key as DrillbitEndpoint and value as the width (i.e #minorFragments) + // per Drillbit. + private Map<DrillbitEndpoint, Integer> getMinorFragCountPerDrillbit(Wrapper currFragment) { + return currFragment.getAssignedEndpoints().stream() + .collect(Collectors.groupingBy(Function.identity(), + Collectors.summingInt(x -> 1))); + } + + // Helper method to merge the memory computations for each operator given memory per operator + // and the number of minor fragments per Drillbit. + private void merge(Wrapper currFrag, + Map<DrillbitEndpoint, Integer> minorFragsPerDrillBit, + Function<Entry<DrillbitEndpoint, Integer>, Long> getMemory) { + + NodeResource.merge(currFrag.getResourceMap(), + minorFragsPerDrillBit.entrySet() + .stream() + .collect(Collectors.toMap((x) -> x.getKey(), + (x) -> NodeResource.create(0, + getMemory.apply(x))))); + } + + @Override + public Void visitSendingExchange(Exchange exchange, Wrapper fragment) throws RuntimeException { + Wrapper receivingFragment = planningSet.get(fragment.getNode().getSendingExchangePair().getNode()); + merge(fragment, + getMinorFragCountPerDrillbit(fragment), + // get the memory requirements for the sender operator. + (x) -> exchange.getSenderMemory(receivingFragment.getWidth(), x.getValue())); + return visitOp(exchange, fragment); + } + + @Override + public Void visitReceivingExchange(Exchange exchange, Wrapper fragment) throws RuntimeException { + + final List<Fragment.ExchangeFragmentPair> receivingExchangePairs = fragment.getNode().getReceivingExchangePairs(); + final Map<DrillbitEndpoint, Integer> sendingFragsPerDrillBit = new HashMap<>(); + + for(Fragment.ExchangeFragmentPair pair : receivingExchangePairs) { + if (pair.getExchange() == exchange) { + Wrapper sendingFragment = planningSet.get(pair.getNode()); + Preconditions.checkArgument(sendingFragment.isEndpointsAssignmentDone()); + for (DrillbitEndpoint endpoint : sendingFragment.getAssignedEndpoints()) { + sendingFragsPerDrillBit.putIfAbsent(endpoint, 0); + sendingFragsPerDrillBit.put(endpoint, sendingFragsPerDrillBit.get(endpoint)+1); + } + } + } + final int totalSendingFrags = sendingFragsPerDrillBit.entrySet().stream() + .mapToInt((x) -> x.getValue()).reduce(0, (x, y) -> x+y); + merge(fragment, + getMinorFragCountPerDrillbit(fragment), + (x) -> exchange.getReceiverMemory(fragment.getWidth(), + // If the exchange is a MuxExchange then the sending fragments are from that particular drillbit otherwise + // sending fragments are from across the cluster. + exchange instanceof AbstractMuxExchange ? sendingFragsPerDrillBit.get(x.getKey()) : totalSendingFrags)); + return null; + } + + public List<Pair<PhysicalOperator, Long>> getBufferedOperators(DrillbitEndpoint endpoint) { + return this.bufferedOperators.getOrDefault(endpoint, new ArrayList<>()); + } + + @Override + public Void visitOp(PhysicalOperator op, Wrapper fragment) { + long memoryCost = (int)Math.ceil(op.getCost().getMemoryCost()); + if (op.isBufferedOperator(queryContext)) { + // If the operator is a buffered operator then get the memory estimates of the optimizer. + // The memory estimates of the optimizer are for the whole operator spread across all the + // minor fragments. Divide this memory estimation by fragment width to get the memory + // requirement per minor fragment. + long memoryCostPerMinorFrag = (int)Math.ceil(memoryCost/fragment.getAssignedEndpoints().size()); + Map<DrillbitEndpoint, Integer> drillbitEndpointMinorFragMap = getMinorFragCountPerDrillbit(fragment); + + Map<DrillbitEndpoint, + Pair<PhysicalOperator, Long>> bufferedOperatorsPerDrillbit = + drillbitEndpointMinorFragMap.entrySet().stream() + .collect(Collectors.toMap((x) -> x.getKey(), + (x) -> Pair.of(op, + memoryCostPerMinorFrag * x.getValue()))); + bufferedOperatorsPerDrillbit.entrySet().forEach((x) -> { + bufferedOperators.putIfAbsent(x.getKey(), new ArrayList<>()); + bufferedOperators.get(x.getKey()).add(x.getValue()); + }); + + merge(fragment, + drillbitEndpointMinorFragMap, + (x) -> memoryCostPerMinorFrag * x.getValue()); + + } else { + // Memory requirement for the non-buffered operators is just the batch size. + merge(fragment, + getMinorFragCountPerDrillbit(fragment), + (x) -> memoryCost * x.getValue()); + } + for (PhysicalOperator child : op) { + child.accept(this, fragment); + } + return null; + } +} + diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueryParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueryParallelizer.java new file mode 100644 index 000000000..097f4b249 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueryParallelizer.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.fragment; + +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.proto.BitControl.QueryContextInformation; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.proto.UserBitShared.QueryId; +import org.apache.drill.exec.rpc.user.UserSession; +import org.apache.drill.exec.server.options.OptionList; +import org.apache.drill.exec.work.QueryWorkUnit; + +import java.util.Collection; + +/** + * This class parallelizes the query plan. Once the optimizer finishes its job by producing a + * optimized plan, it is the job of this parallelizer to generate a parallel plan out of the + * optimized physical plan. It does so by using the optimizers estimates for row count etc. + * There are two kinds of parallelizers as explained below. Currently the difference in + * both of these parallelizers is only in the memory assignment for the physical operators. + * + * a) Default Parallelizer: It optimistically assumes that the whole cluster is running only the + * current query and based on heuristics assigns the optimal memory to the buffered operators. + * + * b) Queue Parallelizer: This parallelizer computes the memory that can be allocated at best based + * on the current cluster state(as to how much memory is available) and also the configuration + * of the queue that it can run on. + */ +public interface QueryParallelizer extends ParallelizationParameters { + + /** + * This is the only function exposed to the consumer of this parallelizer (currently Foreman) to parallelize + * the plan. The caller transforms the plan to a fragment tree and supplies the required information for + * the parallelizer to do its job. + * @param options List of all options that are set for the current session. + * @param foremanNode Endpoint information of the foreman node. + * @param queryId Unique ID of the query. + * @param activeEndpoints Currently active endpoints on which the plan can run. + * @param rootFragment root of the fragment tree of the transformed physical plan + * @param session user session object. + * @param queryContextInfo query context. + * @return Executable query plan which contains all the information like minor frags and major frags. + * @throws ExecutionSetupException + */ + QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, + Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment, + UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException; +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueueQueryParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueueQueryParallelizer.java new file mode 100644 index 000000000..c986fca29 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueueQueryParallelizer.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.fragment; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.drill.common.util.function.CheckedConsumer; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.physical.PhysicalOperatorSetupException; +import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.planner.cost.NodeResource; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; + +import java.util.Map; +import java.util.HashMap; +import java.util.Collection; +import java.util.Set; +import java.util.List; +import java.util.ArrayList; +import java.util.function.BiFunction; +import java.util.stream.Collectors; + +/** + * Paralellizer specialized for managing resources for a query based on Queues. This parallelizer + * does not deal with increase/decrease of the parallelization of a query plan based on the current + * cluster state. However, the memory assignment for each operator, minor fragment and major + * fragment is based on the cluster state and provided queue configuration. + */ +public class QueueQueryParallelizer extends SimpleParallelizer { + private final boolean planHasMemory; + private final QueryContext queryContext; + private final Map<DrillbitEndpoint, Map<PhysicalOperator, Long>> operators; + + public QueueQueryParallelizer(boolean memoryPlanning, QueryContext queryContext) { + super(queryContext); + this.planHasMemory = memoryPlanning; + this.queryContext = queryContext; + this.operators = new HashMap<>(); + } + + // return the memory computed for a physical operator on a drillbitendpoint. + public BiFunction<DrillbitEndpoint, PhysicalOperator, Long> getMemory() { + return (endpoint, operator) -> { + if (planHasMemory) { + return operators.get(endpoint).get(operator); + } + else { + return operator.getMaxAllocation(); + } + }; + } + + /** + * Function called by the SimpleParallelizer to adjust the memory post parallelization. + * The overall logic is to traverse the fragment tree and call the MemoryCalculator on + * each major fragment. Once the memory is computed, resource requirement are accumulated + * per drillbit. + * + * The total resource requirements are used to select a queue. If the selected queue's + * resource limit is more/less than the query's requirement than the memory will be re-adjusted. + * + * @param planningSet context of the fragments. + * @param roots root fragments. + * @param activeEndpoints currently active endpoints. + * @throws PhysicalOperatorSetupException + */ + public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots, + Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException { + + if (planHasMemory) { + return; + } + // total node resources for the query plan maintained per drillbit. + final Map<DrillbitEndpoint, NodeResource> totalNodeResources = + activeEndpoints.stream().collect(Collectors.toMap(x ->x, + x -> NodeResource.create())); + + // list of the physical operators and their memory requirements per drillbit. + final Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> operators = + activeEndpoints.stream().collect(Collectors.toMap(x -> x, + x -> new ArrayList<>())); + + for (Wrapper wrapper : roots) { + traverse(wrapper, CheckedConsumer.throwingConsumerWrapper((Wrapper fragment) -> { + MemoryCalculator calculator = new MemoryCalculator(planningSet, queryContext); + fragment.getNode().getRoot().accept(calculator, fragment); + NodeResource.merge(totalNodeResources, fragment.getResourceMap()); + operators.entrySet() + .stream() + .forEach((entry) -> entry.getValue() + .addAll(calculator.getBufferedOperators(entry.getKey()))); + })); + } + //queryrm.selectQueue( pass the max node Resource) returns queue configuration. + Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> memoryAdjustedOperators = ensureOperatorMemoryWithinLimits(operators, totalNodeResources, 10); + memoryAdjustedOperators.entrySet().stream().forEach((x) -> { + Map<PhysicalOperator, Long> memoryPerOperator = x.getValue().stream() + .collect(Collectors.toMap(operatorLongPair -> operatorLongPair.getLeft(), + operatorLongPair -> operatorLongPair.getRight(), + (mem_1, mem_2) -> (mem_1 + mem_2))); + this.operators.put(x.getKey(), memoryPerOperator); + }); + } + + + /** + * Helper method to adjust the memory for the buffered operators. + * @param memoryPerOperator list of physical operators per drillbit + * @param nodeResourceMap resources per drillbit. + * @param nodeLimit permissible node limit. + * @return list of operators which contain adjusted memory limits. + */ + private Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> + ensureOperatorMemoryWithinLimits(Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> memoryPerOperator, + Map<DrillbitEndpoint, NodeResource> nodeResourceMap, int nodeLimit) { + // Get the physical operators which are above the node memory limit. + Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> onlyMemoryAboveLimitOperators = new HashMap<>(); + memoryPerOperator.entrySet().stream().forEach((entry) -> { + onlyMemoryAboveLimitOperators.putIfAbsent(entry.getKey(), new ArrayList<>()); + if (nodeResourceMap.get(entry.getKey()).getMemory() > nodeLimit) { + onlyMemoryAboveLimitOperators.get(entry.getKey()).addAll(entry.getValue()); + } + }); + + + // Compute the total memory required by the physical operators on the drillbits which are above node limit. + // Then use the total memory to adjust the memory requirement based on the permissible node limit. + Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> memoryAdjustedDrillbits = new HashMap<>(); + onlyMemoryAboveLimitOperators.entrySet().stream().forEach( + entry -> { + Long totalMemory = entry.getValue().stream().mapToLong(Pair::getValue).sum(); + List<Pair<PhysicalOperator, Long>> adjustedMemory = entry.getValue().stream().map(operatorMemory -> { + // formula to adjust the memory is (optimalMemory / totalMemory(this is for all the operators)) * permissible_node_limit. + return Pair.of(operatorMemory.getKey(), (long) Math.ceil(operatorMemory.getValue()/totalMemory * nodeLimit)); + }).collect(Collectors.toList()); + memoryAdjustedDrillbits.put(entry.getKey(), adjustedMemory); + } + ); + + // Get all the operations on drillbits which were adjusted for memory and merge them with operators which are not + // adjusted for memory. + Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> allDrillbits = new HashMap<>(); + memoryPerOperator.entrySet().stream().filter((entry) -> !memoryAdjustedDrillbits.containsKey(entry.getKey())).forEach( + operatorMemory -> { + allDrillbits.put(operatorMemory.getKey(), operatorMemory.getValue()); + } + ); + + memoryAdjustedDrillbits.entrySet().stream().forEach( + operatorMemory -> { + allDrillbits.put(operatorMemory.getKey(), operatorMemory.getValue()); + } + ); + + // At this point allDrillbits contains the operators on all drillbits. The memory also is adjusted based on the nodeLimit and + // the ratio of their requirements. + return allDrillbits; + } +}
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java index 39b699fe4..a434bf80b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java @@ -21,9 +21,12 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Set; +import java.util.function.BiFunction; +import java.util.function.Consumer; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.util.DrillStringUtils; +import org.apache.drill.common.util.function.CheckedConsumer; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.physical.MinorFragmentEndpoint; @@ -61,7 +64,7 @@ import org.apache.drill.exec.work.foreman.ForemanSetupException; * parallelization for each major fragment will be determined. Once the amount of parallelization is done, assignment * is done based on round robin assignment ordered by operator affinity (locality) to available execution Drillbits. */ -public class SimpleParallelizer implements ParallelizationParameters { +public abstract class SimpleParallelizer implements QueryParallelizer { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleParallelizer.class); private final long parallelizationThreshold; @@ -69,7 +72,7 @@ public class SimpleParallelizer implements ParallelizationParameters { private final int maxGlobalWidth; private final double affinityFactor; - public SimpleParallelizer(QueryContext context) { + protected SimpleParallelizer(QueryContext context) { OptionManager optionManager = context.getOptions(); long sliceTarget = optionManager.getOption(ExecConstants.SLICE_TARGET_OPTION); this.parallelizationThreshold = sliceTarget > 0 ? sliceTarget : 1; @@ -81,7 +84,7 @@ public class SimpleParallelizer implements ParallelizationParameters { this.affinityFactor = optionManager.getOption(ExecConstants.AFFINITY_FACTOR_KEY).float_val.intValue(); } - public SimpleParallelizer(long parallelizationThreshold, int maxWidthPerNode, int maxGlobalWidth, double affinityFactor) { + protected SimpleParallelizer(long parallelizationThreshold, int maxWidthPerNode, int maxGlobalWidth, double affinityFactor) { this.parallelizationThreshold = parallelizationThreshold; this.maxWidthPerNode = maxWidthPerNode; this.maxGlobalWidth = maxGlobalWidth; @@ -108,27 +111,106 @@ public class SimpleParallelizer implements ParallelizationParameters { return affinityFactor; } + public Set<Wrapper> getRootFragments(PlanningSet planningSet) { + //The following code gets the root fragment by removing all the dependent fragments on which root fragments depend upon. + //This is fine because the later parallelizer code traverses from these root fragments to their respective dependent + //fragments. + final Set<Wrapper> roots = Sets.newHashSet(); + for(Wrapper w : planningSet) { + roots.add(w); + } + + //roots will be left over with the fragments which are not depended upon by any other fragments. + for(Wrapper wrapper : planningSet) { + final List<Wrapper> fragmentDependencies = wrapper.getFragmentDependencies(); + if (fragmentDependencies != null && fragmentDependencies.size() > 0) { + for(Wrapper dependency : fragmentDependencies) { + if (roots.contains(dependency)) { + roots.remove(dependency); + } + } + } + } + + return roots; + } + + public PlanningSet prepareFragmentTree(Fragment rootFragment) { + PlanningSet planningSet = new PlanningSet(); + + initFragmentWrappers(rootFragment, planningSet); + + constructFragmentDependencyGraph(rootFragment, planningSet); + + return planningSet; + } + /** - * Generate a set of assigned fragments based on the provided fragment tree. Do not allow parallelization stages - * to go beyond the global max width. + * Traverse all the major fragments and parallelize each major fragment based on + * collected stats. The children fragments are parallelized before a parent + * fragment. + * @param planningSet Set of all major fragments and their context. + * @param roots Root nodes of the plan. + * @param activeEndpoints currently active drillbit endpoints. + * @throws PhysicalOperatorSetupException + */ + public void collectStatsAndParallelizeFragments(PlanningSet planningSet, Set<Wrapper> roots, + Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException { + for (Wrapper wrapper : roots) { + traverse(wrapper, CheckedConsumer.throwingConsumerWrapper((Wrapper fragmentWrapper) -> { + // If this fragment is already parallelized then no need do it again. + // This happens in the case of fragments which have MUX operators. + if (fragmentWrapper.isEndpointsAssignmentDone()) { + return; + } + fragmentWrapper.getNode().getRoot().accept(new StatsCollector(planningSet), fragmentWrapper); + fragmentWrapper.getStats() + .getDistributionAffinity() + .getFragmentParallelizer() + .parallelizeFragment(fragmentWrapper, this, activeEndpoints); + //consolidate the cpu resources required by this major fragment per drillbit. + fragmentWrapper.computeCpuResources(); + })); + } + } + + public abstract void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots, + Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException; + + /** + * The starting function for the whole parallelization and memory computation logic. + * 1) Initially a fragment tree is prepared which contains a wrapper for each fragment. + * The topology of this tree is same as that of the major fragment tree. + * 2) Traverse this fragment tree to collect stats for each major fragment and then + * parallelize each fragment. At this stage minor fragments are not created but all + * the required information to create minor fragment are computed. + * 3) Memory is computed for each operator and for the minor fragment. + * 4) Lastly all the above computed information is used to create the minor fragments + * for each major fragment. * - * @param options Option list - * @param foremanNode The driving/foreman node for this query. (this node) - * @param queryId The queryId for this query. - * @param activeEndpoints The list of endpoints to consider for inclusion in planning this query. - * @param rootFragment The root node of the PhysicalPlan that we will be parallelizing. - * @param session UserSession of user who launched this query. - * @param queryContextInfo Info related to the context when query has started. - * @return The list of generated PlanFragment protobuf objects to be assigned out to the individual nodes. + * @param options List of options set by the user. + * @param foremanNode foreman node for this query plan. + * @param queryId Query ID. + * @param activeEndpoints currently active endpoins on which this plan will run. + * @param rootFragment Root major fragment. + * @param session session context. + * @param queryContextInfo query context. + * @return * @throws ExecutionSetupException */ - public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, - Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment, - UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException { + @Override + public final QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, + Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment, + UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException { + PlanningSet planningSet = prepareFragmentTree(rootFragment); + + Set<Wrapper> rootFragments = getRootFragments(planningSet); - final PlanningSet planningSet = getFragmentsHelper(activeEndpoints, rootFragment); - return generateWorkUnit( - options, foremanNode, queryId, rootFragment, planningSet, session, queryContextInfo); + collectStatsAndParallelizeFragments(planningSet, rootFragments, activeEndpoints); + + adjustMemory(planningSet, rootFragments, activeEndpoints); + + return generateWorkUnit(options, foremanNode, queryId, rootFragment, planningSet, session, queryContextInfo); } /** @@ -151,28 +233,7 @@ public class SimpleParallelizer implements ParallelizationParameters { throw new UnsupportedOperationException("Use children classes"); } - /** - * Helper method to reuse the code for QueryWorkUnit(s) generation - * @param activeEndpoints - * @param rootFragment - * @return A {@link PlanningSet}. - * @throws ExecutionSetupException - */ - protected PlanningSet getFragmentsHelper(Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment) throws ExecutionSetupException { - - PlanningSet planningSet = new PlanningSet(); - initFragmentWrappers(rootFragment, planningSet); - - final Set<Wrapper> leafFragments = constructFragmentDependencyGraph(planningSet); - - // Start parallelizing from leaf fragments - for (Wrapper wrapper : leafFragments) { - parallelizeFragment(wrapper, planningSet, activeEndpoints); - } - planningSet.findRootWrapper(rootFragment); - return planningSet; - } // For every fragment, create a Wrapper in PlanningSet. @VisibleForTesting @@ -190,77 +251,49 @@ public class SimpleParallelizer implements ParallelizationParameters { * @param planningSet * @return Returns a list of leaf fragments in fragment dependency graph. */ - private static Set<Wrapper> constructFragmentDependencyGraph(PlanningSet planningSet) { + private void constructFragmentDependencyGraph(Fragment rootFragment, PlanningSet planningSet) { // Set up dependency of fragments based on the affinity of exchange that separates the fragments. - for (Wrapper currentFragmentWrapper : planningSet) { - ExchangeFragmentPair sendingExchange = currentFragmentWrapper.getNode().getSendingExchangePair(); - if (sendingExchange != null) { - ParallelizationDependency dependency = sendingExchange.getExchange().getParallelizationDependency(); - Wrapper receivingFragmentWrapper = planningSet.get(sendingExchange.getNode()); - + for(Wrapper currentFragment : planningSet) { + ExchangeFragmentPair sendingXchgForCurrFrag = currentFragment.getNode().getSendingExchangePair(); + if (sendingXchgForCurrFrag != null) { + ParallelizationDependency dependency = sendingXchgForCurrFrag.getExchange().getParallelizationDependency(); + Wrapper receivingFragmentWrapper = planningSet.get(sendingXchgForCurrFrag.getNode()); + + //Mostly Receivers of the current fragment depend on the sender of the child fragments. However there is a special case + //for DeMux Exchanges where the Sender of the current fragment depends on the receiver of the parent fragment. if (dependency == ParallelizationDependency.RECEIVER_DEPENDS_ON_SENDER) { - receivingFragmentWrapper.addFragmentDependency(currentFragmentWrapper); + receivingFragmentWrapper.addFragmentDependency(currentFragment); } else if (dependency == ParallelizationDependency.SENDER_DEPENDS_ON_RECEIVER) { - currentFragmentWrapper.addFragmentDependency(receivingFragmentWrapper); + currentFragment.addFragmentDependency(receivingFragmentWrapper); } } } - - // Identify leaf fragments. Leaf fragments are fragments that have no other fragments depending on them for - // parallelization info. First assume all fragments are leaf fragments. Go through the fragments one by one and - // remove the fragment on which the current fragment depends. - - final Set<Wrapper> roots = Sets.newHashSet(); - for (Wrapper w : planningSet) { - roots.add(w); - } - - for (Wrapper wrapper : planningSet) { - final List<Wrapper> fragmentDependencies = wrapper.getFragmentDependencies(); - if (fragmentDependencies != null && fragmentDependencies.size() > 0) { - for (Wrapper dependency : fragmentDependencies) { - if (roots.contains(dependency)) { - roots.remove(dependency); - } - } - } - } - - return roots; + planningSet.findRootWrapper(rootFragment); } + /** - * Helper method for parallelizing a given fragment. Dependent fragments are parallelized first before - * parallelizing the given fragment. + * Helper method to call operation on each fragment. Traversal calls operation on child fragments before + * calling it on the parent fragment. */ - private void parallelizeFragment(Wrapper fragmentWrapper, PlanningSet planningSet, - Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException { - // If the fragment is already parallelized, return. - if (fragmentWrapper.isEndpointsAssignmentDone()) { - return; - } + protected void traverse(Wrapper fragmentWrapper, Consumer<Wrapper> operation) throws PhysicalOperatorSetupException { - // First parallelize fragments on which this fragment depends. final List<Wrapper> fragmentDependencies = fragmentWrapper.getFragmentDependencies(); if (fragmentDependencies != null && fragmentDependencies.size() > 0) { for(Wrapper dependency : fragmentDependencies) { - parallelizeFragment(dependency, planningSet, activeEndpoints); + traverse(dependency, operation); } } - - // Find stats. Stats include various factors including cost of physical operators, parallelizability of - // work in physical operator and affinity of physical operator to certain nodes. - fragmentWrapper.getNode().getRoot().accept(new StatsCollector(planningSet), fragmentWrapper); - - fragmentWrapper.getStats().getDistributionAffinity() - .getFragmentParallelizer() - .parallelizeFragment(fragmentWrapper, this, activeEndpoints); + operation.accept(fragmentWrapper); } + // A function which returns the memory assigned for a particular physical operator. + protected abstract BiFunction<DrillbitEndpoint, PhysicalOperator, Long> getMemory(); + protected QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId, - Fragment rootNode, PlanningSet planningSet, - UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException { + Fragment rootNode, PlanningSet planningSet, UserSession session, + QueryContextInformation queryContextInfo) throws ExecutionSetupException { List<MinorFragmentDefn> fragmentDefns = new ArrayList<>( ); MinorFragmentDefn rootFragmentDefn = null; @@ -282,7 +315,8 @@ public class SimpleParallelizer implements ParallelizationParameters { // Create a minorFragment for each major fragment. for (int minorFragmentId = 0; minorFragmentId < wrapper.getWidth(); minorFragmentId++) { - IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper); + IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper, + (fragmentWrapper, minorFragment) -> fragmentWrapper.getAssignedEndpoint(minorFragment), getMemory()); wrapper.resetAllocation(); PhysicalOperator op = physicalOperatorRoot.accept(Materializer.INSTANCE, iNode); Preconditions.checkArgument(op instanceof FragmentRoot); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java index 680e6a566..49af366ce 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java @@ -71,6 +71,7 @@ public class StatsCollector extends AbstractOpWrapperVisitor<Void, RuntimeExcept for(ExchangeFragmentPair pair : receivingExchangePairs) { if (pair.getExchange() == exchange) { + //This is the child fragment which is sending data to this fragment. Wrapper sendingFragment = planningSet.get(pair.getNode()); if (sendingFragment.isEndpointsAssignmentDone()) { sendingEndpoints.addAll(sendingFragment.getAssignedEndpoints()); @@ -105,7 +106,7 @@ public class StatsCollector extends AbstractOpWrapperVisitor<Void, RuntimeExcept stats.addEndpointAffinities(hasAffinity.getOperatorAffinity()); stats.setDistributionAffinity(hasAffinity.getDistributionAffinity()); } - stats.addCost(op.getCost()); + stats.addCost(op.getCost().getOutputRowCount()); for (PhysicalOperator child : op) { child.accept(this, wrapper); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java index 6da885044..ffa577e41 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java @@ -18,7 +18,12 @@ package org.apache.drill.exec.planner.fragment; import java.util.List; +import java.util.Map; +import java.util.function.BinaryOperator; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.drill.exec.planner.cost.NodeResource; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; import org.apache.drill.exec.physical.PhysicalOperatorSetupException; import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; @@ -46,6 +51,11 @@ public class Wrapper { private boolean endpointsAssigned; private long initialAllocation = 0; private long maxAllocation = 0; + // Resources (i.e memory and cpu) are stored per drillbit in this map. + // A Drillbit can have n number of minor fragments then the NodeResource + // contains cumulative resources required for all the minor fragments + // for that major fragment on that Drillbit. + private Map<DrillbitEndpoint, NodeResource> nodeResourceMap; // List of fragments this particular fragment depends on for determining its parallelization and endpoint assignments. private final List<Wrapper> fragmentDependencies = Lists.newArrayList(); @@ -58,6 +68,7 @@ public class Wrapper { this.majorFragmentId = majorFragmentId; this.node = node; this.stats = new Stats(); + nodeResourceMap = null; } public Stats getStats() { @@ -94,10 +105,12 @@ public class Wrapper { return maxAllocation; } - public void addAllocation(PhysicalOperator pop) { - initialAllocation += pop.getInitialAllocation(); -// logger.debug("Incrementing initialAllocation by {} to {}. Pop: {}", pop.getInitialAllocation(), initialAllocation, pop); - maxAllocation += pop.getMaxAllocation(); + public void addInitialAllocation(long memory) { + initialAllocation += memory; + } + + public void addMaxAllocation(long memory) { + maxAllocation += memory; } private class AssignEndpointsToScanAndStore extends AbstractPhysicalVisitor<Void, List<DrillbitEndpoint>, PhysicalOperatorSetupException>{ @@ -197,4 +210,30 @@ public class Wrapper { public List<Wrapper> getFragmentDependencies() { return ImmutableList.copyOf(fragmentDependencies); } + + /** + * Compute the cpu resources required for all the minor fragments of this major fragment. + * This information is stored per DrillbitEndpoint. It is assumed that this function is + * called only once. + */ + public void computeCpuResources() { + Preconditions.checkArgument(nodeResourceMap == null); + BinaryOperator<NodeResource> merge = (first, second) -> { + NodeResource result = NodeResource.create(); + result.add(first); + result.add(second); + return result; + }; + + Function<DrillbitEndpoint, NodeResource> cpuPerEndpoint = (endpoint) -> new NodeResource(1, 0); + + nodeResourceMap = endpoints.stream() + .collect(Collectors.groupingBy(Function.identity(), + Collectors.reducing(NodeResource.create(), + cpuPerEndpoint, merge))); + } + + public Map<DrillbitEndpoint, NodeResource> getResourceMap() { + return nodeResourceMap; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java index 70d4e268a..c1250e3ac 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.fragment.contrib; import java.util.Collection; import java.util.Iterator; import java.util.List; +import java.util.Set; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.util.DrillStringUtils; @@ -28,9 +29,9 @@ import org.apache.drill.exec.physical.base.Exchange; import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.planner.PhysicalPlanReader; +import org.apache.drill.exec.planner.fragment.DefaultQueryParallelizer; import org.apache.drill.exec.planner.fragment.Fragment; import org.apache.drill.exec.planner.fragment.PlanningSet; -import org.apache.drill.exec.planner.fragment.SimpleParallelizer; import org.apache.drill.exec.planner.fragment.Wrapper; import org.apache.drill.exec.planner.fragment.Materializer.IndexedFragmentNode; import org.apache.drill.exec.proto.BitControl.PlanFragment; @@ -56,12 +57,12 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists; * allows not to pollute parent class with non-authentic functionality * */ -public class SplittingParallelizer extends SimpleParallelizer { +public class SplittingParallelizer extends DefaultQueryParallelizer { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SplittingParallelizer.class); - public SplittingParallelizer(QueryContext context) { - super(context); + public SplittingParallelizer(boolean doMemoryPlanning, QueryContext context) { + super(doMemoryPlanning, context); } /** @@ -81,7 +82,13 @@ public class SplittingParallelizer extends SimpleParallelizer { Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment rootFragment, UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException { - final PlanningSet planningSet = getFragmentsHelper(activeEndpoints, rootFragment); + final PlanningSet planningSet = this.prepareFragmentTree(rootFragment); + + Set<Wrapper> rootFragments = getRootFragments(planningSet); + + collectStatsAndParallelizeFragments(planningSet, rootFragments, activeEndpoints); + + adjustMemory(planningSet, rootFragments, activeEndpoints); return generateWorkUnits( options, foremanNode, queryId, reader, rootFragment, planningSet, session, queryContextInfo); @@ -113,7 +120,7 @@ public class SplittingParallelizer extends SimpleParallelizer { List<QueryWorkUnit> workUnits = Lists.newArrayList(); int plansCount = 0; - DrillbitEndpoint[] endPoints = null; + DrillbitEndpoint[] leafFragEndpoints = null; long initialAllocation = 0; final Iterator<Wrapper> iter = planningSet.iterator(); @@ -130,12 +137,14 @@ public class SplittingParallelizer extends SimpleParallelizer { // allocation plansCount = wrapper.getWidth(); initialAllocation = (wrapper.getInitialAllocation() != 0 ) ? wrapper.getInitialAllocation()/plansCount : 0; - endPoints = new DrillbitEndpoint[plansCount]; + leafFragEndpoints = new DrillbitEndpoint[plansCount]; for (int mfId = 0; mfId < plansCount; mfId++) { - endPoints[mfId] = wrapper.getAssignedEndpoint(mfId); + leafFragEndpoints[mfId] = wrapper.getAssignedEndpoint(mfId); } } } + + DrillbitEndpoint[] endPoints = leafFragEndpoints; if ( plansCount == 0 ) { // no exchange, return list of single QueryWorkUnit workUnits.add(generateWorkUnit(options, foremanNode, queryId, rootNode, planningSet, session, queryContextInfo)); @@ -174,7 +183,8 @@ public class SplittingParallelizer extends SimpleParallelizer { MinorFragmentDefn rootFragment = null; FragmentRoot rootOperator = null; - IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper); + IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper, + (fragmentWrapper, minorFragment) -> endPoints[minorFragment],getMemory()); wrapper.resetAllocation(); // two visitors here // 1. To remove exchange diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java index 095a3238b..7d950c187 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java @@ -21,13 +21,17 @@ import java.io.IOException; import java.util.List; import java.util.Map; +import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.drill.common.logical.PlanProperties; import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode; import org.apache.drill.common.logical.PlanProperties.PlanPropertiesBuilder; import org.apache.drill.common.logical.PlanProperties.PlanType; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.planner.cost.DrillCostBase; +import org.apache.drill.exec.planner.cost.PrelCostEstimates; import org.apache.drill.exec.planner.physical.explain.PrelSequencer.OpId; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; @@ -54,10 +58,24 @@ public class PhysicalPlanCreator { public PhysicalOperator addMetadata(Prel originalPrel, PhysicalOperator op){ op.setOperatorId(opIdMap.get(originalPrel).getAsSingleInt()); - op.setCost(originalPrel.estimateRowCount(originalPrel.getCluster().getMetadataQuery())); + op.setCost(getPrelCostEstimates(originalPrel, op)); return op; } + private PrelCostEstimates getPrelCostEstimates(Prel originalPrel, PhysicalOperator op) { + final RelMetadataQuery mq = originalPrel.getCluster().getMetadataQuery(); + final double estimatedRowCount = originalPrel.estimateRowCount(mq); + final DrillCostBase costBase = (DrillCostBase) originalPrel.computeSelfCost(originalPrel.getCluster().getPlanner(), + mq); + final PrelCostEstimates costEstimates; + if (!op.isBufferedOperator(context)) { + costEstimates = new PrelCostEstimates(context.getOptions().getLong(ExecConstants.OUTPUT_BATCH_SIZE), estimatedRowCount); + } else { + costEstimates = new PrelCostEstimates(costBase.getMemory(), estimatedRowCount); + } + return costEstimates; + } + public PhysicalPlan build(Prel rootPrel, boolean forceRebuild) { if (plan != null && !forceRebuild) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java index bec2bcb87..008c9dfc0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java @@ -17,7 +17,7 @@ */ package org.apache.drill.exec.util; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import org.apache.drill.common.config.DrillConfig; @@ -35,6 +35,22 @@ public class MemoryAllocationUtilities { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MemoryAllocationUtilities.class); + + public static void setupBufferedMemoryAllocations(PhysicalPlan plan, final QueryContext queryContext) { + setupBufferedOpsMemoryAllocations(plan.getProperties().hasResourcePlan, + getBufferedOperators(plan.getSortedOperators(), queryContext), queryContext); + } + + public static List<PhysicalOperator> getBufferedOperators(List<PhysicalOperator> operators, QueryContext queryContext) { + final List<PhysicalOperator> bufferedOpList = new ArrayList<>(); + for (final PhysicalOperator op : operators) { + if (op.isBufferedOperator(queryContext)) { + bufferedOpList.add(op); + } + } + return bufferedOpList; + } + /** * Helper method to setup Memory Allocations * <p> @@ -54,28 +70,18 @@ public class MemoryAllocationUtilities { * <p> * since this method can be used in multiple places adding it in this class * rather than keeping it in Foreman - * @param plan - * @param queryContext + * @param planHasMemory defines the memory planning needs to be done or not. + * generally skipped when the plan contains memory allocation. + * @param bufferedOperators list of buffered operators in the plan. + * @param queryContext context of the query. */ - public static void setupBufferedOpsMemoryAllocations(final PhysicalPlan plan, final QueryContext queryContext) { + public static void setupBufferedOpsMemoryAllocations(boolean planHasMemory, + List<PhysicalOperator> bufferedOperators, final QueryContext queryContext) { // Test plans may already have a pre-defined memory plan. // Otherwise, determine memory allocation. - if (plan.getProperties().hasResourcePlan) { - return; - } - // look for external sorts - final List<PhysicalOperator> bufferedOpList = new LinkedList<>(); - for (final PhysicalOperator op : plan.getSortedOperators()) { - if (op.isBufferedOperator(queryContext)) { - bufferedOpList.add(op); - } - } - - // if there are any sorts, compute the maximum allocation, and set it on them - plan.getProperties().hasResourcePlan = true; - if (bufferedOpList.isEmpty()) { + if (planHasMemory || bufferedOperators.isEmpty()) { return; } @@ -91,9 +97,9 @@ public class MemoryAllocationUtilities { // Now divide up the memory by slices and operators. - final long opMinMem = computeOperatorMemory(optionManager, maxAllocPerNode, bufferedOpList.size()); + final long opMinMem = computeOperatorMemory(optionManager, maxAllocPerNode, bufferedOperators.size()); - for(final PhysicalOperator op : bufferedOpList) { + for(final PhysicalOperator op : bufferedOperators) { final long alloc = Math.max(opMinMem, op.getInitialAllocation()); op.setMaxAllocation(alloc); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 37b8a4073..521e50a35 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -38,7 +38,6 @@ import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.planner.fragment.Fragment; import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor; -import org.apache.drill.exec.planner.fragment.SimpleParallelizer; import org.apache.drill.exec.planner.sql.DirectPlan; import org.apache.drill.exec.planner.sql.DrillSqlWorker; import org.apache.drill.exec.proto.BitControl.PlanFragment; @@ -408,7 +407,7 @@ public class Foreman implements Runnable { validatePlan(plan); queryRM.visitAbstractPlan(plan); - final QueryWorkUnit work = getQueryWorkUnit(plan); + final QueryWorkUnit work = getQueryWorkUnit(plan, queryRM); if (enableRuntimeFilter) { runtimeFilterRouter = new RuntimeFilterRouter(work, drillbitContext); runtimeFilterRouter.collectRuntimeFilterParallelAndControlInfo(); @@ -463,7 +462,7 @@ public class Foreman implements Runnable { } catch (IOException e) { throw new ExecutionSetupException(String.format("Unable to parse FragmentRoot from fragment: %s", rootFragment.getFragmentJson())); } - queryRM.setCost(rootOperator.getCost()); + queryRM.setCost(rootOperator.getCost().getOutputRowCount()); fragmentsRunner.setFragmentsInfo(planFragments, rootFragment, rootOperator); @@ -561,14 +560,18 @@ public class Foreman implements Runnable { } } - private QueryWorkUnit getQueryWorkUnit(final PhysicalPlan plan) throws ExecutionSetupException { + private QueryWorkUnit getQueryWorkUnit(final PhysicalPlan plan, + final QueryResourceManager rm) throws ExecutionSetupException { + final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next(); + final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null); - final SimpleParallelizer parallelizer = new SimpleParallelizer(queryContext); - return parallelizer.getFragments( - queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(), - queryId, queryContext.getOnlineEndpoints(), rootFragment, - initiatingClient.getSession(), queryContext.getQueryContextInfo()); + + return rm.getParallelizer(plan.getProperties().hasResourcePlan).generateWorkUnit(queryContext.getOptions().getOptionList(), + queryContext.getCurrentEndpoint(), + queryId, queryContext.getOnlineEndpoints(), + rootFragment, initiatingClient.getSession(), + queryContext.getQueryContextInfo()); } private void logWorkUnit(QueryWorkUnit queryWorkUnit) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java index 77957d5a3..7c3cc04d5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java @@ -20,6 +20,8 @@ package org.apache.drill.exec.work.foreman.rm; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.ops.QueryContext; import org.apache.drill.exec.physical.PhysicalPlan; +import org.apache.drill.exec.planner.fragment.QueryParallelizer; +import org.apache.drill.exec.planner.fragment.DefaultQueryParallelizer; import org.apache.drill.exec.util.MemoryAllocationUtilities; import org.apache.drill.exec.work.QueryWorkUnit; import org.apache.drill.exec.work.foreman.Foreman; @@ -46,12 +48,16 @@ public class DefaultResourceManager implements ResourceManager { if (plan == null || plan.getProperties().hasResourcePlan) { return; } - MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext); + MemoryAllocationUtilities.setupBufferedMemoryAllocations(plan, queryContext); } @Override public void visitPhysicalPlan(QueryWorkUnit work) { } + + public QueryContext getQueryContext() { + return queryContext; + } } public static class DefaultQueryResourceManager extends DefaultResourceAllocator implements QueryResourceManager { @@ -70,6 +76,11 @@ public class DefaultResourceManager implements ResourceManager { } @Override + public QueryParallelizer getParallelizer(boolean memoryPlanning){ + return new DefaultQueryParallelizer(memoryPlanning, this.getQueryContext()); + } + + @Override public void admit() { // No queueing by default } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java index 9e2a3a10f..4b9112194 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.work.foreman.rm; +import org.apache.drill.exec.planner.fragment.QueryParallelizer; import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException; import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException; @@ -44,6 +45,15 @@ public interface QueryResourceManager extends QueryResourceAllocator { void setCost(double cost); /** + * Create a parallelizer to parallelize each major fragment of the query into + * many minor fragments. The parallelizer encapsulates the logic of how much + * memory and parallelism is required for the query. + * @param memoryPlanning memory planning needs to be done during parallelization + * @return + */ + QueryParallelizer getParallelizer(boolean memoryPlanning); + + /** * Admit the query into the cluster. Blocks until the query * can run. (Later revisions may use a more thread-friendly * approach.) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java index c6eb9d3b5..5d1a1709d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java @@ -28,6 +28,8 @@ import org.apache.drill.exec.physical.PhysicalPlan; import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.base.PhysicalOperator; +import org.apache.drill.exec.planner.fragment.QueryParallelizer; +import org.apache.drill.exec.planner.fragment.QueueQueryParallelizer; import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.work.QueryWorkUnit; @@ -116,6 +118,10 @@ public class ThrottledResourceManager extends AbstractResourceManager { } } + public QueryContext getQueryContext() { + return queryContext; + } + private int countBufferingOperators( Map<String, Collection<PhysicalOperator>> nodeMap) { int width = 0; @@ -287,6 +293,12 @@ public class ThrottledResourceManager extends AbstractResourceManager { } @Override + public QueryParallelizer getParallelizer(boolean planHasMemory) { + // currently memory planning is disabled. Enable it once the RM functionality is fully implemented. + return new QueueQueryParallelizer(true || planHasMemory, this.getQueryContext()); + } + + @Override public void admit() throws QueueTimeoutException, QueryQueueException { lease = rm.queue().enqueue(foreman.getQueryId(), queryCost); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java index e789c1dfa..e61814ab3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java @@ -113,7 +113,7 @@ public class PlanSplitter { final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next(); final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null); - final SimpleParallelizer parallelizer = new SplittingParallelizer(queryContext); + final SimpleParallelizer parallelizer = new SplittingParallelizer(plan.getProperties().hasResourcePlan, queryContext); List<PlanFragment> fragments = Lists.newArrayList(); @@ -134,7 +134,7 @@ public class PlanSplitter { } } } else { - final QueryWorkUnit queryWorkUnit = parallelizer.getFragments(queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(), + final QueryWorkUnit queryWorkUnit = parallelizer.generateWorkUnit(queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(), queryId, queryContext.getActiveEndpoints(), rootFragment, queryContext.getSession(), queryContext.getQueryContextInfo()); planner.visitPhysicalPlan(queryWorkUnit); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java index 0f0bf0df0..7fb7bdbe4 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java @@ -38,6 +38,7 @@ import org.apache.drill.exec.physical.config.Screen; import org.apache.drill.exec.physical.config.UnionExchange; import org.apache.drill.exec.planner.PhysicalPlanReader; import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory; +import org.apache.drill.exec.planner.cost.PrelCostEstimates; import org.apache.drill.exec.proto.CoordinationProtos; import org.apache.drill.exec.store.direct.DirectSubScan; import org.apache.drill.exec.store.mock.MockSubScanPOP; @@ -58,7 +59,7 @@ public class TestOpSerialization { private static PhysicalOperator setupPhysicalOperator(PhysicalOperator operator) { operator.setOperatorId(1); - operator.setCost(1.0); + operator.setCost(new PrelCostEstimates(1.0, 1.0)); operator.setMaxAllocation(1000); return operator; } @@ -66,7 +67,7 @@ public class TestOpSerialization { private static void assertOperator(PhysicalOperator operator) { assertEquals(1, operator.getOperatorId()); - assertEquals(1.0, operator.getCost(), 0.00001); + assertEquals(1.0, operator.getCost().getOutputRowCount(), 0.00001); assertEquals(1000, operator.getMaxAllocation()); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java index b45acfbc8..37cae3e92 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.physical.impl; +import org.apache.drill.exec.planner.fragment.DefaultQueryParallelizer; import org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; @@ -81,7 +82,8 @@ public class TestLocalExchange extends PlanTestBase { .withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build()) .build(); - private static final SimpleParallelizer PARALLELIZER = new SimpleParallelizer( + private static final SimpleParallelizer PARALLELIZER = new DefaultQueryParallelizer( + true, 1 /*parallelizationThreshold (slice_count)*/, 6 /*maxWidthPerNode*/, 1000 /*maxGlobalWidth*/, @@ -394,9 +396,8 @@ public class TestLocalExchange extends PlanTestBase { findFragmentsWithPartitionSender(rootFragment, planningSet, deMuxFragments, htrFragments); - final QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo("dummySchemaName", - "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e"); - QueryWorkUnit qwu = PARALLELIZER.getFragments(new OptionList(), drillbitContext.getEndpoint(), + final QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo("dummySchemaName", "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e"); + QueryWorkUnit qwu = PARALLELIZER.generateWorkUnit(new OptionList(), drillbitContext.getEndpoint(), QueryId.getDefaultInstance(), drillbitContext.getBits(), rootFragment, USER_SESSION, queryContextInfo); qwu.applyPlan(planReader); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java index 0986c75d6..7ef94f7d7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java @@ -46,6 +46,8 @@ import org.apache.drill.exec.physical.impl.TopN.TopNBatch; import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.Metric; import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator.GeneralExecuteIface; import org.apache.drill.exec.planner.PhysicalPlanReader; +import org.apache.drill.exec.planner.cost.PrelCostEstimates; +import org.apache.drill.exec.planner.fragment.DefaultQueryParallelizer; import org.apache.drill.exec.planner.fragment.Fragment; import org.apache.drill.exec.planner.fragment.PlanningSet; import org.apache.drill.exec.planner.fragment.SimpleParallelizer; @@ -86,7 +88,8 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists; @Category(OperatorTest.class) public class TestPartitionSender extends PlanTestBase { - private static final SimpleParallelizer PARALLELIZER = new SimpleParallelizer( + private static final SimpleParallelizer PARALLELIZER = new DefaultQueryParallelizer( + true, 1 /*parallelizationThreshold (slice_count)*/, 6 /*maxWidthPerNode*/, 1000 /*maxGlobalWidth*/, @@ -178,14 +181,14 @@ public class TestPartitionSender extends PlanTestBase { options.clear(); options.add(OptionValue.create(AccessibleScopes.SESSION, "planner.slice_target", 1, OptionScope.SESSION)); options.add(OptionValue.create(OptionValue.AccessibleScopes.SESSION, "planner.partitioner_sender_max_threads", 10, OptionScope.SESSION)); - hashToRandomExchange.setCost(1000); + hashToRandomExchange.setCost(new PrelCostEstimates(1000, 1000)); testThreadsHelper(hashToRandomExchange, drillbitContext, options, incoming, registry, planReader, planningSet, rootFragment, 10); options.clear(); options.add(OptionValue.create(AccessibleScopes.SESSION, "planner.slice_target", 1000, OptionScope.SESSION)); options.add(OptionValue.create(AccessibleScopes.SESSION, "planner.partitioner_sender_threads_factor",2, OptionScope.SESSION)); - hashToRandomExchange.setCost(14000); + hashToRandomExchange.setCost(new PrelCostEstimates(14000, 14000)); testThreadsHelper(hashToRandomExchange, drillbitContext, options, incoming, registry, planReader, planningSet, rootFragment, 2); } @@ -207,9 +210,8 @@ public class TestPartitionSender extends PlanTestBase { RecordBatch incoming, FunctionImplementationRegistry registry, PhysicalPlanReader planReader, PlanningSet planningSet, Fragment rootFragment, int expectedThreadsCount) throws Exception { - final QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo("dummySchemaName", - "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e"); - final QueryWorkUnit qwu = PARALLELIZER.getFragments(options, drillbitContext.getEndpoint(), + final QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo("dummySchemaName", "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e"); + final QueryWorkUnit qwu = PARALLELIZER.generateWorkUnit(options, drillbitContext.getEndpoint(), QueryId.getDefaultInstance(), drillbitContext.getBits(), rootFragment, USER_SESSION, queryContextInfo); qwu.applyPlan(planReader); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java new file mode 100644 index 000000000..4893a36fd --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.rm; + + +import org.apache.drill.PlanTestBase; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.planner.PhysicalPlanReader; +import org.apache.drill.exec.planner.cost.NodeResource; +import org.apache.drill.exec.planner.fragment.Fragment; +import org.apache.drill.exec.planner.fragment.PlanningSet; +import org.apache.drill.exec.planner.fragment.QueueQueryParallelizer; +import org.apache.drill.exec.planner.fragment.SimpleParallelizer; +import org.apache.drill.exec.planner.fragment.Wrapper; +import org.apache.drill.exec.pop.PopUnitTestBase; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.proto.UserProtos; +import org.apache.drill.exec.rpc.user.UserSession; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.shaded.guava.com.google.common.collect.Iterables; +import org.apache.drill.test.ClientFixture; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterFixtureBuilder; +import org.junit.AfterClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestMemoryCalculator extends PlanTestBase { + + private static final long DEFAULT_SLICE_TARGET = 100000L; + private static final long DEFAULT_BATCH_SIZE = 16*1024*1024; + + private static final UserSession session = UserSession.Builder.newBuilder() + .withCredentials(UserBitShared.UserCredentials.newBuilder() + .setUserName("foo") + .build()) + .withUserProperties(UserProtos.UserProperties.getDefaultInstance()) + .withOptionManager(bits[0].getContext().getOptionManager()) + .build(); + + private static final DrillbitEndpoint N1_EP1 = newDrillbitEndpoint("node1", 30010); + private static final DrillbitEndpoint N1_EP2 = newDrillbitEndpoint("node2", 30011); + private static final DrillbitEndpoint N1_EP3 = newDrillbitEndpoint("node3", 30012); + private static final DrillbitEndpoint N1_EP4 = newDrillbitEndpoint("node4", 30013); + + private static final DrillbitEndpoint[] nodeList = {N1_EP1, N1_EP2, N1_EP3, N1_EP4}; + + private static final DrillbitEndpoint newDrillbitEndpoint(String address, int port) { + return DrillbitEndpoint.newBuilder().setAddress(address).setControlPort(port).build(); + } + private static final DrillbitContext drillbitContext = getDrillbitContext(); + private static final QueryContext queryContext = new QueryContext(session, drillbitContext, + UserBitShared.QueryId.getDefaultInstance()); + + @AfterClass + public static void close() throws Exception { + queryContext.close(); + } + + private final Wrapper mockWrapper(Wrapper rootFragment, + Map<DrillbitEndpoint, NodeResource> resourceMap, + List<DrillbitEndpoint> endpoints, + Map<Fragment, Wrapper> originalToMockWrapper ) { + final Wrapper mockWrapper = mock(Wrapper.class); + originalToMockWrapper.put(rootFragment.getNode(), mockWrapper); + List<Wrapper> mockdependencies = new ArrayList<>(); + + for (Wrapper dependency : rootFragment.getFragmentDependencies()) { + mockdependencies.add(mockWrapper(dependency, resourceMap, endpoints, originalToMockWrapper)); + } + + when(mockWrapper.getNode()).thenReturn(rootFragment.getNode()); + when(mockWrapper.getAssignedEndpoints()).thenReturn(endpoints); + when(mockWrapper.getResourceMap()).thenReturn(resourceMap); + when(mockWrapper.getWidth()).thenReturn(endpoints.size()); + when(mockWrapper.getFragmentDependencies()).thenReturn(mockdependencies); + when(mockWrapper.isEndpointsAssignmentDone()).thenReturn(true); + return mockWrapper; + } + + private final PlanningSet mockPlanningSet(PlanningSet planningSet, + Map<DrillbitEndpoint, NodeResource> resourceMap, + List<DrillbitEndpoint> endpoints) { + Map<Fragment, Wrapper> wrapperToMockWrapper = new HashMap<>(); + Wrapper rootFragment = mockWrapper( planningSet.getRootWrapper(), resourceMap, + endpoints, wrapperToMockWrapper); + PlanningSet mockPlanningSet = mock(PlanningSet.class); + when(mockPlanningSet.getRootWrapper()).thenReturn(rootFragment); + when(mockPlanningSet.get(any(Fragment.class))).thenAnswer(invocation -> { + return wrapperToMockWrapper.get(invocation.getArgument(0)); + }); + return mockPlanningSet; + } + + private String getPlanForQuery(String query) throws Exception { + return getPlanForQuery(query, DEFAULT_BATCH_SIZE); + } + + private String getPlanForQuery(String query, long outputBatchSize) throws Exception { + return getPlanForQuery(query, outputBatchSize, DEFAULT_SLICE_TARGET); + } + + private String getPlanForQuery(String query, long outputBatchSize, + long slice_target) throws Exception { + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher) + .setOptionDefault(ExecConstants.OUTPUT_BATCH_SIZE, outputBatchSize) + .setOptionDefault(ExecConstants.SLICE_TARGET, slice_target); + String plan; + + try (ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + plan = client.queryBuilder() + .sql(query) + .explainJson(); + } + return plan; + } + + private List<DrillbitEndpoint> getEndpoints(int totalMinorFragments, + Set<DrillbitEndpoint> notIn) { + List<DrillbitEndpoint> endpoints = new ArrayList<>(); + Iterator drillbits = Iterables.cycle(nodeList).iterator(); + + while(totalMinorFragments-- > 0) { + DrillbitEndpoint dbit = (DrillbitEndpoint) drillbits.next(); + if (!notIn.contains(dbit)) { + endpoints.add(dbit); + } + } + return endpoints; + } + + private Set<Wrapper> createSet(Wrapper... wrappers) { + Set<Wrapper> setOfWrappers = new HashSet<>(); + for (Wrapper wrapper : wrappers) { + setOfWrappers.add(wrapper); + } + return setOfWrappers; + } + + private Fragment getRootFragmentFromPlan(DrillbitContext context, + String plan) throws Exception { + final PhysicalPlanReader planReader = context.getPlanReader(); + return PopUnitTestBase.getRootFragmentFromPlanString(planReader, plan); + } + + private PlanningSet preparePlanningSet(List<DrillbitEndpoint> activeEndpoints, long slice_target, + Map<DrillbitEndpoint, NodeResource> resources, String sql, + SimpleParallelizer parallelizer) throws Exception { + Fragment rootFragment = getRootFragmentFromPlan(drillbitContext, getPlanForQuery(sql, 10, slice_target)); + return mockPlanningSet(parallelizer.prepareFragmentTree(rootFragment), resources, activeEndpoints); + } + + @Test + public void TestSingleMajorFragmentWithProjectAndScan() throws Exception { + List<DrillbitEndpoint> activeEndpoints = getEndpoints(2, new HashSet<>()); + Map<DrillbitEndpoint, NodeResource> resources = activeEndpoints.stream() + .collect(Collectors.toMap(x -> x, + x -> NodeResource.create())); + String sql = "SELECT * from cp.`tpch/nation.parquet`"; + + SimpleParallelizer parallelizer = new QueueQueryParallelizer(false, queryContext); + PlanningSet planningSet = preparePlanningSet(activeEndpoints, DEFAULT_SLICE_TARGET, resources, sql, parallelizer); + parallelizer.adjustMemory(planningSet, createSet(planningSet.getRootWrapper()), activeEndpoints); + assertTrue("memory requirement is different", Iterables.all(resources.entrySet(), (e) -> e.getValue().getMemory() == 30)); + } + + + @Test + public void TestSingleMajorFragmentWithGroupByProjectAndScan() throws Exception { + List<DrillbitEndpoint> activeEndpoints = getEndpoints(2, new HashSet<>()); + Map<DrillbitEndpoint, NodeResource> resources = activeEndpoints.stream() + .collect(Collectors.toMap(x -> x, + x -> NodeResource.create())); + String sql = "SELECT dept_id, count(*) from cp.`tpch/lineitem.parquet` group by dept_id"; + + SimpleParallelizer parallelizer = new QueueQueryParallelizer(false, queryContext); + PlanningSet planningSet = preparePlanningSet(activeEndpoints, DEFAULT_SLICE_TARGET, resources, sql, parallelizer); + parallelizer.adjustMemory(planningSet, createSet(planningSet.getRootWrapper()), activeEndpoints); + assertTrue("memory requirement is different", Iterables.all(resources.entrySet(), (e) -> e.getValue().getMemory() == 529570)); + } + + + @Test + public void TestTwoMajorFragmentWithSortyProjectAndScan() throws Exception { + List<DrillbitEndpoint> activeEndpoints = getEndpoints(2, new HashSet<>()); + Map<DrillbitEndpoint, NodeResource> resources = activeEndpoints.stream() + .collect(Collectors.toMap(x -> x, + x -> NodeResource.create())); + String sql = "SELECT * from cp.`tpch/lineitem.parquet` order by dept_id"; + + SimpleParallelizer parallelizer = new QueueQueryParallelizer(false, queryContext); + PlanningSet planningSet = preparePlanningSet(activeEndpoints, 2, resources, sql, parallelizer); + parallelizer.adjustMemory(planningSet, createSet(planningSet.getRootWrapper()), activeEndpoints); + assertTrue("memory requirement is different", Iterables.all(resources.entrySet(), (e) -> e.getValue().getMemory() == 481490)); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java index 7b53bb65c..c01143b26 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.drill.categories.PlannerTest; import org.apache.drill.exec.planner.PhysicalPlanReader; import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory; +import org.apache.drill.exec.planner.fragment.DefaultQueryParallelizer; import org.apache.drill.exec.planner.fragment.Fragment; import org.apache.drill.exec.planner.fragment.SimpleParallelizer; import org.apache.drill.exec.proto.BitControl.QueryContextInformation; @@ -52,7 +53,7 @@ public class TestFragmentChecker extends PopUnitTestBase{ private void print(String fragmentFile, int bitCount, int expectedFragmentCount) throws Exception { PhysicalPlanReader ppr = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(CONFIG); Fragment fragmentRoot = getRootFragment(ppr, fragmentFile); - SimpleParallelizer par = new SimpleParallelizer(1000*1000, 5, 10, 1.2); + SimpleParallelizer par = new DefaultQueryParallelizer(true, 1000*1000, 5, 10, 1.2); List<DrillbitEndpoint> endpoints = Lists.newArrayList(); DrillbitEndpoint localBit = null; for(int i =0; i < bitCount; i++) { @@ -63,9 +64,8 @@ public class TestFragmentChecker extends PopUnitTestBase{ endpoints.add(b1); } - final QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo("dummySchemaName", - "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e"); - QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, fragmentRoot, + final QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo("dummySchemaName", "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e"); + QueryWorkUnit qwu = par.generateWorkUnit(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, fragmentRoot, UserSession.Builder.newBuilder().withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build()).build(), queryContextInfo); qwu.applyPlan(ppr); diff --git a/exec/java-exec/src/test/resources/join/hashJoinExpr.json b/exec/java-exec/src/test/resources/join/hashJoinExpr.json index 386d90e83..13a1c0fa6 100644 --- a/exec/java-exec/src/test/resources/join/hashJoinExpr.json +++ b/exec/java-exec/src/test/resources/join/hashJoinExpr.json @@ -28,7 +28,10 @@ }, "columns" : [ "`r_regionkey`" ], "selectionRoot" : "/tpch/region.parquet", - "cost" : 5.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 5.0 + } }, { "pop" : "project", "@id" : 3, @@ -39,7 +42,10 @@ "child" : 5, "initialAllocation" : 1000000, "maxAllocation" : 10000000000, - "cost" : 5.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 5.0 + } }, { "pop" : "parquet-scan", "@id" : 6, @@ -58,7 +64,10 @@ }, "columns" : [ "`n_nationkey`", "`n_regionkey`" ], "selectionRoot" : "/tpch/nation.parquet", - "cost" : 25.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 25.0 + } }, { "pop" : "project", "@id" : 4, @@ -72,7 +81,10 @@ "child" : 6, "initialAllocation" : 1000000, "maxAllocation" : 10000000000, - "cost" : 25.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 25.0 + } }, { "pop" : "hash-join", "@id" : 2, @@ -86,7 +98,10 @@ "joinType" : "INNER", "initialAllocation" : 1000000, "maxAllocation" : 10000000000, - "cost" : 25.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 25.0 + } }, { "pop" : "project", "@id" : 1, @@ -97,13 +112,19 @@ "child" : 2, "initialAllocation" : 1000000, "maxAllocation" : 10000000000, - "cost" : 25.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 25.0 + } }, { "pop" : "screen", "@id" : 0, "child" : 1, "initialAllocation" : 1000000, "maxAllocation" : 10000000000, - "cost" : 25.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 25.0 + } } ] } diff --git a/exec/java-exec/src/test/resources/join/mergeJoinExpr.json b/exec/java-exec/src/test/resources/join/mergeJoinExpr.json index 1c5111bbb..c0027ea96 100644 --- a/exec/java-exec/src/test/resources/join/mergeJoinExpr.json +++ b/exec/java-exec/src/test/resources/join/mergeJoinExpr.json @@ -33,7 +33,10 @@ }, "columns" : [ "`n_nationkey`", "`n_regionkey`" ], "selectionRoot" : "/tpch/nation.parquet", - "cost" : 25.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 25.0 + } }, { "pop" : "project", "@id" : 8, @@ -47,7 +50,10 @@ "child" : 9, "initialAllocation" : 1000000, "maxAllocation" : 10000000000, - "cost" : 25.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 25.0 + } }, { "pop" : "external-sort", "@id" : 6, @@ -60,14 +66,20 @@ "reverse" : false, "initialAllocation" : 1000000, "maxAllocation" : 10000000000, - "cost" : 25.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 25.0 + } }, { "pop" : "selection-vector-remover", "@id" : 4, "child" : 6, "initialAllocation" : 1000000, "maxAllocation" : 10000000000, - "cost" : 25.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 25.0 + } }, { "pop" : "parquet-scan", "@id" : 7, @@ -86,7 +98,10 @@ }, "columns" : [ "`r_regionkey`" ], "selectionRoot" : "/tpch/region.parquet", - "cost" : 5.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 25.0 + } }, { "pop" : "external-sort", "@id" : 5, @@ -99,14 +114,20 @@ "reverse" : false, "initialAllocation" : 1000000, "maxAllocation" : 10000000000, - "cost" : 5.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 25.0 + } }, { "pop" : "selection-vector-remover", "@id" : 3, "child" : 5, "initialAllocation" : 1000000, "maxAllocation" : 10000000000, - "cost" : 5.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 25.0 + } }, { "pop" : "merge-join", "@id" : 2, @@ -120,7 +141,10 @@ "joinType" : "INNER", "initialAllocation" : 1000000, "maxAllocation" : 10000000000, - "cost" : 25.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 25.0 + } }, { "pop" : "project", "@id" : 1, @@ -131,13 +155,19 @@ "child" : 2, "initialAllocation" : 1000000, "maxAllocation" : 10000000000, - "cost" : 25.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 25.0 + } }, { "pop" : "screen", "@id" : 0, "child" : 1, "initialAllocation" : 1000000, "maxAllocation" : 10000000000, - "cost" : 25.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 25.0 + } } ] } diff --git a/exec/java-exec/src/test/resources/join/merge_join_nullkey.json b/exec/java-exec/src/test/resources/join/merge_join_nullkey.json index b283dda2e..fb6bfc272 100644 --- a/exec/java-exec/src/test/resources/join/merge_join_nullkey.json +++ b/exec/java-exec/src/test/resources/join/merge_join_nullkey.json @@ -32,7 +32,10 @@ "type" : "json" }, "selectionRoot" : "/region.json", - "cost" : 18.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 18.0 + } }, { "pop" : "project", "@id" : 9, @@ -43,7 +46,10 @@ "child" : 11, "initialAllocation" : 1000000, "maxAllocation" : 10000000000, - "cost" : 18.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 18.0 + } }, { "pop" : "external-sort", "@id" : 7, @@ -56,14 +62,20 @@ "reverse" : false, "initialAllocation" : 1000000, "maxAllocation" : 10000000000, - "cost" : 18.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 18.0 + } }, { "pop" : "selection-vector-remover", "@id" : 5, "child" : 7, "initialAllocation" : 1000000, "maxAllocation" : 10000000000, - "cost" : 18.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 18.0 + } }, { "pop" : "fs-scan", "@id" : 10, @@ -86,7 +98,10 @@ "type" : "json" }, "selectionRoot" : "/region.json", - "cost" : 18.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 18.0 + } }, { "pop" : "project", "@id" : 8, @@ -97,7 +112,10 @@ "child" : 10, "initialAllocation" : 1000000, "maxAllocation" : 10000000000, - "cost" : 18.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 18.0 + } }, { "pop" : "external-sort", "@id" : 6, @@ -110,14 +128,20 @@ "reverse" : false, "initialAllocation" : 1000000, "maxAllocation" : 10000000000, - "cost" : 18.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 18.0 + } }, { "pop" : "selection-vector-remover", "@id" : 4, "child" : 6, "initialAllocation" : 1000000, "maxAllocation" : 10000000000, - "cost" : 18.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 18.0 + } }, { "pop" : "merge-join", "@id" : 3, @@ -131,7 +155,10 @@ "joinType" : "${JOIN_TYPE}", "initialAllocation" : 1000000, "maxAllocation" : 10000000000, - "cost" : 18.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 18.0 + } }, { "pop" : "project", "@id" : 2, @@ -145,7 +172,10 @@ "child" : 3, "initialAllocation" : 1000000, "maxAllocation" : 10000000000, - "cost" : 18.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 18.0 + } }, { "pop" : "project", "@id" : 1, @@ -159,13 +189,19 @@ "child" : 2, "initialAllocation" : 1000000, "maxAllocation" : 10000000000, - "cost" : 18.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 18.0 + } }, { "pop" : "screen", "@id" : 0, "child" : 1, "initialAllocation" : 1000000, "maxAllocation" : 10000000000, - "cost" : 18.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 18.0 + } } ] } diff --git a/exec/java-exec/src/test/resources/store/json/project_pushdown_json_physical_plan.json b/exec/java-exec/src/test/resources/store/json/project_pushdown_json_physical_plan.json index 60cd2b0d5..b6652071c 100644 --- a/exec/java-exec/src/test/resources/store/json/project_pushdown_json_physical_plan.json +++ b/exec/java-exec/src/test/resources/store/json/project_pushdown_json_physical_plan.json @@ -38,13 +38,19 @@ }, "columns" : [ "`field_1`", "`field_3`.`inner_1`", "`field_3`.`inner_2`", "`field_4`.`inner_1`", "`non_existent_at_root`", "`non_existent`.`nested`.`field`"], "selectionRoot" : "/store/json/schema_change_int_to_string.json", - "cost" : 0.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 0 + } }, { "pop" : "screen", "@id" : 0, "child" : 1, "initialAllocation" : 1000000, "maxAllocation" : 10000000000, - "cost" : 1.0 + "cost" : { + "memoryCost" : 0, + "outputRowCount" : 1.0 + } } ] -}
\ No newline at end of file +} |