aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java
diff options
context:
space:
mode:
authorTimothy Farkas <timothyfarkas@apache.org>2018-10-18 16:32:03 -0700
committerTimothy Farkas <timothytiborfarkas@gmail.com>2018-10-23 15:11:42 -0700
commit5859968d525dbb2f65b20a228a7f31dc9e516698 (patch)
tree058f85bba51938baab9d782bb752931cb6c74d31 /exec/java-exec/src/main/java
parent2882b89c2288a9df7c9abd5321cb85c588312b8c (diff)
DRILL-6804: Simplify usage of OperatorPhase in HashAgg.
Diffstat (limited to 'exec/java-exec/src/main/java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java7
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java44
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java37
3 files changed, 59 insertions, 29 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index 80d25edb1..485d36372 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import org.apache.drill.exec.planner.physical.AggPrelBase;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollector;
@@ -49,7 +50,6 @@ import org.apache.drill.exec.physical.impl.aggregate.HashAggregator.AggOutcome;
import org.apache.drill.exec.physical.impl.common.Comparator;
import org.apache.drill.exec.physical.impl.common.HashTable;
import org.apache.drill.exec.physical.impl.common.HashTableConfig;
-import org.apache.drill.exec.planner.physical.AggPrelBase;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
@@ -192,9 +192,10 @@ public class HashAggBatch extends AbstractRecordBatch<HashAggregate> {
long memAvail = oContext.getAllocator().getLimit();
long minBatchesPerPartition = context.getOptions().getOption(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR);
long minBatchesNeeded = 2 * minBatchesPerPartition; // 2 - to cover overheads, etc.
- boolean is2ndPhase = popConfig.getAggPhase() == AggPrelBase.OperatorPhase.PHASE_2of2;
boolean fallbackEnabled = context.getOptions().getOption(ExecConstants.HASHAGG_FALLBACK_ENABLED_KEY).bool_val;
- if ( is2ndPhase && !fallbackEnabled ) {
+ final AggPrelBase.OperatorPhase phase = popConfig.getAggPhase();
+
+ if ( phase.is2nd() && !fallbackEnabled ) {
minBatchesNeeded *= 2; // 2nd phase (w/o fallback) needs at least 2 partitions
}
if ( configuredBatchSize > memAvail / minBatchesNeeded ) { // no cast - memAvail may be bigger than max-int
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 6709cf6dd..32db9eaf4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -106,9 +106,7 @@ public abstract class HashAggTemplate implements HashAggregator {
private int rowsSpilledReturned = 0;
private int rowsReturnedEarly = 0;
- private boolean isTwoPhase = false; // 1 phase or 2 phase aggr?
- private boolean is2ndPhase = false;
- private boolean is1stPhase = false;
+ private AggPrelBase.OperatorPhase phase;
private boolean canSpill = true; // make it false in case can not spill/return-early
private ChainedHashTable baseHashTable;
private boolean earlyOutput = false; // when 1st phase returns a partition due to no memory
@@ -379,11 +377,8 @@ public abstract class HashAggTemplate implements HashAggregator {
this.outgoing = outgoing;
this.outContainer = outContainer;
this.useMemoryPrediction = context.getOptions().getOption(ExecConstants.HASHAGG_USE_MEMORY_PREDICTION_VALIDATOR);
-
- is2ndPhase = hashAggrConfig.getAggPhase() == AggPrelBase.OperatorPhase.PHASE_2of2;
- isTwoPhase = hashAggrConfig.getAggPhase() != AggPrelBase.OperatorPhase.PHASE_1of1;
- is1stPhase = isTwoPhase && !is2ndPhase;
- canSpill = isTwoPhase; // single phase can not spill
+ this.phase = hashAggrConfig.getAggPhase();
+ canSpill = phase.hasTwo(); // single phase can not spill
// Typically for testing - force a spill after a partition has more than so many batches
minBatchesPerPartition = context.getOptions().getOption(ExecConstants.HASHAGG_MIN_BATCHES_PER_PARTITION_VALIDATOR);
@@ -447,7 +442,7 @@ public abstract class HashAggTemplate implements HashAggregator {
// Set the number of partitions from the configuration (raise to a power of two, if needed)
int numPartitions = (int)context.getOptions().getOption(ExecConstants.HASHAGG_NUM_PARTITIONS_VALIDATOR);
- if ( numPartitions == 1 && is2ndPhase ) { // 1st phase can still do early return with 1 partition
+ if ( numPartitions == 1 && phase.is2nd() ) { // 1st phase can still do early return with 1 partition
canSpill = false;
logger.warn("Spilling is disabled due to configuration setting of num_partitions to 1");
}
@@ -473,7 +468,7 @@ public abstract class HashAggTemplate implements HashAggregator {
while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 2 * 1024 * 1024) > memAvail ) {
numPartitions /= 2;
if ( numPartitions < 2) {
- if (is2ndPhase) {
+ if (phase.is2nd()) {
canSpill = false; // 2nd phase needs at least 2 to make progress
if (fallbackEnabled) {
@@ -492,7 +487,7 @@ public abstract class HashAggTemplate implements HashAggregator {
}
}
}
- logger.debug("{} phase. Number of partitions chosen: {}. {} spill", isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",
+ logger.debug("{} phase. Number of partitions chosen: {}. {} spill", phase.getName(),
numPartitions, canSpill ? "Can" : "Cannot");
// The following initial safety check should be revisited once we can lower the number of rows in a batch
@@ -616,7 +611,7 @@ public abstract class HashAggTemplate implements HashAggregator {
estOutgoingAllocSize = estValuesBatchSize; // initially assume same size
logger.trace("{} phase. Estimated internal row width: {} Values row width: {} batch size: {} memory limit: {} max column width: {}",
- isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estValuesRowWidth,estMaxBatchSize,allocator.getLimit(),maxColumnWidth);
+ phase.getName(),estRowWidth,estValuesRowWidth,estMaxBatchSize,allocator.getLimit(),maxColumnWidth);
if ( estMaxBatchSize > allocator.getLimit() ) {
logger.warn("HashAggregate: Estimated max batch size {} is larger than the memory limit {}",estMaxBatchSize,allocator.getLimit());
@@ -886,7 +881,7 @@ public abstract class HashAggTemplate implements HashAggregator {
@Override
public void cleanup() {
if ( schema == null ) { return; } // not set up; nothing to clean
- if ( is2ndPhase && spillSet.getWriteBytes() > 0 ) {
+ if ( phase.is2nd() && spillSet.getWriteBytes() > 0 ) {
stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
(int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
}
@@ -982,7 +977,7 @@ public abstract class HashAggTemplate implements HashAggregator {
* @return The partition (number) chosen to be spilled
*/
private int chooseAPartitionToFlush(int currPart, boolean tryAvoidCurr) {
- if ( is1stPhase && ! tryAvoidCurr) { return currPart; } // 1st phase: just use the current partition
+ if ( phase.is1st() && ! tryAvoidCurr) { return currPart; } // 1st phase: just use the current partition
int currPartSize = batchHolders[currPart].size();
if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current if size is 1
// first find the largest spilled partition
@@ -1188,7 +1183,7 @@ public abstract class HashAggTemplate implements HashAggregator {
if (spilledState.isEmpty()) { // and no spilled partitions
allFlushed = true;
this.outcome = IterOutcome.NONE;
- if ( is2ndPhase && spillSet.getWriteBytes() > 0 ) {
+ if ( phase.is2nd() && spillSet.getWriteBytes() > 0 ) {
stats.setLongStat(Metric.SPILL_MB, // update stats - total MB spilled
(int) Math.round(spillSet.getWriteBytes() / 1024.0D / 1024.0));
}
@@ -1243,7 +1238,7 @@ public abstract class HashAggTemplate implements HashAggregator {
this.outcome = IterOutcome.OK;
- if ( EXTRA_DEBUG_SPILL && is2ndPhase ) {
+ if ( EXTRA_DEBUG_SPILL && phase.is2nd() ) {
logger.debug("So far returned {} + SpilledReturned {} total {} (spilled {})",rowsNotSpilled,rowsSpilledReturned,
rowsNotSpilled+rowsSpilledReturned,
rowsSpilled);
@@ -1322,12 +1317,12 @@ public abstract class HashAggTemplate implements HashAggregator {
*/
private String getOOMErrorMsg(String prefix) {
String errmsg;
- if (!isTwoPhase) {
+ if (!phase.hasTwo()) {
errmsg = "Single Phase Hash Aggregate operator can not spill.";
} else if (!canSpill) { // 2nd phase, with only 1 partition
errmsg = "Too little memory available to operator to facilitate spilling.";
} else { // a bug ?
- errmsg = prefix + " OOM at " + (is2ndPhase ? "Second Phase" : "First Phase") + ". Partitions: " + spilledState.getNumPartitions() +
+ errmsg = prefix + " OOM at " + phase.getName() + " Phase. Partitions: " + spilledState.getNumPartitions() +
". Estimated batch size: " + estMaxBatchSize + ". values size: " + estValuesBatchSize + ". Output alloc size: " + estOutgoingAllocSize;
if ( plannedBatches > 0 ) { errmsg += ". Planned batches: " + plannedBatches; }
if ( rowsSpilled > 0 ) { errmsg += ". Rows spilled so far: " + rowsSpilled; }
@@ -1367,11 +1362,11 @@ public abstract class HashAggTemplate implements HashAggregator {
hashCode >>>= spilledState.getBitsInMask();
HashTable.PutStatus putStatus = null;
long allocatedBeforeHTput = allocator.getAllocatedMemory();
+ String tryingTo = phase.is1st() ? "early return" : "spill";
// Proactive spill - in case there is no reserve memory - spill and retry putting later
if ( reserveValueBatchMemory == 0 && canSpill ) {
- logger.trace("Reserved memory runs short, trying to {} a partition and retry Hash Table put() again.",
- is1stPhase ? "early return" : "spill");
+ logger.trace("Reserved memory runs short, trying to {} a partition and retry Hash Table put() again.", tryingTo);
doSpill(currentPartition); // spill to free some memory
@@ -1389,8 +1384,7 @@ public abstract class HashAggTemplate implements HashAggregator {
} catch (RetryAfterSpillException re) {
if ( ! canSpill ) { throw new OutOfMemoryException(getOOMErrorMsg("Can not spill")); }
- logger.trace("HT put failed with an OOM, trying to {} a partition and retry Hash Table put() again.",
- is1stPhase ? "early return" : "spill");
+ logger.trace("HT put failed with an OOM, trying to {} a partition and retry Hash Table put() again.", tryingTo);
// for debugging - in case there's a leak
long memDiff = allocator.getAllocatedMemory() - allocatedBeforeHTput;
@@ -1493,7 +1487,7 @@ public abstract class HashAggTemplate implements HashAggregator {
// log a detailed debug message explaining why a spill may be needed
logger.trace("MEMORY CHECK: Allocated mem: {}, agg phase: {}, trying to add to partition {} with {} batches. " + "Max memory needed {}, Est batch size {}, mem limit {}",
- allocator.getAllocatedMemory(), isTwoPhase ? (is2ndPhase ? "2ND" : "1ST") : "Single", currentPartition, batchHolders[currentPartition].size(), maxMemoryNeeded,
+ allocator.getAllocatedMemory(), phase.getName(), currentPartition, batchHolders[currentPartition].size(), maxMemoryNeeded,
estMaxBatchSize, allocator.getLimit());
}
//
@@ -1516,7 +1510,7 @@ public abstract class HashAggTemplate implements HashAggregator {
return;
}
- if ( is2ndPhase ) {
+ if ( phase.is2nd() ) {
long before = allocator.getAllocatedMemory();
spillAPartition(victimPartition);
@@ -1583,7 +1577,7 @@ public abstract class HashAggTemplate implements HashAggregator {
this.stats.setLongStat(Metric.RESIZING_TIME_MS, htStats.resizingTime);
this.stats.setLongStat(Metric.NUM_PARTITIONS, spilledState.getNumPartitions());
this.stats.setLongStat(Metric.SPILL_CYCLE, spilledState.getCycle()); // Put 0 in case no spill
- if ( is2ndPhase ) {
+ if ( phase.is2nd() ) {
this.stats.setLongStat(Metric.SPILLED_PARTITIONS, numSpilled);
}
if ( rowsReturnedEarly > 0 ) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
index 84f85ba2f..f3d527e82 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/AggPrelBase.java
@@ -47,7 +47,42 @@ import java.util.List;
public abstract class AggPrelBase extends DrillAggregateRelBase implements Prel {
- public enum OperatorPhase {PHASE_1of1, PHASE_1of2, PHASE_2of2}
+ public enum OperatorPhase {
+ PHASE_1of1(false, false, false, "Single"),
+ PHASE_1of2(true, true, false, "1st"),
+ PHASE_2of2(true, false, true, "2nd");
+
+ private boolean hasTwo;
+ private boolean is1st;
+ private boolean is2nd;
+ private String name;
+
+ OperatorPhase(boolean hasTwo,
+ boolean is1st,
+ boolean is2nd,
+ String name) {
+ this.hasTwo = hasTwo;
+ this.is1st = is1st;
+ this.is2nd = is2nd;
+ this.name = name;
+ }
+
+ public boolean hasTwo() {
+ return hasTwo;
+ }
+
+ public boolean is1st() {
+ return is1st;
+ }
+
+ public boolean is2nd() {
+ return is2nd;
+ }
+
+ public String getName() {
+ return name;
+ }
+ }
protected OperatorPhase operPhase = OperatorPhase.PHASE_1of1; // default phase
protected List<NamedExpression> keys = Lists.newArrayList();