aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec
diff options
context:
space:
mode:
authorSteven Phillips <sphillips@maprtech.com>2015-04-02 23:37:16 -0700
committerSteven Phillips <smp@apache.org>2015-04-28 11:52:21 -0700
commit5f1d6d7bbe58d4e40abf911483fa1f24ecd71050 (patch)
tree367b53585f124b703daf952ace015958c38c16fa /exec/java-exec
parent927d1998f22bff1e142bb2eb9110d197f2da53fc (diff)
DRILL-2725: Faster work assignment logic
Diffstat (limited to 'exec/java-exec')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java248
1 files changed, 170 insertions, 78 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
index 7e9c4c993..b5eee5d77 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
@@ -17,15 +17,20 @@
*/
package org.apache.drill.exec.store.schedule;
-import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
+import com.carrotsearch.hppc.cursors.ObjectLongCursor;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Maps;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
-import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
@@ -37,104 +42,191 @@ import com.google.common.collect.Lists;
public class AssignmentCreator<T extends CompleteWork> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AssignmentCreator.class);
- static final double[] ASSIGNMENT_CUTOFFS = { 0.99, 0.50, 0.25, 0.00 };
- private final ArrayListMultimap<Integer, T> mappings;
- private final List<DrillbitEndpoint> endpoints;
+ /**
+ * Comparator used to sort in order of decreasing affinity
+ */
+ private static Comparator<Entry<DrillbitEndpoint,Long>> comparator = new Comparator<Entry<DrillbitEndpoint,Long>>() {
+ @Override
+ public int compare(Entry<DrillbitEndpoint, Long> o1, Entry<DrillbitEndpoint,Long> o2) {
+ return (int) (o1.getValue() - o2.getValue());
+ }
+ };
+
+ /**
+ * the maximum number of work units to assign to any minor fragment
+ */
+ private int maxWork;
+
+ /**
+ * The units of work to be assigned
+ */
+ private List<T> units;
+
+ /**
+ * A list of DrillbitEndpoints, where the index in the list corresponds to the minor fragment id
+ */
+ private List<DrillbitEndpoint> incomingEndpoints;
+ private AssignmentCreator(List<DrillbitEndpoint> incomingEndpoints, List<T> units) {
+ this.incomingEndpoints = incomingEndpoints;
+ this.units = units;
+ }
/**
- * Given a set of endpoints to assign work to, attempt to evenly assign work based on affinity of work units to
- * Drillbits.
+ * Assign each unit of work to a minor fragment, given that a list of DrillbitEndpoints, whose index in the list correspond
+ * to the minor fragment id for each fragment. A given DrillbitEndpoint can appear multiple times in this list. This method
+ * will try to assign work based on the affinity of each work unit, but will also evenly distribute the work units among
+ * all of the minor fragments
*
- * @param incomingEndpoints
- * The set of nodes to assign work to. Note that nodes can be listed multiple times if we want to have
- * multiple slices on a node working on the task simultaneously.
- * @param units
- * The work units to assign.
- * @return ListMultimap of Integer > List<CompleteWork> (based on their incoming order) to with
+ * @param incomingEndpoints The list of incomingEndpoints, indexed by minor fragment id
+ * @param units the list of work units to be assigned
+ * @return A multimap that maps each minor fragment id to a list of work units
*/
- public static <T extends CompleteWork> ListMultimap<Integer, T> getMappings(List<DrillbitEndpoint> incomingEndpoints,
- List<T> units) {
- AssignmentCreator<T> creator = new AssignmentCreator<T>(incomingEndpoints, units);
- return creator.mappings;
+ public static <T extends CompleteWork> ListMultimap<Integer,T> getMappings(List<DrillbitEndpoint> incomingEndpoints, List<T> units) {
+ AssignmentCreator<T> creator = new AssignmentCreator(incomingEndpoints, units);
+ return creator.getMappings();
}
- private AssignmentCreator(List<DrillbitEndpoint> incomingEndpoints, List<T> units) {
- logger.debug("Assigning {} units to {} endpoints", units.size(), incomingEndpoints.size());
+ /**
+ * Does the work of creating the mappings for this AssignmentCreator
+ * @return the minor fragment id to work units mapping
+ */
+ private ListMultimap<Integer, T> getMappings() {
Stopwatch watch = new Stopwatch();
+ watch.start();
+ maxWork = (int) Math.ceil(units.size() / ((float) incomingEndpoints.size()));
+ LinkedList<WorkEndpointListPair<T>> workList = getWorkList();
+ LinkedList<WorkEndpointListPair<T>> unassignedWorkList = Lists.newLinkedList();
+ Map<DrillbitEndpoint,FragIteratorWrapper> endpointIterators = getEndpointIterators();
+ ArrayListMultimap<Integer, T> mappings = ArrayListMultimap.create();
+
+ outer: for (WorkEndpointListPair<T> workPair : workList) {
+ List<DrillbitEndpoint> endpoints = workPair.sortedEndpoints;
+ for (DrillbitEndpoint endpoint : endpoints) {
+ FragIteratorWrapper iteratorWrapper = endpointIterators.get(endpoint);
+ if (iteratorWrapper == null) {
+ continue;
+ }
+ if (iteratorWrapper.count < iteratorWrapper.maxCount) {
+ Integer assignment = iteratorWrapper.iter.next();
+ iteratorWrapper.count++;
+ mappings.put(assignment, workPair.work);
+ continue outer;
+ }
+ }
+ unassignedWorkList.add(workPair);
+ }
- Preconditions.checkArgument(incomingEndpoints.size() <= units.size(), String.format("Incoming endpoints %d "
- + "is greater than number of row groups %d", incomingEndpoints.size(), units.size()));
- this.mappings = ArrayListMultimap.create();
- this.endpoints = Lists.newLinkedList(incomingEndpoints);
-
- ArrayList<T> rowGroupList = new ArrayList<>(units);
- for (double cutoff : ASSIGNMENT_CUTOFFS) {
- scanAndAssign(rowGroupList, cutoff, false, false);
+ outer: for (FragIteratorWrapper iteratorWrapper : endpointIterators.values()) {
+ while (iteratorWrapper.count < iteratorWrapper.maxCount) {
+ WorkEndpointListPair<T> workPair = unassignedWorkList.poll();
+ if (workPair == null) {
+ break outer;
+ }
+ Integer assignment = iteratorWrapper.iter.next();
+ iteratorWrapper.count++;
+ mappings.put(assignment, workPair.work);
+ }
}
- scanAndAssign(rowGroupList, 0.0, true, false);
- scanAndAssign(rowGroupList, 0.0, true, true);
- logger.debug("Took {} ms to apply assignments", watch.elapsed(TimeUnit.MILLISECONDS));
- Preconditions.checkState(rowGroupList.isEmpty(), "All readEntries should be assigned by now, but some are still unassigned");
- Preconditions.checkState(!units.isEmpty());
+ logger.debug("Took {} ms to assign {} work units to {} fragments", watch.elapsed(TimeUnit.MILLISECONDS), units.size(), incomingEndpoints.size());
+ return mappings;
+ }
+ /**
+ * Builds the list of WorkEndpointListPairs, which pair a work unit with a list of endpoints sorted by affinity
+ * @return the list of WorkEndpointListPairs
+ */
+ private LinkedList<WorkEndpointListPair<T>> getWorkList() {
+ Stopwatch watch = new Stopwatch();
+ watch.start();
+ LinkedList<WorkEndpointListPair<T>> workList = Lists.newLinkedList();
+ for (T work : units) {
+ List<Map.Entry<DrillbitEndpoint,Long>> entries = Lists.newArrayList();
+ for (ObjectLongCursor<DrillbitEndpoint> cursor : work.getByteMap()) {
+ final DrillbitEndpoint ep = cursor.key;
+ final Long val = cursor.value;
+ Map.Entry<DrillbitEndpoint,Long> entry = new Entry() {
+
+ @Override
+ public Object getKey() {
+ return ep;
+ }
+
+ @Override
+ public Object getValue() {
+ return val;
+ }
+
+ @Override
+ public Object setValue(Object value) {
+ throw new UnsupportedOperationException();
+ }
+ };
+ entries.add(entry);
+ }
+ Collections.sort(entries, comparator);
+ List<DrillbitEndpoint> sortedEndpoints = Lists.newArrayList();
+ for (Entry<DrillbitEndpoint,Long> entry : entries) {
+ sortedEndpoints.add(entry.getKey());
+ }
+ workList.add(new WorkEndpointListPair<T>(work, sortedEndpoints));
+ }
+ return workList;
}
/**
- *
- * @param mappings
- * the mapping between fragment/endpoint and rowGroup
- * @param endpoints
- * the list of drillbits, ordered by the corresponding fragment
- * @param workunits
- * the list of rowGroups to assign
- * @param requiredPercentage
- * the percentage of max bytes required to make an assignment
- * @param assignAll
- * if true, will assign even if no affinity
+ * A wrapper class around a work unit and its associated sort list of Endpoints (sorted by affinity in decreasing order)
*/
- private void scanAndAssign(List<T> workunits, double requiredPercentage, boolean assignAllToEmpty, boolean assignAll) {
- Collections.sort(workunits);
- int fragmentPointer = 0;
- final boolean requireAffinity = requiredPercentage > 0;
- int maxAssignments = (int) (workunits.size() / endpoints.size());
-
- if (maxAssignments < 1) {
- maxAssignments = 1;
+ private static class WorkEndpointListPair<T> {
+ T work;
+ List<DrillbitEndpoint> sortedEndpoints;
+
+ WorkEndpointListPair(T work, List<DrillbitEndpoint> sortedEndpoints) {
+ this.work = work;
+ this.sortedEndpoints = sortedEndpoints;
}
+ }
- for (Iterator<T> iter = workunits.iterator(); iter.hasNext();) {
- T unit = iter.next();
- for (int i = 0; i < endpoints.size(); i++) {
- int minorFragmentId = (fragmentPointer + i) % endpoints.size();
- DrillbitEndpoint currentEndpoint = endpoints.get(minorFragmentId);
- EndpointByteMap endpointByteMap = unit.getByteMap();
- boolean haveAffinity = endpointByteMap.isSet(currentEndpoint);
-
- if (assignAll
- || (assignAllToEmpty && !mappings.containsKey(minorFragmentId))
- || (!endpointByteMap.isEmpty() && (!requireAffinity || haveAffinity)
- && (!mappings.containsKey(minorFragmentId) || mappings.get(minorFragmentId).size() < maxAssignments) && (!requireAffinity || endpointByteMap
- .get(currentEndpoint) >= endpointByteMap.getMaxBytes() * requiredPercentage))) {
-
- mappings.put(minorFragmentId, unit);
- logger.debug("Assigned unit: {} to minorFragmentId: {}", unit, minorFragmentId);
- // logger.debug("Assigned rowGroup {} to minorFragmentId {} endpoint {}", rowGroupInfo.getRowGroupIndex(),
- // minorFragmentId, endpoints.get(minorFragmentId).getAddress());
- // if (bytesPerEndpoint.get(currentEndpoint) != null) {
- // // assignmentAffinityStats.update(bytesPerEndpoint.get(currentEndpoint) / rowGroupInfo.getLength());
- // } else {
- // // assignmentAffinityStats.update(0);
- // }
- iter.remove();
- fragmentPointer = (minorFragmentId + 1) % endpoints.size();
- break;
- }
+ /**
+ * Groups minor fragments together by corresponding endpoint, and creates an iterator that can be used to evenly
+ * distribute work assigned to a given endpoint to all corresponding minor fragments evenly
+ *
+ * @return
+ */
+ private Map<DrillbitEndpoint,FragIteratorWrapper> getEndpointIterators() {
+ Stopwatch watch = new Stopwatch();
+ watch.start();
+ Map<DrillbitEndpoint,FragIteratorWrapper> map = Maps.newLinkedHashMap();
+ Map<DrillbitEndpoint,List<Integer>> mmap = Maps.newLinkedHashMap();
+ for (int i = 0; i < incomingEndpoints.size(); i++) {
+ DrillbitEndpoint endpoint = incomingEndpoints.get(i);
+ List<Integer> intList = mmap.get(incomingEndpoints.get(i));
+ if (intList == null) {
+ intList = Lists.newArrayList();
}
+ intList.add(Integer.valueOf(i));
+ mmap.put(endpoint, intList);
+ }
+ for (DrillbitEndpoint endpoint : mmap.keySet()) {
+ FragIteratorWrapper wrapper = new FragIteratorWrapper();
+ wrapper.iter = Iterators.cycle(mmap.get(endpoint));
+ wrapper.maxCount = maxWork * mmap.get(endpoint).size();
+ map.put(endpoint, wrapper);
}
+ return map;
+ }
+
+ /**
+ * A struct that holds an fragment iterator and keeps track of how many units have been assigned, as well as the maximum
+ * number of assignment it will accept
+ */
+ private static class FragIteratorWrapper {
+ int count = 0;
+ int maxCount;
+ Iterator<Integer> iter;
}
}