aboutsummaryrefslogtreecommitdiff
path: root/exec
diff options
context:
space:
mode:
authorJacques Nadeau <jacques@apache.org>2014-08-24 10:04:47 -0700
committerJacques Nadeau <jacques@apache.org>2014-08-24 10:42:50 -0700
commit15eeb9d38af517bc294d6c37c3f7dae82a33665f (patch)
treebe1f7a9840fd117644acebbb9bd8a67b3ae29d19 /exec
parent4216e0e2c60cf17caa678cee685cbfc2ca4e819a (diff)
Fix issue introduced by DRILL-1202 where allocators are being closed after reporting success.
Update ScreenRoot to cleanup before returning success. Update ScanBatch to cleanup reader in case of limit query to avoid memory leak in ParquetReader. Update allocators so that we don't have memory leak when using debug options. Update project record batch so that it doesn't try to return a released remainder.
Diffstat (limited to 'exec')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java9
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java2
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java21
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java36
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java13
7 files changed, 73 insertions, 20 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
index 098922ca6..d89c8926b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java
@@ -98,7 +98,7 @@ public class TopLevelAllocator implements BufferAllocator {
throw new OutOfMemoryException(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, acct.getCapacity() - acct.getAllocation()));
};
logger.debug("New child allocator with initial reservation {}", initialReservation);
- ChildAllocator allocator = new ChildAllocator(handle, acct, maximumReservation, initialReservation);
+ ChildAllocator allocator = new ChildAllocator(handle, acct, maximumReservation, initialReservation, childrenMap);
if(ENABLE_ACCOUNTING) childrenMap.put(allocator, Thread.currentThread().getStackTrace());
return allocator;
@@ -130,11 +130,13 @@ public class TopLevelAllocator implements BufferAllocator {
private Map<ChildAllocator, StackTraceElement[]> children = new HashMap<>();
private boolean closed = false;
private FragmentHandle handle;
+ private Map<ChildAllocator, StackTraceElement[]> thisMap;
- public ChildAllocator(FragmentHandle handle, Accountor parentAccountor, long max, long pre) throws OutOfMemoryException{
+ public ChildAllocator(FragmentHandle handle, Accountor parentAccountor, long max, long pre, Map<ChildAllocator, StackTraceElement[]> map) throws OutOfMemoryException{
assert max >= pre;
childAcct = new Accountor(errorOnLeak, handle, parentAccountor, max, pre);
this.handle = handle;
+ thisMap = map;
}
@Override
@@ -171,7 +173,7 @@ public class TopLevelAllocator implements BufferAllocator {
throw new OutOfMemoryException(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, childAcct.getAvailable()));
};
logger.debug("New child allocator with initial reservation {}", initialReservation);
- ChildAllocator newChildAllocator = new ChildAllocator(handle, childAcct, maximumReservation, initialReservation);
+ ChildAllocator newChildAllocator = new ChildAllocator(handle, childAcct, maximumReservation, initialReservation, null);
this.children.put(newChildAllocator, Thread.currentThread().getStackTrace());
return newChildAllocator;
}
@@ -183,6 +185,7 @@ public class TopLevelAllocator implements BufferAllocator {
@Override
public void close() {
if (ENABLE_ACCOUNTING) {
+ if(thisMap != null) thisMap.remove(this);
for (ChildAllocator child : children.keySet()) {
if (!child.isClosed()) {
StringBuilder sb = new StringBuilder();
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 8a670fb07..6fccb3b07 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -111,7 +111,7 @@ public class FragmentContext implements Closeable {
}
public void fail(Throwable cause) {
- logger.error("Fragment Context received failure. {}", cause);
+ logger.error("Fragment Context received failure.", cause);
failed = true;
failureCause = cause;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 4b7e33ef9..947262753 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -129,6 +129,10 @@ public class ScanBatch implements RecordBatch {
@Override
public void kill(boolean sendUpstream) {
+ if(currentReader != null){
+ currentReader.cleanup();
+ }
+
if (sendUpstream) {
done = true;
} else {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
index 279374f4c..d96bdf356 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java
@@ -99,7 +99,7 @@ public class ScreenCreator implements RootCreator<Screen>{
// logger.debug("Screen Outcome {}", outcome);
switch(outcome){
case STOP: {
- sendCount.waitForSendComplete();
+ this.internalStop();
boolean verbose = context.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val;
QueryResult header = QueryResult.newBuilder() //
.setQueryId(context.getHandle().getQueryId()) //
@@ -122,8 +122,7 @@ public class ScreenCreator implements RootCreator<Screen>{
return false;
}
case NONE: {
- sendCount.waitForSendComplete();
-// context.getStats().batchesCompleted.inc(1);
+ this.internalStop();
QueryWritableBatch batch;
if (!first) {
QueryResult header = QueryResult.newBuilder() //
@@ -168,22 +167,30 @@ public class ScreenCreator implements RootCreator<Screen>{
throw new UnsupportedOperationException();
}
}
-
+
public void updateStats(QueryWritableBatch queryBatch) {
stats.addLongStat(Metric.BYTES_SENT, queryBatch.getByteCount());
}
- @Override
- public void stop() {
+
+ private void internalStop(){
sendCount.waitForSendComplete();
oContext.close();
incoming.cleanup();
}
+ @Override
+ public void stop() {
+ if(!oContext.isClosed()){
+ internalStop();
+ }
+ sendCount.waitForSendComplete();
+ }
+
private SendListener listener = new SendListener();
private class SendListener extends BaseRpcOutcomeListener<Ack>{
- volatile RpcException ex;
+ volatile RpcException ex;
@Override
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
index 4fdb71da1..a7e136066 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java
@@ -111,6 +111,14 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{
return recordCount;
}
+
+ @Override
+ protected void killIncoming(boolean sendUpstream) {
+ super.killIncoming(sendUpstream);
+ hasRemainder = false;
+ }
+
+
@Override
public IterOutcome innerNext() {
if (hasRemainder) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index e4941d086..2a8db6784 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -17,9 +17,9 @@
*/
package org.apache.drill.exec.work.fragment;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.impl.ImplCreator;
@@ -31,8 +31,6 @@ import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection;
import org.apache.drill.exec.work.CancelableQuery;
import org.apache.drill.exec.work.StatusProvider;
-
-import com.codahale.metrics.Timer;
import org.apache.drill.exec.work.WorkManager.WorkerBee;
/**
@@ -49,6 +47,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
private final WorkerBee bee;
private final StatusReporter listener;
private Thread executionThread;
+ private AtomicBoolean closed = new AtomicBoolean(false);
public FragmentExecutor(FragmentContext context, WorkerBee bee, FragmentRoot rootOperator, StatusReporter listener){
this.context = context;
@@ -104,10 +103,14 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
while (state.get() == FragmentState.RUNNING_VALUE) {
if (!root.next()) {
if (context.isFailed()){
- updateState(FragmentState.RUNNING, FragmentState.FAILED, false);
+ internalFail(context.getFailureCause());
+ closeOutResources(false);
} else {
+ closeOutResources(true); // make sure to close out resources before we report success.
updateState(FragmentState.RUNNING, FragmentState.FINISHED, false);
}
+
+ break;
}
}
} catch (AssertionError | Exception e) {
@@ -116,17 +119,32 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
internalFail(e);
} finally {
bee.removeFragment(context.getHandle());
- if (context.isFailed()) {
- internalFail(context.getFailureCause());
- }
- root.stop(); // stop root executor & clean-up resources
- context.close();
logger.debug("Fragment runner complete. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());
Thread.currentThread().setName(originalThread);
}
}
+ private void closeOutResources(boolean throwFailure){
+ if(closed.get()) return;
+
+ try{
+ root.stop();
+ }catch(RuntimeException e){
+ if(throwFailure) throw e;
+ logger.warn("Failure while closing out resources.", e);
+ }
+
+ try{
+ context.close();
+ }catch(RuntimeException e){
+ if(throwFailure) throw e;
+ logger.warn("Failure while closing out resources.", e);
+ }
+
+ closed.set(true);
+ }
+
private void internalFail(Throwable excep){
state.set(FragmentState.FAILED_VALUE);
listener.fail(context.getHandle(), "Failure while running fragment.", excep);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index 0ac2c0960..d9e8b205f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -47,6 +47,19 @@ public class TestExampleQueries extends BaseTestQuery{
}
@Test
+ public void testJoinMerge() throws Exception{
+ test("alter session set `planner.enable_hashjoin` = false");
+ test("select count(*) \n" +
+ " from (select l.l_orderkey as x, c.c_custkey as y \n" +
+ " from cp.`tpch/lineitem.parquet` l \n" +
+ " left outer join cp.`tpch/customer.parquet` c \n" +
+ " on l.l_orderkey = c.c_custkey) as foo\n" +
+ " where x < 10000\n" +
+ "");
+ test("alter session set `planner.enable_hashjoin` = true");
+ }
+
+ @Test
public void testSelStarOrderBy() throws Exception{
test("select * from cp.`employee.json` order by last_name");
}