diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical')
6 files changed, 42 insertions, 7 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java index 6eba65833..13c3c2ac8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/PhysicalPlan.java @@ -93,7 +93,7 @@ public class PhysicalPlan { public double totalCost() { double totalCost = 0; for (final PhysicalOperator ops : getSortedOperators()) { - totalCost += ops.getCost(); + totalCost += ops.getCost().getOutputRowCount(); } return totalCost; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java index 71289b699..aad4e8184 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractBase.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.physical.base; import com.fasterxml.jackson.annotation.JsonIgnore; import org.apache.drill.common.graph.GraphVisitor; import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.planner.cost.PrelCostEstimates; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; @@ -35,7 +36,7 @@ public abstract class AbstractBase implements PhysicalOperator { private final String userName; private int id; - private double cost; + private PrelCostEstimates cost = PrelCostEstimates.ZERO_COST; public AbstractBase() { userName = null; @@ -89,12 +90,12 @@ public abstract class AbstractBase implements PhysicalOperator { } @Override - public double getCost() { + public PrelCostEstimates getCost() { return cost; } @Override - public void setCost(double cost) { + public void setCost(PrelCostEstimates cost) { this.cost = cost; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java index ebd7288a4..4106205f3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractExchange.java @@ -130,4 +130,20 @@ public abstract class AbstractExchange extends AbstractSingle implements Exchang public ParallelizationDependency getParallelizationDependency() { return ParallelizationDependency.RECEIVER_DEPENDS_ON_SENDER; } + + // Memory requirement of the sender for the given receivers and senders in a major fragment. + // Currently set to zero but later once batch sizing for Exchanges is completed it will call + // appropriate function. + @Override + public long getSenderMemory(int receivers, int senders) { + return 0; + } + + // Memory requirement of the receiver for the given receivers and senders in a major fragment. + // Currently set to zero but later once batch sizing for Exchanges is completed it will calling + // apropriate function. + @Override + public long getReceiverMemory(int receivers, int senders) { + return 0; + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java index 56e9b9c90..7de915c4c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Exchange.java @@ -79,6 +79,23 @@ public interface Exchange extends PhysicalOperator { Receiver getReceiver(int minorFragmentId); /** + * Returns the memory requirement for the sender side of the exchange operator. + * @param receiverCount number of receivers at the receiving end of this exchange operator. + * @param senderCount number of senders sending the rows for this exchange operator. + * @return Total memory required by this operator. + */ + long getSenderMemory(int receiverCount, int senderCount); + + /** + * Returns the memory requirement for the receiver side of the exchange operator. + * @param receiverCount number of receivers receiving the rows sent by the sender side of this + * exchange operator. + * @param senderCount number of senders sending the rows. + * @return Total memory required by this operator. + */ + long getReceiverMemory(int receiverCount, int senderCount); + + /** * Provide parallelization parameters for sender side of the exchange. Output includes min width, * max width and affinity to Drillbits. * diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java index 82fb53bf4..3bca02ea0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/PhysicalOperator.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.graph.GraphValue; import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.planner.cost.PrelCostEstimates; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import com.fasterxml.jackson.annotation.JsonIdentityInfo; @@ -101,10 +102,10 @@ public interface PhysicalOperator extends GraphValue<PhysicalOperator> { void setOperatorId(int id); @JsonProperty("cost") - void setCost(double cost); + void setCost(PrelCostEstimates cost); @JsonProperty("cost") - double getCost(); + PrelCostEstimates getCost(); /** * Name of the user whom to impersonate while setting up the implementation (RecordBatch) of this diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index c185ac7ac..5aed88ef8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -121,7 +121,7 @@ public class PartitionSenderRootExec extends BaseRootExec { stats.setLongStat(Metric.N_RECEIVERS, outGoingBatchCount); // Algorithm to figure out number of threads to parallelize output // numberOfRows/sliceTarget/numReceivers/threadfactor - this.cost = operator.getChild().getCost(); + this.cost = operator.getChild().getCost().getOutputRowCount(); final OptionManager optMgr = context.getOptions(); long sliceTarget = optMgr.getOption(ExecConstants.SLICE_TARGET).num_val; int threadFactor = optMgr.getOption(PlannerSettings.PARTITION_SENDER_THREADS_FACTOR.getOptionName()).num_val.intValue(); |