diff options
author | Steven Phillips <sphillips@maprtech.com> | 2015-04-02 23:37:16 -0700 |
---|---|---|
committer | Steven Phillips <smp@apache.org> | 2015-04-28 11:52:21 -0700 |
commit | 5f1d6d7bbe58d4e40abf911483fa1f24ecd71050 (patch) | |
tree | 367b53585f124b703daf952ace015958c38c16fa /exec/java-exec | |
parent | 927d1998f22bff1e142bb2eb9110d197f2da53fc (diff) |
DRILL-2725: Faster work assignment logic
Diffstat (limited to 'exec/java-exec')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java | 248 |
1 files changed, 170 insertions, 78 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java index 7e9c4c993..b5eee5d77 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java @@ -17,15 +17,20 @@ */ package org.apache.drill.exec.store.schedule; -import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.TimeUnit; +import com.carrotsearch.hppc.cursors.ObjectLongCursor; +import com.google.common.collect.Iterators; +import com.google.common.collect.Maps; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; -import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; @@ -37,104 +42,191 @@ import com.google.common.collect.Lists; public class AssignmentCreator<T extends CompleteWork> { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AssignmentCreator.class); - static final double[] ASSIGNMENT_CUTOFFS = { 0.99, 0.50, 0.25, 0.00 }; - private final ArrayListMultimap<Integer, T> mappings; - private final List<DrillbitEndpoint> endpoints; + /** + * Comparator used to sort in order of decreasing affinity + */ + private static Comparator<Entry<DrillbitEndpoint,Long>> comparator = new Comparator<Entry<DrillbitEndpoint,Long>>() { + @Override + public int compare(Entry<DrillbitEndpoint, Long> o1, Entry<DrillbitEndpoint,Long> o2) { + return (int) (o1.getValue() - o2.getValue()); + } + }; + + /** + * the maximum number of work units to assign to any minor fragment + */ + private int maxWork; + + /** + * The units of work to be assigned + */ + private List<T> units; + + /** + * A list of DrillbitEndpoints, where the index in the list corresponds to the minor fragment id + */ + private List<DrillbitEndpoint> incomingEndpoints; + private AssignmentCreator(List<DrillbitEndpoint> incomingEndpoints, List<T> units) { + this.incomingEndpoints = incomingEndpoints; + this.units = units; + } /** - * Given a set of endpoints to assign work to, attempt to evenly assign work based on affinity of work units to - * Drillbits. + * Assign each unit of work to a minor fragment, given that a list of DrillbitEndpoints, whose index in the list correspond + * to the minor fragment id for each fragment. A given DrillbitEndpoint can appear multiple times in this list. This method + * will try to assign work based on the affinity of each work unit, but will also evenly distribute the work units among + * all of the minor fragments * - * @param incomingEndpoints - * The set of nodes to assign work to. Note that nodes can be listed multiple times if we want to have - * multiple slices on a node working on the task simultaneously. - * @param units - * The work units to assign. - * @return ListMultimap of Integer > List<CompleteWork> (based on their incoming order) to with + * @param incomingEndpoints The list of incomingEndpoints, indexed by minor fragment id + * @param units the list of work units to be assigned + * @return A multimap that maps each minor fragment id to a list of work units */ - public static <T extends CompleteWork> ListMultimap<Integer, T> getMappings(List<DrillbitEndpoint> incomingEndpoints, - List<T> units) { - AssignmentCreator<T> creator = new AssignmentCreator<T>(incomingEndpoints, units); - return creator.mappings; + public static <T extends CompleteWork> ListMultimap<Integer,T> getMappings(List<DrillbitEndpoint> incomingEndpoints, List<T> units) { + AssignmentCreator<T> creator = new AssignmentCreator(incomingEndpoints, units); + return creator.getMappings(); } - private AssignmentCreator(List<DrillbitEndpoint> incomingEndpoints, List<T> units) { - logger.debug("Assigning {} units to {} endpoints", units.size(), incomingEndpoints.size()); + /** + * Does the work of creating the mappings for this AssignmentCreator + * @return the minor fragment id to work units mapping + */ + private ListMultimap<Integer, T> getMappings() { Stopwatch watch = new Stopwatch(); + watch.start(); + maxWork = (int) Math.ceil(units.size() / ((float) incomingEndpoints.size())); + LinkedList<WorkEndpointListPair<T>> workList = getWorkList(); + LinkedList<WorkEndpointListPair<T>> unassignedWorkList = Lists.newLinkedList(); + Map<DrillbitEndpoint,FragIteratorWrapper> endpointIterators = getEndpointIterators(); + ArrayListMultimap<Integer, T> mappings = ArrayListMultimap.create(); + + outer: for (WorkEndpointListPair<T> workPair : workList) { + List<DrillbitEndpoint> endpoints = workPair.sortedEndpoints; + for (DrillbitEndpoint endpoint : endpoints) { + FragIteratorWrapper iteratorWrapper = endpointIterators.get(endpoint); + if (iteratorWrapper == null) { + continue; + } + if (iteratorWrapper.count < iteratorWrapper.maxCount) { + Integer assignment = iteratorWrapper.iter.next(); + iteratorWrapper.count++; + mappings.put(assignment, workPair.work); + continue outer; + } + } + unassignedWorkList.add(workPair); + } - Preconditions.checkArgument(incomingEndpoints.size() <= units.size(), String.format("Incoming endpoints %d " - + "is greater than number of row groups %d", incomingEndpoints.size(), units.size())); - this.mappings = ArrayListMultimap.create(); - this.endpoints = Lists.newLinkedList(incomingEndpoints); - - ArrayList<T> rowGroupList = new ArrayList<>(units); - for (double cutoff : ASSIGNMENT_CUTOFFS) { - scanAndAssign(rowGroupList, cutoff, false, false); + outer: for (FragIteratorWrapper iteratorWrapper : endpointIterators.values()) { + while (iteratorWrapper.count < iteratorWrapper.maxCount) { + WorkEndpointListPair<T> workPair = unassignedWorkList.poll(); + if (workPair == null) { + break outer; + } + Integer assignment = iteratorWrapper.iter.next(); + iteratorWrapper.count++; + mappings.put(assignment, workPair.work); + } } - scanAndAssign(rowGroupList, 0.0, true, false); - scanAndAssign(rowGroupList, 0.0, true, true); - logger.debug("Took {} ms to apply assignments", watch.elapsed(TimeUnit.MILLISECONDS)); - Preconditions.checkState(rowGroupList.isEmpty(), "All readEntries should be assigned by now, but some are still unassigned"); - Preconditions.checkState(!units.isEmpty()); + logger.debug("Took {} ms to assign {} work units to {} fragments", watch.elapsed(TimeUnit.MILLISECONDS), units.size(), incomingEndpoints.size()); + return mappings; + } + /** + * Builds the list of WorkEndpointListPairs, which pair a work unit with a list of endpoints sorted by affinity + * @return the list of WorkEndpointListPairs + */ + private LinkedList<WorkEndpointListPair<T>> getWorkList() { + Stopwatch watch = new Stopwatch(); + watch.start(); + LinkedList<WorkEndpointListPair<T>> workList = Lists.newLinkedList(); + for (T work : units) { + List<Map.Entry<DrillbitEndpoint,Long>> entries = Lists.newArrayList(); + for (ObjectLongCursor<DrillbitEndpoint> cursor : work.getByteMap()) { + final DrillbitEndpoint ep = cursor.key; + final Long val = cursor.value; + Map.Entry<DrillbitEndpoint,Long> entry = new Entry() { + + @Override + public Object getKey() { + return ep; + } + + @Override + public Object getValue() { + return val; + } + + @Override + public Object setValue(Object value) { + throw new UnsupportedOperationException(); + } + }; + entries.add(entry); + } + Collections.sort(entries, comparator); + List<DrillbitEndpoint> sortedEndpoints = Lists.newArrayList(); + for (Entry<DrillbitEndpoint,Long> entry : entries) { + sortedEndpoints.add(entry.getKey()); + } + workList.add(new WorkEndpointListPair<T>(work, sortedEndpoints)); + } + return workList; } /** - * - * @param mappings - * the mapping between fragment/endpoint and rowGroup - * @param endpoints - * the list of drillbits, ordered by the corresponding fragment - * @param workunits - * the list of rowGroups to assign - * @param requiredPercentage - * the percentage of max bytes required to make an assignment - * @param assignAll - * if true, will assign even if no affinity + * A wrapper class around a work unit and its associated sort list of Endpoints (sorted by affinity in decreasing order) */ - private void scanAndAssign(List<T> workunits, double requiredPercentage, boolean assignAllToEmpty, boolean assignAll) { - Collections.sort(workunits); - int fragmentPointer = 0; - final boolean requireAffinity = requiredPercentage > 0; - int maxAssignments = (int) (workunits.size() / endpoints.size()); - - if (maxAssignments < 1) { - maxAssignments = 1; + private static class WorkEndpointListPair<T> { + T work; + List<DrillbitEndpoint> sortedEndpoints; + + WorkEndpointListPair(T work, List<DrillbitEndpoint> sortedEndpoints) { + this.work = work; + this.sortedEndpoints = sortedEndpoints; } + } - for (Iterator<T> iter = workunits.iterator(); iter.hasNext();) { - T unit = iter.next(); - for (int i = 0; i < endpoints.size(); i++) { - int minorFragmentId = (fragmentPointer + i) % endpoints.size(); - DrillbitEndpoint currentEndpoint = endpoints.get(minorFragmentId); - EndpointByteMap endpointByteMap = unit.getByteMap(); - boolean haveAffinity = endpointByteMap.isSet(currentEndpoint); - - if (assignAll - || (assignAllToEmpty && !mappings.containsKey(minorFragmentId)) - || (!endpointByteMap.isEmpty() && (!requireAffinity || haveAffinity) - && (!mappings.containsKey(minorFragmentId) || mappings.get(minorFragmentId).size() < maxAssignments) && (!requireAffinity || endpointByteMap - .get(currentEndpoint) >= endpointByteMap.getMaxBytes() * requiredPercentage))) { - - mappings.put(minorFragmentId, unit); - logger.debug("Assigned unit: {} to minorFragmentId: {}", unit, minorFragmentId); - // logger.debug("Assigned rowGroup {} to minorFragmentId {} endpoint {}", rowGroupInfo.getRowGroupIndex(), - // minorFragmentId, endpoints.get(minorFragmentId).getAddress()); - // if (bytesPerEndpoint.get(currentEndpoint) != null) { - // // assignmentAffinityStats.update(bytesPerEndpoint.get(currentEndpoint) / rowGroupInfo.getLength()); - // } else { - // // assignmentAffinityStats.update(0); - // } - iter.remove(); - fragmentPointer = (minorFragmentId + 1) % endpoints.size(); - break; - } + /** + * Groups minor fragments together by corresponding endpoint, and creates an iterator that can be used to evenly + * distribute work assigned to a given endpoint to all corresponding minor fragments evenly + * + * @return + */ + private Map<DrillbitEndpoint,FragIteratorWrapper> getEndpointIterators() { + Stopwatch watch = new Stopwatch(); + watch.start(); + Map<DrillbitEndpoint,FragIteratorWrapper> map = Maps.newLinkedHashMap(); + Map<DrillbitEndpoint,List<Integer>> mmap = Maps.newLinkedHashMap(); + for (int i = 0; i < incomingEndpoints.size(); i++) { + DrillbitEndpoint endpoint = incomingEndpoints.get(i); + List<Integer> intList = mmap.get(incomingEndpoints.get(i)); + if (intList == null) { + intList = Lists.newArrayList(); } + intList.add(Integer.valueOf(i)); + mmap.put(endpoint, intList); + } + for (DrillbitEndpoint endpoint : mmap.keySet()) { + FragIteratorWrapper wrapper = new FragIteratorWrapper(); + wrapper.iter = Iterators.cycle(mmap.get(endpoint)); + wrapper.maxCount = maxWork * mmap.get(endpoint).size(); + map.put(endpoint, wrapper); } + return map; + } + + /** + * A struct that holds an fragment iterator and keeps track of how many units have been assigned, as well as the maximum + * number of assignment it will accept + */ + private static class FragIteratorWrapper { + int count = 0; + int maxCount; + Iterator<Integer> iter; } } |