aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/StatsCollector.java
blob: 49af366ce2eff25498179c6009321e5da03b2713 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.planner.fragment;

import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.exec.physical.base.Exchange;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.HasAffinity;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.Store;
import org.apache.drill.exec.planner.AbstractOpWrapperVisitor;
import org.apache.drill.exec.planner.fragment.Fragment.ExchangeFragmentPair;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;

import java.util.Collections;
import java.util.List;

/**
 * Visitor to collect stats such as cost and parallelization info of operators within a fragment.
 *
 * All operators have cost associated with them, but only few type of operators such as scan,
 * store and exchanges (both sending and receiving) have parallelization info associated with them.
 */
public class StatsCollector extends AbstractOpWrapperVisitor<Void, RuntimeException> {
  private final PlanningSet planningSet;

  public StatsCollector(final PlanningSet planningSet) {
    this.planningSet = planningSet;
  }

  @Override
  public Void visitSendingExchange(Exchange exchange, Wrapper wrapper) throws RuntimeException {
    // Handle the sending side exchange
    Wrapper receivingFragment = planningSet.get(wrapper.getNode().getSendingExchangePair().getNode());

    // List to contain the endpoints where the fragment that receive data to this fragment are running.
    List<DrillbitEndpoint> receiverEndpoints;
    if (receivingFragment.isEndpointsAssignmentDone()) {
      receiverEndpoints = receivingFragment.getAssignedEndpoints();
    } else {
      receiverEndpoints = Collections.emptyList();
    }

    wrapper.getStats().addParallelizationInfo(exchange.getSenderParallelizationInfo(receiverEndpoints));
    return visitOp(exchange, wrapper);
  }

  @Override
  public Void visitReceivingExchange(Exchange exchange, Wrapper wrapper) throws RuntimeException {
    // Handle the receiving side Exchange

    final List<ExchangeFragmentPair> receivingExchangePairs = wrapper.getNode().getReceivingExchangePairs();

    // List to contain the endpoints where the fragment that send dat to this fragment are running.
    final List<DrillbitEndpoint> sendingEndpoints = Lists.newArrayList();

    for(ExchangeFragmentPair pair : receivingExchangePairs) {
      if (pair.getExchange() == exchange) {
        //This is the child fragment which is sending data to this fragment.
        Wrapper sendingFragment = planningSet.get(pair.getNode());
        if (sendingFragment.isEndpointsAssignmentDone()) {
          sendingEndpoints.addAll(sendingFragment.getAssignedEndpoints());
        }
      }
    }

    wrapper.getStats().addParallelizationInfo(exchange.getReceiverParallelizationInfo(sendingEndpoints));
    // no traversal since it would cross current fragment boundary.
    return null;
  }

  @Override
  public Void visitGroupScan(GroupScan groupScan, Wrapper wrapper) {
    final Stats stats = wrapper.getStats();
    stats.addMaxWidth(groupScan.getMaxParallelizationWidth());
    stats.addMinWidth(groupScan.getMinParallelizationWidth());
    return super.visitGroupScan(groupScan, wrapper);
  }

  @Override
  public Void visitStore(Store store, Wrapper wrapper) {
    wrapper.getStats().addMaxWidth(store.getMaxWidth());
    return super.visitStore(store, wrapper);
  }

  @Override
  public Void visitOp(PhysicalOperator op, Wrapper wrapper) {
    final Stats stats = wrapper.getStats();
    if (op instanceof HasAffinity) {
      final HasAffinity hasAffinity = (HasAffinity)op;
      stats.addEndpointAffinities(hasAffinity.getOperatorAffinity());
      stats.setDistributionAffinity(hasAffinity.getDistributionAffinity());
    }
    stats.addCost(op.getCost().getOutputRowCount());
    for (PhysicalOperator child : op) {
      child.accept(this, wrapper);
    }
    return null;
  }
}