aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src
diff options
context:
space:
mode:
authorweijie.tong <weijie.tong@alipay.com>2018-09-06 19:23:35 +0800
committerSorabh Hamirwasia <sorabh@apache.org>2018-10-10 09:50:04 -0700
commit216b1237739935b04c4f54b3f6f05371a4644085 (patch)
tree50f58c2d7438a7b2e6d86372caf207795d0254c3 /exec/java-exec/src
parentd5146c43986f09f132f4e96966082732a3740181 (diff)
DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFilter
Diffstat (limited to 'exec/java-exec/src')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java8
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java16
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java43
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java24
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java4
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java5
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java (renamed from exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java)60
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java171
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java34
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java16
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java11
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java12
13 files changed, 298 insertions, 111 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
index 608f05c56..88c21d9e9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.drill.common.config.DrillConfig;
@@ -160,16 +161,15 @@ public interface FragmentContext extends UdfUtilities, AutoCloseable {
void close();
/**
- * Return null ,if setRuntimeFilter not being called
* @return
*/
- RuntimeFilterWritable getRuntimeFilter();
+ RuntimeFilterSink getRuntimeFilterSink();
/**
- * Set a RuntimeFilter when the RuntimeFilter receiver belongs to the same MinorFragment
+ * add a RuntimeFilter when the RuntimeFilter receiver belongs to the same MinorFragment
* @param runtimeFilter
*/
- public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter);
+ public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter);
interface ExecutorState {
/**
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
index a8980785a..1f9d48970 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java
@@ -61,6 +61,7 @@ import org.apache.drill.exec.testing.ExecutionControls;
import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.drill.exec.work.batch.IncomingBuffers;
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
import org.apache.drill.shaded.guava.com.google.common.base.Function;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -136,7 +137,7 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
/** Stores constants and their holders by type */
private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache;
- private RuntimeFilterWritable runtimeFilterWritable;
+ private RuntimeFilterSink runtimeFilterSink;
/**
* Create a FragmentContext instance for non-root fragment.
@@ -208,6 +209,7 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
stats = new FragmentStats(allocator, fragment.getAssignment());
bufferManager = new BufferManagerImpl(this.allocator);
constantValueHolderCache = Maps.newHashMap();
+ this.runtimeFilterSink = new RuntimeFilterSink(this.allocator);
}
/**
@@ -348,13 +350,13 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
}
@Override
- public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
- this.runtimeFilterWritable = runtimeFilter;
+ public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
+ this.runtimeFilterSink.aggregate(runtimeFilter);
}
@Override
- public RuntimeFilterWritable getRuntimeFilter() {
- return runtimeFilterWritable;
+ public RuntimeFilterSink getRuntimeFilterSink() {
+ return runtimeFilterSink;
}
/**
@@ -470,8 +472,8 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor
for (OperatorContextImpl opContext : contexts) {
suppressingClose(opContext);
}
- if (runtimeFilterWritable != null) {
- suppressingClose(runtimeFilterWritable);
+ if (runtimeFilterSink != null) {
+ suppressingClose(runtimeFilterSink);
}
suppressingClose(bufferManager);
suppressingClose(allocator);
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
index bc21580d3..9248bbc69 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java
@@ -36,7 +36,9 @@ import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.work.filter.BloomFilter;
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
+
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
@@ -56,6 +58,8 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
private Map<String, Integer> field2id = new HashMap<>();
private List<String> toFilterFields;
private List<BloomFilter> bloomFilters;
+ private RuntimeFilterWritable current;
+ private RuntimeFilterWritable previous;
private int originalRecordCount;
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterRecordBatch.class);
@@ -102,6 +106,9 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
sv2.clear();
}
super.close();
+ if (current != null) {
+ current.close();
+ }
}
@Override
@@ -148,30 +155,36 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
* schema change hash64 should be reset and this method needs to be called again.
*/
private void setupHashHelper() {
- final RuntimeFilterWritable runtimeFilterWritable = context.getRuntimeFilter();
-
+ final RuntimeFilterSink runtimeFilterSink = context.getRuntimeFilterSink();
// Check if RuntimeFilterWritable was received by the minor fragment or not
- if (runtimeFilterWritable == null) {
+ if (!runtimeFilterSink.containOne()) {
return;
}
-
- // Check if bloomFilters is initialized or not
- if (bloomFilters == null) {
- bloomFilters = runtimeFilterWritable.unwrap();
+ if (runtimeFilterSink.hasFreshOne()) {
+ RuntimeFilterWritable freshRuntimeFilterWritable = runtimeFilterSink.fetchLatestDuplicatedAggregatedOne();
+ if (current == null) {
+ current = freshRuntimeFilterWritable;
+ previous = freshRuntimeFilterWritable;
+ } else {
+ previous = current;
+ current = freshRuntimeFilterWritable;
+ previous.close();
+ }
+ bloomFilters = current.unwrap();
}
-
// Check if HashHelper is initialized or not
if (hash64 == null) {
ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(incoming, context);
try {
//generate hash helper
- this.toFilterFields = runtimeFilterWritable.getRuntimeFilterBDef().getProbeFieldsList();
+ this.toFilterFields = current.getRuntimeFilterBDef().getProbeFieldsList();
List<LogicalExpression> hashFieldExps = new ArrayList<>();
List<TypedFieldId> typedFieldIds = new ArrayList<>();
for (String toFilterField : toFilterFields) {
SchemaPath schemaPath = new SchemaPath(new PathSegment.NameSegment(toFilterField), ExpressionPosition.UNKNOWN);
TypedFieldId typedFieldId = container.getValueVectorId(schemaPath);
- this.field2id.put(toFilterField, typedFieldId.getFieldIds()[0]);
+ int[] fieldIds = typedFieldId.getFieldIds();
+ this.field2id.put(toFilterField, fieldIds[0]);
typedFieldIds.add(typedFieldId);
ValueVectorReadExpression toHashFieldExp = new ValueVectorReadExpression(typedFieldId);
hashFieldExps.add(toHashFieldExp);
@@ -195,11 +208,9 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
sv2.setRecordCount(0);
return;
}
-
- final RuntimeFilterWritable runtimeFilterWritable = context.getRuntimeFilter();
+ final RuntimeFilterSink runtimeFilterSink = context.getRuntimeFilterSink();
sv2.allocateNew(originalRecordCount);
-
- if (runtimeFilterWritable == null) {
+ if (!runtimeFilterSink.containOne()) {
// means none of the rows are filtered out hence set all the indexes
for (int i = 0; i < originalRecordCount; ++i) {
sv2.setIndex(i, i);
@@ -207,10 +218,8 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF
sv2.setRecordCount(originalRecordCount);
return;
}
-
- // Setup a hash helper if need be
+ // Setup a hash helper if needed
setupHashHelper();
-
//To make each independent bloom filter work together to construct a final filter result: BitSet.
BitSet bitSet = new BitSet(originalRecordCount);
for (int i = 0; i < toFilterFields.size(); i++) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 658f03a33..3d456967f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -724,7 +724,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
runtimeFilterReporter = new RuntimeFilterReporter((ExecutorFragmentContext) context);
RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
//RuntimeFilter is not a necessary part of a HashJoin operator, only the query which satisfy the
- //RuntimeFilterManager's judgement will have the RuntimeFilterDef.
+ //RuntimeFilterRouter's judgement will have the RuntimeFilterDef.
if (runtimeFilterDef != null) {
List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs();
for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
@@ -944,7 +944,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> {
if (cycleNum == 0 && enableRuntimeFilter) {
if (bloomFilters.size() > 0) {
- runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef().isSendToForeman());
+ int hashJoinOpId = this.popConfig.getOperatorId();
+ runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef().isSendToForeman(), hashJoinOpId);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
index c31e491a3..bfba5f2c1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
@@ -197,11 +197,8 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
@Override
public Void visitExchange(ExchangePrel exchange, RFHelperHolder holder) throws RuntimeException {
if (holder != null) {
- boolean broadcastExchange = exchange instanceof BroadcastExchangePrel;
if (holder.isFromBuildSide()) {
- //To the build side ,we need to identify whether the HashJoin's direct children have a Broadcast node to mark
- //this HashJoin as BroadcastHashJoin
- holder.setEncounteredBroadcastExchange(broadcastExchange);
+ holder.setBuildSideExchange(exchange);
}
}
return visitPrel(exchange, holder);
@@ -224,15 +221,15 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
Prel right = (Prel) hashJoinPrel.getRight();
holder.setFromBuildSide(true);
right.accept(this, holder);
- boolean buildSideEncountererdBroadcastExchange = holder.isEncounteredBroadcastExchange();
- if (buildSideEncountererdBroadcastExchange) {
+ boolean routeToForeman = holder.needToRouteToForeman();
+ if (!routeToForeman) {
runtimeFilterDef.setSendToForeman(false);
} else {
runtimeFilterDef.setSendToForeman(true);
}
List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs();
for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
- if (buildSideEncountererdBroadcastExchange) {
+ if (!routeToForeman) {
bloomFilterDef.setLocal(true);
} else {
bloomFilterDef.setLocal(false);
@@ -338,18 +335,17 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
* RuntimeFilter helper util holder
*/
private static class RFHelperHolder {
- //whether this join operator is a partitioned HashJoin or broadcast HashJoin,
- //also single node HashJoin is not expected to do JPPD.
- private boolean encounteredBroadcastExchange;
private boolean fromBuildSide;
- public boolean isEncounteredBroadcastExchange() {
- return encounteredBroadcastExchange;
+ private ExchangePrel exchangePrel;
+
+ public void setBuildSideExchange(ExchangePrel exchange){
+ this.exchangePrel = exchange;
}
- public void setEncounteredBroadcastExchange(boolean encounteredBroadcastExchange) {
- this.encounteredBroadcastExchange = encounteredBroadcastExchange;
+ public boolean needToRouteToForeman() {
+ return exchangePrel != null && !(exchangePrel instanceof BroadcastExchangePrel);
}
public boolean isFromBuildSide() {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
index bf91ed3cd..0d97e0ac3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java
@@ -395,7 +395,7 @@ public class WorkManager implements AutoCloseable {
final String originalName = currentThread.getName();
currentThread.setName(queryIdStr + ":foreman:registerRuntimeFilter");
try {
- foreman.getRuntimeFilterManager().registerRuntimeFilter(runtimeFilter);
+ foreman.getRuntimeFilterRouter().registerRuntimeFilter(runtimeFilter);
} catch (Exception e) {
logger.warn("Exception while registering the RuntimeFilter", e);
} finally {
@@ -413,7 +413,7 @@ public class WorkManager implements AutoCloseable {
.setQueryId(queryId).build();
FragmentExecutor fragmentExecutor = runningFragments.get(fragmentHandle);
if (fragmentExecutor != null) {
- fragmentExecutor.getContext().setRuntimeFilter(runtimeFilter);
+ fragmentExecutor.getContext().addRuntimeFilter(runtimeFilter);
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
index e6ede7a43..6e4a9a8e5 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java
@@ -39,7 +39,7 @@ public class RuntimeFilterReporter {
this.context = context;
}
- public void sendOut(List<BloomFilter> bloomFilters, List<String> probeFields, boolean sendToForeman) {
+ public void sendOut(List<BloomFilter> bloomFilters, List<String> probeFields, boolean sendToForeman, int hashJoinOpId) {
ExecProtos.FragmentHandle fragmentHandle = context.getHandle();
DrillBuf[] data = new DrillBuf[bloomFilters.size()];
List<Integer> bloomFilterSizeInBytes = new ArrayList<>();
@@ -63,6 +63,7 @@ public class RuntimeFilterReporter {
.setMajorFragmentId(majorFragmentId)
.setMinorFragmentId(minorFragmentId)
.setToForeman(sendToForeman)
+ .setHjOpId(hashJoinOpId)
.addAllBloomFilterSizeInBytes(bloomFilterSizeInBytes)
.build();
RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterB, data);
@@ -72,7 +73,7 @@ public class RuntimeFilterReporter {
AccountingDataTunnel dataTunnel = context.getDataTunnel(foremanEndpoint);
dataTunnel.sendRuntimeFilter(runtimeFilterWritable);
} else {
- context.setRuntimeFilter(runtimeFilterWritable);
+ context.addRuntimeFilter(runtimeFilterWritable);
}
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
index e3f89a6e7..5a8c6fc9e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java
@@ -18,7 +18,7 @@
package org.apache.drill.exec.work.filter;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import io.netty.buffer.DrillBuf;
import org.apache.commons.collections.CollectionUtils;
import org.apache.drill.exec.ops.AccountingDataTunnel;
import org.apache.drill.exec.ops.Consumer;
@@ -35,7 +35,6 @@ import org.apache.drill.exec.proto.BitData;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.GeneralRPCProtos;
import org.apache.drill.exec.proto.UserBitShared;
-import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.data.DataTunnel;
@@ -60,17 +59,14 @@ import java.util.concurrent.ConcurrentHashMap;
* The HashJoinRecordBatch is responsible to generate the RuntimeFilter.
* To Partitioned case:
* The generated RuntimeFilter will be sent to the Foreman node. The Foreman node receives the RuntimeFilter
- * async, aggregates them, broadcasts them the Scan nodes's MinorFragment. The RuntimeFilterRecordBatch which
- * steps over the Scan node will leverage the received RuntimeFilter to filter out the scanned rows to generate
- * the SV2.
+ * async, broadcasts them to the Scan nodes's MinorFragment. The RuntimeFilterRecordBatch which is downstream
+ * to the Scan node will aggregate all the received RuntimeFilter and will leverage it to filter out the
+ * scanned rows to generate the SV2.
* To Broadcast case:
* The generated RuntimeFilter will be sent to Scan node's RuntimeFilterRecordBatch directly. The working of the
* RuntimeFilterRecordBath is the same as the Partitioned one.
- *
- *
- *
*/
-public class RuntimeFilterManager {
+public class RuntimeFilterRouter {
private Wrapper rootWrapper;
//HashJoin node's major fragment id to its corresponding probe side nodes's endpoints
@@ -79,14 +75,12 @@ public class RuntimeFilterManager {
private Map<Integer, Integer> joinMjId2scanSize = new ConcurrentHashMap<>();
//HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id
private Map<Integer, Integer> joinMjId2ScanMjId = new HashMap<>();
- //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable
- private Map<Integer, RuntimeFilterWritable> joinMjId2AggregatedRF = new ConcurrentHashMap<>();
private DrillbitContext drillbitContext;
private SendingAccountor sendingAccountor = new SendingAccountor();
- private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterManager.class);
+ private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterRouter.class);
/**
* This class maintains context for the runtime join push down's filter management. It
@@ -95,7 +89,7 @@ public class RuntimeFilterManager {
* @param workUnit
* @param drillbitContext
*/
- public RuntimeFilterManager(QueryWorkUnit workUnit, DrillbitContext drillbitContext) {
+ public RuntimeFilterRouter(QueryWorkUnit workUnit, DrillbitContext drillbitContext) {
this.rootWrapper = workUnit.getRootWrapper();
this.drillbitContext = drillbitContext;
}
@@ -134,32 +128,16 @@ public class RuntimeFilterManager {
* @param runtimeFilterWritable
*/
public void registerRuntimeFilter(RuntimeFilterWritable runtimeFilterWritable) {
- BitData.RuntimeFilterBDef runtimeFilterB = runtimeFilterWritable.getRuntimeFilterBDef();
- int majorId = runtimeFilterB.getMajorFragmentId();
- UserBitShared.QueryId queryId = runtimeFilterB.getQueryId();
- List<String> probeFields = runtimeFilterB.getProbeFieldsList();
- logger.info("RuntimeFilterManager receives a runtime filter , majorId:{}, queryId:{}", majorId, QueryIdHelper.getQueryId(queryId));
- int size;
- synchronized (this) {
- size = joinMjId2scanSize.get(majorId);
- Preconditions.checkState(size > 0);
- RuntimeFilterWritable aggregatedRuntimeFilter = joinMjId2AggregatedRF.get(majorId);
- if (aggregatedRuntimeFilter == null) {
- aggregatedRuntimeFilter = runtimeFilterWritable;
- } else {
- aggregatedRuntimeFilter.aggregate(runtimeFilterWritable);
- }
- joinMjId2AggregatedRF.put(majorId, aggregatedRuntimeFilter);
- size--;
- joinMjId2scanSize.put(majorId, size);
- }
- if (size == 0) {
- broadcastAggregatedRuntimeFilter(majorId, queryId, probeFields);
- }
+ broadcastAggregatedRuntimeFilter(runtimeFilterWritable);
}
- private void broadcastAggregatedRuntimeFilter(int joinMajorId, UserBitShared.QueryId queryId, List<String> probeFields) {
+ private void broadcastAggregatedRuntimeFilter(RuntimeFilterWritable srcRuntimeFilterWritable) {
+ BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef();
+ int joinMajorId = runtimeFilterB.getMajorFragmentId();
+ UserBitShared.QueryId queryId = runtimeFilterB.getQueryId();
+ List<String> probeFields = runtimeFilterB.getProbeFieldsList();
+ DrillBuf[] data = srcRuntimeFilterWritable.getData();
List<CoordinationProtos.DrillbitEndpoint> scanNodeEps = joinMjId2probdeScanEps.get(joinMajorId);
int scanNodeMjId = joinMjId2ScanMjId.get(joinMajorId);
for (int minorId = 0; minorId < scanNodeEps.size(); minorId++) {
@@ -172,10 +150,8 @@ public class RuntimeFilterManager {
.setMajorFragmentId(scanNodeMjId)
.setMinorFragmentId(minorId)
.build();
- RuntimeFilterWritable aggregatedRuntimeFilter = joinMjId2AggregatedRF.get(joinMajorId);
- RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, aggregatedRuntimeFilter.getData());
+ RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, data);
CoordinationProtos.DrillbitEndpoint drillbitEndpoint = scanNodeEps.get(minorId);
-
DataTunnel dataTunnel = drillbitContext.getDataConnectionsPool().getTunnel(drillbitEndpoint);
Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() {
@Override
@@ -235,8 +211,6 @@ public class RuntimeFilterManager {
private class WrapperOperatorsVisitor extends AbstractPhysicalVisitor<Void, Void, RuntimeException> {
- private PhysicalOperator targetOp;
-
private Fragment fragment;
private boolean contain = false;
@@ -251,7 +225,6 @@ public class RuntimeFilterManager {
public WrapperOperatorsVisitor(PhysicalOperator targetOp, Fragment fragment) {
- this.targetOp = targetOp;
this.fragment = fragment;
this.targetIsGroupScan = targetOp instanceof GroupScan;
this.targetIsHashJoin = targetOp instanceof HashJoinPOP;
@@ -343,13 +316,10 @@ public class RuntimeFilterManager {
private int probeSideScanMajorId;
-
-
private List<CoordinationProtos.DrillbitEndpoint> probeSideScanEndpoints;
private RuntimeFilterDef runtimeFilterDef;
-
public List<CoordinationProtos.DrillbitEndpoint> getProbeSideScanEndpoints() {
return probeSideScanEndpoints;
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
new file mode 100644
index 000000000..8f4c8230e
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.filter;
+
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.rpc.NamedThreadFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * This sink receives the RuntimeFilters from the netty thread,
+ * aggregates them in an async thread, supplies the aggregated
+ * one to the fragment running thread.
+ */
+public class RuntimeFilterSink implements AutoCloseable {
+
+ private AtomicInteger currentBookId = new AtomicInteger(0);
+
+ private int staleBookId = 0;
+
+ private RuntimeFilterWritable aggregated = null;
+
+ private BlockingQueue<RuntimeFilterWritable> rfQueue = new LinkedBlockingQueue<>();
+
+ private AtomicBoolean running = new AtomicBoolean(true);
+
+ private ReentrantLock aggregatedRFLock = new ReentrantLock();
+
+ private Thread asyncAggregateThread;
+
+ private BufferAllocator bufferAllocator;
+
+ private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class);
+
+
+ public RuntimeFilterSink(BufferAllocator bufferAllocator) {
+ this.bufferAllocator = bufferAllocator;
+ AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker();
+ asyncAggregateThread = new NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker);
+ asyncAggregateThread.start();
+ }
+
+ public void aggregate(RuntimeFilterWritable runtimeFilterWritable) {
+ if (running.get()) {
+ if (containOne()) {
+ boolean same = aggregated.same(runtimeFilterWritable);
+ if (!same) {
+ //This is to solve the only one fragment case that two RuntimeFilterRecordBatchs
+ //share the same FragmentContext.
+ try {
+ aggregatedRFLock.lock();
+ aggregated.close();
+ aggregated = null;
+ } finally {
+ aggregatedRFLock.unlock();
+ }
+ currentBookId.set(0);
+ staleBookId = 0;
+ clearQueued();
+ }
+ }
+ rfQueue.add(runtimeFilterWritable);
+ } else {
+ runtimeFilterWritable.close();
+ }
+ }
+
+ public RuntimeFilterWritable fetchLatestDuplicatedAggregatedOne() {
+ try {
+ aggregatedRFLock.lock();
+ return aggregated.duplicate(bufferAllocator);
+ } finally {
+ aggregatedRFLock.unlock();
+ }
+ }
+
+ /**
+ * whether there's a fresh aggregated RuntimeFilter
+ *
+ * @return
+ */
+ public boolean hasFreshOne() {
+ if (currentBookId.get() > staleBookId) {
+ staleBookId = currentBookId.get();
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * whether there's a usable RuntimeFilter.
+ *
+ * @return
+ */
+ public boolean containOne() {
+ return aggregated != null;
+ }
+
+ @Override
+ public void close() throws Exception {
+ running.compareAndSet(true, false);
+ asyncAggregateThread.interrupt();
+ if (containOne()) {
+ try {
+ aggregatedRFLock.lock();
+ aggregated.close();
+ } finally {
+ aggregatedRFLock.unlock();
+ }
+ }
+ clearQueued();
+ }
+
+ private void clearQueued() {
+ RuntimeFilterWritable toClear;
+ while ((toClear = rfQueue.poll()) != null) {
+ toClear.close();
+ }
+ }
+
+ class AsyncAggregateWorker implements Runnable {
+
+ @Override
+ public void run() {
+ try {
+ while (running.get()) {
+ RuntimeFilterWritable toAggregate = rfQueue.take();
+ if (!running.get()) {
+ toAggregate.close();
+ return;
+ }
+ if (containOne()) {
+ try {
+ aggregatedRFLock.lock();
+ aggregated.aggregate(toAggregate);
+ } finally {
+ aggregatedRFLock.unlock();
+ }
+ } else {
+ aggregated = toAggregate;
+ }
+ currentBookId.incrementAndGet();
+ }
+ } catch (InterruptedException e) {
+ logger.info("Thread : {} was interrupted.", asyncAggregateThread.getName(), e);
+ }
+ }
+ }
+}
+
+
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
index 8649e15af..302a4801c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.work.filter;
import io.netty.buffer.DrillBuf;
import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.BitData;
import java.util.ArrayList;
@@ -29,7 +30,7 @@ import java.util.List;
* A binary wire transferable representation of the RuntimeFilter which contains
* the runtime filter definition and its corresponding data.
*/
-public class RuntimeFilterWritable implements AutoCloseables.Closeable {
+public class RuntimeFilterWritable implements AutoCloseables.Closeable{
private BitData.RuntimeFilterBDef runtimeFilterBDef;
@@ -81,6 +82,37 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable {
}
}
+ public RuntimeFilterWritable duplicate(BufferAllocator bufferAllocator) {
+ int len = data.length;
+ DrillBuf[] cloned = new DrillBuf[len];
+ int i = 0;
+ for (DrillBuf src : data) {
+ int capacity = src.readableBytes();
+ DrillBuf duplicateOne = bufferAllocator.buffer(capacity);
+ int readerIndex = src.readerIndex();
+ src.readBytes(duplicateOne, 0, capacity);
+ src.readerIndex(readerIndex);
+ cloned[i] = duplicateOne;
+ i++;
+ }
+ return new RuntimeFilterWritable(runtimeFilterBDef, cloned);
+ }
+
+ public boolean same(RuntimeFilterWritable other) {
+ BitData.RuntimeFilterBDef runtimeFilterDef = other.getRuntimeFilterBDef();
+ int otherMajorId = runtimeFilterDef.getMajorFragmentId();
+ int otherMinorId = runtimeFilterDef.getMinorFragmentId();
+ int otherHashJoinOpId = runtimeFilterDef.getHjOpId();
+ int thisMajorId = this.runtimeFilterBDef.getMajorFragmentId();
+ int thisMinorId = this.runtimeFilterBDef.getMinorFragmentId();
+ int thisHashJoinOpId = this.runtimeFilterBDef.getHjOpId();
+ return otherMajorId == thisMajorId && otherMinorId == thisMinorId && otherHashJoinOpId == thisHashJoinOpId;
+ }
+
+ public String toString() {
+ return "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId() + ",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId() + ", operatorId:" + runtimeFilterBDef.getHjOpId();
+ }
+
@Override
public void close() {
for (DrillBuf buf : data) {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
index 634e8328c..42b76f278 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.work.foreman;
+import org.apache.drill.exec.work.filter.RuntimeFilterRouter;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import com.google.protobuf.InvalidProtocolBufferException;
@@ -61,7 +62,6 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.util.Pointer;
import org.apache.drill.exec.work.QueryWorkUnit;
import org.apache.drill.exec.work.WorkManager.WorkerBee;
-import org.apache.drill.exec.work.filter.RuntimeFilterManager;
import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException;
import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException;
import org.apache.drill.exec.work.foreman.rm.QueryResourceManager;
@@ -122,7 +122,7 @@ public class Foreman implements Runnable {
private String queryText;
- private RuntimeFilterManager runtimeFilterManager;
+ private RuntimeFilterRouter runtimeFilterRouter;
private boolean enableRuntimeFilter;
/**
@@ -410,8 +410,8 @@ public class Foreman implements Runnable {
queryRM.visitAbstractPlan(plan);
final QueryWorkUnit work = getQueryWorkUnit(plan);
if (enableRuntimeFilter) {
- runtimeFilterManager = new RuntimeFilterManager(work, drillbitContext);
- runtimeFilterManager.collectRuntimeFilterParallelAndControlInfo();
+ runtimeFilterRouter = new RuntimeFilterRouter(work, drillbitContext);
+ runtimeFilterRouter.collectRuntimeFilterParallelAndControlInfo();
}
if (textPlan != null) {
queryManager.setPlanText(textPlan.value);
@@ -734,8 +734,8 @@ public class Foreman implements Runnable {
logger.debug(queryIdString + ": cleaning up.");
injector.injectPause(queryContext.getExecutionControls(), "foreman-cleanup", logger);
- if (enableRuntimeFilter && runtimeFilterManager != null) {
- runtimeFilterManager.waitForComplete();
+ if (enableRuntimeFilter && runtimeFilterRouter != null) {
+ runtimeFilterRouter.waitForComplete();
}
// remove the channel disconnected listener (doesn't throw)
closeFuture.removeListener(closeListener);
@@ -866,8 +866,8 @@ public class Foreman implements Runnable {
}
- public RuntimeFilterManager getRuntimeFilterManager() {
- return runtimeFilterManager;
+ public RuntimeFilterRouter getRuntimeFilterRouter() {
+ return runtimeFilterRouter;
}
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
index f86701572..81d0d1a6a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/OperatorFixture.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.test;
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
import org.apache.drill.shaded.guava.com.google.common.base.Function;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -180,6 +181,7 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
private ExecutorState executorState = new OperatorFixture.MockExecutorState();
private ExecutionControls controls;
+ private RuntimeFilterSink runtimeFilterSink;
public MockFragmentContext(final DrillConfig config,
final OptionManager options,
@@ -195,6 +197,7 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
this.controls = new ExecutionControls(options);
compiler = new CodeCompiler(config, options);
bufferManager = new BufferManagerImpl(allocator);
+ this.runtimeFilterSink = new RuntimeFilterSink(allocator);
}
private static FunctionImplementationRegistry newFunctionRegistry(
@@ -315,13 +318,13 @@ public class OperatorFixture extends BaseFixture implements AutoCloseable {
}
@Override
- public RuntimeFilterWritable getRuntimeFilter() {
- return null;
+ public RuntimeFilterSink getRuntimeFilterSink() {
+ return runtimeFilterSink;
}
@Override
- public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
-
+ public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
+ runtimeFilterSink.aggregate(runtimeFilter);
}
@Override
diff --git a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
index 1c4779c3e..559f7f4df 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/test/PhysicalOpUnitTestBase.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.server.QueryProfileStoreContext;
import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.easy.json.JSONRecordReader;
import org.apache.drill.exec.work.batch.IncomingBuffers;
+import org.apache.drill.exec.work.filter.RuntimeFilterSink;
import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -203,11 +204,12 @@ public class PhysicalOpUnitTestBase extends ExecTest {
* </p>
*/
protected static class MockExecutorFragmentContext extends OperatorFixture.MockFragmentContext implements ExecutorFragmentContext {
- private RuntimeFilterWritable runtimeFilterWritable;
+ private RuntimeFilterSink runtimeFilterSink;
public MockExecutorFragmentContext(final FragmentContext fragmentContext) {
super(fragmentContext.getConfig(), fragmentContext.getOptions(), fragmentContext.getAllocator(),
fragmentContext.getScanExecutor(), fragmentContext.getScanDecodeExecutor());
+ this.runtimeFilterSink = new RuntimeFilterSink(fragmentContext.getAllocator());
}
@Override
@@ -304,13 +306,13 @@ public class PhysicalOpUnitTestBase extends ExecTest {
}
@Override
- public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
- this.runtimeFilterWritable = runtimeFilter;
+ public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) {
+ this.runtimeFilterSink.aggregate(runtimeFilter);
}
@Override
- public RuntimeFilterWritable getRuntimeFilter() {
- return runtimeFilterWritable;
+ public RuntimeFilterSink getRuntimeFilterSink() {
+ return runtimeFilterSink;
}
}