diff options
author | Sorabh Hamirwasia <sorabh@apache.org> | 2018-09-26 13:20:02 -0700 |
---|---|---|
committer | Sorabh Hamirwasia <sorabh@apache.org> | 2018-10-10 09:51:02 -0700 |
commit | de76e135316086386e2f7edd04ec1d5ca479bc59 (patch) | |
tree | 0726d0d7bd1273239573244d32fafb44d05bb079 /exec/java-exec/src/main/java/org/apache | |
parent | 216b1237739935b04c4f54b3f6f05371a4644085 (diff) |
DRILL-6731: Resolving race conditions in RuntimeFilterSink
Add condition variable to avoid starvation of producer thread while acquiring queue lock
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache')
3 files changed, 114 insertions, 61 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java index bfba5f2c1..fcfa2bca1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java @@ -222,18 +222,10 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc holder.setFromBuildSide(true); right.accept(this, holder); boolean routeToForeman = holder.needToRouteToForeman(); - if (!routeToForeman) { - runtimeFilterDef.setSendToForeman(false); - } else { - runtimeFilterDef.setSendToForeman(true); - } + runtimeFilterDef.setSendToForeman(routeToForeman); List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs(); for (BloomFilterDef bloomFilterDef : bloomFilterDefs) { - if (!routeToForeman) { - bloomFilterDef.setLocal(true); - } else { - bloomFilterDef.setLocal(false); - } + bloomFilterDef.setLocal(!routeToForeman); } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java index 8f4c8230e..754c68e70 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java @@ -25,6 +25,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /** @@ -38,12 +39,28 @@ public class RuntimeFilterSink implements AutoCloseable { private int staleBookId = 0; + /** + * RuntimeFilterWritable holding the aggregated version of all the received filter + */ private RuntimeFilterWritable aggregated = null; private BlockingQueue<RuntimeFilterWritable> rfQueue = new LinkedBlockingQueue<>(); + /** + * Flag used by Minor Fragment thread to indicate it has encountered error + */ private AtomicBoolean running = new AtomicBoolean(true); + /** + * Lock used to synchronize between producer (Netty Thread) and consumer (AsyncAggregateThread) of elements of this + * queue. This is needed because in error condition running flag can be consumed by producer and consumer thread at + * different times. Whoever sees it first will take this lock and clear all elements and set the queue to null to + * indicate producer not to put any new elements in it. + */ + private ReentrantLock queueLock = new ReentrantLock(); + + private Condition notEmpty = queueLock.newCondition(); + private ReentrantLock aggregatedRFLock = new ReentrantLock(); private Thread asyncAggregateThread; @@ -62,24 +79,34 @@ public class RuntimeFilterSink implements AutoCloseable { public void aggregate(RuntimeFilterWritable runtimeFilterWritable) { if (running.get()) { - if (containOne()) { - boolean same = aggregated.same(runtimeFilterWritable); - if (!same) { - //This is to solve the only one fragment case that two RuntimeFilterRecordBatchs - //share the same FragmentContext. - try { - aggregatedRFLock.lock(); + try { + aggregatedRFLock.lock(); + if (containOne()) { + boolean same = aggregated.equals(runtimeFilterWritable); + if (!same) { + // This is to solve the only one fragment case that two RuntimeFilterRecordBatchs + // share the same FragmentContext. aggregated.close(); - aggregated = null; - } finally { - aggregatedRFLock.unlock(); + currentBookId.set(0); + staleBookId = 0; + clearQueued(false); } - currentBookId.set(0); - staleBookId = 0; - clearQueued(); } + } finally { + aggregatedRFLock.unlock(); + } + + try { + queueLock.lock(); + if (rfQueue != null) { + rfQueue.add(runtimeFilterWritable); + notEmpty.signal(); + } else { + runtimeFilterWritable.close(); + } + } finally { + queueLock.unlock(); } - rfQueue.add(runtimeFilterWritable); } else { runtimeFilterWritable.close(); } @@ -116,53 +143,77 @@ public class RuntimeFilterSink implements AutoCloseable { return aggregated != null; } - @Override - public void close() throws Exception { + private void doCleanup() { running.compareAndSet(true, false); - asyncAggregateThread.interrupt(); - if (containOne()) { - try { - aggregatedRFLock.lock(); + try { + aggregatedRFLock.lock(); + if (containOne()) { aggregated.close(); - } finally { - aggregatedRFLock.unlock(); + aggregated = null; } + } finally { + aggregatedRFLock.unlock(); } - clearQueued(); } - private void clearQueued() { + @Override + public void close() throws Exception { + asyncAggregateThread.interrupt(); + doCleanup(); + } + + private void clearQueued(boolean setToNull) { RuntimeFilterWritable toClear; - while ((toClear = rfQueue.poll()) != null) { - toClear.close(); + try { + queueLock.lock(); + while (rfQueue != null && (toClear = rfQueue.poll()) != null) { + toClear.close(); + } + rfQueue = (setToNull) ? null : rfQueue; + } finally { + queueLock.unlock(); } } - class AsyncAggregateWorker implements Runnable { + private class AsyncAggregateWorker implements Runnable { @Override public void run() { try { + RuntimeFilterWritable toAggregate = null; while (running.get()) { - RuntimeFilterWritable toAggregate = rfQueue.take(); - if (!running.get()) { - toAggregate.close(); - return; + try { + queueLock.lock(); + toAggregate = (rfQueue != null) ? rfQueue.poll() : null; + if (toAggregate == null) { + notEmpty.await(); + continue; + } + } finally { + queueLock.unlock(); } - if (containOne()) { - try { - aggregatedRFLock.lock(); + + try { + aggregatedRFLock.lock(); + if (containOne()) { aggregated.aggregate(toAggregate); - } finally { - aggregatedRFLock.unlock(); + + // Release the byteBuf referenced by toAggregate since aggregate will not do it + toAggregate.close(); + } else { + aggregated = toAggregate; } - } else { - aggregated = toAggregate; + } finally { + aggregatedRFLock.unlock(); } currentBookId.incrementAndGet(); } } catch (InterruptedException e) { logger.info("Thread : {} was interrupted.", asyncAggregateThread.getName(), e); + Thread.currentThread().interrupt(); + } finally { + doCleanup(); + clearQueued(true); } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java index 302a4801c..9a971e94c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java @@ -36,9 +36,14 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{ private DrillBuf[] data; + private String identifier; + public RuntimeFilterWritable(BitData.RuntimeFilterBDef runtimeFilterBDef, DrillBuf... data) { this.runtimeFilterBDef = runtimeFilterBDef; this.data = data; + this.identifier = "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId() + + ",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId() + + ", srcOperatorId:" + runtimeFilterBDef.getHjOpId(); } @@ -90,7 +95,7 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{ int capacity = src.readableBytes(); DrillBuf duplicateOne = bufferAllocator.buffer(capacity); int readerIndex = src.readerIndex(); - src.readBytes(duplicateOne, 0, capacity); + duplicateOne.writeBytes(src); src.readerIndex(readerIndex); cloned[i] = duplicateOne; i++; @@ -98,19 +103,25 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{ return new RuntimeFilterWritable(runtimeFilterBDef, cloned); } - public boolean same(RuntimeFilterWritable other) { - BitData.RuntimeFilterBDef runtimeFilterDef = other.getRuntimeFilterBDef(); - int otherMajorId = runtimeFilterDef.getMajorFragmentId(); - int otherMinorId = runtimeFilterDef.getMinorFragmentId(); - int otherHashJoinOpId = runtimeFilterDef.getHjOpId(); - int thisMajorId = this.runtimeFilterBDef.getMajorFragmentId(); - int thisMinorId = this.runtimeFilterBDef.getMinorFragmentId(); - int thisHashJoinOpId = this.runtimeFilterBDef.getHjOpId(); - return otherMajorId == thisMajorId && otherMinorId == thisMinorId && otherHashJoinOpId == thisHashJoinOpId; + public String toString() { + return identifier; } - public String toString() { - return "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId() + ",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId() + ", operatorId:" + runtimeFilterBDef.getHjOpId(); + @Override + public boolean equals(Object other) { + if (other == null) { + return false; + } + if (other instanceof RuntimeFilterWritable) { + RuntimeFilterWritable otherRFW = (RuntimeFilterWritable) other; + return this.identifier.equals(otherRFW.identifier); + } + return false; + } + + @Override + public int hashCode() { + return identifier.hashCode(); } @Override @@ -119,5 +130,4 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{ buf.release(); } } - } |