diff options
author | Vlad Rozov <vrozov@apache.org> | 2018-04-11 10:12:07 -0700 |
---|---|---|
committer | Parth Chandra <parthc@apache.org> | 2018-04-17 18:18:57 -0700 |
commit | 931b43ec54bf47dcbb4aa9ae4499f37a8f21b408 (patch) | |
tree | f3042dd426abd4be5678823b3b06cbfb68b630cd | |
parent | fdc69783a0f0014e331fe853b623750ba88c34e0 (diff) |
DRILL-6295: PartitionerDecorator may close partitioners while CustomRunnable are active during query cancellation
This closes #1208
4 files changed, 221 insertions, 140 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java index 7e7623818..034d6c23d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java @@ -19,6 +19,7 @@ package org.apache.drill.exec.physical.impl.partitionsender; import java.io.IOException; import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerArray; @@ -167,10 +168,10 @@ public class PartitionSenderRootExec extends BaseRootExec { } else { sendEmptyBatch(true); } - } catch (IOException e) { + } catch (ExecutionException e) { incoming.kill(false); logger.error("Error while creating partitioning sender or flushing outgoing batches", e); - context.getExecutorState().fail(e); + context.getExecutorState().fail(e.getCause()); } return false; @@ -197,10 +198,10 @@ public class PartitionSenderRootExec extends BaseRootExec { first = false; sendEmptyBatch(false); } - } catch (IOException e) { + } catch (ExecutionException e) { incoming.kill(false); logger.error("Error while flushing outgoing batches", e); - context.getExecutorState().fail(e); + context.getExecutorState().fail(e.getCause()); return false; } catch (SchemaChangeException e) { incoming.kill(false); @@ -211,8 +212,8 @@ public class PartitionSenderRootExec extends BaseRootExec { case OK: try { partitioner.partitionBatch(incoming); - } catch (IOException e) { - context.getExecutorState().fail(e); + } catch (ExecutionException e) { + context.getExecutorState().fail(e.getCause()); incoming.kill(false); return false; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java index bb69f39a1..e9838f79f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerDecorator.java @@ -18,10 +18,15 @@ package org.apache.drill.exec.physical.impl.partitionsender; import java.io.IOException; +import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorStats; @@ -31,7 +36,8 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory; import org.apache.drill.exec.testing.CountDownLatchInjection; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.Lists; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.MoreExecutors; /** * Decorator class to hide multiple Partitioner existence from the caller @@ -41,34 +47,37 @@ import com.google.common.collect.Lists; * The algorithm to figure out processing versus wait time is based on following formula: * totalWaitTime = totalAllPartitionersProcessingTime - max(sum(processingTime) by partitioner) */ -public class PartitionerDecorator { +public final class PartitionerDecorator { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PartitionerDecorator.class); private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(PartitionerDecorator.class); private List<Partitioner> partitioners; private final OperatorStats stats; - private final String tName; - private final String childThreadPrefix; private final ExecutorService executor; private final FragmentContext context; + private final Thread thread; + private final boolean enableParallelTaskExecution; + PartitionerDecorator(List<Partitioner> partitioners, OperatorStats stats, FragmentContext context) { + this(partitioners, stats, context, partitioners.size() > 1); + } - public PartitionerDecorator(List<Partitioner> partitioners, OperatorStats stats, FragmentContext context) { + PartitionerDecorator(List<Partitioner> partitioners, OperatorStats stats, FragmentContext context, boolean enableParallelTaskExecution) { this.partitioners = partitioners; this.stats = stats; this.context = context; - this.executor = context.getExecutor(); - this.tName = Thread.currentThread().getName(); - this.childThreadPrefix = "Partitioner-" + tName + "-"; + this.enableParallelTaskExecution = enableParallelTaskExecution; + executor = enableParallelTaskExecution ? context.getExecutor() : MoreExecutors.newDirectExecutorService(); + thread = Thread.currentThread(); } /** * partitionBatch - decorator method to call real Partitioner(s) to process incoming batch * uses either threading or not threading approach based on number Partitioners * @param incoming - * @throws IOException + * @throws ExecutionException */ - public void partitionBatch(final RecordBatch incoming) throws IOException { + public void partitionBatch(final RecordBatch incoming) throws ExecutionException { executeMethodLogic(new PartitionBatchHandlingClass(incoming)); } @@ -76,9 +85,9 @@ public class PartitionerDecorator { * flushOutgoingBatches - decorator to call real Partitioner(s) flushOutgoingBatches * @param isLastBatch * @param schemaChanged - * @throws IOException + * @throws ExecutionException */ - public void flushOutgoingBatches(final boolean isLastBatch, final boolean schemaChanged) throws IOException { + public void flushOutgoingBatches(final boolean isLastBatch, final boolean schemaChanged) throws ExecutionException { executeMethodLogic(new FlushBatchesHandlingClass(isLastBatch, schemaChanged)); } @@ -118,105 +127,115 @@ public class PartitionerDecorator { return null; } - @VisibleForTesting - protected List<Partitioner> getPartitioners() { + List<Partitioner> getPartitioners() { return partitioners; } /** * Helper to execute the different methods wrapped into same logic * @param iface - * @throws IOException + * @throws ExecutionException */ - protected void executeMethodLogic(final GeneralExecuteIface iface) throws IOException { - if (partitioners.size() == 1 ) { - // no need for threads - final OperatorStats localStatsSingle = partitioners.get(0).getStats(); - localStatsSingle.clear(); - localStatsSingle.startProcessing(); + @VisibleForTesting + void executeMethodLogic(final GeneralExecuteIface iface) throws ExecutionException { + // To simulate interruption of main fragment thread and interrupting the partition threads, create a + // CountDownInject latch. Partitioner threads await on the latch and main fragment thread counts down or + // interrupts waiting threads. This makes sure that we are actually interrupting the blocked partitioner threads. + try (CountDownLatchInjection testCountDownLatch = injector.getLatch(context.getExecutionControls(), "partitioner-sender-latch")) { + testCountDownLatch.initialize(1); + final AtomicInteger count = new AtomicInteger(); + List<PartitionerTask> partitionerTasks = new ArrayList<>(partitioners.size()); + ExecutionException executionException = null; + // start waiting on main stats to adjust by sum(max(processing)) at the end + startWait(); try { - iface.execute(partitioners.get(0)); + partitioners.forEach(partitioner -> createAndExecute(iface, testCountDownLatch, count, partitionerTasks, partitioner)); + // Wait for main fragment interruption. + injector.injectInterruptiblePause(context.getExecutionControls(), "wait-for-fragment-interrupt", logger); + testCountDownLatch.countDown(); + } catch (InterruptedException e) { + logger.warn("fragment thread interrupted", e); + } catch (RejectedExecutionException e) { + logger.warn("Failed to execute partitioner tasks. Execution service down?", e); + executionException = new ExecutionException(e); } finally { - localStatsSingle.stopProcessing(); - stats.mergeMetrics(localStatsSingle); - // since main stats did not have any wait time - adjust based of partitioner stats wait time - // main stats processing time started recording in BaseRootExec - stats.adjustWaitNanos(localStatsSingle.getWaitNanos()); + await(count, partitionerTasks); + stopWait(); + processPartitionerTasks(partitionerTasks, executionException); } - return; } + } - long maxProcessTime = 0l; - // start waiting on main stats to adjust by sum(max(processing)) at the end - stats.startWait(); - final CountDownLatch latch = new CountDownLatch(partitioners.size()); - final List<CustomRunnable> runnables = Lists.newArrayList(); - final List<Future<?>> taskFutures = Lists.newArrayList(); - CountDownLatchInjection testCountDownLatch = null; - try { - // To simulate interruption of main fragment thread and interrupting the partition threads, create a - // CountDownInject patch. Partitioner threads await on the latch and main fragment thread counts down or - // interrupts waiting threads. This makes sures that we are actually interrupting the blocked partitioner threads. - testCountDownLatch = injector.getLatch(context.getExecutionControls(), "partitioner-sender-latch"); - testCountDownLatch.initialize(1); - for (final Partitioner part : partitioners) { - final CustomRunnable runnable = new CustomRunnable(childThreadPrefix, latch, iface, part, testCountDownLatch); - runnables.add(runnable); - taskFutures.add(executor.submit(runnable)); + private void createAndExecute(GeneralExecuteIface iface, CountDownLatchInjection testCountDownLatch, AtomicInteger count, + List<PartitionerTask> partitionerTasks, Partitioner partitioner) { + PartitionerTask partitionerTask = new PartitionerTask(this, iface, partitioner, count, testCountDownLatch); + executor.execute(partitionerTask); + partitionerTasks.add(partitionerTask); + count.incrementAndGet(); + } + + /** + * Wait for completion of all partitioner tasks. + * @param count current number of task not yet completed + * @param partitionerTasks list of partitioner tasks submitted for execution + */ + private void await(AtomicInteger count, List<PartitionerTask> partitionerTasks) { + boolean cancelled = false; + while (count.get() > 0) { + if (context.getExecutorState().shouldContinue() || cancelled) { + LockSupport.park(); + } else { + logger.warn("Cancelling fragment {} partitioner tasks...", context.getFragIdString()); + partitionerTasks.forEach(partitionerTask -> partitionerTask.cancel(true)); + cancelled = true; } + } + } - while (true) { - try { - // Wait for main fragment interruption. - injector.injectInterruptiblePause(context.getExecutionControls(), "wait-for-fragment-interrupt", logger); - - // If there is no pause inserted at site "wait-for-fragment-interrupt", release the latch. - injector.getLatch(context.getExecutionControls(), "partitioner-sender-latch").countDown(); - - latch.await(); - break; - } catch (final InterruptedException e) { - // If the fragment state says we shouldn't continue, cancel or interrupt partitioner threads - if (!context.getExecutorState().shouldContinue()) { - logger.debug("Interrupting partioner threads. Fragment thread {}", tName); - for(Future<?> f : taskFutures) { - f.cancel(true); - } + private void startWait() { + if (enableParallelTaskExecution) { + stats.startWait(); + } + } - break; - } - } - } + private void stopWait() { + if (enableParallelTaskExecution) { + stats.stopWait(); + } + } - IOException excep = null; - for (final CustomRunnable runnable : runnables ) { - IOException myException = runnable.getException(); - if ( myException != null ) { - if ( excep == null ) { - excep = myException; - } else { - excep.addSuppressed(myException); - } + private void processPartitionerTasks(List<PartitionerTask> partitionerTasks, ExecutionException executionException) throws ExecutionException { + long maxProcessTime = 0l; + for (PartitionerTask partitionerTask : partitionerTasks) { + ExecutionException e = partitionerTask.getException(); + if (e != null) { + if (executionException == null) { + executionException = e; + } else { + executionException.getCause().addSuppressed(e.getCause()); } - final OperatorStats localStats = runnable.getPart().getStats(); - long currentProcessingNanos = localStats.getProcessingNanos(); + } + if (executionException == null) { + final OperatorStats localStats = partitionerTask.getStats(); // find out max Partitioner processing time - maxProcessTime = (currentProcessingNanos > maxProcessTime) ? currentProcessingNanos : maxProcessTime; + if (enableParallelTaskExecution) { + long currentProcessingNanos = localStats.getProcessingNanos(); + maxProcessTime = (currentProcessingNanos > maxProcessTime) ? currentProcessingNanos : maxProcessTime; + } else { + maxProcessTime += localStats.getWaitNanos(); + } stats.mergeMetrics(localStats); } - if ( excep != null ) { - throw excep; - } - } finally { - stats.stopWait(); - // scale down main stats wait time based on calculated processing time - // since we did not wait for whole duration of above execution + } + if (executionException != null) { + throw executionException; + } + // scale down main stats wait time based on calculated processing time + // since we did not wait for whole duration of above execution + if (enableParallelTaskExecution) { stats.adjustWaitNanos(-maxProcessTime); - - // Done with the latch, close it. - if (testCountDownLatch != null) { - testCountDownLatch.close(); - } + } else { + stats.adjustWaitNanos(maxProcessTime); } } @@ -237,7 +256,7 @@ public class PartitionerDecorator { private final RecordBatch incoming; - public PartitionBatchHandlingClass(RecordBatch incoming) { + PartitionBatchHandlingClass(RecordBatch incoming) { this.incoming = incoming; } @@ -268,62 +287,116 @@ public class PartitionerDecorator { } /** - * Helper class to wrap Runnable with customized naming - * Exception handling + * Helper class to wrap Runnable with cancellation and waiting for completion support * */ - private static class CustomRunnable implements Runnable { + private static class PartitionerTask implements Runnable { + + private enum STATE { + NEW, + COMPLETING, + NORMAL, + EXCEPTIONAL, + CANCELLED, + INTERRUPTING, + INTERRUPTED + } + + private final AtomicReference<STATE> state; + private final AtomicReference<Thread> runner; + private final PartitionerDecorator partitionerDecorator; + private final AtomicInteger count; - private final String parentThreadName; - private final CountDownLatch latch; private final GeneralExecuteIface iface; - private final Partitioner part; + private final Partitioner partitioner; private CountDownLatchInjection testCountDownLatch; - private volatile IOException exp; + private volatile ExecutionException exception; - public CustomRunnable(final String parentThreadName, final CountDownLatch latch, final GeneralExecuteIface iface, - final Partitioner part, CountDownLatchInjection testCountDownLatch) { - this.parentThreadName = parentThreadName; - this.latch = latch; + public PartitionerTask(PartitionerDecorator partitionerDecorator, GeneralExecuteIface iface, Partitioner partitioner, AtomicInteger count, CountDownLatchInjection testCountDownLatch) { + state = new AtomicReference<>(STATE.NEW); + runner = new AtomicReference<>(); + this.partitionerDecorator = partitionerDecorator; this.iface = iface; - this.part = part; + this.partitioner = partitioner; + this.count = count; this.testCountDownLatch = testCountDownLatch; } @Override public void run() { - // Test only - Pause until interrupted by fragment thread - try { - testCountDownLatch.await(); - } catch (final InterruptedException e) { - logger.debug("Test only: partitioner thread is interrupted in test countdown latch await()", e); - } - - final Thread currThread = Thread.currentThread(); - final String currThreadName = currThread.getName(); - final OperatorStats localStats = part.getStats(); - try { - final String newThreadName = parentThreadName + currThread.getId(); - currThread.setName(newThreadName); + final Thread thread = Thread.currentThread(); + if (runner.compareAndSet(null, thread)) { + final String name = thread.getName(); + thread.setName(String.format("Partitioner-%s-%d", partitionerDecorator.thread.getName(), thread.getId())); + final OperatorStats localStats = partitioner.getStats(); localStats.clear(); localStats.startProcessing(); - iface.execute(part); - } catch (IOException e) { - exp = e; - } finally { - localStats.stopProcessing(); - currThread.setName(currThreadName); - latch.countDown(); + ExecutionException executionException = null; + try { + // Test only - Pause until interrupted by fragment thread + testCountDownLatch.await(); + if (state.get() == STATE.NEW) { + iface.execute(partitioner); + } + } catch (InterruptedException e) { + if (state.compareAndSet(STATE.NEW, STATE.INTERRUPTED)) { + logger.warn("Partitioner Task interrupted during the run", e); + } + } catch (Throwable t) { + executionException = new ExecutionException(t); + } finally { + if (state.compareAndSet(STATE.NEW, STATE.COMPLETING)) { + if (executionException == null) { + localStats.stopProcessing(); + state.lazySet(STATE.NORMAL); + } else { + exception = executionException; + state.lazySet(STATE.EXCEPTIONAL); + } + } + if (count.decrementAndGet() == 0) { + LockSupport.unpark(partitionerDecorator.thread); + } + thread.setName(name); + while (state.get() == STATE.INTERRUPTING) { + Thread.yield(); + } + // Clear interrupt flag + Thread.interrupted(); + } + } + } + + void cancel(boolean mayInterruptIfRunning) { + Preconditions.checkState(Thread.currentThread() == partitionerDecorator.thread, + String.format("PartitionerTask can be cancelled only from the main %s thread", partitionerDecorator.thread.getName())); + if (runner.compareAndSet(null, partitionerDecorator.thread)) { + if (partitionerDecorator.executor instanceof ThreadPoolExecutor) { + ((ThreadPoolExecutor)partitionerDecorator.executor).remove(this); + } + count.decrementAndGet(); + } else { + if (mayInterruptIfRunning) { + if (state.compareAndSet(STATE.NEW, STATE.INTERRUPTING)) { + try { + runner.get().interrupt(); + } finally { + state.lazySet(STATE.INTERRUPTED); + } + } + } else { + state.compareAndSet(STATE.NEW, STATE.CANCELLED); + } } } - public IOException getException() { - return this.exp; + public ExecutionException getException() { + return this.exception; } - public Partitioner getPart() { - return part; + public OperatorStats getStats() { + return partitioner.getStats(); } } - } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java index 957d79b9f..4103ee62c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/testing/CountDownLatchInjection.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.testing; +import org.apache.drill.common.AutoCloseables.Closeable; + /** * This class is used internally for tracking injected countdown latches. These latches are specified via * {@link org.apache.drill.exec.ExecConstants#DRILLBIT_CONTROL_INJECTIONS} session option. @@ -25,7 +27,7 @@ package org.apache.drill.exec.testing; * with the expected number of countdown and awaits. The child threads count down on the same latch (same site class * and same descriptor), and once there are enough, the parent thread continues. */ -public interface CountDownLatchInjection { +public interface CountDownLatchInjection extends Closeable { /** * Initializes the underlying latch diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java index c431fea74..eaaa87d89 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java @@ -28,6 +28,7 @@ import java.io.PrintWriter; import java.nio.file.Paths; import java.util.List; import java.util.Random; +import java.util.concurrent.ExecutionException; import org.apache.drill.categories.OperatorTest; import org.apache.drill.PlanTestBase; @@ -68,6 +69,8 @@ import org.apache.drill.exec.server.options.OptionValue.AccessibleScopes; import org.apache.drill.exec.server.options.OptionValue.OptionScope; import org.apache.drill.exec.util.Utilities; import org.apache.drill.exec.work.QueryWorkUnit; +import org.apache.drill.test.OperatorFixture.MockExecutorState; + import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -221,6 +224,7 @@ public class TestPartitionSender extends PlanTestBase { FragmentContextImpl context = null; try { context = new FragmentContextImpl(drillbitContext, planFragment, null, registry); + context.setExecutorState(new MockExecutorState()); final int majorFragmentId = planFragment.getHandle().getMajorFragmentId(); final HashPartitionSender partSender = new HashPartitionSender(majorFragmentId, hashToRandomExchange, hashToRandomExchange.getExpression(), mfEndPoints); partionSenderRootExec = new MockPartitionSenderRootExec(context, incoming, partSender); @@ -285,8 +289,8 @@ public class TestPartitionSender extends PlanTestBase { partionSenderRootExec.getStats().startProcessing(); try { partDecor.executeMethodLogic(new InjectExceptionTest()); - fail("Should throw IOException here"); - } catch (IOException ioe) { + fail("executeMethodLogic should throw an exception."); + } catch (ExecutionException e) { final OperatorProfile.Builder oPBuilder = OperatorProfile.newBuilder(); partionSenderRootExec.getStats().addAllMetrics(oPBuilder); final List<MetricValue> metrics = oPBuilder.getMetricList(); @@ -298,7 +302,8 @@ public class TestPartitionSender extends PlanTestBase { assertEquals(actualThreads, metric.getLongValue()); } } - assertEquals(actualThreads-1, ioe.getSuppressed().length); + assertTrue(e.getCause() instanceof IOException); + assertEquals(actualThreads-1, e.getCause().getSuppressed().length); } finally { partionSenderRootExec.getStats().stopProcessing(); } |