diff options
author | weijie.tong <weijie.tong@alipay.com> | 2018-09-06 19:23:35 +0800 |
---|---|---|
committer | Sorabh Hamirwasia <sorabh@apache.org> | 2018-10-10 09:50:04 -0700 |
commit | 216b1237739935b04c4f54b3f6f05371a4644085 (patch) | |
tree | 50f58c2d7438a7b2e6d86372caf207795d0254c3 /exec/java-exec/src/main/java/org/apache/drill/exec | |
parent | d5146c43986f09f132f4e96966082732a3740181 (diff) |
DRILL-6731: Move the BFs aggregating work from the Foreman to the RuntimeFilter
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec')
11 files changed, 284 insertions, 102 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java index 608f05c56..88c21d9e9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.ExecutorService; +import org.apache.drill.exec.work.filter.RuntimeFilterSink; import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting; import org.apache.calcite.schema.SchemaPlus; import org.apache.drill.common.config.DrillConfig; @@ -160,16 +161,15 @@ public interface FragmentContext extends UdfUtilities, AutoCloseable { void close(); /** - * Return null ,if setRuntimeFilter not being called * @return */ - RuntimeFilterWritable getRuntimeFilter(); + RuntimeFilterSink getRuntimeFilterSink(); /** - * Set a RuntimeFilter when the RuntimeFilter receiver belongs to the same MinorFragment + * add a RuntimeFilter when the RuntimeFilter receiver belongs to the same MinorFragment * @param runtimeFilter */ - public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter); + public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter); interface ExecutorState { /** diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java index a8980785a..1f9d48970 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContextImpl.java @@ -61,6 +61,7 @@ import org.apache.drill.exec.testing.ExecutionControls; import org.apache.drill.exec.util.ImpersonationUtil; import org.apache.drill.exec.work.batch.IncomingBuffers; +import org.apache.drill.exec.work.filter.RuntimeFilterSink; import org.apache.drill.shaded.guava.com.google.common.base.Function; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; @@ -136,7 +137,7 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor /** Stores constants and their holders by type */ private final Map<String, Map<MinorType, ValueHolder>> constantValueHolderCache; - private RuntimeFilterWritable runtimeFilterWritable; + private RuntimeFilterSink runtimeFilterSink; /** * Create a FragmentContext instance for non-root fragment. @@ -208,6 +209,7 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor stats = new FragmentStats(allocator, fragment.getAssignment()); bufferManager = new BufferManagerImpl(this.allocator); constantValueHolderCache = Maps.newHashMap(); + this.runtimeFilterSink = new RuntimeFilterSink(this.allocator); } /** @@ -348,13 +350,13 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor } @Override - public void setRuntimeFilter(RuntimeFilterWritable runtimeFilter) { - this.runtimeFilterWritable = runtimeFilter; + public void addRuntimeFilter(RuntimeFilterWritable runtimeFilter) { + this.runtimeFilterSink.aggregate(runtimeFilter); } @Override - public RuntimeFilterWritable getRuntimeFilter() { - return runtimeFilterWritable; + public RuntimeFilterSink getRuntimeFilterSink() { + return runtimeFilterSink; } /** @@ -470,8 +472,8 @@ public class FragmentContextImpl extends BaseFragmentContext implements Executor for (OperatorContextImpl opContext : contexts) { suppressingClose(opContext); } - if (runtimeFilterWritable != null) { - suppressingClose(runtimeFilterWritable); + if (runtimeFilterSink != null) { + suppressingClose(runtimeFilterSink); } suppressingClose(bufferManager); suppressingClose(allocator); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java index bc21580d3..9248bbc69 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/RuntimeFilterRecordBatch.java @@ -36,7 +36,9 @@ import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.work.filter.BloomFilter; +import org.apache.drill.exec.work.filter.RuntimeFilterSink; import org.apache.drill.exec.work.filter.RuntimeFilterWritable; + import java.util.ArrayList; import java.util.BitSet; import java.util.HashMap; @@ -56,6 +58,8 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF private Map<String, Integer> field2id = new HashMap<>(); private List<String> toFilterFields; private List<BloomFilter> bloomFilters; + private RuntimeFilterWritable current; + private RuntimeFilterWritable previous; private int originalRecordCount; private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterRecordBatch.class); @@ -102,6 +106,9 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF sv2.clear(); } super.close(); + if (current != null) { + current.close(); + } } @Override @@ -148,30 +155,36 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF * schema change hash64 should be reset and this method needs to be called again. */ private void setupHashHelper() { - final RuntimeFilterWritable runtimeFilterWritable = context.getRuntimeFilter(); - + final RuntimeFilterSink runtimeFilterSink = context.getRuntimeFilterSink(); // Check if RuntimeFilterWritable was received by the minor fragment or not - if (runtimeFilterWritable == null) { + if (!runtimeFilterSink.containOne()) { return; } - - // Check if bloomFilters is initialized or not - if (bloomFilters == null) { - bloomFilters = runtimeFilterWritable.unwrap(); + if (runtimeFilterSink.hasFreshOne()) { + RuntimeFilterWritable freshRuntimeFilterWritable = runtimeFilterSink.fetchLatestDuplicatedAggregatedOne(); + if (current == null) { + current = freshRuntimeFilterWritable; + previous = freshRuntimeFilterWritable; + } else { + previous = current; + current = freshRuntimeFilterWritable; + previous.close(); + } + bloomFilters = current.unwrap(); } - // Check if HashHelper is initialized or not if (hash64 == null) { ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(incoming, context); try { //generate hash helper - this.toFilterFields = runtimeFilterWritable.getRuntimeFilterBDef().getProbeFieldsList(); + this.toFilterFields = current.getRuntimeFilterBDef().getProbeFieldsList(); List<LogicalExpression> hashFieldExps = new ArrayList<>(); List<TypedFieldId> typedFieldIds = new ArrayList<>(); for (String toFilterField : toFilterFields) { SchemaPath schemaPath = new SchemaPath(new PathSegment.NameSegment(toFilterField), ExpressionPosition.UNKNOWN); TypedFieldId typedFieldId = container.getValueVectorId(schemaPath); - this.field2id.put(toFilterField, typedFieldId.getFieldIds()[0]); + int[] fieldIds = typedFieldId.getFieldIds(); + this.field2id.put(toFilterField, fieldIds[0]); typedFieldIds.add(typedFieldId); ValueVectorReadExpression toHashFieldExp = new ValueVectorReadExpression(typedFieldId); hashFieldExps.add(toHashFieldExp); @@ -195,11 +208,9 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF sv2.setRecordCount(0); return; } - - final RuntimeFilterWritable runtimeFilterWritable = context.getRuntimeFilter(); + final RuntimeFilterSink runtimeFilterSink = context.getRuntimeFilterSink(); sv2.allocateNew(originalRecordCount); - - if (runtimeFilterWritable == null) { + if (!runtimeFilterSink.containOne()) { // means none of the rows are filtered out hence set all the indexes for (int i = 0; i < originalRecordCount; ++i) { sv2.setIndex(i, i); @@ -207,10 +218,8 @@ public class RuntimeFilterRecordBatch extends AbstractSingleRecordBatch<RuntimeF sv2.setRecordCount(originalRecordCount); return; } - - // Setup a hash helper if need be + // Setup a hash helper if needed setupHashHelper(); - //To make each independent bloom filter work together to construct a final filter result: BitSet. BitSet bitSet = new BitSet(originalRecordCount); for (int i = 0; i < toFilterFields.size(); i++) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 658f03a33..3d456967f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -724,7 +724,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { runtimeFilterReporter = new RuntimeFilterReporter((ExecutorFragmentContext) context); RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef(); //RuntimeFilter is not a necessary part of a HashJoin operator, only the query which satisfy the - //RuntimeFilterManager's judgement will have the RuntimeFilterDef. + //RuntimeFilterRouter's judgement will have the RuntimeFilterDef. if (runtimeFilterDef != null) { List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs(); for (BloomFilterDef bloomFilterDef : bloomFilterDefs) { @@ -944,7 +944,8 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> { if (cycleNum == 0 && enableRuntimeFilter) { if (bloomFilters.size() > 0) { - runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef().isSendToForeman()); + int hashJoinOpId = this.popConfig.getOperatorId(); + runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef().isSendToForeman(), hashJoinOpId); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java index c31e491a3..bfba5f2c1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java @@ -197,11 +197,8 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc @Override public Void visitExchange(ExchangePrel exchange, RFHelperHolder holder) throws RuntimeException { if (holder != null) { - boolean broadcastExchange = exchange instanceof BroadcastExchangePrel; if (holder.isFromBuildSide()) { - //To the build side ,we need to identify whether the HashJoin's direct children have a Broadcast node to mark - //this HashJoin as BroadcastHashJoin - holder.setEncounteredBroadcastExchange(broadcastExchange); + holder.setBuildSideExchange(exchange); } } return visitPrel(exchange, holder); @@ -224,15 +221,15 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc Prel right = (Prel) hashJoinPrel.getRight(); holder.setFromBuildSide(true); right.accept(this, holder); - boolean buildSideEncountererdBroadcastExchange = holder.isEncounteredBroadcastExchange(); - if (buildSideEncountererdBroadcastExchange) { + boolean routeToForeman = holder.needToRouteToForeman(); + if (!routeToForeman) { runtimeFilterDef.setSendToForeman(false); } else { runtimeFilterDef.setSendToForeman(true); } List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs(); for (BloomFilterDef bloomFilterDef : bloomFilterDefs) { - if (buildSideEncountererdBroadcastExchange) { + if (!routeToForeman) { bloomFilterDef.setLocal(true); } else { bloomFilterDef.setLocal(false); @@ -338,18 +335,17 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc * RuntimeFilter helper util holder */ private static class RFHelperHolder { - //whether this join operator is a partitioned HashJoin or broadcast HashJoin, - //also single node HashJoin is not expected to do JPPD. - private boolean encounteredBroadcastExchange; private boolean fromBuildSide; - public boolean isEncounteredBroadcastExchange() { - return encounteredBroadcastExchange; + private ExchangePrel exchangePrel; + + public void setBuildSideExchange(ExchangePrel exchange){ + this.exchangePrel = exchange; } - public void setEncounteredBroadcastExchange(boolean encounteredBroadcastExchange) { - this.encounteredBroadcastExchange = encounteredBroadcastExchange; + public boolean needToRouteToForeman() { + return exchangePrel != null && !(exchangePrel instanceof BroadcastExchangePrel); } public boolean isFromBuildSide() { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java index bf91ed3cd..0d97e0ac3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java @@ -395,7 +395,7 @@ public class WorkManager implements AutoCloseable { final String originalName = currentThread.getName(); currentThread.setName(queryIdStr + ":foreman:registerRuntimeFilter"); try { - foreman.getRuntimeFilterManager().registerRuntimeFilter(runtimeFilter); + foreman.getRuntimeFilterRouter().registerRuntimeFilter(runtimeFilter); } catch (Exception e) { logger.warn("Exception while registering the RuntimeFilter", e); } finally { @@ -413,7 +413,7 @@ public class WorkManager implements AutoCloseable { .setQueryId(queryId).build(); FragmentExecutor fragmentExecutor = runningFragments.get(fragmentHandle); if (fragmentExecutor != null) { - fragmentExecutor.getContext().setRuntimeFilter(runtimeFilter); + fragmentExecutor.getContext().addRuntimeFilter(runtimeFilter); } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java index e6ede7a43..6e4a9a8e5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java @@ -39,7 +39,7 @@ public class RuntimeFilterReporter { this.context = context; } - public void sendOut(List<BloomFilter> bloomFilters, List<String> probeFields, boolean sendToForeman) { + public void sendOut(List<BloomFilter> bloomFilters, List<String> probeFields, boolean sendToForeman, int hashJoinOpId) { ExecProtos.FragmentHandle fragmentHandle = context.getHandle(); DrillBuf[] data = new DrillBuf[bloomFilters.size()]; List<Integer> bloomFilterSizeInBytes = new ArrayList<>(); @@ -63,6 +63,7 @@ public class RuntimeFilterReporter { .setMajorFragmentId(majorFragmentId) .setMinorFragmentId(minorFragmentId) .setToForeman(sendToForeman) + .setHjOpId(hashJoinOpId) .addAllBloomFilterSizeInBytes(bloomFilterSizeInBytes) .build(); RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterB, data); @@ -72,7 +73,7 @@ public class RuntimeFilterReporter { AccountingDataTunnel dataTunnel = context.getDataTunnel(foremanEndpoint); dataTunnel.sendRuntimeFilter(runtimeFilterWritable); } else { - context.setRuntimeFilter(runtimeFilterWritable); + context.addRuntimeFilter(runtimeFilterWritable); } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java index e3f89a6e7..5a8c6fc9e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java @@ -18,7 +18,7 @@ package org.apache.drill.exec.work.filter; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import io.netty.buffer.DrillBuf; import org.apache.commons.collections.CollectionUtils; import org.apache.drill.exec.ops.AccountingDataTunnel; import org.apache.drill.exec.ops.Consumer; @@ -35,7 +35,6 @@ import org.apache.drill.exec.proto.BitData; import org.apache.drill.exec.proto.CoordinationProtos; import org.apache.drill.exec.proto.GeneralRPCProtos; import org.apache.drill.exec.proto.UserBitShared; -import org.apache.drill.exec.proto.helper.QueryIdHelper; import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.rpc.RpcOutcomeListener; import org.apache.drill.exec.rpc.data.DataTunnel; @@ -60,17 +59,14 @@ import java.util.concurrent.ConcurrentHashMap; * The HashJoinRecordBatch is responsible to generate the RuntimeFilter. * To Partitioned case: * The generated RuntimeFilter will be sent to the Foreman node. The Foreman node receives the RuntimeFilter - * async, aggregates them, broadcasts them the Scan nodes's MinorFragment. The RuntimeFilterRecordBatch which - * steps over the Scan node will leverage the received RuntimeFilter to filter out the scanned rows to generate - * the SV2. + * async, broadcasts them to the Scan nodes's MinorFragment. The RuntimeFilterRecordBatch which is downstream + * to the Scan node will aggregate all the received RuntimeFilter and will leverage it to filter out the + * scanned rows to generate the SV2. * To Broadcast case: * The generated RuntimeFilter will be sent to Scan node's RuntimeFilterRecordBatch directly. The working of the * RuntimeFilterRecordBath is the same as the Partitioned one. - * - * - * */ -public class RuntimeFilterManager { +public class RuntimeFilterRouter { private Wrapper rootWrapper; //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints @@ -79,14 +75,12 @@ public class RuntimeFilterManager { private Map<Integer, Integer> joinMjId2scanSize = new ConcurrentHashMap<>(); //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id private Map<Integer, Integer> joinMjId2ScanMjId = new HashMap<>(); - //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable - private Map<Integer, RuntimeFilterWritable> joinMjId2AggregatedRF = new ConcurrentHashMap<>(); private DrillbitContext drillbitContext; private SendingAccountor sendingAccountor = new SendingAccountor(); - private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterManager.class); + private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterRouter.class); /** * This class maintains context for the runtime join push down's filter management. It @@ -95,7 +89,7 @@ public class RuntimeFilterManager { * @param workUnit * @param drillbitContext */ - public RuntimeFilterManager(QueryWorkUnit workUnit, DrillbitContext drillbitContext) { + public RuntimeFilterRouter(QueryWorkUnit workUnit, DrillbitContext drillbitContext) { this.rootWrapper = workUnit.getRootWrapper(); this.drillbitContext = drillbitContext; } @@ -134,32 +128,16 @@ public class RuntimeFilterManager { * @param runtimeFilterWritable */ public void registerRuntimeFilter(RuntimeFilterWritable runtimeFilterWritable) { - BitData.RuntimeFilterBDef runtimeFilterB = runtimeFilterWritable.getRuntimeFilterBDef(); - int majorId = runtimeFilterB.getMajorFragmentId(); - UserBitShared.QueryId queryId = runtimeFilterB.getQueryId(); - List<String> probeFields = runtimeFilterB.getProbeFieldsList(); - logger.info("RuntimeFilterManager receives a runtime filter , majorId:{}, queryId:{}", majorId, QueryIdHelper.getQueryId(queryId)); - int size; - synchronized (this) { - size = joinMjId2scanSize.get(majorId); - Preconditions.checkState(size > 0); - RuntimeFilterWritable aggregatedRuntimeFilter = joinMjId2AggregatedRF.get(majorId); - if (aggregatedRuntimeFilter == null) { - aggregatedRuntimeFilter = runtimeFilterWritable; - } else { - aggregatedRuntimeFilter.aggregate(runtimeFilterWritable); - } - joinMjId2AggregatedRF.put(majorId, aggregatedRuntimeFilter); - size--; - joinMjId2scanSize.put(majorId, size); - } - if (size == 0) { - broadcastAggregatedRuntimeFilter(majorId, queryId, probeFields); - } + broadcastAggregatedRuntimeFilter(runtimeFilterWritable); } - private void broadcastAggregatedRuntimeFilter(int joinMajorId, UserBitShared.QueryId queryId, List<String> probeFields) { + private void broadcastAggregatedRuntimeFilter(RuntimeFilterWritable srcRuntimeFilterWritable) { + BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef(); + int joinMajorId = runtimeFilterB.getMajorFragmentId(); + UserBitShared.QueryId queryId = runtimeFilterB.getQueryId(); + List<String> probeFields = runtimeFilterB.getProbeFieldsList(); + DrillBuf[] data = srcRuntimeFilterWritable.getData(); List<CoordinationProtos.DrillbitEndpoint> scanNodeEps = joinMjId2probdeScanEps.get(joinMajorId); int scanNodeMjId = joinMjId2ScanMjId.get(joinMajorId); for (int minorId = 0; minorId < scanNodeEps.size(); minorId++) { @@ -172,10 +150,8 @@ public class RuntimeFilterManager { .setMajorFragmentId(scanNodeMjId) .setMinorFragmentId(minorId) .build(); - RuntimeFilterWritable aggregatedRuntimeFilter = joinMjId2AggregatedRF.get(joinMajorId); - RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, aggregatedRuntimeFilter.getData()); + RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, data); CoordinationProtos.DrillbitEndpoint drillbitEndpoint = scanNodeEps.get(minorId); - DataTunnel dataTunnel = drillbitContext.getDataConnectionsPool().getTunnel(drillbitEndpoint); Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() { @Override @@ -235,8 +211,6 @@ public class RuntimeFilterManager { private class WrapperOperatorsVisitor extends AbstractPhysicalVisitor<Void, Void, RuntimeException> { - private PhysicalOperator targetOp; - private Fragment fragment; private boolean contain = false; @@ -251,7 +225,6 @@ public class RuntimeFilterManager { public WrapperOperatorsVisitor(PhysicalOperator targetOp, Fragment fragment) { - this.targetOp = targetOp; this.fragment = fragment; this.targetIsGroupScan = targetOp instanceof GroupScan; this.targetIsHashJoin = targetOp instanceof HashJoinPOP; @@ -343,13 +316,10 @@ public class RuntimeFilterManager { private int probeSideScanMajorId; - - private List<CoordinationProtos.DrillbitEndpoint> probeSideScanEndpoints; private RuntimeFilterDef runtimeFilterDef; - public List<CoordinationProtos.DrillbitEndpoint> getProbeSideScanEndpoints() { return probeSideScanEndpoints; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java new file mode 100644 index 000000000..8f4c8230e --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.work.filter; + +import org.apache.drill.exec.memory.BufferAllocator; +import org.apache.drill.exec.rpc.NamedThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; + +/** + * This sink receives the RuntimeFilters from the netty thread, + * aggregates them in an async thread, supplies the aggregated + * one to the fragment running thread. + */ +public class RuntimeFilterSink implements AutoCloseable { + + private AtomicInteger currentBookId = new AtomicInteger(0); + + private int staleBookId = 0; + + private RuntimeFilterWritable aggregated = null; + + private BlockingQueue<RuntimeFilterWritable> rfQueue = new LinkedBlockingQueue<>(); + + private AtomicBoolean running = new AtomicBoolean(true); + + private ReentrantLock aggregatedRFLock = new ReentrantLock(); + + private Thread asyncAggregateThread; + + private BufferAllocator bufferAllocator; + + private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class); + + + public RuntimeFilterSink(BufferAllocator bufferAllocator) { + this.bufferAllocator = bufferAllocator; + AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker(); + asyncAggregateThread = new NamedThreadFactory("RFAggregating-").newThread(asyncAggregateWorker); + asyncAggregateThread.start(); + } + + public void aggregate(RuntimeFilterWritable runtimeFilterWritable) { + if (running.get()) { + if (containOne()) { + boolean same = aggregated.same(runtimeFilterWritable); + if (!same) { + //This is to solve the only one fragment case that two RuntimeFilterRecordBatchs + //share the same FragmentContext. + try { + aggregatedRFLock.lock(); + aggregated.close(); + aggregated = null; + } finally { + aggregatedRFLock.unlock(); + } + currentBookId.set(0); + staleBookId = 0; + clearQueued(); + } + } + rfQueue.add(runtimeFilterWritable); + } else { + runtimeFilterWritable.close(); + } + } + + public RuntimeFilterWritable fetchLatestDuplicatedAggregatedOne() { + try { + aggregatedRFLock.lock(); + return aggregated.duplicate(bufferAllocator); + } finally { + aggregatedRFLock.unlock(); + } + } + + /** + * whether there's a fresh aggregated RuntimeFilter + * + * @return + */ + public boolean hasFreshOne() { + if (currentBookId.get() > staleBookId) { + staleBookId = currentBookId.get(); + return true; + } + return false; + } + + /** + * whether there's a usable RuntimeFilter. + * + * @return + */ + public boolean containOne() { + return aggregated != null; + } + + @Override + public void close() throws Exception { + running.compareAndSet(true, false); + asyncAggregateThread.interrupt(); + if (containOne()) { + try { + aggregatedRFLock.lock(); + aggregated.close(); + } finally { + aggregatedRFLock.unlock(); + } + } + clearQueued(); + } + + private void clearQueued() { + RuntimeFilterWritable toClear; + while ((toClear = rfQueue.poll()) != null) { + toClear.close(); + } + } + + class AsyncAggregateWorker implements Runnable { + + @Override + public void run() { + try { + while (running.get()) { + RuntimeFilterWritable toAggregate = rfQueue.take(); + if (!running.get()) { + toAggregate.close(); + return; + } + if (containOne()) { + try { + aggregatedRFLock.lock(); + aggregated.aggregate(toAggregate); + } finally { + aggregatedRFLock.unlock(); + } + } else { + aggregated = toAggregate; + } + currentBookId.incrementAndGet(); + } + } catch (InterruptedException e) { + logger.info("Thread : {} was interrupted.", asyncAggregateThread.getName(), e); + } + } + } +} + + diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java index 8649e15af..302a4801c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.work.filter; import io.netty.buffer.DrillBuf; import org.apache.drill.common.AutoCloseables; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.proto.BitData; import java.util.ArrayList; @@ -29,7 +30,7 @@ import java.util.List; * A binary wire transferable representation of the RuntimeFilter which contains * the runtime filter definition and its corresponding data. */ -public class RuntimeFilterWritable implements AutoCloseables.Closeable { +public class RuntimeFilterWritable implements AutoCloseables.Closeable{ private BitData.RuntimeFilterBDef runtimeFilterBDef; @@ -81,6 +82,37 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable { } } + public RuntimeFilterWritable duplicate(BufferAllocator bufferAllocator) { + int len = data.length; + DrillBuf[] cloned = new DrillBuf[len]; + int i = 0; + for (DrillBuf src : data) { + int capacity = src.readableBytes(); + DrillBuf duplicateOne = bufferAllocator.buffer(capacity); + int readerIndex = src.readerIndex(); + src.readBytes(duplicateOne, 0, capacity); + src.readerIndex(readerIndex); + cloned[i] = duplicateOne; + i++; + } + return new RuntimeFilterWritable(runtimeFilterBDef, cloned); + } + + public boolean same(RuntimeFilterWritable other) { + BitData.RuntimeFilterBDef runtimeFilterDef = other.getRuntimeFilterBDef(); + int otherMajorId = runtimeFilterDef.getMajorFragmentId(); + int otherMinorId = runtimeFilterDef.getMinorFragmentId(); + int otherHashJoinOpId = runtimeFilterDef.getHjOpId(); + int thisMajorId = this.runtimeFilterBDef.getMajorFragmentId(); + int thisMinorId = this.runtimeFilterBDef.getMinorFragmentId(); + int thisHashJoinOpId = this.runtimeFilterBDef.getHjOpId(); + return otherMajorId == thisMajorId && otherMinorId == thisMinorId && otherHashJoinOpId == thisHashJoinOpId; + } + + public String toString() { + return "majorFragmentId:" + runtimeFilterBDef.getMajorFragmentId() + ",minorFragmentId:" + runtimeFilterBDef.getMinorFragmentId() + ", operatorId:" + runtimeFilterBDef.getHjOpId(); + } + @Override public void close() { for (DrillBuf buf : data) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 634e8328c..42b76f278 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.work.foreman; +import org.apache.drill.exec.work.filter.RuntimeFilterRouter; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import com.google.protobuf.InvalidProtocolBufferException; @@ -61,7 +62,6 @@ import org.apache.drill.exec.testing.ControlsInjectorFactory; import org.apache.drill.exec.util.Pointer; import org.apache.drill.exec.work.QueryWorkUnit; import org.apache.drill.exec.work.WorkManager.WorkerBee; -import org.apache.drill.exec.work.filter.RuntimeFilterManager; import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueueTimeoutException; import org.apache.drill.exec.work.foreman.rm.QueryQueue.QueryQueueException; import org.apache.drill.exec.work.foreman.rm.QueryResourceManager; @@ -122,7 +122,7 @@ public class Foreman implements Runnable { private String queryText; - private RuntimeFilterManager runtimeFilterManager; + private RuntimeFilterRouter runtimeFilterRouter; private boolean enableRuntimeFilter; /** @@ -410,8 +410,8 @@ public class Foreman implements Runnable { queryRM.visitAbstractPlan(plan); final QueryWorkUnit work = getQueryWorkUnit(plan); if (enableRuntimeFilter) { - runtimeFilterManager = new RuntimeFilterManager(work, drillbitContext); - runtimeFilterManager.collectRuntimeFilterParallelAndControlInfo(); + runtimeFilterRouter = new RuntimeFilterRouter(work, drillbitContext); + runtimeFilterRouter.collectRuntimeFilterParallelAndControlInfo(); } if (textPlan != null) { queryManager.setPlanText(textPlan.value); @@ -734,8 +734,8 @@ public class Foreman implements Runnable { logger.debug(queryIdString + ": cleaning up."); injector.injectPause(queryContext.getExecutionControls(), "foreman-cleanup", logger); - if (enableRuntimeFilter && runtimeFilterManager != null) { - runtimeFilterManager.waitForComplete(); + if (enableRuntimeFilter && runtimeFilterRouter != null) { + runtimeFilterRouter.waitForComplete(); } // remove the channel disconnected listener (doesn't throw) closeFuture.removeListener(closeListener); @@ -866,8 +866,8 @@ public class Foreman implements Runnable { } - public RuntimeFilterManager getRuntimeFilterManager() { - return runtimeFilterManager; + public RuntimeFilterRouter getRuntimeFilterRouter() { + return runtimeFilterRouter; } } |