aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/work
diff options
context:
space:
mode:
authorweijie.tong <weijie.tong@alipay.com>2018-10-14 19:41:51 +0800
committerVitalii Diravka <vitalii.diravka@gmail.com>2018-11-29 18:33:23 +0200
commit9667e92e1e87ce1826f0eac3f2396187dbfa8aaa (patch)
treecb68cd4bbedf6f84e00168cb0ab300c6dacdb35c /exec/java-exec/src/main/java/org/apache/drill/exec/work
parent325fa26b5df1bc29594677a0f3e1360fbb4f8bca (diff)
DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf reference count bugs & tune the execution flow & support left deep tree
closes #1504
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/work')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java10
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java62
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterDef.java12
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterDef.java21
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java280
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java340
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java21
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java2
9 files changed, 414 insertions, 339 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index 0d97e0ac3..7915843eb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -379,11 +379,16 @@ public class WorkManager implements AutoCloseable {
return runningFragments.get(handle);
}
+ /**
+ * receive the RuntimeFilter thorough the wire
+ * @param runtimeFilter
+ */
public void receiveRuntimeFilter(final RuntimeFilterWritable runtimeFilter) {
BitData.RuntimeFilterBDef runtimeFilterDef = runtimeFilter.getRuntimeFilterBDef();
boolean toForeman = runtimeFilterDef.getToForeman();
QueryId queryId = runtimeFilterDef.getQueryId();
String queryIdStr = QueryIdHelper.getQueryId(queryId);
+ runtimeFilter.retainBuffers(1);
//to foreman
if (toForeman) {
Foreman foreman = queries.get(queryId);
@@ -393,13 +398,14 @@ public class WorkManager implements AutoCloseable {
public void run() {
final Thread currentThread = Thread.currentThread();
final String originalName = currentThread.getName();
- currentThread.setName(queryIdStr + ":foreman:registerRuntimeFilter");
+ currentThread.setName(queryIdStr + ":foreman:routeRuntimeFilter");
try {
- foreman.getRuntimeFilterRouter().registerRuntimeFilter(runtimeFilter);
+ foreman.getRuntimeFilterRouter().register(runtimeFilter);
} catch (Exception e) {
logger.warn("Exception while registering the RuntimeFilter", e);
} finally {
currentThread.setName(originalName);
+ runtimeFilter.close();
}
}
});
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
index dc6cc2fcf..afbc56a5b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java
@@ -34,6 +34,7 @@ import java.util.Arrays;
public class BloomFilter {
// Bytes in a bucket.
private static final int BYTES_PER_BUCKET = 32;
+
// Minimum bloom filter data size.
private static final int MINIMUM_BLOOM_SIZE_IN_BYTES = 256;
@@ -41,16 +42,14 @@ public class BloomFilter {
private int numBytes;
- private int mask[] = new int[8];
-
- private byte[] tempBucket = new byte[32];
-
+ private int bucketMask[] = new int[8];
public BloomFilter(int numBytes, BufferAllocator bufferAllocator) {
int size = BloomFilter.adjustByteSize(numBytes);
this.byteBuf = bufferAllocator.buffer(size);
this.numBytes = byteBuf.capacity();
- this.byteBuf.writerIndex(numBytes);
+ this.byteBuf.writeZero(this.numBytes);
+ this.byteBuf.writerIndex(this.numBytes);
}
public BloomFilter(int ndv, double fpp, BufferAllocator bufferAllocator) {
@@ -74,26 +73,27 @@ public class BloomFilter {
}
private void setMask(int key) {
- //8 odd numbers act as salt value to participate in the computation of the mask.
+ //8 odd numbers act as salt value to participate in the computation of the bucketMask.
final int SALT[] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d, 0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31};
- Arrays.fill(mask, 0);
+ Arrays.fill(bucketMask, 0);
for (int i = 0; i < 8; ++i) {
- mask[i] = key * SALT[i];
+ bucketMask[i] = key * SALT[i];
}
for (int i = 0; i < 8; ++i) {
- mask[i] = mask[i] >> 27;
+ bucketMask[i] = bucketMask[i] >>> 27;
}
for (int i = 0; i < 8; ++i) {
- mask[i] = 0x1 << mask[i];
+ bucketMask[i] = 0x1 << bucketMask[i];
}
}
/**
* Add an element's hash value to this bloom filter.
+ *
* @param hash hash result of element.
*/
public void insert(long hash) {
@@ -101,16 +101,13 @@ public class BloomFilter {
int key = (int) hash;
setMask(key);
int initialStartIndex = bucketIndex * BYTES_PER_BUCKET;
- byteBuf.getBytes(initialStartIndex, tempBucket);
for (int i = 0; i < 8; i++) {
+ int index = initialStartIndex + i * 4;
//every iterate batch,we set 32 bits
- int bitsetIndex = i * 4;
- tempBucket[bitsetIndex] = (byte) (tempBucket[bitsetIndex] | (byte) (mask[i] >>> 24));
- tempBucket[bitsetIndex + 1] = (byte) (tempBucket[(bitsetIndex) + 1] | (byte) (mask[i] >>> 16));
- tempBucket[bitsetIndex + 2] = (byte) (tempBucket[(bitsetIndex) + 2] | (byte) (mask[i] >>> 8));
- tempBucket[bitsetIndex + 3] = (byte) (tempBucket[(bitsetIndex) + 3] | (byte) (mask[i]));
+ int a = byteBuf.getInt(index);
+ a |= bucketMask[i];
+ byteBuf.setInt(index, a);
}
- byteBuf.setBytes(initialStartIndex, tempBucket);
}
/**
@@ -123,17 +120,12 @@ public class BloomFilter {
int bucketIndex = (int) (hash >> 32) & (numBytes / BYTES_PER_BUCKET - 1);
int key = (int) hash;
setMask(key);
-
int startIndex = bucketIndex * BYTES_PER_BUCKET;
- byteBuf.getBytes(startIndex, tempBucket);
for (int i = 0; i < 8; i++) {
- byte set = 0;
- int bitsetIndex = i * 4;
- set |= tempBucket[bitsetIndex] & ((byte) (mask[i] >>> 24));
- set |= tempBucket[(bitsetIndex + 1)] & ((byte) (mask[i] >>> 16));
- set |= tempBucket[(bitsetIndex + 2)] & ((byte) (mask[i] >>> 8));
- set |= tempBucket[(bitsetIndex + 3)] & ((byte) mask[i]);
- if (0 == set) {
+ int index = startIndex + i * 4;
+ int a = byteBuf.getInt(index);
+ int b = a & bucketMask[i];
+ if (b == 0) {
return false;
}
}
@@ -142,6 +134,7 @@ public class BloomFilter {
/**
* Merge this bloom filter with other one
+ *
* @param other
*/
public void or(BloomFilter other) {
@@ -150,20 +143,19 @@ public class BloomFilter {
Preconditions.checkArgument(otherLength == thisLength);
Preconditions.checkState(otherLength % BYTES_PER_BUCKET == 0);
Preconditions.checkState(thisLength % BYTES_PER_BUCKET == 0);
- byte[] otherTmpBucket = new byte[BYTES_PER_BUCKET];
- for (int i = 0; i < thisLength / BYTES_PER_BUCKET; i++) {
- byteBuf.getBytes(i * BYTES_PER_BUCKET, tempBucket);
- other.byteBuf.getBytes(i * BYTES_PER_BUCKET, otherTmpBucket);
- for (int j = 0; j < BYTES_PER_BUCKET; j++) {
- tempBucket[j] = (byte) (tempBucket[j] | otherTmpBucket[j]);
- }
- this.byteBuf.setBytes(i, tempBucket);
+ for (int i = 0; i < thisLength / 8; i++) {
+ int index = i * 8;
+ long a = byteBuf.getLong(index);
+ long b = other.byteBuf.getLong(index);
+ long c = a | b;
+ byteBuf.setLong(index, c);
}
}
/**
* Calculate optimal size according to the number of distinct values and false positive probability.
* See http://en.wikipedia.org/wiki/Bloom_filter#Probability_of_false_positives for the formula.
+ *
* @param ndv: The number of distinct values.
* @param fpp: The false positive probability.
* @return optimal number of bytes of given ndv and fpp.
@@ -177,7 +169,7 @@ public class BloomFilter {
bits |= bits >> 8;
bits |= bits >> 16;
bits++;
- int bytes = bits/8;
+ int bytes = bits / 8;
return bytes;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterDef.java
index 9a6df57ae..b2a9bd763 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterDef.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterDef.java
@@ -28,6 +28,8 @@ public class BloomFilterDef {
private boolean local;
private String probeField;
+
+ private String buildField;
//TODO
@JsonIgnore
private Double leftNDV;
@@ -37,10 +39,11 @@ public class BloomFilterDef {
@JsonCreator
public BloomFilterDef(@JsonProperty("numBytes") int numBytes, @JsonProperty("local") boolean local, @JsonProperty("probeField")
- String probeField){
+ String probeField, @JsonProperty("buildField") String buildField){
this.numBytes = numBytes;
this.local = local;
this.probeField = probeField;
+ this.buildField = buildField;
}
@@ -61,7 +64,7 @@ public class BloomFilterDef {
}
public String toString() {
- return "BF:{numBytes=" + numBytes + ",send2Foreman=" + !local + ",probeField= " + probeField + " }";
+ return "BF:{numBytes=" + numBytes + ",send2Foreman=" + !local + ",probeField= " + probeField + ",buildField= " + buildField + " }";
}
@JsonIgnore
@@ -82,4 +85,9 @@ public class BloomFilterDef {
this.rightNDV = rightNDV;
}
+ public String getBuildField()
+ {
+ return buildField;
+ }
+
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterDef.java
index 5fb51bf24..efe300f3a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterDef.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterDef.java
@@ -18,13 +18,8 @@
package org.apache.drill.exec.work.filter;
import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-
import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.drill.exec.physical.base.GroupScan;
-
-
import java.util.List;
@JsonIgnoreProperties(ignoreUnknown = true)
@@ -37,17 +32,18 @@ public class RuntimeFilterDef {
private List<BloomFilterDef> bloomFilterDefs;
private boolean sendToForeman;
- @JsonIgnore
- private GroupScan probeSideGroupScan;
+ private long runtimeFilterIdentifier;
@JsonCreator
public RuntimeFilterDef(@JsonProperty("generateBloomFilter") boolean generateBloomFilter, @JsonProperty("generateMinMaxFilter") boolean generateMinMaxFilter,
- @JsonProperty("bloomFilterDefs") List<BloomFilterDef> bloomFilterDefs, @JsonProperty("sendToForeman") boolean sendToForeman) {
+ @JsonProperty("bloomFilterDefs") List<BloomFilterDef> bloomFilterDefs, @JsonProperty("sendToForeman") boolean sendToForeman,
+ @JsonProperty("runtimeFilterIdentifier") long runtimeFilterIdentifier) {
this.generateBloomFilter = generateBloomFilter;
this.generateMinMaxFilter = generateMinMaxFilter;
this.bloomFilterDefs = bloomFilterDefs;
this.sendToForeman = sendToForeman;
+ this.runtimeFilterIdentifier = runtimeFilterIdentifier;
}
@@ -84,12 +80,11 @@ public class RuntimeFilterDef {
this.sendToForeman = sendToForeman;
}
- @JsonIgnore
- public GroupScan getProbeSideGroupScan() {
- return probeSideGroupScan;
+ public long getRuntimeFilterIdentifier() {
+ return runtimeFilterIdentifier;
}
- public void setProbeSideGroupScan(GroupScan probeSideGroupScan) {
- this.probeSideGroupScan = probeSideGroupScan;
+ public void setRuntimeFilterIdentifier(long runtimeFilterIdentifier) {
+ this.runtimeFilterIdentifier = runtimeFilterIdentifier;
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
index 6e4a9a8e5..93736c5f2 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
@@ -39,7 +39,9 @@ public class RuntimeFilterReporter {
this.context = context;
}
- public void sendOut(List<BloomFilter> bloomFilters, List<String> probeFields, boolean sendToForeman, int hashJoinOpId) {
+ public void sendOut(List<BloomFilter> bloomFilters, List<String> probeFields, RuntimeFilterDef runtimeFilterDef, int hashJoinOpId) {
+ boolean sendToForeman = runtimeFilterDef.isSendToForeman();
+ long rfIdentifier = runtimeFilterDef.getRuntimeFilterIdentifier();
ExecProtos.FragmentHandle fragmentHandle = context.getHandle();
DrillBuf[] data = new DrillBuf[bloomFilters.size()];
List<Integer> bloomFilterSizeInBytes = new ArrayList<>();
@@ -64,6 +66,7 @@ public class RuntimeFilterReporter {
.setMinorFragmentId(minorFragmentId)
.setToForeman(sendToForeman)
.setHjOpId(hashJoinOpId)
+ .setRfIdentifier(rfIdentifier)
.addAllBloomFilterSizeInBytes(bloomFilterSizeInBytes)
.build();
RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterB, data);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
index 5a8c6fc9e..a4946a96c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
@@ -17,39 +17,24 @@
*/
package org.apache.drill.exec.work.filter;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import io.netty.buffer.DrillBuf;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.drill.exec.ops.AccountingDataTunnel;
-import org.apache.drill.exec.ops.Consumer;
import org.apache.drill.exec.ops.SendingAccountor;
-import org.apache.drill.exec.ops.StatusHandler;
import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor;
import org.apache.drill.exec.physical.base.Exchange;
-import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.HashJoinPOP;
+import org.apache.drill.exec.physical.config.RuntimeFilterPOP;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.Wrapper;
-import org.apache.drill.exec.proto.BitData;
import org.apache.drill.exec.proto.CoordinationProtos;
-import org.apache.drill.exec.proto.GeneralRPCProtos;
-import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.rpc.RpcException;
-import org.apache.drill.exec.rpc.RpcOutcomeListener;
-import org.apache.drill.exec.rpc.data.DataTunnel;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
/**
* This class manages the RuntimeFilter routing information of the pushed down join predicate
@@ -69,29 +54,24 @@ import java.util.concurrent.ConcurrentHashMap;
public class RuntimeFilterRouter {
private Wrapper rootWrapper;
- //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints
- private Map<Integer, List<CoordinationProtos.DrillbitEndpoint>> joinMjId2probdeScanEps = new HashMap<>();
- //HashJoin node's major fragment id to its corresponding probe side nodes's number
- private Map<Integer, Integer> joinMjId2scanSize = new ConcurrentHashMap<>();
- //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id
- private Map<Integer, Integer> joinMjId2ScanMjId = new HashMap<>();
-
- private DrillbitContext drillbitContext;
private SendingAccountor sendingAccountor = new SendingAccountor();
+ private RuntimeFilterSink runtimeFilterSink;
+
private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterRouter.class);
/**
* This class maintains context for the runtime join push down's filter management. It
* does a traversal of the physical operators by leveraging the root wrapper which indirectly
* holds the global PhysicalOperator tree and contains the minor fragment endpoints.
+ *
* @param workUnit
* @param drillbitContext
*/
public RuntimeFilterRouter(QueryWorkUnit workUnit, DrillbitContext drillbitContext) {
this.rootWrapper = workUnit.getRootWrapper();
- this.drillbitContext = drillbitContext;
+ runtimeFilterSink = new RuntimeFilterSink(drillbitContext, sendingAccountor);
}
/**
@@ -99,6 +79,12 @@ public class RuntimeFilterRouter {
* record the relationship between the RuntimeFilter producers and consumers.
*/
public void collectRuntimeFilterParallelAndControlInfo() {
+ //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints
+ Map<Integer, List<CoordinationProtos.DrillbitEndpoint>> joinMjId2probeScanEps = new HashMap<>();
+ //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id
+ Map<Integer, Integer> joinMjId2ScanMjId = new HashMap<>();
+ Map<Integer, Integer> joinMjId2rfNumber = new HashMap<>();
+
RuntimeFilterParallelismCollector runtimeFilterParallelismCollector = new RuntimeFilterParallelismCollector();
rootWrapper.getNode().getRoot().accept(runtimeFilterParallelismCollector, null);
List<RFHelperHolder> holders = runtimeFilterParallelismCollector.getHolders();
@@ -107,67 +93,33 @@ public class RuntimeFilterRouter {
List<CoordinationProtos.DrillbitEndpoint> probeSideEndpoints = holder.getProbeSideScanEndpoints();
int probeSideScanMajorId = holder.getProbeSideScanMajorId();
int joinNodeMajorId = holder.getJoinMajorId();
+ int buildSideRfNumber = holder.getBuildSideRfNumber();
RuntimeFilterDef runtimeFilterDef = holder.getRuntimeFilterDef();
boolean sendToForeman = runtimeFilterDef.isSendToForeman();
if (sendToForeman) {
//send RuntimeFilter to Foreman
- joinMjId2probdeScanEps.put(joinNodeMajorId, probeSideEndpoints);
- joinMjId2scanSize.put(joinNodeMajorId, probeSideEndpoints.size());
+ joinMjId2probeScanEps.put(joinNodeMajorId, probeSideEndpoints);
joinMjId2ScanMjId.put(joinNodeMajorId, probeSideScanMajorId);
+ joinMjId2rfNumber.put(joinNodeMajorId, buildSideRfNumber);
}
}
+ runtimeFilterSink.setJoinMjId2probeScanEps(joinMjId2probeScanEps);
+ runtimeFilterSink.setJoinMjId2rfNumber(joinMjId2rfNumber);
+ runtimeFilterSink.setJoinMjId2ScanMjId(joinMjId2ScanMjId);
}
-
public void waitForComplete() {
sendingAccountor.waitForSendComplete();
+ runtimeFilterSink.close();
}
/**
* This method is passively invoked by receiving a runtime filter from the network
- * @param runtimeFilterWritable
+ *
+ * @param srcRuntimeFilterWritable
*/
- public void registerRuntimeFilter(RuntimeFilterWritable runtimeFilterWritable) {
- broadcastAggregatedRuntimeFilter(runtimeFilterWritable);
- }
-
-
- private void broadcastAggregatedRuntimeFilter(RuntimeFilterWritable srcRuntimeFilterWritable) {
- BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef();
- int joinMajorId = runtimeFilterB.getMajorFragmentId();
- UserBitShared.QueryId queryId = runtimeFilterB.getQueryId();
- List<String> probeFields = runtimeFilterB.getProbeFieldsList();
- DrillBuf[] data = srcRuntimeFilterWritable.getData();
- List<CoordinationProtos.DrillbitEndpoint> scanNodeEps = joinMjId2probdeScanEps.get(joinMajorId);
- int scanNodeMjId = joinMjId2ScanMjId.get(joinMajorId);
- for (int minorId = 0; minorId < scanNodeEps.size(); minorId++) {
- BitData.RuntimeFilterBDef.Builder builder = BitData.RuntimeFilterBDef.newBuilder();
- for (String probeField : probeFields) {
- builder.addProbeFields(probeField);
- }
- BitData.RuntimeFilterBDef runtimeFilterBDef = builder
- .setQueryId(queryId)
- .setMajorFragmentId(scanNodeMjId)
- .setMinorFragmentId(minorId)
- .build();
- RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, data);
- CoordinationProtos.DrillbitEndpoint drillbitEndpoint = scanNodeEps.get(minorId);
- DataTunnel dataTunnel = drillbitContext.getDataConnectionsPool().getTunnel(drillbitEndpoint);
- Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
- @Override
- public void accept(final RpcException e) {
- logger.warn("fail to broadcast a runtime filter to the probe side scan node", e);
- }
-
- @Override
- public void interrupt(final InterruptedException e) {
- logger.warn("fail to broadcast a runtime filter to the probe side scan node", e);
- }
- };
- RpcOutcomeListener<GeneralRPCProtos.Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
- AccountingDataTunnel accountingDataTunnel = new AccountingDataTunnel(dataTunnel, sendingAccountor, statusHandler);
- accountingDataTunnel.sendRuntimeFilter(runtimeFilterWritable);
- }
+ public void register(RuntimeFilterWritable srcRuntimeFilterWritable) {
+ runtimeFilterSink.add(srcRuntimeFilterWritable);
}
/**
@@ -183,18 +135,29 @@ public class RuntimeFilterRouter {
boolean isHashJoinOp = op instanceof HashJoinPOP;
if (isHashJoinOp) {
HashJoinPOP hashJoinPOP = (HashJoinPOP) op;
+ int hashJoinOpId = hashJoinPOP.getOperatorId();
RuntimeFilterDef runtimeFilterDef = hashJoinPOP.getRuntimeFilterDef();
- if (runtimeFilterDef != null) {
- if (holder == null) {
- holder = new RFHelperHolder();
+ if (runtimeFilterDef != null && runtimeFilterDef.isSendToForeman()) {
+ if (holder == null || holder.getJoinOpId() != hashJoinOpId) {
+ holder = new RFHelperHolder(hashJoinOpId);
holders.add(holder);
}
holder.setRuntimeFilterDef(runtimeFilterDef);
- GroupScan probeSideScanOp = runtimeFilterDef.getProbeSideGroupScan();
- Wrapper container = findPhysicalOpContainer(rootWrapper, hashJoinPOP);
+ long runtimeFilterIdentifier = runtimeFilterDef.getRuntimeFilterIdentifier();
+ WrapperOperatorsVisitor operatorsVisitor = new WrapperOperatorsVisitor(hashJoinPOP);
+ Wrapper container = findTargetWrapper(rootWrapper, operatorsVisitor);
+ if (container == null) {
+ throw new IllegalStateException(String.format("No valid Wrapper found for HashJoinPOP with id=%d", hashJoinPOP.getOperatorId()));
+ }
+ int buildSideRFNumber = container.getAssignedEndpoints().size();
+ holder.setBuildSideRfNumber(buildSideRFNumber);
int majorFragmentId = container.getMajorFragmentId();
holder.setJoinMajorId(majorFragmentId);
- Wrapper probeSideScanContainer = findPhysicalOpContainer(rootWrapper, probeSideScanOp);
+ WrapperRuntimeFilterOperatorsVisitor runtimeFilterOperatorsVisitor = new WrapperRuntimeFilterOperatorsVisitor(runtimeFilterIdentifier);
+ Wrapper probeSideScanContainer = findTargetWrapper(container, runtimeFilterOperatorsVisitor);
+ if (probeSideScanContainer == null) {
+ throw new IllegalStateException(String.format("No valid Wrapper found for RuntimeFilterPOP with id=%d", op.getOperatorId()));
+ }
int probeSideScanMjId = probeSideScanContainer.getMajorFragmentId();
List<CoordinationProtos.DrillbitEndpoint> probeSideScanEps = probeSideScanContainer.getAssignedEndpoints();
holder.setProbeSideScanEndpoints(probeSideScanEps);
@@ -209,59 +172,63 @@ public class RuntimeFilterRouter {
}
}
- private class WrapperOperatorsVisitor extends AbstractPhysicalVisitor<Void, Void, RuntimeException> {
+ private Wrapper findTargetWrapper(Wrapper wrapper, TargetPhysicalOperatorVisitor targetOpVisitor) {
+ targetOpVisitor.setCurrentFragment(wrapper.getNode());
+ wrapper.getNode().getRoot().accept(targetOpVisitor, null);
+ boolean contain = targetOpVisitor.isContain();
+ if (contain) {
+ return wrapper;
+ }
+ List<Wrapper> dependencies = wrapper.getFragmentDependencies();
+ if (CollectionUtils.isEmpty(dependencies)) {
+ return null;
+ }
+ for (Wrapper dependencyWrapper : dependencies) {
+ Wrapper opContainer = findTargetWrapper(dependencyWrapper, targetOpVisitor);
+ if (opContainer != null) {
+ return opContainer;
+ }
+ }
+ return null;
+ }
- private Fragment fragment;
+ private abstract class TargetPhysicalOperatorVisitor<T, X, E extends Throwable> extends AbstractPhysicalVisitor<T, X, E> {
- private boolean contain = false;
+ protected Exchange sendingExchange;
- private boolean targetIsGroupScan;
+ public void setCurrentFragment(Fragment fragment) {
+ sendingExchange = fragment.getSendingExchange();
+ }
- private boolean targetIsHashJoin;
+ public abstract boolean isContain();
+ }
- private String targetGroupScanDigest;
+ private class WrapperOperatorsVisitor extends TargetPhysicalOperatorVisitor<Void, Void, RuntimeException> {
- private String targetHashJoinJson;
+ private boolean contain = false;
+ private PhysicalOperator targetOp;
- public WrapperOperatorsVisitor(PhysicalOperator targetOp, Fragment fragment) {
- this.fragment = fragment;
- this.targetIsGroupScan = targetOp instanceof GroupScan;
- this.targetIsHashJoin = targetOp instanceof HashJoinPOP;
- this.targetGroupScanDigest = targetIsGroupScan ? ((GroupScan) targetOp).getDigest() : null;
- this.targetHashJoinJson = targetIsHashJoin ? jsonOfPhysicalOp(targetOp) : null;
+ public WrapperOperatorsVisitor(PhysicalOperator targetOp) {
+ this.targetOp = targetOp;
}
@Override
public Void visitExchange(Exchange exchange, Void value) throws RuntimeException {
- List<Fragment.ExchangeFragmentPair> exchangeFragmentPairs = fragment.getReceivingExchangePairs();
- for (Fragment.ExchangeFragmentPair exchangeFragmentPair : exchangeFragmentPairs) {
- boolean same = exchange == exchangeFragmentPair.getExchange();
- if (same) {
- return null;
- }
+ if (exchange != sendingExchange) {
+ return null;
}
return exchange.getChild().accept(this, value);
}
@Override
public Void visitOp(PhysicalOperator op, Void value) throws RuntimeException {
- boolean same = false;
- if (targetIsGroupScan && op instanceof GroupScan) {
- //Since GroupScan may be rewrite during the planing, here we use the digest to identify it.
- String currentDigest = ((GroupScan) op).getDigest();
- same = targetGroupScanDigest.equals(currentDigest);
- }
- if (targetIsHashJoin && op instanceof HashJoinPOP) {
- String currentOpJson = jsonOfPhysicalOp(op);
- same = targetHashJoinJson.equals(currentOpJson);
- }
- if (!same) {
+ if (op == targetOp) {
+ contain = true;
+ } else {
for (PhysicalOperator child : op) {
child.accept(this, value);
}
- } else {
- contain = true;
}
return null;
}
@@ -269,42 +236,57 @@ public class RuntimeFilterRouter {
public boolean isContain() {
return contain;
}
+ }
+
+ private class WrapperRuntimeFilterOperatorsVisitor extends TargetPhysicalOperatorVisitor<Void, Void, RuntimeException> {
+
+ private boolean contain = false;
+
+ private long identifier;
+
+
+ public WrapperRuntimeFilterOperatorsVisitor(long identifier) {
+ this.identifier = identifier;
+ }
- public String jsonOfPhysicalOp(PhysicalOperator operator) {
- try {
- ObjectMapper objectMapper = new ObjectMapper();
- StringWriter stringWriter = new StringWriter();
- objectMapper.writeValue(stringWriter, operator);
- return stringWriter.toString();
- } catch (IOException e) {
- throw new RuntimeException(e);
+ @Override
+ public Void visitExchange(Exchange exchange, Void value) throws RuntimeException {
+ if (exchange != sendingExchange) {
+ return null;
}
+ return exchange.getChild().accept(this, value);
}
- }
- private boolean containsPhysicalOperator(Wrapper wrapper, PhysicalOperator op) {
- WrapperOperatorsVisitor wrapperOpsVistitor = new WrapperOperatorsVisitor(op, wrapper.getNode());
- wrapper.getNode().getRoot().accept(wrapperOpsVistitor, null);
- return wrapperOpsVistitor.isContain();
- }
+ @Override
+ public Void visitOp(PhysicalOperator op, Void value) throws RuntimeException {
+ boolean same;
+ boolean isRuntimeFilterPop = op instanceof RuntimeFilterPOP;
+ boolean isHashJoinPop = op instanceof HashJoinPOP;
- private Wrapper findPhysicalOpContainer(Wrapper wrapper, PhysicalOperator op) {
- boolean contain = containsPhysicalOperator(wrapper, op);
- if (contain) {
- return wrapper;
- }
- List<Wrapper> dependencies = wrapper.getFragmentDependencies();
- if (CollectionUtils.isEmpty(dependencies)) {
+ if (isHashJoinPop) {
+ HashJoinPOP hashJoinPOP = (HashJoinPOP) op;
+ PhysicalOperator leftPop = hashJoinPOP.getLeft();
+ leftPop.accept(this, value);
+ return null;
+ }
+
+ if (isRuntimeFilterPop) {
+ RuntimeFilterPOP runtimeFilterPOP = (RuntimeFilterPOP) op;
+ same = this.identifier == runtimeFilterPOP.getIdentifier();
+ if (same) {
+ contain = true;
+ return null;
+ }
+ }
+ for (PhysicalOperator child : op) {
+ child.accept(this, value);
+ }
return null;
}
- for (Wrapper dependencyWrapper : dependencies) {
- Wrapper opContainer = findPhysicalOpContainer(dependencyWrapper, op);
- if (opContainer != null) {
- return opContainer;
- }
+
+ public boolean isContain() {
+ return contain;
}
- //should not be here
- throw new IllegalStateException(String.format("No valid Wrapper found for physicalOperator with id=%d", op.getOperatorId()));
}
/**
@@ -320,6 +302,22 @@ public class RuntimeFilterRouter {
private RuntimeFilterDef runtimeFilterDef;
+ private int joinOpId;
+
+ private int buildSideRfNumber;
+
+ public RFHelperHolder(int joinOpId) {
+ this.joinOpId = joinOpId;
+ }
+
+ public int getJoinOpId() {
+ return joinOpId;
+ }
+
+ public void setJoinOpId(int joinOpId) {
+ this.joinOpId = joinOpId;
+ }
+
public List<CoordinationProtos.DrillbitEndpoint> getProbeSideScanEndpoints() {
return probeSideScanEndpoints;
}
@@ -352,5 +350,13 @@ public class RuntimeFilterRouter {
public void setRuntimeFilterDef(RuntimeFilterDef runtimeFilterDef) {
this.runtimeFilterDef = runtimeFilterDef;
}
+
+ public int getBuildSideRfNumber() {
+ return buildSideRfNumber;
+ }
+
+ public void setBuildSideRfNumber(int buildSideRfNumber) {
+ this.buildSideRfNumber = buildSideRfNumber;
+ }
}
-}
+} \ No newline at end of file
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
index 14686254f..f69a44ef7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
@@ -17,206 +17,250 @@
*/
package org.apache.drill.exec.work.filter;
-import org.apache.drill.exec.memory.BufferAllocator;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.ops.AccountingDataTunnel;
+import org.apache.drill.exec.ops.Consumer;
+import org.apache.drill.exec.ops.SendingAccountor;
+import org.apache.drill.exec.ops.StatusHandler;
+import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.CoordinationProtos;
+import org.apache.drill.exec.proto.GeneralRPCProtos;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.rpc.RpcException;
+import org.apache.drill.exec.rpc.RpcOutcomeListener;
+import org.apache.drill.exec.rpc.data.DataTunnel;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
/**
* This sink receives the RuntimeFilters from the netty thread,
- * aggregates them in an async thread, supplies the aggregated
- * one to the fragment running thread.
+ * aggregates them in an async thread, broadcast the final aggregated
+ * one to the RuntimeFilterRecordBatch.
*/
-public class RuntimeFilterSink implements AutoCloseable {
-
- private AtomicInteger currentBookId = new AtomicInteger(0);
+public class RuntimeFilterSink implements Closeable
+{
- private int staleBookId = 0;
+ private BlockingQueue<RuntimeFilterWritable> rfQueue = new LinkedBlockingQueue<>();
- /**
- * RuntimeFilterWritable holding the aggregated version of all the received filter
- */
- private RuntimeFilterWritable aggregated = null;
+ private Map<Integer, Integer> joinMjId2rfNumber;
- private BlockingQueue<RuntimeFilterWritable> rfQueue = new LinkedBlockingQueue<>();
+ //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints
+ private Map<Integer, List<CoordinationProtos.DrillbitEndpoint>> joinMjId2probeScanEps = new HashMap<>();
- /**
- * Flag used by Minor Fragment thread to indicate it has encountered error
- */
- private AtomicBoolean running = new AtomicBoolean(true);
+ //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id
+ private Map<Integer, Integer> joinMjId2ScanMjId = new HashMap<>();
- /**
- * Lock used to synchronize between producer (Netty Thread) and consumer (AsyncAggregateThread) of elements of this
- * queue. This is needed because in error condition running flag can be consumed by producer and consumer thread at
- * different times. Whoever sees it first will take this lock and clear all elements and set the queue to null to
- * indicate producer not to put any new elements in it.
- */
- private ReentrantLock queueLock = new ReentrantLock();
+ //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable
+ private Map<Integer, RuntimeFilterWritable> joinMjId2AggregatedRF = new HashMap<>();
+ //for debug usage
+ private Map<Integer, Stopwatch> joinMjId2Stopwatch = new HashMap<>();
- private Condition notEmpty = queueLock.newCondition();
+ private DrillbitContext drillbitContext;
- private ReentrantLock aggregatedRFLock = new ReentrantLock();
+ private SendingAccountor sendingAccountor;
- private BufferAllocator bufferAllocator;
+ private AsyncAggregateWorker asyncAggregateWorker;
- private Future future;
+ private AtomicBoolean running = new AtomicBoolean(true);
private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class);
- public RuntimeFilterSink(BufferAllocator bufferAllocator, ExecutorService executorService) {
- this.bufferAllocator = bufferAllocator;
- AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
- future = executorService.submit(asyncAggregateWorker);
+ public RuntimeFilterSink(DrillbitContext drillbitContext, SendingAccountor sendingAccountor)
+ {
+ this.drillbitContext = drillbitContext;
+ this.sendingAccountor = sendingAccountor;
+ asyncAggregateWorker = new AsyncAggregateWorker();
+ drillbitContext.getExecutor().submit(asyncAggregateWorker);
}
- public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
- if (running.get()) {
- try {
- aggregatedRFLock.lock();
- if (containOne()) {
- boolean same = aggregated.equals(runtimeFilterWritable);
- if (!same) {
- // This is to solve the only one fragment case that two RuntimeFilterRecordBatchs
- // share the same FragmentContext.
- aggregated.close();
- currentBookId.set(0);
- staleBookId = 0;
- clearQueued(false);
- }
- }
- } finally {
- aggregatedRFLock.unlock();
- }
+ public void add(RuntimeFilterWritable runtimeFilterWritable)
+ {
+ if (!running.get()) {
+ runtimeFilterWritable.close();
+ return;
+ }
+ runtimeFilterWritable.retainBuffers(1);
+ int joinMjId = runtimeFilterWritable.getRuntimeFilterBDef().getMajorFragmentId();
+ if (joinMjId2Stopwatch.get(joinMjId) == null) {
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ joinMjId2Stopwatch.put(joinMjId, stopwatch);
+ }
+ synchronized (rfQueue) {
+ rfQueue.add(runtimeFilterWritable);
+ rfQueue.notify();
+ }
+ }
+ public void close() {
+ running.set(false);
+ if (asyncAggregateWorker != null) {
+ synchronized (rfQueue) {
+ rfQueue.notify();
+ }
+ }
+ while (!asyncAggregateWorker.over.get()) {
try {
- queueLock.lock();
- if (rfQueue != null) {
- rfQueue.add(runtimeFilterWritable);
- notEmpty.signal();
- } else {
- runtimeFilterWritable.close();
- }
- } finally {
- queueLock.unlock();
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ logger.error("interrupted while sleeping to wait for the aggregating worker thread to exit", e);
}
- } else {
+ }
+ for (RuntimeFilterWritable runtimeFilterWritable : joinMjId2AggregatedRF.values()) {
runtimeFilterWritable.close();
}
}
- public RuntimeFilterWritable fetchLatestDuplicatedAggregatedOne() {
- try {
- aggregatedRFLock.lock();
- return aggregated.duplicate(bufferAllocator);
- } finally {
- aggregatedRFLock.unlock();
+ private void aggregate(RuntimeFilterWritable srcRuntimeFilterWritable)
+ {
+ BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef();
+ int joinMajorId = runtimeFilterB.getMajorFragmentId();
+ int buildSideRfNumber;
+ RuntimeFilterWritable toAggregated = null;
+ buildSideRfNumber = joinMjId2rfNumber.get(joinMajorId);
+ buildSideRfNumber--;
+ joinMjId2rfNumber.put(joinMajorId, buildSideRfNumber);
+ toAggregated = joinMjId2AggregatedRF.get(joinMajorId);
+ if (toAggregated == null) {
+ toAggregated = srcRuntimeFilterWritable;
+ toAggregated.retainBuffers(1);
+ } else {
+ toAggregated.aggregate(srcRuntimeFilterWritable);
}
- }
-
- /**
- * whether there's a fresh aggregated RuntimeFilter
- *
- * @return
- */
- public boolean hasFreshOne() {
- if (currentBookId.get() > staleBookId) {
- staleBookId = currentBookId.get();
- return true;
+ joinMjId2AggregatedRF.put(joinMajorId, toAggregated);
+ if (buildSideRfNumber == 0) {
+ joinMjId2AggregatedRF.remove(joinMajorId);
+ route(toAggregated);
+ joinMjId2rfNumber.remove(joinMajorId);
+ Stopwatch stopwatch = joinMjId2Stopwatch.get(joinMajorId);
+ logger.info(
+ "received all the RFWs belonging to the majorId {}'s HashJoin nodes and flushed aggregated RFW out elapsed {} ms",
+ joinMajorId,
+ stopwatch.elapsed(TimeUnit.MILLISECONDS)
+ );
}
- return false;
- }
-
- /**
- * whether there's a usable RuntimeFilter.
- *
- * @return
- */
- public boolean containOne() {
- return aggregated != null;
}
- private void doCleanup() {
- running.compareAndSet(true, false);
- try {
- aggregatedRFLock.lock();
- if (containOne()) {
- aggregated.close();
- aggregated = null;
+ private void route(RuntimeFilterWritable srcRuntimeFilterWritable)
+ {
+ BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef();
+ int joinMajorId = runtimeFilterB.getMajorFragmentId();
+ UserBitShared.QueryId queryId = runtimeFilterB.getQueryId();
+ List<String> probeFields = runtimeFilterB.getProbeFieldsList();
+ List<Integer> sizeInBytes = runtimeFilterB.getBloomFilterSizeInBytesList();
+ long rfIdentifier = runtimeFilterB.getRfIdentifier();
+ DrillBuf[] data = srcRuntimeFilterWritable.getData();
+ List<CoordinationProtos.DrillbitEndpoint> scanNodeEps = joinMjId2probeScanEps.get(joinMajorId);
+ int scanNodeSize = scanNodeEps.size();
+ srcRuntimeFilterWritable.retainBuffers(scanNodeSize - 1);
+ int scanNodeMjId = joinMjId2ScanMjId.get(joinMajorId);
+ for (int minorId = 0; minorId < scanNodeEps.size(); minorId++) {
+ BitData.RuntimeFilterBDef.Builder builder = BitData.RuntimeFilterBDef.newBuilder();
+ for (String probeField : probeFields) {
+ builder.addProbeFields(probeField);
}
- } finally {
- aggregatedRFLock.unlock();
+ BitData.RuntimeFilterBDef runtimeFilterBDef = builder.setQueryId(queryId)
+ .setMajorFragmentId(scanNodeMjId)
+ .setMinorFragmentId(minorId)
+ .setToForeman(false)
+ .setRfIdentifier(rfIdentifier)
+ .addAllBloomFilterSizeInBytes(sizeInBytes)
+ .build();
+ RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, data);
+ CoordinationProtos.DrillbitEndpoint drillbitEndpoint = scanNodeEps.get(minorId);
+
+ DataTunnel dataTunnel = drillbitContext.getDataConnectionsPool().getTunnel(drillbitEndpoint);
+ Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>()
+ {
+ @Override
+ public void accept(final RpcException e)
+ {
+ logger.warn("fail to broadcast a runtime filter to the probe side scan node", e);
+ }
+
+ @Override
+ public void interrupt(final InterruptedException e)
+ {
+ logger.warn("fail to broadcast a runtime filter to the probe side scan node", e);
+ }
+ };
+ RpcOutcomeListener<GeneralRPCProtos.Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor);
+ AccountingDataTunnel accountingDataTunnel = new AccountingDataTunnel(dataTunnel, sendingAccountor, statusHandler);
+ accountingDataTunnel.sendRuntimeFilter(runtimeFilterWritable);
}
}
- @Override
- public void close() throws Exception {
- future.cancel(true);
- doCleanup();
+ public void setJoinMjId2rfNumber(Map<Integer, Integer> joinMjId2rfNumber)
+ {
+ this.joinMjId2rfNumber = joinMjId2rfNumber;
}
- private void clearQueued(boolean setToNull) {
- RuntimeFilterWritable toClear;
- try {
- queueLock.lock();
- while (rfQueue != null && (toClear = rfQueue.poll()) != null) {
- toClear.close();
- }
- rfQueue = (setToNull) ? null : rfQueue;
- } finally {
- queueLock.unlock();
- }
+ public void setJoinMjId2probeScanEps(Map<Integer, List<CoordinationProtos.DrillbitEndpoint>> joinMjId2probeScanEps)
+ {
+ this.joinMjId2probeScanEps = joinMjId2probeScanEps;
}
- private class AsyncAggregateWorker implements Runnable {
+ public void setJoinMjId2ScanMjId(Map<Integer, Integer> joinMjId2ScanMjId)
+ {
+ this.joinMjId2ScanMjId = joinMjId2ScanMjId;
+ }
+
+ private class AsyncAggregateWorker implements Runnable
+ {
+ private AtomicBoolean over = new AtomicBoolean(false);
@Override
- public void run() {
- try {
+ public void run()
+ {
+ while ((joinMjId2rfNumber == null || !joinMjId2rfNumber.isEmpty() ) && running.get()) {
RuntimeFilterWritable toAggregate = null;
- while (running.get()) {
+ synchronized (rfQueue) {
try {
- queueLock.lock();
- toAggregate = (rfQueue != null) ? rfQueue.poll() : null;
- if (toAggregate == null) {
- notEmpty.await();
- continue;
+ toAggregate = rfQueue.poll();
+ while (toAggregate == null && running.get()) {
+ rfQueue.wait();
+ toAggregate = rfQueue.poll();
}
- } finally {
- queueLock.unlock();
+ } catch (InterruptedException ex) {
+ logger.error("RFW_Aggregator thread being interrupted", ex);
+ continue;
}
-
- try {
- aggregatedRFLock.lock();
- if (containOne()) {
- aggregated.aggregate(toAggregate);
-
- // Release the byteBuf referenced by toAggregate since aggregate will not do it
- toAggregate.close();
- } else {
- aggregated = toAggregate;
- }
- } finally {
- aggregatedRFLock.unlock();
+ }
+ if (toAggregate == null) {
+ continue;
+ }
+ // perform aggregate outside the sync block.
+ try {
+ aggregate(toAggregate);
+ } catch (Exception ex) {
+ logger.error("Failed to aggregate or route the RFW", ex);
+ throw new DrillRuntimeException(ex);
+ } finally {
+ if (toAggregate != null) {
+ toAggregate.close();
}
- currentBookId.incrementAndGet();
}
- } catch (InterruptedException e) {
- logger.info("RFAggregating Thread : {} was interrupted.", Thread.currentThread().getName());
- Thread.currentThread().interrupt();
- } finally {
- doCleanup();
- clearQueued(true);
}
+
+ if (!running.get()) {
+ RuntimeFilterWritable toClose;
+ while ((toClose = rfQueue.poll()) != null) {
+ toClose.close();
+ }
+ }
+ over.set(true);
}
}
}
-
-
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
index 9a971e94c..f8c2701b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
@@ -103,6 +103,27 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{
return new RuntimeFilterWritable(runtimeFilterBDef, cloned);
}
+ public void retainBuffers(final int increment) {
+ if (increment <= 0) {
+ return;
+ }
+ for (final DrillBuf buf : data) {
+ buf.retain(increment);
+ }
+ }
+ //TODO: Not used currently because of DRILL-6826
+ public RuntimeFilterWritable newRuntimeFilterWritable(BufferAllocator bufferAllocator) {
+ int bufNum = data.length;
+ DrillBuf [] newBufs = new DrillBuf[bufNum];
+ int i = 0;
+ for (DrillBuf buf : data) {
+ DrillBuf transferredBuffer = buf.transferOwnership(bufferAllocator).buffer;
+ newBufs[i] = transferredBuffer;
+ i++;
+ }
+ return new RuntimeFilterWritable(this.runtimeFilterBDef, newBufs);
+ }
+
public String toString() {
return identifier;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 42b76f278..a379db175 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -151,7 +151,7 @@ public class Foreman implements Runnable {
this.fragmentsRunner = new FragmentsRunner(bee, initiatingClient, drillbitContext, this);
this.queryStateProcessor = new QueryStateProcessor(queryIdString, queryManager, drillbitContext, new ForemanResult());
this.profileOption = setProfileOption(queryContext.getOptions());
- this.enableRuntimeFilter = drillbitContext.getOptionManager().getBoolean(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_KEY);
+ this.enableRuntimeFilter = queryContext.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_KEY).bool_val;
}