diff options
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java | 43 |
1 files changed, 34 insertions, 9 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java index 8476b5345..74849c2e2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java @@ -17,6 +17,9 @@ */ package org.apache.drill.exec.memory; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.common.exceptions.ExecutionSetupException; + import java.util.concurrent.atomic.AtomicLong; /** @@ -35,6 +38,7 @@ public class AtomicRemainder { private final long initTotal; private final long initShared; private final long initPrivate; + private boolean closed = false; public AtomicRemainder(AtomicRemainder parent, long max, long pre) { this.parent = parent; @@ -43,6 +47,7 @@ public class AtomicRemainder { this.initTotal = max; this.initShared = max - pre; this.initPrivate = pre; +// logger.info("new AtomicRemainder. a.s. {} a.p. {} hashcode {}", availableShared, availablePrivate, hashCode(), new Exception()); } public long getRemainder() { @@ -60,25 +65,36 @@ public class AtomicRemainder { * @param size */ public void forceGet(long size) { - if (DEBUG) - logger.info("Force get {}", size); - availableShared.addAndGet(size); + long newAvailableShared = availableShared.addAndGet(size); +// if (DEBUG) +// logger.info("Force get {}. a.s. {} a.p. {} hashcode: {}", size, availableShared, availablePrivate, hashCode(), new Exception()); +// assert newAvailableShared <= initShared; if (parent != null) parent.forceGet(size); } public boolean get(long size) { - if (DEBUG) - logger.info("Get {}", size); if (availablePrivate.get() < 1) { // if there is no preallocated memory, we can operate normally. + // if there is a parent allocator, check it before allocating. + if (parent != null && !parent.get(size)) { + return false; + } + // attempt to get shared memory, if fails, return false. long outcome = availableShared.addAndGet(-size); +// assert outcome <= initShared; if (outcome < 0) { - availableShared.addAndGet(size); + long newAvailableShared = availableShared.addAndGet(size); + assert newAvailableShared <= initShared; + if (parent != null) { + parent.returnAllocation(size); + } return false; } else { +// if (DEBUG) +// logger.info("Get {}. a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception()); return true; } @@ -86,6 +102,8 @@ public class AtomicRemainder { // if there is preallocated memory, use that first. long unaccount = availablePrivate.addAndGet(-size); if (unaccount >= 0) { +// if (DEBUG) +// logger.info("Get {}. a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception()); return true; } else { @@ -102,6 +120,8 @@ public class AtomicRemainder { if (account >= 0) { // we were succesful, move private back to zero (since we allocated using shared). availablePrivate.addAndGet(additionalSpaceNeeded); +// if (DEBUG) +// logger.info("Get {}. a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception()); return true; } else { // we failed to get space from available shared. Return allocations to initial state. @@ -122,26 +142,31 @@ public class AtomicRemainder { * @param size */ public void returnAllocation(long size) { - if (DEBUG) - logger.info("Return allocation {}", size); long privateSize = availablePrivate.get(); long privateChange = Math.min(size, initPrivate - privateSize); long sharedChange = size - privateChange; availablePrivate.addAndGet(privateChange); availableShared.addAndGet(sharedChange); +// if (DEBUG) +// logger.info("Return allocation {}, a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception()); if (parent != null) { parent.returnAllocation(sharedChange); } + assert getUsed() <= initTotal; } public void close() { - + if (closed) { + logger.warn("Tried to close remainder, but it has already been closed", new Exception()); + return; + } if (availablePrivate.get() != initPrivate || availableShared.get() != initShared) throw new IllegalStateException( String .format(ERROR, initPrivate, availablePrivate.get(), initPrivate - availablePrivate.get(), initShared, availableShared.get(), initShared - availableShared.get())); if(parent != null) parent.returnAllocation(initPrivate); + closed = true; } static final String ERROR = "Failure while closing accountor. Expected private and shared pools to be set to initial values. However, one or more were not. Stats are\n\tzone\tinit\tallocated\tdelta \n\tprivate\t%d\t%d\t%d \n\tshared\t%d\t%d\t%d."; |