aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java43
1 files changed, 34 insertions, 9 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
index 8476b5345..74849c2e2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/AtomicRemainder.java
@@ -17,6 +17,9 @@
*/
package org.apache.drill.exec.memory;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -35,6 +38,7 @@ public class AtomicRemainder {
private final long initTotal;
private final long initShared;
private final long initPrivate;
+ private boolean closed = false;
public AtomicRemainder(AtomicRemainder parent, long max, long pre) {
this.parent = parent;
@@ -43,6 +47,7 @@ public class AtomicRemainder {
this.initTotal = max;
this.initShared = max - pre;
this.initPrivate = pre;
+// logger.info("new AtomicRemainder. a.s. {} a.p. {} hashcode {}", availableShared, availablePrivate, hashCode(), new Exception());
}
public long getRemainder() {
@@ -60,25 +65,36 @@ public class AtomicRemainder {
* @param size
*/
public void forceGet(long size) {
- if (DEBUG)
- logger.info("Force get {}", size);
- availableShared.addAndGet(size);
+ long newAvailableShared = availableShared.addAndGet(size);
+// if (DEBUG)
+// logger.info("Force get {}. a.s. {} a.p. {} hashcode: {}", size, availableShared, availablePrivate, hashCode(), new Exception());
+// assert newAvailableShared <= initShared;
if (parent != null)
parent.forceGet(size);
}
public boolean get(long size) {
- if (DEBUG)
- logger.info("Get {}", size);
if (availablePrivate.get() < 1) {
// if there is no preallocated memory, we can operate normally.
+ // if there is a parent allocator, check it before allocating.
+ if (parent != null && !parent.get(size)) {
+ return false;
+ }
+
// attempt to get shared memory, if fails, return false.
long outcome = availableShared.addAndGet(-size);
+// assert outcome <= initShared;
if (outcome < 0) {
- availableShared.addAndGet(size);
+ long newAvailableShared = availableShared.addAndGet(size);
+ assert newAvailableShared <= initShared;
+ if (parent != null) {
+ parent.returnAllocation(size);
+ }
return false;
} else {
+// if (DEBUG)
+// logger.info("Get {}. a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception());
return true;
}
@@ -86,6 +102,8 @@ public class AtomicRemainder {
// if there is preallocated memory, use that first.
long unaccount = availablePrivate.addAndGet(-size);
if (unaccount >= 0) {
+// if (DEBUG)
+// logger.info("Get {}. a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception());
return true;
} else {
@@ -102,6 +120,8 @@ public class AtomicRemainder {
if (account >= 0) {
// we were succesful, move private back to zero (since we allocated using shared).
availablePrivate.addAndGet(additionalSpaceNeeded);
+// if (DEBUG)
+// logger.info("Get {}. a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception());
return true;
} else {
// we failed to get space from available shared. Return allocations to initial state.
@@ -122,26 +142,31 @@ public class AtomicRemainder {
* @param size
*/
public void returnAllocation(long size) {
- if (DEBUG)
- logger.info("Return allocation {}", size);
long privateSize = availablePrivate.get();
long privateChange = Math.min(size, initPrivate - privateSize);
long sharedChange = size - privateChange;
availablePrivate.addAndGet(privateChange);
availableShared.addAndGet(sharedChange);
+// if (DEBUG)
+// logger.info("Return allocation {}, a.s. {} a.p. {} hashcode {}", size, availableShared, availablePrivate, hashCode(), new Exception());
if (parent != null) {
parent.returnAllocation(sharedChange);
}
+ assert getUsed() <= initTotal;
}
public void close() {
-
+ if (closed) {
+ logger.warn("Tried to close remainder, but it has already been closed", new Exception());
+ return;
+ }
if (availablePrivate.get() != initPrivate || availableShared.get() != initShared)
throw new IllegalStateException(
String
.format(ERROR, initPrivate, availablePrivate.get(), initPrivate - availablePrivate.get(), initShared, availableShared.get(), initShared - availableShared.get()));
if(parent != null) parent.returnAllocation(initPrivate);
+ closed = true;
}
static final String ERROR = "Failure while closing accountor. Expected private and shared pools to be set to initial values. However, one or more were not. Stats are\n\tzone\tinit\tallocated\tdelta \n\tprivate\t%d\t%d\t%d \n\tshared\t%d\t%d\t%d.";