aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/physical
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java17
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java2
6 files changed, 42 insertions, 7 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
index 6eba65833..13c3c2ac8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java
@@ -93,7 +93,7 @@ public class PhysicalPlan {
public double totalCost() {
double totalCost = 0;
for (final PhysicalOperator ops : getSortedOperators()) {
- totalCost += ops.getCost();
+ totalCost += ops.getCost().getOutputRowCount();
}
return totalCost;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
index 71289b699..aad4e8184 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.base;
import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.drill.common.graph.GraphVisitor;
import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.planner.cost.PrelCostEstimates;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
@@ -35,7 +36,7 @@ public abstract class AbstractBase implements PhysicalOperator {
private final String userName;
private int id;
- private double cost;
+ private PrelCostEstimates cost = PrelCostEstimates.ZERO_COST;
public AbstractBase() {
userName = null;
@@ -89,12 +90,12 @@ public abstract class AbstractBase implements PhysicalOperator {
}
@Override
- public double getCost() {
+ public PrelCostEstimates getCost() {
return cost;
}
@Override
- public void setCost(double cost) {
+ public void setCost(PrelCostEstimates cost) {
this.cost = cost;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
index ebd7288a4..4106205f3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java
@@ -130,4 +130,20 @@ public abstract class AbstractExchange extends AbstractSingle implements Exchang
public ParallelizationDependency getParallelizationDependency() {
return ParallelizationDependency.RECEIVER_DEPENDS_ON_SENDER;
}
+
+ // Memory requirement of the sender for the given receivers and senders in a major fragment.
+ // Currently set to zero but later once batch sizing for Exchanges is completed it will call
+ // appropriate function.
+ @Override
+ public long getSenderMemory(int receivers, int senders) {
+ return 0;
+ }
+
+ // Memory requirement of the receiver for the given receivers and senders in a major fragment.
+ // Currently set to zero but later once batch sizing for Exchanges is completed it will calling
+ // apropriate function.
+ @Override
+ public long getReceiverMemory(int receivers, int senders) {
+ return 0;
+ }
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
index 56e9b9c90..7de915c4c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java
@@ -79,6 +79,23 @@ public interface Exchange extends PhysicalOperator {
Receiver getReceiver(int minorFragmentId);
/**
+ * Returns the memory requirement for the sender side of the exchange operator.
+ * @param receiverCount number of receivers at the receiving end of this exchange operator.
+ * @param senderCount number of senders sending the rows for this exchange operator.
+ * @return Total memory required by this operator.
+ */
+ long getSenderMemory(int receiverCount, int senderCount);
+
+ /**
+ * Returns the memory requirement for the receiver side of the exchange operator.
+ * @param receiverCount number of receivers receiving the rows sent by the sender side of this
+ * exchange operator.
+ * @param senderCount number of senders sending the rows.
+ * @return Total memory required by this operator.
+ */
+ long getReceiverMemory(int receiverCount, int senderCount);
+
+ /**
* Provide parallelization parameters for sender side of the exchange. Output includes min width,
* max width and affinity to Drillbits.
*
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
index 82fb53bf4..3bca02ea0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.graph.GraphValue;
import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.planner.cost.PrelCostEstimates;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import com.fasterxml.jackson.annotation.JsonIdentityInfo;
@@ -101,10 +102,10 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> {
void setOperatorId(int id);
@JsonProperty("cost")
- void setCost(double cost);
+ void setCost(PrelCostEstimates cost);
@JsonProperty("cost")
- double getCost();
+ PrelCostEstimates getCost();
/**
* Name of the user whom to impersonate while setting up the implementation (RecordBatch) of this
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index c185ac7ac..5aed88ef8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -121,7 +121,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
stats.setLongStat(Metric.N_RECEIVERS, outGoingBatchCount);
// Algorithm to figure out number of threads to parallelize output
// numberOfRows/sliceTarget/numReceivers/threadfactor
- this.cost = operator.getChild().getCost();
+ this.cost = operator.getChild().getCost().getOutputRowCount();
final OptionManager optMgr = context.getOptions();
long sliceTarget = optMgr.getOption(ExecConstants.SLICE_TARGET).num_val;
int threadFactor = optMgr.getOption(PlannerSettings.PARTITION_SENDER_THREADS_FACTOR.getOptionName()).num_val.intValue();