aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHanumathRao <hanu.ncr@gmail.com>2019-02-28 00:12:05 -0800
committerSorabh Hamirwasia <sorabh@apache.org>2019-03-14 21:34:06 -0700
commitd22e68b83d1d0cc0539d79ae0cb3aa70ae3242ad (patch)
tree76f816aead22b0d7668d06f1b13ccad5c142960c
parent5aa38a51d90998234b4ca46434ce225df72addc5 (diff)
downloaddrill-d22e68b83d1d0cc0539d79ae0cb3aa70ae3242ad.tar.gz
DRILL-7068: Support memory adjustment framework for resource management with Queues.
closes #1677
-rw-r--r--common/src/main/java/org/apache/drill/common/util/function/CheckedConsumer.java39
-rw-r--r--contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/NodeResource.java73
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/PrelCostEstimates.java51
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DefaultQueryParallelizer.java66
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java35
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java30
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java160
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueryParallelizer.java63
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueueQueryParallelizer.java173
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java218
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java3
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java47
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java28
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java20
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java46
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java21
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java13
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java4
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java5
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java9
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java14
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java227
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java8
-rw-r--r--exec/java-exec/src/test/resources/join/hashJoinExpr.json35
-rw-r--r--exec/java-exec/src/test/resources/join/mergeJoinExpr.json50
-rw-r--r--exec/java-exec/src/test/resources/join/merge_join_nullkey.json60
-rw-r--r--exec/java-exec/src/test/resources/store/json/project_pushdown_json_physical_plan.json12
38 files changed, 1375 insertions, 224 deletions
diff --git a/common/src/main/java/org/apache/drill/common/util/function/CheckedConsumer.java b/common/src/main/java/org/apache/drill/common/util/function/CheckedConsumer.java
new file mode 100644
index 000000000..bd9b69b3e
--- /dev/null
+++ b/common/src/main/java/org/apache/drill/common/util/function/CheckedConsumer.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.common.util.function;
+
+import java.util.function.Consumer;
+import org.apache.drill.common.exceptions.ErrorHelper;
+
+@FunctionalInterface
+public interface CheckedConsumer<T, E extends Throwable> {
+ void accept(T t) throws E;
+
+ static <T> Consumer<T> throwingConsumerWrapper(
+ CheckedConsumer<T, Exception> throwingConsumer) {
+
+ return i -> {
+ try {
+ throwingConsumer.accept(i);
+ } catch (Exception ex) {
+ ErrorHelper.sneakyThrow(ex);
+ }
+ };
+ }
+}
+
diff --git a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
index d87473305..2a5cdddf8 100644
--- a/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
+++ b/contrib/storage-kafka/src/test/java/org/apache/drill/exec/store/kafka/KafkaFilterPushdownTest.java
@@ -35,7 +35,7 @@ public class KafkaFilterPushdownTest extends KafkaTestBase {
private static final String expectedSubStr = " \"kafkaScanSpec\" : {\n" +
" \"topicName\" : \"drill-pushdown-topic\"\n" +
" },\n" +
- " \"cost\" : %s.0";
+ " \"cost\"";
@BeforeClass
public static void setup() throws Exception {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
index 6eba65833..13c3c2ac8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
@@ -93,7 +93,7 @@ public class PhysicalPlan {
public double totalCost() {
double totalCost = 0;
for (final PhysicalOperator ops : getSortedOperators()) {
- totalCost += ops.getCost();
+ totalCost += ops.getCost().getOutputRowCount();
}
return totalCost;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
index 71289b699..aad4e8184 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.base;
import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.drill.common.graph.GraphVisitor;
import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.planner.cost.PrelCostEstimates;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
@@ -35,7 +36,7 @@ public abstract class AbstractBase implements PhysicalOperator {
private final String userName;
private int id;
- private double cost;
+ private PrelCostEstimates cost = PrelCostEstimates.ZERO_COST;
public AbstractBase() {
userName = null;
@@ -89,12 +90,12 @@ public abstract class AbstractBase implements PhysicalOperator {
}
@Override
- public double getCost() {
+ public PrelCostEstimates getCost() {
return cost;
}
@Override
- public void setCost(double cost) {
+ public void setCost(PrelCostEstimates cost) {
this.cost = cost;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
index ebd7288a4..4106205f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
@@ -130,4 +130,20 @@ public abstract class AbstractExchange extends AbstractSingle implements Exchang
public ParallelizationDependency getParallelizationDependency() {
return ParallelizationDependency.RECEIVER_DEPENDS_ON_SENDER;
}
+
+ // Memory requirement of the sender for the given receivers and senders in a major fragment.
+ // Currently set to zero but later once batch sizing for Exchanges is completed it will call
+ // appropriate function.
+ @Override
+ public long getSenderMemory(int receivers, int senders) {
+ return 0;
+ }
+
+ // Memory requirement of the receiver for the given receivers and senders in a major fragment.
+ // Currently set to zero but later once batch sizing for Exchanges is completed it will calling
+ // apropriate function.
+ @Override
+ public long getReceiverMemory(int receivers, int senders) {
+ return 0;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
index 56e9b9c90..7de915c4c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
@@ -79,6 +79,23 @@ public interface Exchange extends PhysicalOperator {
Receiver getReceiver(int minorFragmentId);
/**
+ * Returns the memory requirement for the sender side of the exchange operator.
+ * @param receiverCount number of receivers at the receiving end of this exchange operator.
+ * @param senderCount number of senders sending the rows for this exchange operator.
+ * @return Total memory required by this operator.
+ */
+ long getSenderMemory(int receiverCount, int senderCount);
+
+ /**
+ * Returns the memory requirement for the receiver side of the exchange operator.
+ * @param receiverCount number of receivers receiving the rows sent by the sender side of this
+ * exchange operator.
+ * @param senderCount number of senders sending the rows.
+ * @return Total memory required by this operator.
+ */
+ long getReceiverMemory(int receiverCount, int senderCount);
+
+ /**
* Provide parallelization parameters for sender side of the exchange. Output includes min width,
* max width and affinity to Drillbits.
*
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
index 82fb53bf4..3bca02ea0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.graph.GraphValue;
import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.planner.cost.PrelCostEstimates;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import com.fasterxml.jackson.annotation.JsonIdentityInfo;
@@ -101,10 +102,10 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
void setOperatorId(int id);
@JsonProperty("cost")
- void setCost(double cost);
+ void setCost(PrelCostEstimates cost);
@JsonProperty("cost")
- double getCost();
+ PrelCostEstimates getCost();
/**
* Name of the user whom to impersonate while setting up the implementation (RecordBatch) of this
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index c185ac7ac..5aed88ef8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -121,7 +121,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
stats.setLongStat(Metric.N_RECEIVERS, outGoingBatchCount);
// Algorithm to figure out number of threads to parallelize output
// numberOfRows/sliceTarget/numReceivers/threadfactor
- this.cost = operator.getChild().getCost();
+ this.cost = operator.getChild().getCost().getOutputRowCount();
final OptionManager optMgr = context.getOptions();
long sliceTarget = optMgr.getOption(ExecConstants.SLICE_TARGET).num_val;
int threadFactor = optMgr.getOption(PlannerSettings.PARTITION_SENDER_THREADS_FACTOR.getOptionName()).num_val.intValue();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java
index faf09afe6..8c268e22f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/DrillRelOptCost.java
@@ -24,4 +24,6 @@ public interface DrillRelOptCost extends RelOptCost {
double getNetwork();
+ double getMemory();
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/NodeResource.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/NodeResource.java
new file mode 100644
index 000000000..ad7bc2e75
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/NodeResource.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.cost;
+
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import java.util.Map;
+
+/**
+ * This class abstracts the resources like cpu and memory used up by the operators.
+ * In future network resources can also be incorporated if required.
+ */
+public class NodeResource {
+ private long cpu;
+ private long memory;
+
+ public NodeResource(long cpu, long memory) {
+ this.cpu = cpu;
+ this.memory = memory;
+ }
+
+ public void add(NodeResource other) {
+ if (other == null) {
+ return;
+ }
+ this.cpu += other.cpu;
+ this.memory += other.memory;
+ }
+
+ public long getMemory() {
+ return memory;
+ }
+
+ // A utility function to merge the node resources from one drillbit map to other drillbit map.
+ public static Map<DrillbitEndpoint, NodeResource> merge(Map<DrillbitEndpoint, NodeResource> to,
+ Map<DrillbitEndpoint, NodeResource> from) {
+ to.entrySet().stream().forEach((toEntry) -> toEntry.getValue().add(from.get(toEntry.getKey())));
+ return to;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("CPU: ").append(cpu).append("Memory: ").append(memory);
+ return sb.toString();
+ }
+
+ public static NodeResource create() {
+ return create(0,0);
+ }
+
+ public static NodeResource create(long cpu) {
+ return create(cpu,0);
+ }
+
+ public static NodeResource create(long cpu, long memory) {
+ return new NodeResource(cpu, memory);
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/PrelCostEstimates.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/PrelCostEstimates.java
new file mode 100644
index 000000000..7cf1b9e11
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/cost/PrelCostEstimates.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.cost;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+
+/**
+ * Cost estimates per physical relation. These cost estimations are computed
+ * during physical planning by the optimizer. These are also used post physical
+ * planning to compute memory requirements in minor fragment generation phase.
+ *
+ */
+@JsonTypeName("cost-estimates")
+public class PrelCostEstimates {
+ // memory requirement for an operator.
+ private final double memoryCost;
+ // number of rows that are output by this operator.
+ private final double outputRowCount;
+
+ public static PrelCostEstimates ZERO_COST = new PrelCostEstimates(0,0);
+
+ public PrelCostEstimates(@JsonProperty("memoryCost") double memory,
+ @JsonProperty("outputRowCount") double rowCount) {
+ this.memoryCost = memory;
+ this.outputRowCount = rowCount;
+ }
+
+ public double getOutputRowCount() {
+ return outputRowCount;
+ }
+
+ public double getMemoryCost() {
+ return memoryCost;
+ }
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DefaultQueryParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DefaultQueryParallelizer.java
new file mode 100644
index 000000000..20875a470
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/DefaultQueryParallelizer.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.util.MemoryAllocationUtilities;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * Non RM version of the parallelizer. The parallelization logic is fully inherited from SimpleParallelizer.
+ * The memory computation of the operators is based on the earlier logic to assign memory for the buffered
+ * operators.
+ */
+public class DefaultQueryParallelizer extends SimpleParallelizer {
+ private final boolean planHasMemory;
+ private final QueryContext queryContext;
+
+ public DefaultQueryParallelizer(boolean memoryAvailableInPlan, QueryContext queryContext) {
+ super(queryContext);
+ this.planHasMemory = memoryAvailableInPlan;
+ this.queryContext = queryContext;
+ }
+
+ public DefaultQueryParallelizer(boolean memoryPlanning, long parallelizationThreshold, int maxWidthPerNode,
+ int maxGlobalWidth, double affinityFactor) {
+ super(parallelizationThreshold, maxWidthPerNode, maxGlobalWidth, affinityFactor);
+ this.planHasMemory = memoryPlanning;
+ this.queryContext = null;
+ }
+
+ @Override
+ public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
+ Collection<DrillbitEndpoint> activeEndpoints) {
+ if (planHasMemory) {
+ return;
+ }
+ List<PhysicalOperator> bufferedOpers = planningSet.getRootWrapper().getNode().getBufferedOperators();
+ MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(planHasMemory, bufferedOpers, queryContext);
+ }
+
+ @Override
+ protected BiFunction<DrillbitEndpoint, PhysicalOperator, Long> getMemory() {
+ return (endpoint, operator) -> operator.getMaxAllocation();
+ }
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java
index a662adc6b..824060194 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Fragment.java
@@ -17,9 +17,11 @@
*/
package org.apache.drill.exec.planner.fragment;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.Exchange;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
@@ -82,18 +84,14 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> {
return sendingExchange;
}
-// public <T, V> T accept(FragmentVisitor<T, V> visitor, V extra) {
-// return visitor.visit(this, extra);
-// }
-
public class ExchangeFragmentPair {
private Exchange exchange;
- private Fragment node;
+ private Fragment fragmentXchgTo;
- public ExchangeFragmentPair(Exchange exchange, Fragment node) {
+ public ExchangeFragmentPair(Exchange exchange, Fragment fragXchgTo) {
super();
this.exchange = exchange;
- this.node = node;
+ this.fragmentXchgTo = fragXchgTo;
}
public Exchange getExchange() {
@@ -101,7 +99,7 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> {
}
public Fragment getNode() {
- return node;
+ return fragmentXchgTo;
}
@Override
@@ -109,7 +107,7 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> {
final int prime = 31;
int result = 1;
result = prime * result + ((exchange == null) ? 0 : exchange.hashCode());
- result = prime * result + ((node == null) ? 0 : node.hashCode());
+ result = prime * result + ((fragmentXchgTo == null) ? 0 : fragmentXchgTo.hashCode());
return result;
}
@@ -167,10 +165,27 @@ public class Fragment implements Iterable<Fragment.ExchangeFragmentPair> {
return true;
}
+ public List<PhysicalOperator> getBufferedOperators() {
+ List<PhysicalOperator> bufferedOps = new ArrayList<>();
+ root.accept(new BufferedOpFinder(), bufferedOps);
+ return bufferedOps;
+ }
+
+ protected static class BufferedOpFinder extends AbstractPhysicalVisitor<Void, List<PhysicalOperator>, RuntimeException> {
+ @Override
+ public Void visitOp(PhysicalOperator op, List<PhysicalOperator> value)
+ throws RuntimeException {
+ if (op.isBufferedOperator(null)) {
+ value.add(op);
+ }
+ visitChildren(op, value);
+ return null;
+ }
+ }
+
@Override
public String toString() {
return "FragmentNode [root=" + root + ", sendingExchange=" + sendingExchange + ", receivingExchangePairs="
+ receivingExchangePairs + "]";
}
-
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java
index a9be6f398..7a59f4f76 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MakeFragmentsVisitor.java
@@ -26,7 +26,6 @@ import org.apache.drill.exec.work.foreman.ForemanSetupException;
* Responsible for breaking a plan into its constituent Fragments.
*/
public class MakeFragmentsVisitor extends AbstractPhysicalVisitor<Fragment, Fragment, ForemanSetupException> {
- static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MakeFragmentsVisitor.class);
public final static MakeFragmentsVisitor INSTANCE = new MakeFragmentsVisitor();
@@ -34,21 +33,20 @@ public class MakeFragmentsVisitor extends AbstractPhysicalVisitor<Fragment, Frag
}
@Override
- public Fragment visitExchange(Exchange exchange, Fragment value) throws ForemanSetupException {
-// logger.debug("Visiting Exchange {}", exchange);
- if (value == null) {
- throw new ForemanSetupException("The simple fragmenter was called without a FragmentBuilder value. This will only happen if the initial call to SimpleFragmenter is by a Exchange node. This should never happen since an Exchange node should never be the root node of a plan.");
+ public Fragment visitExchange(Exchange exchange, Fragment receivingFragment) throws ForemanSetupException {
+ if (receivingFragment == null) {
+ throw new ForemanSetupException("The simple fragmenter was called without a FragmentBuilder value. This will only happen if the initial call to SimpleFragmenter is by a" +
+ " Exchange node. This should never happen since an Exchange node should never be the root node of a plan.");
}
- Fragment next = getNextBuilder();
- value.addReceiveExchange(exchange, next);
- next.addSendExchange(exchange, value);
- exchange.getChild().accept(this, next);
- return value;
+ Fragment sendingFragment= getNextFragment();
+ receivingFragment.addReceiveExchange(exchange, sendingFragment);
+ sendingFragment.addSendExchange(exchange, receivingFragment);
+ exchange.getChild().accept(this, sendingFragment);
+ return sendingFragment;
}
@Override
public Fragment visitOp(PhysicalOperator op, Fragment value) throws ForemanSetupException{
-// logger.debug("Visiting Other {}", op);
value = ensureBuilder(value);
value.addOperator(op);
for (PhysicalOperator child : op) {
@@ -57,15 +55,15 @@ public class MakeFragmentsVisitor extends AbstractPhysicalVisitor<Fragment, Frag
return value;
}
- private Fragment ensureBuilder(Fragment value) throws ForemanSetupException{
- if (value != null) {
- return value;
+ private Fragment ensureBuilder(Fragment currentFragment) throws ForemanSetupException{
+ if (currentFragment != null) {
+ return currentFragment;
} else {
- return getNextBuilder();
+ return getNextFragment();
}
}
- public Fragment getNextBuilder() {
+ public Fragment getNextFragment() {
return new Fragment();
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
index e33a38afc..7659f90f1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Materializer.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.fragment;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.List;
+import java.util.function.BiFunction;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.exception.FragmentSetupException;
@@ -30,7 +31,7 @@ import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Store;
import org.apache.drill.exec.physical.base.SubScan;
-
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.exec.physical.config.LateralJoinPOP;
import org.apache.drill.exec.physical.config.UnnestPOP;
@@ -164,15 +165,21 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
public static class IndexedFragmentNode{
private final Wrapper info;
+ private final BiFunction<Wrapper, Integer, DrillbitEndpoint> endpoint;
private final int minorFragmentId;
+ private final BiFunction<DrillbitEndpoint, PhysicalOperator, Long> memoryPerOperPerDrillbit;
SubScan subScan = null;
private final Deque<UnnestPOP> unnest = new ArrayDeque<>();
- public IndexedFragmentNode(int minorFragmentId, Wrapper info) {
+ public IndexedFragmentNode(int minorFragmentId, Wrapper info,
+ BiFunction<Wrapper, Integer, DrillbitEndpoint> wrapperToEndpoint,
+ BiFunction<DrillbitEndpoint, PhysicalOperator, Long> memoryReqs) {
super();
this.info = info;
+ this.endpoint = wrapperToEndpoint;
this.minorFragmentId = minorFragmentId;
+ this.memoryPerOperPerDrillbit = memoryReqs;
}
public Fragment getNode() {
@@ -188,7 +195,8 @@ public class Materializer extends AbstractPhysicalVisitor<PhysicalOperator, Mate
}
public void addAllocation(PhysicalOperator pop) {
- info.addAllocation(pop);
+ info.addInitialAllocation(pop.getInitialAllocation());
+ info.addMaxAllocation(memoryPerOperPerDrillbit.apply(this.endpoint.apply(info, minorFragmentId), pop));
}
public void addUnnest(UnnestPOP unnest) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java
new file mode 100644
index 000000000..443ab79cd
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/MemoryCalculator.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.base.Exchange;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.physical.config.AbstractMuxExchange;
+import org.apache.drill.exec.planner.AbstractOpWrapperVisitor;
+import org.apache.drill.exec.planner.cost.NodeResource;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.HashMap;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * A visitor to compute memory requirements for each operator in a minor fragment.
+ * This visitor will be called for each major fragment. It traverses the physical operators
+ * in that major fragment and computes the memory for each operator per each minor fragment.
+ * The minor fragment memory resources are further grouped into per Drillbit resource
+ * requirements.
+ */
+public class MemoryCalculator extends AbstractOpWrapperVisitor<Void, RuntimeException> {
+
+ private final PlanningSet planningSet;
+ // List of all the buffered operators and their memory requirement per drillbit.
+ private final Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> bufferedOperators;
+ private final QueryContext queryContext;
+
+ public MemoryCalculator(PlanningSet planningSet, QueryContext context) {
+ this.planningSet = planningSet;
+ this.bufferedOperators = new HashMap<>();
+ this.queryContext = context;
+ }
+
+ // Helper method to compute the minor fragment count per drillbit. This method returns
+ // a map with key as DrillbitEndpoint and value as the width (i.e #minorFragments)
+ // per Drillbit.
+ private Map<DrillbitEndpoint, Integer> getMinorFragCountPerDrillbit(Wrapper currFragment) {
+ return currFragment.getAssignedEndpoints().stream()
+ .collect(Collectors.groupingBy(Function.identity(),
+ Collectors.summingInt(x -> 1)));
+ }
+
+ // Helper method to merge the memory computations for each operator given memory per operator
+ // and the number of minor fragments per Drillbit.
+ private void merge(Wrapper currFrag,
+ Map<DrillbitEndpoint, Integer> minorFragsPerDrillBit,
+ Function<Entry<DrillbitEndpoint, Integer>, Long> getMemory) {
+
+ NodeResource.merge(currFrag.getResourceMap(),
+ minorFragsPerDrillBit.entrySet()
+ .stream()
+ .collect(Collectors.toMap((x) -> x.getKey(),
+ (x) -> NodeResource.create(0,
+ getMemory.apply(x)))));
+ }
+
+ @Override
+ public Void visitSendingExchange(Exchange exchange, Wrapper fragment) throws RuntimeException {
+ Wrapper receivingFragment = planningSet.get(fragment.getNode().getSendingExchangePair().getNode());
+ merge(fragment,
+ getMinorFragCountPerDrillbit(fragment),
+ // get the memory requirements for the sender operator.
+ (x) -> exchange.getSenderMemory(receivingFragment.getWidth(), x.getValue()));
+ return visitOp(exchange, fragment);
+ }
+
+ @Override
+ public Void visitReceivingExchange(Exchange exchange, Wrapper fragment) throws RuntimeException {
+
+ final List<Fragment.ExchangeFragmentPair> receivingExchangePairs = fragment.getNode().getReceivingExchangePairs();
+ final Map<DrillbitEndpoint, Integer> sendingFragsPerDrillBit = new HashMap<>();
+
+ for(Fragment.ExchangeFragmentPair pair : receivingExchangePairs) {
+ if (pair.getExchange() == exchange) {
+ Wrapper sendingFragment = planningSet.get(pair.getNode());
+ Preconditions.checkArgument(sendingFragment.isEndpointsAssignmentDone());
+ for (DrillbitEndpoint endpoint : sendingFragment.getAssignedEndpoints()) {
+ sendingFragsPerDrillBit.putIfAbsent(endpoint, 0);
+ sendingFragsPerDrillBit.put(endpoint, sendingFragsPerDrillBit.get(endpoint)+1);
+ }
+ }
+ }
+ final int totalSendingFrags = sendingFragsPerDrillBit.entrySet().stream()
+ .mapToInt((x) -> x.getValue()).reduce(0, (x, y) -> x+y);
+ merge(fragment,
+ getMinorFragCountPerDrillbit(fragment),
+ (x) -> exchange.getReceiverMemory(fragment.getWidth(),
+ // If the exchange is a MuxExchange then the sending fragments are from that particular drillbit otherwise
+ // sending fragments are from across the cluster.
+ exchange instanceof AbstractMuxExchange ? sendingFragsPerDrillBit.get(x.getKey()) : totalSendingFrags));
+ return null;
+ }
+
+ public List<Pair<PhysicalOperator, Long>> getBufferedOperators(DrillbitEndpoint endpoint) {
+ return this.bufferedOperators.getOrDefault(endpoint, new ArrayList<>());
+ }
+
+ @Override
+ public Void visitOp(PhysicalOperator op, Wrapper fragment) {
+ long memoryCost = (int)Math.ceil(op.getCost().getMemoryCost());
+ if (op.isBufferedOperator(queryContext)) {
+ // If the operator is a buffered operator then get the memory estimates of the optimizer.
+ // The memory estimates of the optimizer are for the whole operator spread across all the
+ // minor fragments. Divide this memory estimation by fragment width to get the memory
+ // requirement per minor fragment.
+ long memoryCostPerMinorFrag = (int)Math.ceil(memoryCost/fragment.getAssignedEndpoints().size());
+ Map<DrillbitEndpoint, Integer> drillbitEndpointMinorFragMap = getMinorFragCountPerDrillbit(fragment);
+
+ Map<DrillbitEndpoint,
+ Pair<PhysicalOperator, Long>> bufferedOperatorsPerDrillbit =
+ drillbitEndpointMinorFragMap.entrySet().stream()
+ .collect(Collectors.toMap((x) -> x.getKey(),
+ (x) -> Pair.of(op,
+ memoryCostPerMinorFrag * x.getValue())));
+ bufferedOperatorsPerDrillbit.entrySet().forEach((x) -> {
+ bufferedOperators.putIfAbsent(x.getKey(), new ArrayList<>());
+ bufferedOperators.get(x.getKey()).add(x.getValue());
+ });
+
+ merge(fragment,
+ drillbitEndpointMinorFragMap,
+ (x) -> memoryCostPerMinorFrag * x.getValue());
+
+ } else {
+ // Memory requirement for the non-buffered operators is just the batch size.
+ merge(fragment,
+ getMinorFragCountPerDrillbit(fragment),
+ (x) -> memoryCost * x.getValue());
+ }
+ for (PhysicalOperator child : op) {
+ child.accept(this, fragment);
+ }
+ return null;
+ }
+}
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueryParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueryParallelizer.java
new file mode 100644
index 000000000..097f4b249
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueryParallelizer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.proto.BitControl.QueryContextInformation;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared.QueryId;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.server.options.OptionList;
+import org.apache.drill.exec.work.QueryWorkUnit;
+
+import java.util.Collection;
+
+/**
+ * This class parallelizes the query plan. Once the optimizer finishes its job by producing a
+ * optimized plan, it is the job of this parallelizer to generate a parallel plan out of the
+ * optimized physical plan. It does so by using the optimizers estimates for row count etc.
+ * There are two kinds of parallelizers as explained below. Currently the difference in
+ * both of these parallelizers is only in the memory assignment for the physical operators.
+ *
+ * a) Default Parallelizer: It optimistically assumes that the whole cluster is running only the
+ * current query and based on heuristics assigns the optimal memory to the buffered operators.
+ *
+ * b) Queue Parallelizer: This parallelizer computes the memory that can be allocated at best based
+ * on the current cluster state(as to how much memory is available) and also the configuration
+ * of the queue that it can run on.
+ */
+public interface QueryParallelizer extends ParallelizationParameters {
+
+ /**
+ * This is the only function exposed to the consumer of this parallelizer (currently Foreman) to parallelize
+ * the plan. The caller transforms the plan to a fragment tree and supplies the required information for
+ * the parallelizer to do its job.
+ * @param options List of all options that are set for the current session.
+ * @param foremanNode Endpoint information of the foreman node.
+ * @param queryId Unique ID of the query.
+ * @param activeEndpoints Currently active endpoints on which the plan can run.
+ * @param rootFragment root of the fragment tree of the transformed physical plan
+ * @param session user session object.
+ * @param queryContextInfo query context.
+ * @return Executable query plan which contains all the information like minor frags and major frags.
+ * @throws ExecutionSetupException
+ */
+ QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,
+ Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment,
+ UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException;
+}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueueQueryParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueueQueryParallelizer.java
new file mode 100644
index 000000000..c986fca29
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/QueueQueryParallelizer.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.fragment;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.drill.common.util.function.CheckedConsumer;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
+import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.cost.NodeResource;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Collection;
+import java.util.Set;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+/**
+ * Paralellizer specialized for managing resources for a query based on Queues. This parallelizer
+ * does not deal with increase/decrease of the parallelization of a query plan based on the current
+ * cluster state. However, the memory assignment for each operator, minor fragment and major
+ * fragment is based on the cluster state and provided queue configuration.
+ */
+public class QueueQueryParallelizer extends SimpleParallelizer {
+ private final boolean planHasMemory;
+ private final QueryContext queryContext;
+ private final Map<DrillbitEndpoint, Map<PhysicalOperator, Long>> operators;
+
+ public QueueQueryParallelizer(boolean memoryPlanning, QueryContext queryContext) {
+ super(queryContext);
+ this.planHasMemory = memoryPlanning;
+ this.queryContext = queryContext;
+ this.operators = new HashMap<>();
+ }
+
+ // return the memory computed for a physical operator on a drillbitendpoint.
+ public BiFunction<DrillbitEndpoint, PhysicalOperator, Long> getMemory() {
+ return (endpoint, operator) -> {
+ if (planHasMemory) {
+ return operators.get(endpoint).get(operator);
+ }
+ else {
+ return operator.getMaxAllocation();
+ }
+ };
+ }
+
+ /**
+ * Function called by the SimpleParallelizer to adjust the memory post parallelization.
+ * The overall logic is to traverse the fragment tree and call the MemoryCalculator on
+ * each major fragment. Once the memory is computed, resource requirement are accumulated
+ * per drillbit.
+ *
+ * The total resource requirements are used to select a queue. If the selected queue's
+ * resource limit is more/less than the query's requirement than the memory will be re-adjusted.
+ *
+ * @param planningSet context of the fragments.
+ * @param roots root fragments.
+ * @param activeEndpoints currently active endpoints.
+ * @throws PhysicalOperatorSetupException
+ */
+ public void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
+ Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException {
+
+ if (planHasMemory) {
+ return;
+ }
+ // total node resources for the query plan maintained per drillbit.
+ final Map<DrillbitEndpoint, NodeResource> totalNodeResources =
+ activeEndpoints.stream().collect(Collectors.toMap(x ->x,
+ x -> NodeResource.create()));
+
+ // list of the physical operators and their memory requirements per drillbit.
+ final Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> operators =
+ activeEndpoints.stream().collect(Collectors.toMap(x -> x,
+ x -> new ArrayList<>()));
+
+ for (Wrapper wrapper : roots) {
+ traverse(wrapper, CheckedConsumer.throwingConsumerWrapper((Wrapper fragment) -> {
+ MemoryCalculator calculator = new MemoryCalculator(planningSet, queryContext);
+ fragment.getNode().getRoot().accept(calculator, fragment);
+ NodeResource.merge(totalNodeResources, fragment.getResourceMap());
+ operators.entrySet()
+ .stream()
+ .forEach((entry) -> entry.getValue()
+ .addAll(calculator.getBufferedOperators(entry.getKey())));
+ }));
+ }
+ //queryrm.selectQueue( pass the max node Resource) returns queue configuration.
+ Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> memoryAdjustedOperators = ensureOperatorMemoryWithinLimits(operators, totalNodeResources, 10);
+ memoryAdjustedOperators.entrySet().stream().forEach((x) -> {
+ Map<PhysicalOperator, Long> memoryPerOperator = x.getValue().stream()
+ .collect(Collectors.toMap(operatorLongPair -> operatorLongPair.getLeft(),
+ operatorLongPair -> operatorLongPair.getRight(),
+ (mem_1, mem_2) -> (mem_1 + mem_2)));
+ this.operators.put(x.getKey(), memoryPerOperator);
+ });
+ }
+
+
+ /**
+ * Helper method to adjust the memory for the buffered operators.
+ * @param memoryPerOperator list of physical operators per drillbit
+ * @param nodeResourceMap resources per drillbit.
+ * @param nodeLimit permissible node limit.
+ * @return list of operators which contain adjusted memory limits.
+ */
+ private Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>>
+ ensureOperatorMemoryWithinLimits(Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> memoryPerOperator,
+ Map<DrillbitEndpoint, NodeResource> nodeResourceMap, int nodeLimit) {
+ // Get the physical operators which are above the node memory limit.
+ Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> onlyMemoryAboveLimitOperators = new HashMap<>();
+ memoryPerOperator.entrySet().stream().forEach((entry) -> {
+ onlyMemoryAboveLimitOperators.putIfAbsent(entry.getKey(), new ArrayList<>());
+ if (nodeResourceMap.get(entry.getKey()).getMemory() > nodeLimit) {
+ onlyMemoryAboveLimitOperators.get(entry.getKey()).addAll(entry.getValue());
+ }
+ });
+
+
+ // Compute the total memory required by the physical operators on the drillbits which are above node limit.
+ // Then use the total memory to adjust the memory requirement based on the permissible node limit.
+ Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> memoryAdjustedDrillbits = new HashMap<>();
+ onlyMemoryAboveLimitOperators.entrySet().stream().forEach(
+ entry -> {
+ Long totalMemory = entry.getValue().stream().mapToLong(Pair::getValue).sum();
+ List<Pair<PhysicalOperator, Long>> adjustedMemory = entry.getValue().stream().map(operatorMemory -> {
+ // formula to adjust the memory is (optimalMemory / totalMemory(this is for all the operators)) * permissible_node_limit.
+ return Pair.of(operatorMemory.getKey(), (long) Math.ceil(operatorMemory.getValue()/totalMemory * nodeLimit));
+ }).collect(Collectors.toList());
+ memoryAdjustedDrillbits.put(entry.getKey(), adjustedMemory);
+ }
+ );
+
+ // Get all the operations on drillbits which were adjusted for memory and merge them with operators which are not
+ // adjusted for memory.
+ Map<DrillbitEndpoint, List<Pair<PhysicalOperator, Long>>> allDrillbits = new HashMap<>();
+ memoryPerOperator.entrySet().stream().filter((entry) -> !memoryAdjustedDrillbits.containsKey(entry.getKey())).forEach(
+ operatorMemory -> {
+ allDrillbits.put(operatorMemory.getKey(), operatorMemory.getValue());
+ }
+ );
+
+ memoryAdjustedDrillbits.entrySet().stream().forEach(
+ operatorMemory -> {
+ allDrillbits.put(operatorMemory.getKey(), operatorMemory.getValue());
+ }
+ );
+
+ // At this point allDrillbits contains the operators on all drillbits. The memory also is adjusted based on the nodeLimit and
+ // the ratio of their requirements.
+ return allDrillbits;
+ }
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index 39b699fe4..a434bf80b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -21,9 +21,12 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.util.DrillStringUtils;
+import org.apache.drill.common.util.function.CheckedConsumer;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
@@ -61,7 +64,7 @@ import org.apache.drill.exec.work.foreman.ForemanSetupException;
* parallelization for each major fragment will be determined. Once the amount of parallelization is done, assignment
* is done based on round robin assignment ordered by operator affinity (locality) to available execution Drillbits.
*/
-public class SimpleParallelizer implements ParallelizationParameters {
+public abstract class SimpleParallelizer implements QueryParallelizer {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SimpleParallelizer.class);
private final long parallelizationThreshold;
@@ -69,7 +72,7 @@ public class SimpleParallelizer implements ParallelizationParameters {
private final int maxGlobalWidth;
private final double affinityFactor;
- public SimpleParallelizer(QueryContext context) {
+ protected SimpleParallelizer(QueryContext context) {
OptionManager optionManager = context.getOptions();
long sliceTarget = optionManager.getOption(ExecConstants.SLICE_TARGET_OPTION);
this.parallelizationThreshold = sliceTarget > 0 ? sliceTarget : 1;
@@ -81,7 +84,7 @@ public class SimpleParallelizer implements ParallelizationParameters {
this.affinityFactor = optionManager.getOption(ExecConstants.AFFINITY_FACTOR_KEY).float_val.intValue();
}
- public SimpleParallelizer(long parallelizationThreshold, int maxWidthPerNode, int maxGlobalWidth, double affinityFactor) {
+ protected SimpleParallelizer(long parallelizationThreshold, int maxWidthPerNode, int maxGlobalWidth, double affinityFactor) {
this.parallelizationThreshold = parallelizationThreshold;
this.maxWidthPerNode = maxWidthPerNode;
this.maxGlobalWidth = maxGlobalWidth;
@@ -108,27 +111,106 @@ public class SimpleParallelizer implements ParallelizationParameters {
return affinityFactor;
}
+ public Set<Wrapper> getRootFragments(PlanningSet planningSet) {
+ //The following code gets the root fragment by removing all the dependent fragments on which root fragments depend upon.
+ //This is fine because the later parallelizer code traverses from these root fragments to their respective dependent
+ //fragments.
+ final Set<Wrapper> roots = Sets.newHashSet();
+ for(Wrapper w : planningSet) {
+ roots.add(w);
+ }
+
+ //roots will be left over with the fragments which are not depended upon by any other fragments.
+ for(Wrapper wrapper : planningSet) {
+ final List<Wrapper> fragmentDependencies = wrapper.getFragmentDependencies();
+ if (fragmentDependencies != null && fragmentDependencies.size() > 0) {
+ for(Wrapper dependency : fragmentDependencies) {
+ if (roots.contains(dependency)) {
+ roots.remove(dependency);
+ }
+ }
+ }
+ }
+
+ return roots;
+ }
+
+ public PlanningSet prepareFragmentTree(Fragment rootFragment) {
+ PlanningSet planningSet = new PlanningSet();
+
+ initFragmentWrappers(rootFragment, planningSet);
+
+ constructFragmentDependencyGraph(rootFragment, planningSet);
+
+ return planningSet;
+ }
+
/**
- * Generate a set of assigned fragments based on the provided fragment tree. Do not allow parallelization stages
- * to go beyond the global max width.
+ * Traverse all the major fragments and parallelize each major fragment based on
+ * collected stats. The children fragments are parallelized before a parent
+ * fragment.
+ * @param planningSet Set of all major fragments and their context.
+ * @param roots Root nodes of the plan.
+ * @param activeEndpoints currently active drillbit endpoints.
+ * @throws PhysicalOperatorSetupException
+ */
+ public void collectStatsAndParallelizeFragments(PlanningSet planningSet, Set<Wrapper> roots,
+ Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException {
+ for (Wrapper wrapper : roots) {
+ traverse(wrapper, CheckedConsumer.throwingConsumerWrapper((Wrapper fragmentWrapper) -> {
+ // If this fragment is already parallelized then no need do it again.
+ // This happens in the case of fragments which have MUX operators.
+ if (fragmentWrapper.isEndpointsAssignmentDone()) {
+ return;
+ }
+ fragmentWrapper.getNode().getRoot().accept(new StatsCollector(planningSet), fragmentWrapper);
+ fragmentWrapper.getStats()
+ .getDistributionAffinity()
+ .getFragmentParallelizer()
+ .parallelizeFragment(fragmentWrapper, this, activeEndpoints);
+ //consolidate the cpu resources required by this major fragment per drillbit.
+ fragmentWrapper.computeCpuResources();
+ }));
+ }
+ }
+
+ public abstract void adjustMemory(PlanningSet planningSet, Set<Wrapper> roots,
+ Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException;
+
+ /**
+ * The starting function for the whole parallelization and memory computation logic.
+ * 1) Initially a fragment tree is prepared which contains a wrapper for each fragment.
+ * The topology of this tree is same as that of the major fragment tree.
+ * 2) Traverse this fragment tree to collect stats for each major fragment and then
+ * parallelize each fragment. At this stage minor fragments are not created but all
+ * the required information to create minor fragment are computed.
+ * 3) Memory is computed for each operator and for the minor fragment.
+ * 4) Lastly all the above computed information is used to create the minor fragments
+ * for each major fragment.
*
- * @param options Option list
- * @param foremanNode The driving/foreman node for this query. (this node)
- * @param queryId The queryId for this query.
- * @param activeEndpoints The list of endpoints to consider for inclusion in planning this query.
- * @param rootFragment The root node of the PhysicalPlan that we will be parallelizing.
- * @param session UserSession of user who launched this query.
- * @param queryContextInfo Info related to the context when query has started.
- * @return The list of generated PlanFragment protobuf objects to be assigned out to the individual nodes.
+ * @param options List of options set by the user.
+ * @param foremanNode foreman node for this query plan.
+ * @param queryId Query ID.
+ * @param activeEndpoints currently active endpoins on which this plan will run.
+ * @param rootFragment Root major fragment.
+ * @param session session context.
+ * @param queryContextInfo query context.
+ * @return
* @throws ExecutionSetupException
*/
- public QueryWorkUnit getFragments(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,
- Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment,
- UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException {
+ @Override
+ public final QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,
+ Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment,
+ UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException {
+ PlanningSet planningSet = prepareFragmentTree(rootFragment);
+
+ Set<Wrapper> rootFragments = getRootFragments(planningSet);
- final PlanningSet planningSet = getFragmentsHelper(activeEndpoints, rootFragment);
- return generateWorkUnit(
- options, foremanNode, queryId, rootFragment, planningSet, session, queryContextInfo);
+ collectStatsAndParallelizeFragments(planningSet, rootFragments, activeEndpoints);
+
+ adjustMemory(planningSet, rootFragments, activeEndpoints);
+
+ return generateWorkUnit(options, foremanNode, queryId, rootFragment, planningSet, session, queryContextInfo);
}
/**
@@ -151,28 +233,7 @@ public class SimpleParallelizer implements ParallelizationParameters {
throw new UnsupportedOperationException("Use children classes");
}
- /**
- * Helper method to reuse the code for QueryWorkUnit(s) generation
- * @param activeEndpoints
- * @param rootFragment
- * @return A {@link PlanningSet}.
- * @throws ExecutionSetupException
- */
- protected PlanningSet getFragmentsHelper(Collection<DrillbitEndpoint> activeEndpoints, Fragment rootFragment) throws ExecutionSetupException {
-
- PlanningSet planningSet = new PlanningSet();
- initFragmentWrappers(rootFragment, planningSet);
-
- final Set<Wrapper> leafFragments = constructFragmentDependencyGraph(planningSet);
-
- // Start parallelizing from leaf fragments
- for (Wrapper wrapper : leafFragments) {
- parallelizeFragment(wrapper, planningSet, activeEndpoints);
- }
- planningSet.findRootWrapper(rootFragment);
- return planningSet;
- }
// For every fragment, create a Wrapper in PlanningSet.
@VisibleForTesting
@@ -190,77 +251,49 @@ public class SimpleParallelizer implements ParallelizationParameters {
* @param planningSet
* @return Returns a list of leaf fragments in fragment dependency graph.
*/
- private static Set<Wrapper> constructFragmentDependencyGraph(PlanningSet planningSet) {
+ private void constructFragmentDependencyGraph(Fragment rootFragment, PlanningSet planningSet) {
// Set up dependency of fragments based on the affinity of exchange that separates the fragments.
- for (Wrapper currentFragmentWrapper : planningSet) {
- ExchangeFragmentPair sendingExchange = currentFragmentWrapper.getNode().getSendingExchangePair();
- if (sendingExchange != null) {
- ParallelizationDependency dependency = sendingExchange.getExchange().getParallelizationDependency();
- Wrapper receivingFragmentWrapper = planningSet.get(sendingExchange.getNode());
-
+ for(Wrapper currentFragment : planningSet) {
+ ExchangeFragmentPair sendingXchgForCurrFrag = currentFragment.getNode().getSendingExchangePair();
+ if (sendingXchgForCurrFrag != null) {
+ ParallelizationDependency dependency = sendingXchgForCurrFrag.getExchange().getParallelizationDependency();
+ Wrapper receivingFragmentWrapper = planningSet.get(sendingXchgForCurrFrag.getNode());
+
+ //Mostly Receivers of the current fragment depend on the sender of the child fragments. However there is a special case
+ //for DeMux Exchanges where the Sender of the current fragment depends on the receiver of the parent fragment.
if (dependency == ParallelizationDependency.RECEIVER_DEPENDS_ON_SENDER) {
- receivingFragmentWrapper.addFragmentDependency(currentFragmentWrapper);
+ receivingFragmentWrapper.addFragmentDependency(currentFragment);
} else if (dependency == ParallelizationDependency.SENDER_DEPENDS_ON_RECEIVER) {
- currentFragmentWrapper.addFragmentDependency(receivingFragmentWrapper);
+ currentFragment.addFragmentDependency(receivingFragmentWrapper);
}
}
}
-
- // Identify leaf fragments. Leaf fragments are fragments that have no other fragments depending on them for
- // parallelization info. First assume all fragments are leaf fragments. Go through the fragments one by one and
- // remove the fragment on which the current fragment depends.
-
- final Set<Wrapper> roots = Sets.newHashSet();
- for (Wrapper w : planningSet) {
- roots.add(w);
- }
-
- for (Wrapper wrapper : planningSet) {
- final List<Wrapper> fragmentDependencies = wrapper.getFragmentDependencies();
- if (fragmentDependencies != null && fragmentDependencies.size() > 0) {
- for (Wrapper dependency : fragmentDependencies) {
- if (roots.contains(dependency)) {
- roots.remove(dependency);
- }
- }
- }
- }
-
- return roots;
+ planningSet.findRootWrapper(rootFragment);
}
+
/**
- * Helper method for parallelizing a given fragment. Dependent fragments are parallelized first before
- * parallelizing the given fragment.
+ * Helper method to call operation on each fragment. Traversal calls operation on child fragments before
+ * calling it on the parent fragment.
*/
- private void parallelizeFragment(Wrapper fragmentWrapper, PlanningSet planningSet,
- Collection<DrillbitEndpoint> activeEndpoints) throws PhysicalOperatorSetupException {
- // If the fragment is already parallelized, return.
- if (fragmentWrapper.isEndpointsAssignmentDone()) {
- return;
- }
+ protected void traverse(Wrapper fragmentWrapper, Consumer<Wrapper> operation) throws PhysicalOperatorSetupException {
- // First parallelize fragments on which this fragment depends.
final List<Wrapper> fragmentDependencies = fragmentWrapper.getFragmentDependencies();
if (fragmentDependencies != null && fragmentDependencies.size() > 0) {
for(Wrapper dependency : fragmentDependencies) {
- parallelizeFragment(dependency, planningSet, activeEndpoints);
+ traverse(dependency, operation);
}
}
-
- // Find stats. Stats include various factors including cost of physical operators, parallelizability of
- // work in physical operator and affinity of physical operator to certain nodes.
- fragmentWrapper.getNode().getRoot().accept(new StatsCollector(planningSet), fragmentWrapper);
-
- fragmentWrapper.getStats().getDistributionAffinity()
- .getFragmentParallelizer()
- .parallelizeFragment(fragmentWrapper, this, activeEndpoints);
+ operation.accept(fragmentWrapper);
}
+ // A function which returns the memory assigned for a particular physical operator.
+ protected abstract BiFunction<DrillbitEndpoint, PhysicalOperator, Long> getMemory();
+
protected QueryWorkUnit generateWorkUnit(OptionList options, DrillbitEndpoint foremanNode, QueryId queryId,
- Fragment rootNode, PlanningSet planningSet,
- UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException {
+ Fragment rootNode, PlanningSet planningSet, UserSession session,
+ QueryContextInformation queryContextInfo) throws ExecutionSetupException {
List<MinorFragmentDefn> fragmentDefns = new ArrayList<>( );
MinorFragmentDefn rootFragmentDefn = null;
@@ -282,7 +315,8 @@ public class SimpleParallelizer implements ParallelizationParameters {
// Create a minorFragment for each major fragment.
for (int minorFragmentId = 0; minorFragmentId < wrapper.getWidth(); minorFragmentId++) {
- IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper);
+ IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper,
+ (fragmentWrapper, minorFragment) -> fragmentWrapper.getAssignedEndpoint(minorFragment), getMemory());
wrapper.resetAllocation();
PhysicalOperator op = physicalOperatorRoot.accept(Materializer.INSTANCE, iNode);
Preconditions.checkArgument(op instanceof FragmentRoot);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
index 680e6a566..49af366ce 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
@@ -71,6 +71,7 @@ public class StatsCollector extends AbstractOpWrapperVisitor<Void, RuntimeExcept
for(ExchangeFragmentPair pair : receivingExchangePairs) {
if (pair.getExchange() == exchange) {
+ //This is the child fragment which is sending data to this fragment.
Wrapper sendingFragment = planningSet.get(pair.getNode());
if (sendingFragment.isEndpointsAssignmentDone()) {
sendingEndpoints.addAll(sendingFragment.getAssignedEndpoints());
@@ -105,7 +106,7 @@ public class StatsCollector extends AbstractOpWrapperVisitor<Void, RuntimeExcept
stats.addEndpointAffinities(hasAffinity.getOperatorAffinity());
stats.setDistributionAffinity(hasAffinity.getDistributionAffinity());
}
- stats.addCost(op.getCost());
+ stats.addCost(op.getCost().getOutputRowCount());
for (PhysicalOperator child : op) {
child.accept(this, wrapper);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
index 6da885044..ffa577e41 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/Wrapper.java
@@ -18,7 +18,12 @@
package org.apache.drill.exec.planner.fragment;
import java.util.List;
+import java.util.Map;
+import java.util.function.BinaryOperator;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.drill.exec.planner.cost.NodeResource;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
import org.apache.drill.exec.physical.PhysicalOperatorSetupException;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
@@ -46,6 +51,11 @@ public class Wrapper {
private boolean endpointsAssigned;
private long initialAllocation = 0;
private long maxAllocation = 0;
+ // Resources (i.e memory and cpu) are stored per drillbit in this map.
+ // A Drillbit can have n number of minor fragments then the NodeResource
+ // contains cumulative resources required for all the minor fragments
+ // for that major fragment on that Drillbit.
+ private Map<DrillbitEndpoint, NodeResource> nodeResourceMap;
// List of fragments this particular fragment depends on for determining its parallelization and endpoint assignments.
private final List<Wrapper> fragmentDependencies = Lists.newArrayList();
@@ -58,6 +68,7 @@ public class Wrapper {
this.majorFragmentId = majorFragmentId;
this.node = node;
this.stats = new Stats();
+ nodeResourceMap = null;
}
public Stats getStats() {
@@ -94,10 +105,12 @@ public class Wrapper {
return maxAllocation;
}
- public void addAllocation(PhysicalOperator pop) {
- initialAllocation += pop.getInitialAllocation();
-// logger.debug("Incrementing initialAllocation by {} to {}. Pop: {}", pop.getInitialAllocation(), initialAllocation, pop);
- maxAllocation += pop.getMaxAllocation();
+ public void addInitialAllocation(long memory) {
+ initialAllocation += memory;
+ }
+
+ public void addMaxAllocation(long memory) {
+ maxAllocation += memory;
}
private class AssignEndpointsToScanAndStore extends AbstractPhysicalVisitor<Void, List<DrillbitEndpoint>, PhysicalOperatorSetupException>{
@@ -197,4 +210,30 @@ public class Wrapper {
public List<Wrapper> getFragmentDependencies() {
return ImmutableList.copyOf(fragmentDependencies);
}
+
+ /**
+ * Compute the cpu resources required for all the minor fragments of this major fragment.
+ * This information is stored per DrillbitEndpoint. It is assumed that this function is
+ * called only once.
+ */
+ public void computeCpuResources() {
+ Preconditions.checkArgument(nodeResourceMap == null);
+ BinaryOperator<NodeResource> merge = (first, second) -> {
+ NodeResource result = NodeResource.create();
+ result.add(first);
+ result.add(second);
+ return result;
+ };
+
+ Function<DrillbitEndpoint, NodeResource> cpuPerEndpoint = (endpoint) -> new NodeResource(1, 0);
+
+ nodeResourceMap = endpoints.stream()
+ .collect(Collectors.groupingBy(Function.identity(),
+ Collectors.reducing(NodeResource.create(),
+ cpuPerEndpoint, merge)));
+ }
+
+ public Map<DrillbitEndpoint, NodeResource> getResourceMap() {
+ return nodeResourceMap;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java
index 70d4e268a..c1250e3ac 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/contrib/SplittingParallelizer.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.planner.fragment.contrib;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.util.DrillStringUtils;
@@ -28,9 +29,9 @@ import org.apache.drill.exec.physical.base.Exchange;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.fragment.DefaultQueryParallelizer;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.PlanningSet;
-import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
import org.apache.drill.exec.planner.fragment.Wrapper;
import org.apache.drill.exec.planner.fragment.Materializer.IndexedFragmentNode;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
@@ -56,12 +57,12 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
* allows not to pollute parent class with non-authentic functionality
*
*/
-public class SplittingParallelizer extends SimpleParallelizer {
+public class SplittingParallelizer extends DefaultQueryParallelizer {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SplittingParallelizer.class);
- public SplittingParallelizer(QueryContext context) {
- super(context);
+ public SplittingParallelizer(boolean doMemoryPlanning, QueryContext context) {
+ super(doMemoryPlanning, context);
}
/**
@@ -81,7 +82,13 @@ public class SplittingParallelizer extends SimpleParallelizer {
Collection<DrillbitEndpoint> activeEndpoints, PhysicalPlanReader reader, Fragment rootFragment,
UserSession session, QueryContextInformation queryContextInfo) throws ExecutionSetupException {
- final PlanningSet planningSet = getFragmentsHelper(activeEndpoints, rootFragment);
+ final PlanningSet planningSet = this.prepareFragmentTree(rootFragment);
+
+ Set<Wrapper> rootFragments = getRootFragments(planningSet);
+
+ collectStatsAndParallelizeFragments(planningSet, rootFragments, activeEndpoints);
+
+ adjustMemory(planningSet, rootFragments, activeEndpoints);
return generateWorkUnits(
options, foremanNode, queryId, reader, rootFragment, planningSet, session, queryContextInfo);
@@ -113,7 +120,7 @@ public class SplittingParallelizer extends SimpleParallelizer {
List<QueryWorkUnit> workUnits = Lists.newArrayList();
int plansCount = 0;
- DrillbitEndpoint[] endPoints = null;
+ DrillbitEndpoint[] leafFragEndpoints = null;
long initialAllocation = 0;
final Iterator<Wrapper> iter = planningSet.iterator();
@@ -130,12 +137,14 @@ public class SplittingParallelizer extends SimpleParallelizer {
// allocation
plansCount = wrapper.getWidth();
initialAllocation = (wrapper.getInitialAllocation() != 0 ) ? wrapper.getInitialAllocation()/plansCount : 0;
- endPoints = new DrillbitEndpoint[plansCount];
+ leafFragEndpoints = new DrillbitEndpoint[plansCount];
for (int mfId = 0; mfId < plansCount; mfId++) {
- endPoints[mfId] = wrapper.getAssignedEndpoint(mfId);
+ leafFragEndpoints[mfId] = wrapper.getAssignedEndpoint(mfId);
}
}
}
+
+ DrillbitEndpoint[] endPoints = leafFragEndpoints;
if ( plansCount == 0 ) {
// no exchange, return list of single QueryWorkUnit
workUnits.add(generateWorkUnit(options, foremanNode, queryId, rootNode, planningSet, session, queryContextInfo));
@@ -174,7 +183,8 @@ public class SplittingParallelizer extends SimpleParallelizer {
MinorFragmentDefn rootFragment = null;
FragmentRoot rootOperator = null;
- IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper);
+ IndexedFragmentNode iNode = new IndexedFragmentNode(minorFragmentId, wrapper,
+ (fragmentWrapper, minorFragment) -> endPoints[minorFragment],getMemory());
wrapper.resetAllocation();
// two visitors here
// 1. To remove exchange
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java
index 095a3238b..7d950c187 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/PhysicalPlanCreator.java
@@ -21,13 +21,17 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.drill.common.logical.PlanProperties;
import org.apache.drill.common.logical.PlanProperties.Generator.ResultMode;
import org.apache.drill.common.logical.PlanProperties.PlanPropertiesBuilder;
import org.apache.drill.common.logical.PlanProperties.PlanType;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.cost.DrillCostBase;
+import org.apache.drill.exec.planner.cost.PrelCostEstimates;
import org.apache.drill.exec.planner.physical.explain.PrelSequencer.OpId;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -54,10 +58,24 @@ public class PhysicalPlanCreator {
public PhysicalOperator addMetadata(Prel originalPrel, PhysicalOperator op){
op.setOperatorId(opIdMap.get(originalPrel).getAsSingleInt());
- op.setCost(originalPrel.estimateRowCount(originalPrel.getCluster().getMetadataQuery()));
+ op.setCost(getPrelCostEstimates(originalPrel, op));
return op;
}
+ private PrelCostEstimates getPrelCostEstimates(Prel originalPrel, PhysicalOperator op) {
+ final RelMetadataQuery mq = originalPrel.getCluster().getMetadataQuery();
+ final double estimatedRowCount = originalPrel.estimateRowCount(mq);
+ final DrillCostBase costBase = (DrillCostBase) originalPrel.computeSelfCost(originalPrel.getCluster().getPlanner(),
+ mq);
+ final PrelCostEstimates costEstimates;
+ if (!op.isBufferedOperator(context)) {
+ costEstimates = new PrelCostEstimates(context.getOptions().getLong(ExecConstants.OUTPUT_BATCH_SIZE), estimatedRowCount);
+ } else {
+ costEstimates = new PrelCostEstimates(costBase.getMemory(), estimatedRowCount);
+ }
+ return costEstimates;
+ }
+
public PhysicalPlan build(Prel rootPrel, boolean forceRebuild) {
if (plan != null && !forceRebuild) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
index bec2bcb87..008c9dfc0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/MemoryAllocationUtilities.java
@@ -17,7 +17,7 @@
*/
package org.apache.drill.exec.util;
-import java.util.LinkedList;
+import java.util.ArrayList;
import java.util.List;
import org.apache.drill.common.config.DrillConfig;
@@ -35,6 +35,22 @@ public class MemoryAllocationUtilities {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(MemoryAllocationUtilities.class);
+
+ public static void setupBufferedMemoryAllocations(PhysicalPlan plan, final QueryContext queryContext) {
+ setupBufferedOpsMemoryAllocations(plan.getProperties().hasResourcePlan,
+ getBufferedOperators(plan.getSortedOperators(), queryContext), queryContext);
+ }
+
+ public static List<PhysicalOperator> getBufferedOperators(List<PhysicalOperator> operators, QueryContext queryContext) {
+ final List<PhysicalOperator> bufferedOpList = new ArrayList<>();
+ for (final PhysicalOperator op : operators) {
+ if (op.isBufferedOperator(queryContext)) {
+ bufferedOpList.add(op);
+ }
+ }
+ return bufferedOpList;
+ }
+
/**
* Helper method to setup Memory Allocations
* <p>
@@ -54,28 +70,18 @@ public class MemoryAllocationUtilities {
* <p>
* since this method can be used in multiple places adding it in this class
* rather than keeping it in Foreman
- * @param plan
- * @param queryContext
+ * @param planHasMemory defines the memory planning needs to be done or not.
+ * generally skipped when the plan contains memory allocation.
+ * @param bufferedOperators list of buffered operators in the plan.
+ * @param queryContext context of the query.
*/
- public static void setupBufferedOpsMemoryAllocations(final PhysicalPlan plan, final QueryContext queryContext) {
+ public static void setupBufferedOpsMemoryAllocations(boolean planHasMemory,
+ List<PhysicalOperator> bufferedOperators, final QueryContext queryContext) {
// Test plans may already have a pre-defined memory plan.
// Otherwise, determine memory allocation.
- if (plan.getProperties().hasResourcePlan) {
- return;
- }
- // look for external sorts
- final List<PhysicalOperator> bufferedOpList = new LinkedList<>();
- for (final PhysicalOperator op : plan.getSortedOperators()) {
- if (op.isBufferedOperator(queryContext)) {
- bufferedOpList.add(op);
- }
- }
-
- // if there are any sorts, compute the maximum allocation, and set it on them
- plan.getProperties().hasResourcePlan = true;
- if (bufferedOpList.isEmpty()) {
+ if (planHasMemory || bufferedOperators.isEmpty()) {
return;
}
@@ -91,9 +97,9 @@ public class MemoryAllocationUtilities {
// Now divide up the memory by slices and operators.
- final long opMinMem = computeOperatorMemory(optionManager, maxAllocPerNode, bufferedOpList.size());
+ final long opMinMem = computeOperatorMemory(optionManager, maxAllocPerNode, bufferedOperators.size());
- for(final PhysicalOperator op : bufferedOpList) {
+ for(final PhysicalOperator op : bufferedOperators) {
final long alloc = Math.max(opMinMem, op.getInitialAllocation());
op.setMaxAllocation(alloc);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 37b8a4073..521e50a35 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -38,7 +38,6 @@ import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.MakeFragmentsVisitor;
-import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
import org.apache.drill.exec.planner.sql.DirectPlan;
import org.apache.drill.exec.planner.sql.DrillSqlWorker;
import org.apache.drill.exec.proto.BitControl.PlanFragment;
@@ -408,7 +407,7 @@ public class Foreman implements Runnable {
validatePlan(plan);
queryRM.visitAbstractPlan(plan);
- final QueryWorkUnit work = getQueryWorkUnit(plan);
+ final QueryWorkUnit work = getQueryWorkUnit(plan, queryRM);
if (enableRuntimeFilter) {
runtimeFilterRouter = new RuntimeFilterRouter(work, drillbitContext);
runtimeFilterRouter.collectRuntimeFilterParallelAndControlInfo();
@@ -463,7 +462,7 @@ public class Foreman implements Runnable {
} catch (IOException e) {
throw new ExecutionSetupException(String.format("Unable to parse FragmentRoot from fragment: %s", rootFragment.getFragmentJson()));
}
- queryRM.setCost(rootOperator.getCost());
+ queryRM.setCost(rootOperator.getCost().getOutputRowCount());
fragmentsRunner.setFragmentsInfo(planFragments, rootFragment, rootOperator);
@@ -561,14 +560,18 @@ public class Foreman implements Runnable {
}
}
- private QueryWorkUnit getQueryWorkUnit(final PhysicalPlan plan) throws ExecutionSetupException {
+ private QueryWorkUnit getQueryWorkUnit(final PhysicalPlan plan,
+ final QueryResourceManager rm) throws ExecutionSetupException {
+
final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
+
final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null);
- final SimpleParallelizer parallelizer = new SimpleParallelizer(queryContext);
- return parallelizer.getFragments(
- queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(),
- queryId, queryContext.getOnlineEndpoints(), rootFragment,
- initiatingClient.getSession(), queryContext.getQueryContextInfo());
+
+ return rm.getParallelizer(plan.getProperties().hasResourcePlan).generateWorkUnit(queryContext.getOptions().getOptionList(),
+ queryContext.getCurrentEndpoint(),
+ queryId, queryContext.getOnlineEndpoints(),
+ rootFragment, initiatingClient.getSession(),
+ queryContext.getQueryContextInfo());
}
private void logWorkUnit(QueryWorkUnit queryWorkUnit) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java
index 77957d5a3..7c3cc04d5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/DefaultResourceManager.java
@@ -20,6 +20,8 @@ package org.apache.drill.exec.work.foreman.rm;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ops.QueryContext;
import org.apache.drill.exec.physical.PhysicalPlan;
+import org.apache.drill.exec.planner.fragment.QueryParallelizer;
+import org.apache.drill.exec.planner.fragment.DefaultQueryParallelizer;
import org.apache.drill.exec.util.MemoryAllocationUtilities;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.apache.drill.exec.work.foreman.Foreman;
@@ -46,12 +48,16 @@ public class DefaultResourceManager implements ResourceManager {
if (plan == null || plan.getProperties().hasResourcePlan) {
return;
}
- MemoryAllocationUtilities.setupBufferedOpsMemoryAllocations(plan, queryContext);
+ MemoryAllocationUtilities.setupBufferedMemoryAllocations(plan, queryContext);
}
@Override
public void visitPhysicalPlan(QueryWorkUnit work) {
}
+
+ public QueryContext getQueryContext() {
+ return queryContext;
+ }
}
public static class DefaultQueryResourceManager extends DefaultResourceAllocator implements QueryResourceManager {
@@ -70,6 +76,11 @@ public class DefaultResourceManager implements ResourceManager {
}
@Override
+ public QueryParallelizer getParallelizer(boolean memoryPlanning){
+ return new DefaultQueryParallelizer(memoryPlanning, this.getQueryContext());
+ }
+
+ @Override
public void admit() {
// No queueing by default
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java
index 9e2a3a10f..4b9112194 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/QueryResourceManager.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.work.foreman.rm;
+import org.apache.drill.exec.planner.fragment.QueryParallelizer;
import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException;
@@ -44,6 +45,15 @@ public interface QueryResourceManager extends QueryResourceAllocator {
void setCost(double cost);
/**
+ * Create a parallelizer to parallelize each major fragment of the query into
+ * many minor fragments. The parallelizer encapsulates the logic of how much
+ * memory and parallelism is required for the query.
+ * @param memoryPlanning memory planning needs to be done during parallelization
+ * @return
+ */
+ QueryParallelizer getParallelizer(boolean memoryPlanning);
+
+ /**
* Admit the query into the cluster. Blocks until the query
* can run. (Later revisions may use a more thread-friendly
* approach.)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
index c6eb9d3b5..5d1a1709d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/rm/ThrottledResourceManager.java
@@ -28,6 +28,8 @@ import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
+import org.apache.drill.exec.planner.fragment.QueryParallelizer;
+import org.apache.drill.exec.planner.fragment.QueueQueryParallelizer;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.work.QueryWorkUnit;
@@ -116,6 +118,10 @@ public class ThrottledResourceManager extends AbstractResourceManager {
}
}
+ public QueryContext getQueryContext() {
+ return queryContext;
+ }
+
private int countBufferingOperators(
Map<String, Collection<PhysicalOperator>> nodeMap) {
int width = 0;
@@ -287,6 +293,12 @@ public class ThrottledResourceManager extends AbstractResourceManager {
}
@Override
+ public QueryParallelizer getParallelizer(boolean planHasMemory) {
+ // currently memory planning is disabled. Enable it once the RM functionality is fully implemented.
+ return new QueueQueryParallelizer(true || planHasMemory, this.getQueryContext());
+ }
+
+ @Override
public void admit() throws QueueTimeoutException, QueryQueueException {
lease = rm.queue().enqueue(foreman.getQueryId(), queryCost);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
index e789c1dfa..e61814ab3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/user/PlanSplitter.java
@@ -113,7 +113,7 @@ public class PlanSplitter {
final PhysicalOperator rootOperator = plan.getSortedOperators(false).iterator().next();
final Fragment rootFragment = rootOperator.accept(MakeFragmentsVisitor.INSTANCE, null);
- final SimpleParallelizer parallelizer = new SplittingParallelizer(queryContext);
+ final SimpleParallelizer parallelizer = new SplittingParallelizer(plan.getProperties().hasResourcePlan, queryContext);
List<PlanFragment> fragments = Lists.newArrayList();
@@ -134,7 +134,7 @@ public class PlanSplitter {
}
}
} else {
- final QueryWorkUnit queryWorkUnit = parallelizer.getFragments(queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(),
+ final QueryWorkUnit queryWorkUnit = parallelizer.generateWorkUnit(queryContext.getOptions().getOptionList(), queryContext.getCurrentEndpoint(),
queryId, queryContext.getActiveEndpoints(), rootFragment,
queryContext.getSession(), queryContext.getQueryContextInfo());
planner.visitPhysicalPlan(queryWorkUnit);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
index 0f0bf0df0..7fb7bdbe4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.physical.config.Screen;
import org.apache.drill.exec.physical.config.UnionExchange;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory;
+import org.apache.drill.exec.planner.cost.PrelCostEstimates;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.store.direct.DirectSubScan;
import org.apache.drill.exec.store.mock.MockSubScanPOP;
@@ -58,7 +59,7 @@ public class TestOpSerialization {
private static PhysicalOperator setupPhysicalOperator(PhysicalOperator operator)
{
operator.setOperatorId(1);
- operator.setCost(1.0);
+ operator.setCost(new PrelCostEstimates(1.0, 1.0));
operator.setMaxAllocation(1000);
return operator;
}
@@ -66,7 +67,7 @@ public class TestOpSerialization {
private static void assertOperator(PhysicalOperator operator)
{
assertEquals(1, operator.getOperatorId());
- assertEquals(1.0, operator.getCost(), 0.00001);
+ assertEquals(1.0, operator.getCost().getOutputRowCount(), 0.00001);
assertEquals(1000, operator.getMaxAllocation());
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
index b45acfbc8..37cae3e92 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.physical.impl;
+import org.apache.drill.exec.planner.fragment.DefaultQueryParallelizer;
import org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -81,7 +82,8 @@ public class TestLocalExchange extends PlanTestBase {
.withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build())
.build();
- private static final SimpleParallelizer PARALLELIZER = new SimpleParallelizer(
+ private static final SimpleParallelizer PARALLELIZER = new DefaultQueryParallelizer(
+ true,
1 /*parallelizationThreshold (slice_count)*/,
6 /*maxWidthPerNode*/,
1000 /*maxGlobalWidth*/,
@@ -394,9 +396,8 @@ public class TestLocalExchange extends PlanTestBase {
findFragmentsWithPartitionSender(rootFragment, planningSet, deMuxFragments, htrFragments);
- final QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo("dummySchemaName",
- "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e");
- QueryWorkUnit qwu = PARALLELIZER.getFragments(new OptionList(), drillbitContext.getEndpoint(),
+ final QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo("dummySchemaName", "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e");
+ QueryWorkUnit qwu = PARALLELIZER.generateWorkUnit(new OptionList(), drillbitContext.getEndpoint(),
QueryId.getDefaultInstance(),
drillbitContext.getBits(), rootFragment, USER_SESSION, queryContextInfo);
qwu.applyPlan(planReader);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
index 0986c75d6..7ef94f7d7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
@@ -46,6 +46,8 @@ import org.apache.drill.exec.physical.impl.TopN.TopNBatch;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.Metric;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator.GeneralExecuteIface;
import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.cost.PrelCostEstimates;
+import org.apache.drill.exec.planner.fragment.DefaultQueryParallelizer;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.PlanningSet;
import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
@@ -86,7 +88,8 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@Category(OperatorTest.class)
public class TestPartitionSender extends PlanTestBase {
- private static final SimpleParallelizer PARALLELIZER = new SimpleParallelizer(
+ private static final SimpleParallelizer PARALLELIZER = new DefaultQueryParallelizer(
+ true,
1 /*parallelizationThreshold (slice_count)*/,
6 /*maxWidthPerNode*/,
1000 /*maxGlobalWidth*/,
@@ -178,14 +181,14 @@ public class TestPartitionSender extends PlanTestBase {
options.clear();
options.add(OptionValue.create(AccessibleScopes.SESSION, "planner.slice_target", 1, OptionScope.SESSION));
options.add(OptionValue.create(OptionValue.AccessibleScopes.SESSION, "planner.partitioner_sender_max_threads", 10, OptionScope.SESSION));
- hashToRandomExchange.setCost(1000);
+ hashToRandomExchange.setCost(new PrelCostEstimates(1000, 1000));
testThreadsHelper(hashToRandomExchange, drillbitContext, options,
incoming, registry, planReader, planningSet, rootFragment, 10);
options.clear();
options.add(OptionValue.create(AccessibleScopes.SESSION, "planner.slice_target", 1000, OptionScope.SESSION));
options.add(OptionValue.create(AccessibleScopes.SESSION, "planner.partitioner_sender_threads_factor",2, OptionScope.SESSION));
- hashToRandomExchange.setCost(14000);
+ hashToRandomExchange.setCost(new PrelCostEstimates(14000, 14000));
testThreadsHelper(hashToRandomExchange, drillbitContext, options,
incoming, registry, planReader, planningSet, rootFragment, 2);
}
@@ -207,9 +210,8 @@ public class TestPartitionSender extends PlanTestBase {
RecordBatch incoming, FunctionImplementationRegistry registry, PhysicalPlanReader planReader, PlanningSet planningSet, Fragment rootFragment,
int expectedThreadsCount) throws Exception {
- final QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo("dummySchemaName",
- "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e");
- final QueryWorkUnit qwu = PARALLELIZER.getFragments(options, drillbitContext.getEndpoint(),
+ final QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo("dummySchemaName", "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e");
+ final QueryWorkUnit qwu = PARALLELIZER.generateWorkUnit(options, drillbitContext.getEndpoint(),
QueryId.getDefaultInstance(),
drillbitContext.getBits(), rootFragment, USER_SESSION, queryContextInfo);
qwu.applyPlan(planReader);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java
new file mode 100644
index 000000000..4893a36fd
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.rm;
+
+
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.cost.NodeResource;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.PlanningSet;
+import org.apache.drill.exec.planner.fragment.QueueQueryParallelizer;
+import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
+import org.apache.drill.exec.planner.fragment.Wrapper;
+import org.apache.drill.exec.pop.PopUnitTestBase;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestMemoryCalculator extends PlanTestBase {
+
+ private static final long DEFAULT_SLICE_TARGET = 100000L;
+ private static final long DEFAULT_BATCH_SIZE = 16*1024*1024;
+
+ private static final UserSession session = UserSession.Builder.newBuilder()
+ .withCredentials(UserBitShared.UserCredentials.newBuilder()
+ .setUserName("foo")
+ .build())
+ .withUserProperties(UserProtos.UserProperties.getDefaultInstance())
+ .withOptionManager(bits[0].getContext().getOptionManager())
+ .build();
+
+ private static final DrillbitEndpoint N1_EP1 = newDrillbitEndpoint("node1", 30010);
+ private static final DrillbitEndpoint N1_EP2 = newDrillbitEndpoint("node2", 30011);
+ private static final DrillbitEndpoint N1_EP3 = newDrillbitEndpoint("node3", 30012);
+ private static final DrillbitEndpoint N1_EP4 = newDrillbitEndpoint("node4", 30013);
+
+ private static final DrillbitEndpoint[] nodeList = {N1_EP1, N1_EP2, N1_EP3, N1_EP4};
+
+ private static final DrillbitEndpoint newDrillbitEndpoint(String address, int port) {
+ return DrillbitEndpoint.newBuilder().setAddress(address).setControlPort(port).build();
+ }
+ private static final DrillbitContext drillbitContext = getDrillbitContext();
+ private static final QueryContext queryContext = new QueryContext(session, drillbitContext,
+ UserBitShared.QueryId.getDefaultInstance());
+
+ @AfterClass
+ public static void close() throws Exception {
+ queryContext.close();
+ }
+
+ private final Wrapper mockWrapper(Wrapper rootFragment,
+ Map<DrillbitEndpoint, NodeResource> resourceMap,
+ List<DrillbitEndpoint> endpoints,
+ Map<Fragment, Wrapper> originalToMockWrapper ) {
+ final Wrapper mockWrapper = mock(Wrapper.class);
+ originalToMockWrapper.put(rootFragment.getNode(), mockWrapper);
+ List<Wrapper> mockdependencies = new ArrayList<>();
+
+ for (Wrapper dependency : rootFragment.getFragmentDependencies()) {
+ mockdependencies.add(mockWrapper(dependency, resourceMap, endpoints, originalToMockWrapper));
+ }
+
+ when(mockWrapper.getNode()).thenReturn(rootFragment.getNode());
+ when(mockWrapper.getAssignedEndpoints()).thenReturn(endpoints);
+ when(mockWrapper.getResourceMap()).thenReturn(resourceMap);
+ when(mockWrapper.getWidth()).thenReturn(endpoints.size());
+ when(mockWrapper.getFragmentDependencies()).thenReturn(mockdependencies);
+ when(mockWrapper.isEndpointsAssignmentDone()).thenReturn(true);
+ return mockWrapper;
+ }
+
+ private final PlanningSet mockPlanningSet(PlanningSet planningSet,
+ Map<DrillbitEndpoint, NodeResource> resourceMap,
+ List<DrillbitEndpoint> endpoints) {
+ Map<Fragment, Wrapper> wrapperToMockWrapper = new HashMap<>();
+ Wrapper rootFragment = mockWrapper( planningSet.getRootWrapper(), resourceMap,
+ endpoints, wrapperToMockWrapper);
+ PlanningSet mockPlanningSet = mock(PlanningSet.class);
+ when(mockPlanningSet.getRootWrapper()).thenReturn(rootFragment);
+ when(mockPlanningSet.get(any(Fragment.class))).thenAnswer(invocation -> {
+ return wrapperToMockWrapper.get(invocation.getArgument(0));
+ });
+ return mockPlanningSet;
+ }
+
+ private String getPlanForQuery(String query) throws Exception {
+ return getPlanForQuery(query, DEFAULT_BATCH_SIZE);
+ }
+
+ private String getPlanForQuery(String query, long outputBatchSize) throws Exception {
+ return getPlanForQuery(query, outputBatchSize, DEFAULT_SLICE_TARGET);
+ }
+
+ private String getPlanForQuery(String query, long outputBatchSize,
+ long slice_target) throws Exception {
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .setOptionDefault(ExecConstants.OUTPUT_BATCH_SIZE, outputBatchSize)
+ .setOptionDefault(ExecConstants.SLICE_TARGET, slice_target);
+ String plan;
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ plan = client.queryBuilder()
+ .sql(query)
+ .explainJson();
+ }
+ return plan;
+ }
+
+ private List<DrillbitEndpoint> getEndpoints(int totalMinorFragments,
+ Set<DrillbitEndpoint> notIn) {
+ List<DrillbitEndpoint> endpoints = new ArrayList<>();
+ Iterator drillbits = Iterables.cycle(nodeList).iterator();
+
+ while(totalMinorFragments-- > 0) {
+ DrillbitEndpoint dbit = (DrillbitEndpoint) drillbits.next();
+ if (!notIn.contains(dbit)) {
+ endpoints.add(dbit);
+ }
+ }
+ return endpoints;
+ }
+
+ private Set<Wrapper> createSet(Wrapper... wrappers) {
+ Set<Wrapper> setOfWrappers = new HashSet<>();
+ for (Wrapper wrapper : wrappers) {
+ setOfWrappers.add(wrapper);
+ }
+ return setOfWrappers;
+ }
+
+ private Fragment getRootFragmentFromPlan(DrillbitContext context,
+ String plan) throws Exception {
+ final PhysicalPlanReader planReader = context.getPlanReader();
+ return PopUnitTestBase.getRootFragmentFromPlanString(planReader, plan);
+ }
+
+ private PlanningSet preparePlanningSet(List<DrillbitEndpoint> activeEndpoints, long slice_target,
+ Map<DrillbitEndpoint, NodeResource> resources, String sql,
+ SimpleParallelizer parallelizer) throws Exception {
+ Fragment rootFragment = getRootFragmentFromPlan(drillbitContext, getPlanForQuery(sql, 10, slice_target));
+ return mockPlanningSet(parallelizer.prepareFragmentTree(rootFragment), resources, activeEndpoints);
+ }
+
+ @Test
+ public void TestSingleMajorFragmentWithProjectAndScan() throws Exception {
+ List<DrillbitEndpoint> activeEndpoints = getEndpoints(2, new HashSet<>());
+ Map<DrillbitEndpoint, NodeResource> resources = activeEndpoints.stream()
+ .collect(Collectors.toMap(x -> x,
+ x -> NodeResource.create()));
+ String sql = "SELECT * from cp.`tpch/nation.parquet`";
+
+ SimpleParallelizer parallelizer = new QueueQueryParallelizer(false, queryContext);
+ PlanningSet planningSet = preparePlanningSet(activeEndpoints, DEFAULT_SLICE_TARGET, resources, sql, parallelizer);
+ parallelizer.adjustMemory(planningSet, createSet(planningSet.getRootWrapper()), activeEndpoints);
+ assertTrue("memory requirement is different", Iterables.all(resources.entrySet(), (e) -> e.getValue().getMemory() == 30));
+ }
+
+
+ @Test
+ public void TestSingleMajorFragmentWithGroupByProjectAndScan() throws Exception {
+ List<DrillbitEndpoint> activeEndpoints = getEndpoints(2, new HashSet<>());
+ Map<DrillbitEndpoint, NodeResource> resources = activeEndpoints.stream()
+ .collect(Collectors.toMap(x -> x,
+ x -> NodeResource.create()));
+ String sql = "SELECT dept_id, count(*) from cp.`tpch/lineitem.parquet` group by dept_id";
+
+ SimpleParallelizer parallelizer = new QueueQueryParallelizer(false, queryContext);
+ PlanningSet planningSet = preparePlanningSet(activeEndpoints, DEFAULT_SLICE_TARGET, resources, sql, parallelizer);
+ parallelizer.adjustMemory(planningSet, createSet(planningSet.getRootWrapper()), activeEndpoints);
+ assertTrue("memory requirement is different", Iterables.all(resources.entrySet(), (e) -> e.getValue().getMemory() == 529570));
+ }
+
+
+ @Test
+ public void TestTwoMajorFragmentWithSortyProjectAndScan() throws Exception {
+ List<DrillbitEndpoint> activeEndpoints = getEndpoints(2, new HashSet<>());
+ Map<DrillbitEndpoint, NodeResource> resources = activeEndpoints.stream()
+ .collect(Collectors.toMap(x -> x,
+ x -> NodeResource.create()));
+ String sql = "SELECT * from cp.`tpch/lineitem.parquet` order by dept_id";
+
+ SimpleParallelizer parallelizer = new QueueQueryParallelizer(false, queryContext);
+ PlanningSet planningSet = preparePlanningSet(activeEndpoints, 2, resources, sql, parallelizer);
+ parallelizer.adjustMemory(planningSet, createSet(planningSet.getRootWrapper()), activeEndpoints);
+ assertTrue("memory requirement is different", Iterables.all(resources.entrySet(), (e) -> e.getValue().getMemory() == 481490));
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
index 7b53bb65c..c01143b26 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.drill.categories.PlannerTest;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory;
+import org.apache.drill.exec.planner.fragment.DefaultQueryParallelizer;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
import org.apache.drill.exec.proto.BitControl.QueryContextInformation;
@@ -52,7 +53,7 @@ public class TestFragmentChecker extends PopUnitTestBase{
private void print(String fragmentFile, int bitCount, int expectedFragmentCount) throws Exception {
PhysicalPlanReader ppr = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(CONFIG);
Fragment fragmentRoot = getRootFragment(ppr, fragmentFile);
- SimpleParallelizer par = new SimpleParallelizer(1000*1000, 5, 10, 1.2);
+ SimpleParallelizer par = new DefaultQueryParallelizer(true, 1000*1000, 5, 10, 1.2);
List<DrillbitEndpoint> endpoints = Lists.newArrayList();
DrillbitEndpoint localBit = null;
for(int i =0; i < bitCount; i++) {
@@ -63,9 +64,8 @@ public class TestFragmentChecker extends PopUnitTestBase{
endpoints.add(b1);
}
- final QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo("dummySchemaName",
- "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e");
- QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, fragmentRoot,
+ final QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo("dummySchemaName", "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e");
+ QueryWorkUnit qwu = par.generateWorkUnit(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, fragmentRoot,
UserSession.Builder.newBuilder().withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build()).build(),
queryContextInfo);
qwu.applyPlan(ppr);
diff --git a/exec/java-exec/src/test/resources/join/hashJoinExpr.json b/exec/java-exec/src/test/resources/join/hashJoinExpr.json
index 386d90e83..13a1c0fa6 100644
--- a/exec/java-exec/src/test/resources/join/hashJoinExpr.json
+++ b/exec/java-exec/src/test/resources/join/hashJoinExpr.json
@@ -28,7 +28,10 @@
},
"columns" : [ "`r_regionkey`" ],
"selectionRoot" : "/tpch/region.parquet",
- "cost" : 5.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 5.0
+ }
}, {
"pop" : "project",
"@id" : 3,
@@ -39,7 +42,10 @@
"child" : 5,
"initialAllocation" : 1000000,
"maxAllocation" : 10000000000,
- "cost" : 5.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 5.0
+ }
}, {
"pop" : "parquet-scan",
"@id" : 6,
@@ -58,7 +64,10 @@
},
"columns" : [ "`n_nationkey`", "`n_regionkey`" ],
"selectionRoot" : "/tpch/nation.parquet",
- "cost" : 25.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 25.0
+ }
}, {
"pop" : "project",
"@id" : 4,
@@ -72,7 +81,10 @@
"child" : 6,
"initialAllocation" : 1000000,
"maxAllocation" : 10000000000,
- "cost" : 25.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 25.0
+ }
}, {
"pop" : "hash-join",
"@id" : 2,
@@ -86,7 +98,10 @@
"joinType" : "INNER",
"initialAllocation" : 1000000,
"maxAllocation" : 10000000000,
- "cost" : 25.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 25.0
+ }
}, {
"pop" : "project",
"@id" : 1,
@@ -97,13 +112,19 @@
"child" : 2,
"initialAllocation" : 1000000,
"maxAllocation" : 10000000000,
- "cost" : 25.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 25.0
+ }
}, {
"pop" : "screen",
"@id" : 0,
"child" : 1,
"initialAllocation" : 1000000,
"maxAllocation" : 10000000000,
- "cost" : 25.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 25.0
+ }
} ]
}
diff --git a/exec/java-exec/src/test/resources/join/mergeJoinExpr.json b/exec/java-exec/src/test/resources/join/mergeJoinExpr.json
index 1c5111bbb..c0027ea96 100644
--- a/exec/java-exec/src/test/resources/join/mergeJoinExpr.json
+++ b/exec/java-exec/src/test/resources/join/mergeJoinExpr.json
@@ -33,7 +33,10 @@
},
"columns" : [ "`n_nationkey`", "`n_regionkey`" ],
"selectionRoot" : "/tpch/nation.parquet",
- "cost" : 25.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 25.0
+ }
}, {
"pop" : "project",
"@id" : 8,
@@ -47,7 +50,10 @@
"child" : 9,
"initialAllocation" : 1000000,
"maxAllocation" : 10000000000,
- "cost" : 25.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 25.0
+ }
}, {
"pop" : "external-sort",
"@id" : 6,
@@ -60,14 +66,20 @@
"reverse" : false,
"initialAllocation" : 1000000,
"maxAllocation" : 10000000000,
- "cost" : 25.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 25.0
+ }
}, {
"pop" : "selection-vector-remover",
"@id" : 4,
"child" : 6,
"initialAllocation" : 1000000,
"maxAllocation" : 10000000000,
- "cost" : 25.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 25.0
+ }
}, {
"pop" : "parquet-scan",
"@id" : 7,
@@ -86,7 +98,10 @@
},
"columns" : [ "`r_regionkey`" ],
"selectionRoot" : "/tpch/region.parquet",
- "cost" : 5.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 25.0
+ }
}, {
"pop" : "external-sort",
"@id" : 5,
@@ -99,14 +114,20 @@
"reverse" : false,
"initialAllocation" : 1000000,
"maxAllocation" : 10000000000,
- "cost" : 5.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 25.0
+ }
}, {
"pop" : "selection-vector-remover",
"@id" : 3,
"child" : 5,
"initialAllocation" : 1000000,
"maxAllocation" : 10000000000,
- "cost" : 5.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 25.0
+ }
}, {
"pop" : "merge-join",
"@id" : 2,
@@ -120,7 +141,10 @@
"joinType" : "INNER",
"initialAllocation" : 1000000,
"maxAllocation" : 10000000000,
- "cost" : 25.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 25.0
+ }
}, {
"pop" : "project",
"@id" : 1,
@@ -131,13 +155,19 @@
"child" : 2,
"initialAllocation" : 1000000,
"maxAllocation" : 10000000000,
- "cost" : 25.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 25.0
+ }
}, {
"pop" : "screen",
"@id" : 0,
"child" : 1,
"initialAllocation" : 1000000,
"maxAllocation" : 10000000000,
- "cost" : 25.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 25.0
+ }
} ]
}
diff --git a/exec/java-exec/src/test/resources/join/merge_join_nullkey.json b/exec/java-exec/src/test/resources/join/merge_join_nullkey.json
index b283dda2e..fb6bfc272 100644
--- a/exec/java-exec/src/test/resources/join/merge_join_nullkey.json
+++ b/exec/java-exec/src/test/resources/join/merge_join_nullkey.json
@@ -32,7 +32,10 @@
"type" : "json"
},
"selectionRoot" : "/region.json",
- "cost" : 18.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 18.0
+ }
}, {
"pop" : "project",
"@id" : 9,
@@ -43,7 +46,10 @@
"child" : 11,
"initialAllocation" : 1000000,
"maxAllocation" : 10000000000,
- "cost" : 18.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 18.0
+ }
}, {
"pop" : "external-sort",
"@id" : 7,
@@ -56,14 +62,20 @@
"reverse" : false,
"initialAllocation" : 1000000,
"maxAllocation" : 10000000000,
- "cost" : 18.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 18.0
+ }
}, {
"pop" : "selection-vector-remover",
"@id" : 5,
"child" : 7,
"initialAllocation" : 1000000,
"maxAllocation" : 10000000000,
- "cost" : 18.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 18.0
+ }
}, {
"pop" : "fs-scan",
"@id" : 10,
@@ -86,7 +98,10 @@
"type" : "json"
},
"selectionRoot" : "/region.json",
- "cost" : 18.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 18.0
+ }
}, {
"pop" : "project",
"@id" : 8,
@@ -97,7 +112,10 @@
"child" : 10,
"initialAllocation" : 1000000,
"maxAllocation" : 10000000000,
- "cost" : 18.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 18.0
+ }
}, {
"pop" : "external-sort",
"@id" : 6,
@@ -110,14 +128,20 @@
"reverse" : false,
"initialAllocation" : 1000000,
"maxAllocation" : 10000000000,
- "cost" : 18.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 18.0
+ }
}, {
"pop" : "selection-vector-remover",
"@id" : 4,
"child" : 6,
"initialAllocation" : 1000000,
"maxAllocation" : 10000000000,
- "cost" : 18.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 18.0
+ }
}, {
"pop" : "merge-join",
"@id" : 3,
@@ -131,7 +155,10 @@
"joinType" : "${JOIN_TYPE}",
"initialAllocation" : 1000000,
"maxAllocation" : 10000000000,
- "cost" : 18.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 18.0
+ }
}, {
"pop" : "project",
"@id" : 2,
@@ -145,7 +172,10 @@
"child" : 3,
"initialAllocation" : 1000000,
"maxAllocation" : 10000000000,
- "cost" : 18.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 18.0
+ }
}, {
"pop" : "project",
"@id" : 1,
@@ -159,13 +189,19 @@
"child" : 2,
"initialAllocation" : 1000000,
"maxAllocation" : 10000000000,
- "cost" : 18.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 18.0
+ }
}, {
"pop" : "screen",
"@id" : 0,
"child" : 1,
"initialAllocation" : 1000000,
"maxAllocation" : 10000000000,
- "cost" : 18.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 18.0
+ }
} ]
}
diff --git a/exec/java-exec/src/test/resources/store/json/project_pushdown_json_physical_plan.json b/exec/java-exec/src/test/resources/store/json/project_pushdown_json_physical_plan.json
index 60cd2b0d5..b6652071c 100644
--- a/exec/java-exec/src/test/resources/store/json/project_pushdown_json_physical_plan.json
+++ b/exec/java-exec/src/test/resources/store/json/project_pushdown_json_physical_plan.json
@@ -38,13 +38,19 @@
},
"columns" : [ "`field_1`", "`field_3`.`inner_1`", "`field_3`.`inner_2`", "`field_4`.`inner_1`", "`non_existent_at_root`", "`non_existent`.`nested`.`field`"],
"selectionRoot" : "/store/json/schema_change_int_to_string.json",
- "cost" : 0.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 0
+ }
}, {
"pop" : "screen",
"@id" : 0,
"child" : 1,
"initialAllocation" : 1000000,
"maxAllocation" : 10000000000,
- "cost" : 1.0
+ "cost" : {
+ "memoryCost" : 0,
+ "outputRowCount" : 1.0
+ }
} ]
-} \ No newline at end of file
+}