diff options
author | Timothy Farkas <timothyfarkas@apache.org> | 2018-10-18 16:32:03 -0700 |
---|---|---|
committer | Timothy Farkas <timothytiborfarkas@gmail.com> | 2018-10-23 15:11:42 -0700 |
commit | 5859968d525dbb2f65b20a228a7f31dc9e516698 (patch) | |
tree | 058f85bba51938baab9d782bb752931cb6c74d31 /exec/java-exec/src/main/java | |
parent | 2882b89c2288a9df7c9abd5321cb85c588312b8c (diff) |
DRILL-6804: Simplify usage of OperatorPhase in HashAgg.
Diffstat (limited to 'exec/java-exec/src/main/java')
3 files changed, 59 insertions, 29 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java index 80d25edb1..485d36372 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import org.apache.drill.exec.planner.physical.AggPrelBase; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.ErrorCollector; @@ -49,7 +50,6 @@ import org.apache.drill.exec.physical.impl.aggregate.HashAggregator.AggOutcome; import org.apache.drill.exec.physical.impl.common.Comparator; import org.apache.drill.exec.physical.impl.common.HashTable; import org.apache.drill.exec.physical.impl.common.HashTableConfig; -import org.apache.drill.exec.planner.physical.AggPrelBase; import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; @@ -192,9 +192,10 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> { long memAvail = oContext.getAllocator().getLimit(); long minBatchesPerPartition = context.getOptions().getOption(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR); long minBatchesNeeded = 2 * minBatchesPerPartition; // 2 - to cover overheads, etc. - boolean is2ndPhase = popConfig.getAggPhase() == AggPrelBase.OperatorPhase.PHASE_2of2; boolean fallbackEnabled = context.getOptions().getOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY).bool_val; - if ( is2ndPhase && !fallbackEnabled ) { + final AggPrelBase.OperatorPhase phase = popConfig.getAggPhase(); + + if ( phase.is2nd() && !fallbackEnabled ) { minBatchesNeeded *= 2; // 2nd phase (w/o fallback) needs at least 2 partitions } if ( configuredBatchSize > memAvail / minBatchesNeeded ) { // no cast - memAvail may be bigger than max-int diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index 6709cf6dd..32db9eaf4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -106,9 +106,7 @@ public abstract class HashAggTemplate implements HashAggregator { private int rowsSpilledReturned = 0; private int rowsReturnedEarly = 0; - private boolean isTwoPhase = false; // 1 phase or 2 phase aggr? - private boolean is2ndPhase = false; - private boolean is1stPhase = false; + private AggPrelBase.OperatorPhase phase; private boolean canSpill = true; // make it false in case can not spill/return-early private ChainedHashTable baseHashTable; private boolean earlyOutput = false; // when 1st phase returns a partition due to no memory @@ -379,11 +377,8 @@ public abstract class HashAggTemplate implements HashAggregator { this.outgoing = outgoing; this.outContainer = outContainer; this.useMemoryPrediction = context.getOptions().getOption(ExecConstants.HASHAGG_USE_MEMORY_PREDICTION_VALIDATOR); - - is2ndPhase = hashAggrConfig.getAggPhase() == AggPrelBase.OperatorPhase.PHASE_2of2; - isTwoPhase = hashAggrConfig.getAggPhase() != AggPrelBase.OperatorPhase.PHASE_1of1; - is1stPhase = isTwoPhase && !is2ndPhase; - canSpill = isTwoPhase; // single phase can not spill + this.phase = hashAggrConfig.getAggPhase(); + canSpill = phase.hasTwo(); // single phase can not spill // Typically for testing - force a spill after a partition has more than so many batches minBatchesPerPartition = context.getOptions().getOption(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR); @@ -447,7 +442,7 @@ public abstract class HashAggTemplate implements HashAggregator { // Set the number of partitions from the configuration (raise to a power of two, if needed) int numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR); - if ( numPartitions == 1 && is2ndPhase ) { // 1st phase can still do early return with 1 partition + if ( numPartitions == 1 && phase.is2nd() ) { // 1st phase can still do early return with 1 partition canSpill = false; logger.warn("Spilling is disabled due to configuration setting of num_partitions to 1"); } @@ -473,7 +468,7 @@ public abstract class HashAggTemplate implements HashAggregator { while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 2 * 1024 * 1024) > memAvail ) { numPartitions /= 2; if ( numPartitions < 2) { - if (is2ndPhase) { + if (phase.is2nd()) { canSpill = false; // 2nd phase needs at least 2 to make progress if (fallbackEnabled) { @@ -492,7 +487,7 @@ public abstract class HashAggTemplate implements HashAggregator { } } } - logger.debug("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single", + logger.debug("{} phase. Number of partitions chosen: {}. {} spill", phase.getName(), numPartitions, canSpill ? "Can" : "Cannot"); // The following initial safety check should be revisited once we can lower the number of rows in a batch @@ -616,7 +611,7 @@ public abstract class HashAggTemplate implements HashAggregator { estOutgoingAllocSize = estValuesBatchSize; // initially assume same size logger.trace("{} phase. Estimated internal row width: {} Values row width: {} batch size: {} memory limit: {} max column width: {}", - isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estValuesRowWidth,estMaxBatchSize,allocator.getLimit(),maxColumnWidth); + phase.getName(),estRowWidth,estValuesRowWidth,estMaxBatchSize,allocator.getLimit(),maxColumnWidth); if ( estMaxBatchSize > allocator.getLimit() ) { logger.warn("HashAggregate: Estimated max batch size {} is larger than the memory limit {}",estMaxBatchSize,allocator.getLimit()); @@ -886,7 +881,7 @@ public abstract class HashAggTemplate implements HashAggregator { @Override public void cleanup() { if ( schema == null ) { return; } // not set up; nothing to clean - if ( is2ndPhase && spillSet.getWriteBytes() > 0 ) { + if ( phase.is2nd() && spillSet.getWriteBytes() > 0 ) { stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0)); } @@ -982,7 +977,7 @@ public abstract class HashAggTemplate implements HashAggregator { * @return The partition (number) chosen to be spilled */ private int chooseAPartitionToFlush(int currPart, boolean tryAvoidCurr) { - if ( is1stPhase && ! tryAvoidCurr) { return currPart; } // 1st phase: just use the current partition + if ( phase.is1st() && ! tryAvoidCurr) { return currPart; } // 1st phase: just use the current partition int currPartSize = batchHolders[currPart].size(); if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current if size is 1 // first find the largest spilled partition @@ -1188,7 +1183,7 @@ public abstract class HashAggTemplate implements HashAggregator { if (spilledState.isEmpty()) { // and no spilled partitions allFlushed = true; this.outcome = IterOutcome.NONE; - if ( is2ndPhase && spillSet.getWriteBytes() > 0 ) { + if ( phase.is2nd() && spillSet.getWriteBytes() > 0 ) { stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled (int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0)); } @@ -1243,7 +1238,7 @@ public abstract class HashAggTemplate implements HashAggregator { this.outcome = IterOutcome.OK; - if ( EXTRA_DEBUG_SPILL && is2ndPhase ) { + if ( EXTRA_DEBUG_SPILL && phase.is2nd() ) { logger.debug("So far returned {} + SpilledReturned {} total {} (spilled {})",rowsNotSpilled,rowsSpilledReturned, rowsNotSpilled+rowsSpilledReturned, rowsSpilled); @@ -1322,12 +1317,12 @@ public abstract class HashAggTemplate implements HashAggregator { */ private String getOOMErrorMsg(String prefix) { String errmsg; - if (!isTwoPhase) { + if (!phase.hasTwo()) { errmsg = "Single Phase Hash Aggregate operator can not spill."; } else if (!canSpill) { // 2nd phase, with only 1 partition errmsg = "Too little memory available to operator to facilitate spilling."; } else { // a bug ? - errmsg = prefix + " OOM at " + (is2ndPhase ? "Second Phase" : "First Phase") + ". Partitions: " + spilledState.getNumPartitions() + + errmsg = prefix + " OOM at " + phase.getName() + " Phase. Partitions: " + spilledState.getNumPartitions() + ". Estimated batch size: " + estMaxBatchSize + ". values size: " + estValuesBatchSize + ". Output alloc size: " + estOutgoingAllocSize; if ( plannedBatches > 0 ) { errmsg += ". Planned batches: " + plannedBatches; } if ( rowsSpilled > 0 ) { errmsg += ". Rows spilled so far: " + rowsSpilled; } @@ -1367,11 +1362,11 @@ public abstract class HashAggTemplate implements HashAggregator { hashCode >>>= spilledState.getBitsInMask(); HashTable.PutStatus putStatus = null; long allocatedBeforeHTput = allocator.getAllocatedMemory(); + String tryingTo = phase.is1st() ? "early return" : "spill"; // Proactive spill - in case there is no reserve memory - spill and retry putting later if ( reserveValueBatchMemory == 0 && canSpill ) { - logger.trace("Reserved memory runs short, trying to {} a partition and retry Hash Table put() again.", - is1stPhase ? "early return" : "spill"); + logger.trace("Reserved memory runs short, trying to {} a partition and retry Hash Table put() again.", tryingTo); doSpill(currentPartition); // spill to free some memory @@ -1389,8 +1384,7 @@ public abstract class HashAggTemplate implements HashAggregator { } catch (RetryAfterSpillException re) { if ( ! canSpill ) { throw new OutOfMemoryException(getOOMErrorMsg("Can not spill")); } - logger.trace("HT put failed with an OOM, trying to {} a partition and retry Hash Table put() again.", - is1stPhase ? "early return" : "spill"); + logger.trace("HT put failed with an OOM, trying to {} a partition and retry Hash Table put() again.", tryingTo); // for debugging - in case there's a leak long memDiff = allocator.getAllocatedMemory() - allocatedBeforeHTput; @@ -1493,7 +1487,7 @@ public abstract class HashAggTemplate implements HashAggregator { // log a detailed debug message explaining why a spill may be needed logger.trace("MEMORY CHECK: Allocated mem: {}, agg phase: {}, trying to add to partition {} with {} batches. " + "Max memory needed {}, Est batch size {}, mem limit {}", - allocator.getAllocatedMemory(), isTwoPhase ? (is2ndPhase ? "2ND" : "1ST") : "Single", currentPartition, batchHolders[currentPartition].size(), maxMemoryNeeded, + allocator.getAllocatedMemory(), phase.getName(), currentPartition, batchHolders[currentPartition].size(), maxMemoryNeeded, estMaxBatchSize, allocator.getLimit()); } // @@ -1516,7 +1510,7 @@ public abstract class HashAggTemplate implements HashAggregator { return; } - if ( is2ndPhase ) { + if ( phase.is2nd() ) { long before = allocator.getAllocatedMemory(); spillAPartition(victimPartition); @@ -1583,7 +1577,7 @@ public abstract class HashAggTemplate implements HashAggregator { this.stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime); this.stats.setLongStat(Metric.NUM_PARTITIONS, spilledState.getNumPartitions()); this.stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); // Put 0 in case no spill - if ( is2ndPhase ) { + if ( phase.is2nd() ) { this.stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled); } if ( rowsReturnedEarly > 0 ) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java index 84f85ba2f..f3d527e82 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java @@ -47,7 +47,42 @@ import java.util.List; public abstract class AggPrelBase extends DrillAggregateRelBase implements Prel { - public enum OperatorPhase {PHASE_1of1, PHASE_1of2, PHASE_2of2} + public enum OperatorPhase { + PHASE_1of1(false, false, false, "Single"), + PHASE_1of2(true, true, false, "1st"), + PHASE_2of2(true, false, true, "2nd"); + + private boolean hasTwo; + private boolean is1st; + private boolean is2nd; + private String name; + + OperatorPhase(boolean hasTwo, + boolean is1st, + boolean is2nd, + String name) { + this.hasTwo = hasTwo; + this.is1st = is1st; + this.is2nd = is2nd; + this.name = name; + } + + public boolean hasTwo() { + return hasTwo; + } + + public boolean is1st() { + return is1st; + } + + public boolean is2nd() { + return is2nd; + } + + public String getName() { + return name; + } + } protected OperatorPhase operPhase = OperatorPhase.PHASE_1of1; // default phase protected List<NamedExpression> keys = Lists.newArrayList(); |