aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache
diff options
context:
space:
mode:
authorSorabh Hamirwasia <sorabh@apache.org>2018-09-26 13:20:02 -0700
committerSorabh Hamirwasia <sorabh@apache.org>2018-10-10 09:51:02 -0700
commitde76e135316086386e2f7edd04ec1d5ca479bc59 (patch)
tree0726d0d7bd1273239573244d32fafb44d05bb079 /exec/java-exec/src/main/java/org/apache
parent216b1237739935b04c4f54b3f6f05371a4644085 (diff)
DRILL-6731: Resolving race conditions in RuntimeFilterSink
Add condition variable to avoid starvation of producer thread while acquiring queue lock
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java127
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java36
3 files changed, 114 insertions, 61 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
index bfba5f2c1..fcfa2bca1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
@@ -222,18 +222,10 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
holder.setFromBuildSide(true);
right.accept(this, holder);
boolean routeToForeman = holder.needToRouteToForeman();
- if (!routeToForeman) {
- runtimeFilterDef.setSendToForeman(false);
- } else {
- runtimeFilterDef.setSendToForeman(true);
- }
+ runtimeFilterDef.setSendToForeman(routeToForeman);
List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs();
for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
- if (!routeToForeman) {
- bloomFilterDef.setLocal(true);
- } else {
- bloomFilterDef.setLocal(false);
- }
+ bloomFilterDef.setLocal(!routeToForeman);
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
index 8f4c8230e..754c68e70 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
@@ -25,6 +25,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
@@ -38,12 +39,28 @@ public class RuntimeFilterSink implements AutoCloseable {
private int staleBookId = 0;
+ /**
+ * RuntimeFilterWritable holding the aggregated version of all the received filter
+ */
private RuntimeFilterWritable aggregated = null;
private BlockingQueue<RuntimeFilterWritable> rfQueue = new LinkedBlockingQueue<>();
+ /**
+ * Flag used by Minor Fragment thread to indicate it has encountered error
+ */
private AtomicBoolean running = new AtomicBoolean(true);
+ /**
+ * Lock used to synchronize between producer (Netty Thread) and consumer (AsyncAggregateThread) of elements of this
+ * queue. This is needed because in error condition running flag can be consumed by producer and consumer thread at
+ * different times. Whoever sees it first will take this lock and clear all elements and set the queue to null to
+ * indicate producer not to put any new elements in it.
+ */
+ private ReentrantLock queueLock = new ReentrantLock();
+
+ private Condition notEmpty = queueLock.newCondition();
+
private ReentrantLock aggregatedRFLock = new ReentrantLock();
private Thread asyncAggregateThread;
@@ -62,24 +79,34 @@ public class RuntimeFilterSink implements AutoCloseable {
public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
if (running.get()) {
- if (containOne()) {
- boolean same = aggregated.same(runtimeFilterWritable);
- if (!same) {
- //This is to solve the only one fragment case that two RuntimeFilterRecordBatchs
- //share the same FragmentContext.
- try {
- aggregatedRFLock.lock();
+ try {
+ aggregatedRFLock.lock();
+ if (containOne()) {
+ boolean same = aggregated.equals(runtimeFilterWritable);
+ if (!same) {
+ // This is to solve the only one fragment case that two RuntimeFilterRecordBatchs
+ // share the same FragmentContext.
aggregated.close();
- aggregated = null;
- } finally {
- aggregatedRFLock.unlock();
+ currentBookId.set(0);
+ staleBookId = 0;
+ clearQueued(false);
}
- currentBookId.set(0);
- staleBookId = 0;
- clearQueued();
}
+ } finally {
+ aggregatedRFLock.unlock();
+ }
+
+ try {
+ queueLock.lock();
+ if (rfQueue != null) {
+ rfQueue.add(runtimeFilterWritable);
+ notEmpty.signal();
+ } else {
+ runtimeFilterWritable.close();
+ }
+ } finally {
+ queueLock.unlock();
}
- rfQueue.add(runtimeFilterWritable);
} else {
runtimeFilterWritable.close();
}
@@ -116,53 +143,77 @@ public class RuntimeFilterSink implements AutoCloseable {
return aggregated != null;
}
- @Override
- public void close() throws Exception {
+ private void doCleanup() {
running.compareAndSet(true, false);
- asyncAggregateThread.interrupt();
- if (containOne()) {
- try {
- aggregatedRFLock.lock();
+ try {
+ aggregatedRFLock.lock();
+ if (containOne()) {
aggregated.close();
- } finally {
- aggregatedRFLock.unlock();
+ aggregated = null;
}
+ } finally {
+ aggregatedRFLock.unlock();
}
- clearQueued();
}
- private void clearQueued() {
+ @Override
+ public void close() throws Exception {
+ asyncAggregateThread.interrupt();
+ doCleanup();
+ }
+
+ private void clearQueued(boolean setToNull) {
RuntimeFilterWritable toClear;
- while ((toClear = rfQueue.poll()) != null) {
- toClear.close();
+ try {
+ queueLock.lock();
+ while (rfQueue != null && (toClear = rfQueue.poll()) != null) {
+ toClear.close();
+ }
+ rfQueue = (setToNull) ? null : rfQueue;
+ } finally {
+ queueLock.unlock();
}
}
- class AsyncAggregateWorker implements Runnable {
+ private class AsyncAggregateWorker implements Runnable {
@Override
public void run() {
try {
+ RuntimeFilterWritable toAggregate = null;
while (running.get()) {
- RuntimeFilterWritable toAggregate = rfQueue.take();
- if (!running.get()) {
- toAggregate.close();
- return;
+ try {
+ queueLock.lock();
+ toAggregate = (rfQueue != null) ? rfQueue.poll() : null;
+ if (toAggregate == null) {
+ notEmpty.await();
+ continue;
+ }
+ } finally {
+ queueLock.unlock();
}
- if (containOne()) {
- try {
- aggregatedRFLock.lock();
+
+ try {
+ aggregatedRFLock.lock();
+ if (containOne()) {
aggregated.aggregate(toAggregate);
- } finally {
- aggregatedRFLock.unlock();
+
+ // Release the byteBuf referenced by toAggregate since aggregate will not do it
+ toAggregate.close();
+ } else {
+ aggregated = toAggregate;
}
- } else {
- aggregated = toAggregate;
+ } finally {
+ aggregatedRFLock.unlock();
}
currentBookId.incrementAndGet();
}
} catch (InterruptedException e) {
logger.info("Thread : {} was interrupted.", asyncAggregateThread.getName(), e);
+ Thread.currentThread().interrupt();
+ } finally {
+ doCleanup();
+ clearQueued(true);
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
index 302a4801c..9a971e94c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
@@ -36,9 +36,14 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{
private DrillBuf[] data;
+ private String identifier;
+
public RuntimeFilterWritable(BitData.RuntimeFilterBDef runtimeFilterBDef, DrillBuf... data) {
this.runtimeFilterBDef = runtimeFilterBDef;
this.data = data;
+ this.identifier = "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId()
+ + ",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId()
+ + ", srcOperatorId:" + runtimeFilterBDef.getHjOpId();
}
@@ -90,7 +95,7 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{
int capacity = src.readableBytes();
DrillBuf duplicateOne = bufferAllocator.buffer(capacity);
int readerIndex = src.readerIndex();
- src.readBytes(duplicateOne, 0, capacity);
+ duplicateOne.writeBytes(src);
src.readerIndex(readerIndex);
cloned[i] = duplicateOne;
i++;
@@ -98,19 +103,25 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{
return new RuntimeFilterWritable(runtimeFilterBDef, cloned);
}
- public boolean same(RuntimeFilterWritable other) {
- BitData.RuntimeFilterBDef runtimeFilterDef = other.getRuntimeFilterBDef();
- int otherMajorId = runtimeFilterDef.getMajorFragmentId();
- int otherMinorId = runtimeFilterDef.getMinorFragmentId();
- int otherHashJoinOpId = runtimeFilterDef.getHjOpId();
- int thisMajorId = this.runtimeFilterBDef.getMajorFragmentId();
- int thisMinorId = this.runtimeFilterBDef.getMinorFragmentId();
- int thisHashJoinOpId = this.runtimeFilterBDef.getHjOpId();
- return otherMajorId == thisMajorId && otherMinorId == thisMinorId && otherHashJoinOpId == thisHashJoinOpId;
+ public String toString() {
+ return identifier;
}
- public String toString() {
- return "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId() + ",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId() + ", operatorId:" + runtimeFilterBDef.getHjOpId();
+ @Override
+ public boolean equals(Object other) {
+ if (other == null) {
+ return false;
+ }
+ if (other instanceof RuntimeFilterWritable) {
+ RuntimeFilterWritable otherRFW = (RuntimeFilterWritable) other;
+ return this.identifier.equals(otherRFW.identifier);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return identifier.hashCode();
}
@Override
@@ -119,5 +130,4 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{
buf.release();
}
}
-
}