diff options
author | Jacques Nadeau <jacques@apache.org> | 2014-08-24 10:04:47 -0700 |
---|---|---|
committer | Jacques Nadeau <jacques@apache.org> | 2014-08-24 10:42:50 -0700 |
commit | 15eeb9d38af517bc294d6c37c3f7dae82a33665f (patch) | |
tree | be1f7a9840fd117644acebbb9bd8a67b3ae29d19 /exec | |
parent | 4216e0e2c60cf17caa678cee685cbfc2ca4e819a (diff) |
Fix issue introduced by DRILL-1202 where allocators are being closed after reporting success.
Update ScreenRoot to cleanup before returning success.
Update ScanBatch to cleanup reader in case of limit query to avoid memory leak in ParquetReader.
Update allocators so that we don't have memory leak when using debug options.
Update project record batch so that it doesn't try to return a released remainder.
Diffstat (limited to 'exec')
7 files changed, 73 insertions, 20 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java index 098922ca6..d89c8926b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java @@ -98,7 +98,7 @@ public class TopLevelAllocator implements BufferAllocator { throw new OutOfMemoryException(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, acct.getCapacity() - acct.getAllocation())); }; logger.debug("New child allocator with initial reservation {}", initialReservation); - ChildAllocator allocator = new ChildAllocator(handle, acct, maximumReservation, initialReservation); + ChildAllocator allocator = new ChildAllocator(handle, acct, maximumReservation, initialReservation, childrenMap); if(ENABLE_ACCOUNTING) childrenMap.put(allocator, Thread.currentThread().getStackTrace()); return allocator; @@ -130,11 +130,13 @@ public class TopLevelAllocator implements BufferAllocator { private Map<ChildAllocator, StackTraceElement[]> children = new HashMap<>(); private boolean closed = false; private FragmentHandle handle; + private Map<ChildAllocator, StackTraceElement[]> thisMap; - public ChildAllocator(FragmentHandle handle, Accountor parentAccountor, long max, long pre) throws OutOfMemoryException{ + public ChildAllocator(FragmentHandle handle, Accountor parentAccountor, long max, long pre, Map<ChildAllocator, StackTraceElement[]> map) throws OutOfMemoryException{ assert max >= pre; childAcct = new Accountor(errorOnLeak, handle, parentAccountor, max, pre); this.handle = handle; + thisMap = map; } @Override @@ -171,7 +173,7 @@ public class TopLevelAllocator implements BufferAllocator { throw new OutOfMemoryException(String.format("You attempted to create a new child allocator with initial reservation %d but only %d bytes of memory were available.", initialReservation, childAcct.getAvailable())); }; logger.debug("New child allocator with initial reservation {}", initialReservation); - ChildAllocator newChildAllocator = new ChildAllocator(handle, childAcct, maximumReservation, initialReservation); + ChildAllocator newChildAllocator = new ChildAllocator(handle, childAcct, maximumReservation, initialReservation, null); this.children.put(newChildAllocator, Thread.currentThread().getStackTrace()); return newChildAllocator; } @@ -183,6 +185,7 @@ public class TopLevelAllocator implements BufferAllocator { @Override public void close() { if (ENABLE_ACCOUNTING) { + if(thisMap != null) thisMap.remove(this); for (ChildAllocator child : children.keySet()) { if (!child.isClosed()) { StringBuilder sb = new StringBuilder(); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java index 8a670fb07..6fccb3b07 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java @@ -111,7 +111,7 @@ public class FragmentContext implements Closeable { } public void fail(Throwable cause) { - logger.error("Fragment Context received failure. {}", cause); + logger.error("Fragment Context received failure.", cause); failed = true; failureCause = cause; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index 4b7e33ef9..947262753 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -129,6 +129,10 @@ public class ScanBatch implements RecordBatch { @Override public void kill(boolean sendUpstream) { + if(currentReader != null){ + currentReader.cleanup(); + } + if (sendUpstream) { done = true; } else { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java index 279374f4c..d96bdf356 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScreenCreator.java @@ -99,7 +99,7 @@ public class ScreenCreator implements RootCreator<Screen>{ // logger.debug("Screen Outcome {}", outcome); switch(outcome){ case STOP: { - sendCount.waitForSendComplete(); + this.internalStop(); boolean verbose = context.getOptions().getOption(ExecConstants.ENABLE_VERBOSE_ERRORS_KEY).bool_val; QueryResult header = QueryResult.newBuilder() // .setQueryId(context.getHandle().getQueryId()) // @@ -122,8 +122,7 @@ public class ScreenCreator implements RootCreator<Screen>{ return false; } case NONE: { - sendCount.waitForSendComplete(); -// context.getStats().batchesCompleted.inc(1); + this.internalStop(); QueryWritableBatch batch; if (!first) { QueryResult header = QueryResult.newBuilder() // @@ -168,22 +167,30 @@ public class ScreenCreator implements RootCreator<Screen>{ throw new UnsupportedOperationException(); } } - + public void updateStats(QueryWritableBatch queryBatch) { stats.addLongStat(Metric.BYTES_SENT, queryBatch.getByteCount()); } - @Override - public void stop() { + + private void internalStop(){ sendCount.waitForSendComplete(); oContext.close(); incoming.cleanup(); } + @Override + public void stop() { + if(!oContext.isClosed()){ + internalStop(); + } + sendCount.waitForSendComplete(); + } + private SendListener listener = new SendListener(); private class SendListener extends BaseRpcOutcomeListener<Ack>{ - volatile RpcException ex; + volatile RpcException ex; @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java index 4fdb71da1..a7e136066 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/project/ProjectRecordBatch.java @@ -111,6 +111,14 @@ public class ProjectRecordBatch extends AbstractSingleRecordBatch<Project>{ return recordCount; } + + @Override + protected void killIncoming(boolean sendUpstream) { + super.killIncoming(sendUpstream); + hasRemainder = false; + } + + @Override public IterOutcome innerNext() { if (hasRemainder) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java index e4941d086..2a8db6784 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java @@ -17,9 +17,9 @@ */ package org.apache.drill.exec.work.fragment; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.physical.base.FragmentRoot; import org.apache.drill.exec.physical.impl.ImplCreator; @@ -31,8 +31,6 @@ import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.user.UserServer.UserClientConnection; import org.apache.drill.exec.work.CancelableQuery; import org.apache.drill.exec.work.StatusProvider; - -import com.codahale.metrics.Timer; import org.apache.drill.exec.work.WorkManager.WorkerBee; /** @@ -49,6 +47,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid private final WorkerBee bee; private final StatusReporter listener; private Thread executionThread; + private AtomicBoolean closed = new AtomicBoolean(false); public FragmentExecutor(FragmentContext context, WorkerBee bee, FragmentRoot rootOperator, StatusReporter listener){ this.context = context; @@ -104,10 +103,14 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid while (state.get() == FragmentState.RUNNING_VALUE) { if (!root.next()) { if (context.isFailed()){ - updateState(FragmentState.RUNNING, FragmentState.FAILED, false); + internalFail(context.getFailureCause()); + closeOutResources(false); } else { + closeOutResources(true); // make sure to close out resources before we report success. updateState(FragmentState.RUNNING, FragmentState.FINISHED, false); } + + break; } } } catch (AssertionError | Exception e) { @@ -116,17 +119,32 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid internalFail(e); } finally { bee.removeFragment(context.getHandle()); - if (context.isFailed()) { - internalFail(context.getFailureCause()); - } - root.stop(); // stop root executor & clean-up resources - context.close(); logger.debug("Fragment runner complete. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId()); Thread.currentThread().setName(originalThread); } } + private void closeOutResources(boolean throwFailure){ + if(closed.get()) return; + + try{ + root.stop(); + }catch(RuntimeException e){ + if(throwFailure) throw e; + logger.warn("Failure while closing out resources.", e); + } + + try{ + context.close(); + }catch(RuntimeException e){ + if(throwFailure) throw e; + logger.warn("Failure while closing out resources.", e); + } + + closed.set(true); + } + private void internalFail(Throwable excep){ state.set(FragmentState.FAILED_VALUE); listener.fail(context.getHandle(), "Failure while running fragment.", excep); diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java index 0ac2c0960..d9e8b205f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java +++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java @@ -47,6 +47,19 @@ public class TestExampleQueries extends BaseTestQuery{ } @Test + public void testJoinMerge() throws Exception{ + test("alter session set `planner.enable_hashjoin` = false"); + test("select count(*) \n" + + " from (select l.l_orderkey as x, c.c_custkey as y \n" + + " from cp.`tpch/lineitem.parquet` l \n" + + " left outer join cp.`tpch/customer.parquet` c \n" + + " on l.l_orderkey = c.c_custkey) as foo\n" + + " where x < 10000\n" + + ""); + test("alter session set `planner.enable_hashjoin` = true"); + } + + @Test public void testSelStarOrderBy() throws Exception{ test("select * from cp.`employee.json` order by last_name"); } |