diff options
author | weijie.tong <weijie.tong@alipay.com> | 2018-10-14 19:41:51 +0800 |
---|---|---|
committer | Vitalii Diravka <vitalii.diravka@gmail.com> | 2018-11-29 18:33:23 +0200 |
commit | 9667e92e1e87ce1826f0eac3f2396187dbfa8aaa (patch) | |
tree | cb68cd4bbedf6f84e00168cb0ab300c6dacdb35c /exec/java-exec/src/main/java/org/apache/drill/exec/work | |
parent | 325fa26b5df1bc29594677a0f3e1360fbb4f8bca (diff) |
DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf reference count bugs & tune the execution flow & support left deep tree
closes #1504
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/work')
9 files changed, 414 insertions, 339 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java index 0d97e0ac3..7915843eb 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/WorkManager.java @@ -379,11 +379,16 @@ public class WorkManager implements AutoCloseable { return runningFragments.get(handle); } + /** + * receive the RuntimeFilter thorough the wire + * @param runtimeFilter + */ public void receiveRuntimeFilter(final RuntimeFilterWritable runtimeFilter) { BitData.RuntimeFilterBDef runtimeFilterDef = runtimeFilter.getRuntimeFilterBDef(); boolean toForeman = runtimeFilterDef.getToForeman(); QueryId queryId = runtimeFilterDef.getQueryId(); String queryIdStr = QueryIdHelper.getQueryId(queryId); + runtimeFilter.retainBuffers(1); //to foreman if (toForeman) { Foreman foreman = queries.get(queryId); @@ -393,13 +398,14 @@ public class WorkManager implements AutoCloseable { public void run() { final Thread currentThread = Thread.currentThread(); final String originalName = currentThread.getName(); - currentThread.setName(queryIdStr + ":foreman:registerRuntimeFilter"); + currentThread.setName(queryIdStr + ":foreman:routeRuntimeFilter"); try { - foreman.getRuntimeFilterRouter().registerRuntimeFilter(runtimeFilter); + foreman.getRuntimeFilterRouter().register(runtimeFilter); } catch (Exception e) { logger.warn("Exception while registering the RuntimeFilter", e); } finally { currentThread.setName(originalName); + runtimeFilter.close(); } } }); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java index dc6cc2fcf..afbc56a5b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilter.java @@ -34,6 +34,7 @@ import java.util.Arrays; public class BloomFilter { // Bytes in a bucket. private static final int BYTES_PER_BUCKET = 32; + // Minimum bloom filter data size. private static final int MINIMUM_BLOOM_SIZE_IN_BYTES = 256; @@ -41,16 +42,14 @@ public class BloomFilter { private int numBytes; - private int mask[] = new int[8]; - - private byte[] tempBucket = new byte[32]; - + private int bucketMask[] = new int[8]; public BloomFilter(int numBytes, BufferAllocator bufferAllocator) { int size = BloomFilter.adjustByteSize(numBytes); this.byteBuf = bufferAllocator.buffer(size); this.numBytes = byteBuf.capacity(); - this.byteBuf.writerIndex(numBytes); + this.byteBuf.writeZero(this.numBytes); + this.byteBuf.writerIndex(this.numBytes); } public BloomFilter(int ndv, double fpp, BufferAllocator bufferAllocator) { @@ -74,26 +73,27 @@ public class BloomFilter { } private void setMask(int key) { - //8 odd numbers act as salt value to participate in the computation of the mask. + //8 odd numbers act as salt value to participate in the computation of the bucketMask. final int SALT[] = {0x47b6137b, 0x44974d91, 0x8824ad5b, 0xa2b7289d, 0x705495c7, 0x2df1424b, 0x9efc4947, 0x5c6bfb31}; - Arrays.fill(mask, 0); + Arrays.fill(bucketMask, 0); for (int i = 0; i < 8; ++i) { - mask[i] = key * SALT[i]; + bucketMask[i] = key * SALT[i]; } for (int i = 0; i < 8; ++i) { - mask[i] = mask[i] >> 27; + bucketMask[i] = bucketMask[i] >>> 27; } for (int i = 0; i < 8; ++i) { - mask[i] = 0x1 << mask[i]; + bucketMask[i] = 0x1 << bucketMask[i]; } } /** * Add an element's hash value to this bloom filter. + * * @param hash hash result of element. */ public void insert(long hash) { @@ -101,16 +101,13 @@ public class BloomFilter { int key = (int) hash; setMask(key); int initialStartIndex = bucketIndex * BYTES_PER_BUCKET; - byteBuf.getBytes(initialStartIndex, tempBucket); for (int i = 0; i < 8; i++) { + int index = initialStartIndex + i * 4; //every iterate batch,we set 32 bits - int bitsetIndex = i * 4; - tempBucket[bitsetIndex] = (byte) (tempBucket[bitsetIndex] | (byte) (mask[i] >>> 24)); - tempBucket[bitsetIndex + 1] = (byte) (tempBucket[(bitsetIndex) + 1] | (byte) (mask[i] >>> 16)); - tempBucket[bitsetIndex + 2] = (byte) (tempBucket[(bitsetIndex) + 2] | (byte) (mask[i] >>> 8)); - tempBucket[bitsetIndex + 3] = (byte) (tempBucket[(bitsetIndex) + 3] | (byte) (mask[i])); + int a = byteBuf.getInt(index); + a |= bucketMask[i]; + byteBuf.setInt(index, a); } - byteBuf.setBytes(initialStartIndex, tempBucket); } /** @@ -123,17 +120,12 @@ public class BloomFilter { int bucketIndex = (int) (hash >> 32) & (numBytes / BYTES_PER_BUCKET - 1); int key = (int) hash; setMask(key); - int startIndex = bucketIndex * BYTES_PER_BUCKET; - byteBuf.getBytes(startIndex, tempBucket); for (int i = 0; i < 8; i++) { - byte set = 0; - int bitsetIndex = i * 4; - set |= tempBucket[bitsetIndex] & ((byte) (mask[i] >>> 24)); - set |= tempBucket[(bitsetIndex + 1)] & ((byte) (mask[i] >>> 16)); - set |= tempBucket[(bitsetIndex + 2)] & ((byte) (mask[i] >>> 8)); - set |= tempBucket[(bitsetIndex + 3)] & ((byte) mask[i]); - if (0 == set) { + int index = startIndex + i * 4; + int a = byteBuf.getInt(index); + int b = a & bucketMask[i]; + if (b == 0) { return false; } } @@ -142,6 +134,7 @@ public class BloomFilter { /** * Merge this bloom filter with other one + * * @param other */ public void or(BloomFilter other) { @@ -150,20 +143,19 @@ public class BloomFilter { Preconditions.checkArgument(otherLength == thisLength); Preconditions.checkState(otherLength % BYTES_PER_BUCKET == 0); Preconditions.checkState(thisLength % BYTES_PER_BUCKET == 0); - byte[] otherTmpBucket = new byte[BYTES_PER_BUCKET]; - for (int i = 0; i < thisLength / BYTES_PER_BUCKET; i++) { - byteBuf.getBytes(i * BYTES_PER_BUCKET, tempBucket); - other.byteBuf.getBytes(i * BYTES_PER_BUCKET, otherTmpBucket); - for (int j = 0; j < BYTES_PER_BUCKET; j++) { - tempBucket[j] = (byte) (tempBucket[j] | otherTmpBucket[j]); - } - this.byteBuf.setBytes(i, tempBucket); + for (int i = 0; i < thisLength / 8; i++) { + int index = i * 8; + long a = byteBuf.getLong(index); + long b = other.byteBuf.getLong(index); + long c = a | b; + byteBuf.setLong(index, c); } } /** * Calculate optimal size according to the number of distinct values and false positive probability. * See http://en.wikipedia.org/wiki/Bloom_filter#Probability_of_false_positives for the formula. + * * @param ndv: The number of distinct values. * @param fpp: The false positive probability. * @return optimal number of bytes of given ndv and fpp. @@ -177,7 +169,7 @@ public class BloomFilter { bits |= bits >> 8; bits |= bits >> 16; bits++; - int bytes = bits/8; + int bytes = bits / 8; return bytes; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterDef.java index 9a6df57ae..b2a9bd763 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterDef.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/BloomFilterDef.java @@ -28,6 +28,8 @@ public class BloomFilterDef { private boolean local; private String probeField; + + private String buildField; //TODO @JsonIgnore private Double leftNDV; @@ -37,10 +39,11 @@ public class BloomFilterDef { @JsonCreator public BloomFilterDef(@JsonProperty("numBytes") int numBytes, @JsonProperty("local") boolean local, @JsonProperty("probeField") - String probeField){ + String probeField, @JsonProperty("buildField") String buildField){ this.numBytes = numBytes; this.local = local; this.probeField = probeField; + this.buildField = buildField; } @@ -61,7 +64,7 @@ public class BloomFilterDef { } public String toString() { - return "BF:{numBytes=" + numBytes + ",send2Foreman=" + !local + ",probeField= " + probeField + " }"; + return "BF:{numBytes=" + numBytes + ",send2Foreman=" + !local + ",probeField= " + probeField + ",buildField= " + buildField + " }"; } @JsonIgnore @@ -82,4 +85,9 @@ public class BloomFilterDef { this.rightNDV = rightNDV; } + public String getBuildField() + { + return buildField; + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterDef.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterDef.java index 5fb51bf24..efe300f3a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterDef.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterDef.java @@ -18,13 +18,8 @@ package org.apache.drill.exec.work.filter; import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; - import com.fasterxml.jackson.annotation.JsonProperty; -import org.apache.drill.exec.physical.base.GroupScan; - - import java.util.List; @JsonIgnoreProperties(ignoreUnknown = true) @@ -37,17 +32,18 @@ public class RuntimeFilterDef { private List<BloomFilterDef> bloomFilterDefs; private boolean sendToForeman; - @JsonIgnore - private GroupScan probeSideGroupScan; + private long runtimeFilterIdentifier; @JsonCreator public RuntimeFilterDef(@JsonProperty("generateBloomFilter") boolean generateBloomFilter, @JsonProperty("generateMinMaxFilter") boolean generateMinMaxFilter, - @JsonProperty("bloomFilterDefs") List<BloomFilterDef> bloomFilterDefs, @JsonProperty("sendToForeman") boolean sendToForeman) { + @JsonProperty("bloomFilterDefs") List<BloomFilterDef> bloomFilterDefs, @JsonProperty("sendToForeman") boolean sendToForeman, + @JsonProperty("runtimeFilterIdentifier") long runtimeFilterIdentifier) { this.generateBloomFilter = generateBloomFilter; this.generateMinMaxFilter = generateMinMaxFilter; this.bloomFilterDefs = bloomFilterDefs; this.sendToForeman = sendToForeman; + this.runtimeFilterIdentifier = runtimeFilterIdentifier; } @@ -84,12 +80,11 @@ public class RuntimeFilterDef { this.sendToForeman = sendToForeman; } - @JsonIgnore - public GroupScan getProbeSideGroupScan() { - return probeSideGroupScan; + public long getRuntimeFilterIdentifier() { + return runtimeFilterIdentifier; } - public void setProbeSideGroupScan(GroupScan probeSideGroupScan) { - this.probeSideGroupScan = probeSideGroupScan; + public void setRuntimeFilterIdentifier(long runtimeFilterIdentifier) { + this.runtimeFilterIdentifier = runtimeFilterIdentifier; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java index 6e4a9a8e5..93736c5f2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterReporter.java @@ -39,7 +39,9 @@ public class RuntimeFilterReporter { this.context = context; } - public void sendOut(List<BloomFilter> bloomFilters, List<String> probeFields, boolean sendToForeman, int hashJoinOpId) { + public void sendOut(List<BloomFilter> bloomFilters, List<String> probeFields, RuntimeFilterDef runtimeFilterDef, int hashJoinOpId) { + boolean sendToForeman = runtimeFilterDef.isSendToForeman(); + long rfIdentifier = runtimeFilterDef.getRuntimeFilterIdentifier(); ExecProtos.FragmentHandle fragmentHandle = context.getHandle(); DrillBuf[] data = new DrillBuf[bloomFilters.size()]; List<Integer> bloomFilterSizeInBytes = new ArrayList<>(); @@ -64,6 +66,7 @@ public class RuntimeFilterReporter { .setMinorFragmentId(minorFragmentId) .setToForeman(sendToForeman) .setHjOpId(hashJoinOpId) + .setRfIdentifier(rfIdentifier) .addAllBloomFilterSizeInBytes(bloomFilterSizeInBytes) .build(); RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterB, data); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java index 5a8c6fc9e..a4946a96c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterRouter.java @@ -17,39 +17,24 @@ */ package org.apache.drill.exec.work.filter; -import com.fasterxml.jackson.databind.ObjectMapper; -import io.netty.buffer.DrillBuf; import org.apache.commons.collections.CollectionUtils; -import org.apache.drill.exec.ops.AccountingDataTunnel; -import org.apache.drill.exec.ops.Consumer; import org.apache.drill.exec.ops.SendingAccountor; -import org.apache.drill.exec.ops.StatusHandler; import org.apache.drill.exec.physical.base.AbstractPhysicalVisitor; import org.apache.drill.exec.physical.base.Exchange; -import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.physical.base.PhysicalOperator; import org.apache.drill.exec.physical.config.HashJoinPOP; +import org.apache.drill.exec.physical.config.RuntimeFilterPOP; import org.apache.drill.exec.planner.fragment.Fragment; import org.apache.drill.exec.planner.fragment.Wrapper; -import org.apache.drill.exec.proto.BitData; import org.apache.drill.exec.proto.CoordinationProtos; -import org.apache.drill.exec.proto.GeneralRPCProtos; -import org.apache.drill.exec.proto.UserBitShared; -import org.apache.drill.exec.rpc.RpcException; -import org.apache.drill.exec.rpc.RpcOutcomeListener; -import org.apache.drill.exec.rpc.data.DataTunnel; import org.apache.drill.exec.server.DrillbitContext; import org.apache.drill.exec.work.QueryWorkUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.StringWriter; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; /** * This class manages the RuntimeFilter routing information of the pushed down join predicate @@ -69,29 +54,24 @@ import java.util.concurrent.ConcurrentHashMap; public class RuntimeFilterRouter { private Wrapper rootWrapper; - //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints - private Map<Integer, List<CoordinationProtos.DrillbitEndpoint>> joinMjId2probdeScanEps = new HashMap<>(); - //HashJoin node's major fragment id to its corresponding probe side nodes's number - private Map<Integer, Integer> joinMjId2scanSize = new ConcurrentHashMap<>(); - //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id - private Map<Integer, Integer> joinMjId2ScanMjId = new HashMap<>(); - - private DrillbitContext drillbitContext; private SendingAccountor sendingAccountor = new SendingAccountor(); + private RuntimeFilterSink runtimeFilterSink; + private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterRouter.class); /** * This class maintains context for the runtime join push down's filter management. It * does a traversal of the physical operators by leveraging the root wrapper which indirectly * holds the global PhysicalOperator tree and contains the minor fragment endpoints. + * * @param workUnit * @param drillbitContext */ public RuntimeFilterRouter(QueryWorkUnit workUnit, DrillbitContext drillbitContext) { this.rootWrapper = workUnit.getRootWrapper(); - this.drillbitContext = drillbitContext; + runtimeFilterSink = new RuntimeFilterSink(drillbitContext, sendingAccountor); } /** @@ -99,6 +79,12 @@ public class RuntimeFilterRouter { * record the relationship between the RuntimeFilter producers and consumers. */ public void collectRuntimeFilterParallelAndControlInfo() { + //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints + Map<Integer, List<CoordinationProtos.DrillbitEndpoint>> joinMjId2probeScanEps = new HashMap<>(); + //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id + Map<Integer, Integer> joinMjId2ScanMjId = new HashMap<>(); + Map<Integer, Integer> joinMjId2rfNumber = new HashMap<>(); + RuntimeFilterParallelismCollector runtimeFilterParallelismCollector = new RuntimeFilterParallelismCollector(); rootWrapper.getNode().getRoot().accept(runtimeFilterParallelismCollector, null); List<RFHelperHolder> holders = runtimeFilterParallelismCollector.getHolders(); @@ -107,67 +93,33 @@ public class RuntimeFilterRouter { List<CoordinationProtos.DrillbitEndpoint> probeSideEndpoints = holder.getProbeSideScanEndpoints(); int probeSideScanMajorId = holder.getProbeSideScanMajorId(); int joinNodeMajorId = holder.getJoinMajorId(); + int buildSideRfNumber = holder.getBuildSideRfNumber(); RuntimeFilterDef runtimeFilterDef = holder.getRuntimeFilterDef(); boolean sendToForeman = runtimeFilterDef.isSendToForeman(); if (sendToForeman) { //send RuntimeFilter to Foreman - joinMjId2probdeScanEps.put(joinNodeMajorId, probeSideEndpoints); - joinMjId2scanSize.put(joinNodeMajorId, probeSideEndpoints.size()); + joinMjId2probeScanEps.put(joinNodeMajorId, probeSideEndpoints); joinMjId2ScanMjId.put(joinNodeMajorId, probeSideScanMajorId); + joinMjId2rfNumber.put(joinNodeMajorId, buildSideRfNumber); } } + runtimeFilterSink.setJoinMjId2probeScanEps(joinMjId2probeScanEps); + runtimeFilterSink.setJoinMjId2rfNumber(joinMjId2rfNumber); + runtimeFilterSink.setJoinMjId2ScanMjId(joinMjId2ScanMjId); } - public void waitForComplete() { sendingAccountor.waitForSendComplete(); + runtimeFilterSink.close(); } /** * This method is passively invoked by receiving a runtime filter from the network - * @param runtimeFilterWritable + * + * @param srcRuntimeFilterWritable */ - public void registerRuntimeFilter(RuntimeFilterWritable runtimeFilterWritable) { - broadcastAggregatedRuntimeFilter(runtimeFilterWritable); - } - - - private void broadcastAggregatedRuntimeFilter(RuntimeFilterWritable srcRuntimeFilterWritable) { - BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef(); - int joinMajorId = runtimeFilterB.getMajorFragmentId(); - UserBitShared.QueryId queryId = runtimeFilterB.getQueryId(); - List<String> probeFields = runtimeFilterB.getProbeFieldsList(); - DrillBuf[] data = srcRuntimeFilterWritable.getData(); - List<CoordinationProtos.DrillbitEndpoint> scanNodeEps = joinMjId2probdeScanEps.get(joinMajorId); - int scanNodeMjId = joinMjId2ScanMjId.get(joinMajorId); - for (int minorId = 0; minorId < scanNodeEps.size(); minorId++) { - BitData.RuntimeFilterBDef.Builder builder = BitData.RuntimeFilterBDef.newBuilder(); - for (String probeField : probeFields) { - builder.addProbeFields(probeField); - } - BitData.RuntimeFilterBDef runtimeFilterBDef = builder - .setQueryId(queryId) - .setMajorFragmentId(scanNodeMjId) - .setMinorFragmentId(minorId) - .build(); - RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, data); - CoordinationProtos.DrillbitEndpoint drillbitEndpoint = scanNodeEps.get(minorId); - DataTunnel dataTunnel = drillbitContext.getDataConnectionsPool().getTunnel(drillbitEndpoint); - Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() { - @Override - public void accept(final RpcException e) { - logger.warn("fail to broadcast a runtime filter to the probe side scan node", e); - } - - @Override - public void interrupt(final InterruptedException e) { - logger.warn("fail to broadcast a runtime filter to the probe side scan node", e); - } - }; - RpcOutcomeListener<GeneralRPCProtos.Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor); - AccountingDataTunnel accountingDataTunnel = new AccountingDataTunnel(dataTunnel, sendingAccountor, statusHandler); - accountingDataTunnel.sendRuntimeFilter(runtimeFilterWritable); - } + public void register(RuntimeFilterWritable srcRuntimeFilterWritable) { + runtimeFilterSink.add(srcRuntimeFilterWritable); } /** @@ -183,18 +135,29 @@ public class RuntimeFilterRouter { boolean isHashJoinOp = op instanceof HashJoinPOP; if (isHashJoinOp) { HashJoinPOP hashJoinPOP = (HashJoinPOP) op; + int hashJoinOpId = hashJoinPOP.getOperatorId(); RuntimeFilterDef runtimeFilterDef = hashJoinPOP.getRuntimeFilterDef(); - if (runtimeFilterDef != null) { - if (holder == null) { - holder = new RFHelperHolder(); + if (runtimeFilterDef != null && runtimeFilterDef.isSendToForeman()) { + if (holder == null || holder.getJoinOpId() != hashJoinOpId) { + holder = new RFHelperHolder(hashJoinOpId); holders.add(holder); } holder.setRuntimeFilterDef(runtimeFilterDef); - GroupScan probeSideScanOp = runtimeFilterDef.getProbeSideGroupScan(); - Wrapper container = findPhysicalOpContainer(rootWrapper, hashJoinPOP); + long runtimeFilterIdentifier = runtimeFilterDef.getRuntimeFilterIdentifier(); + WrapperOperatorsVisitor operatorsVisitor = new WrapperOperatorsVisitor(hashJoinPOP); + Wrapper container = findTargetWrapper(rootWrapper, operatorsVisitor); + if (container == null) { + throw new IllegalStateException(String.format("No valid Wrapper found for HashJoinPOP with id=%d", hashJoinPOP.getOperatorId())); + } + int buildSideRFNumber = container.getAssignedEndpoints().size(); + holder.setBuildSideRfNumber(buildSideRFNumber); int majorFragmentId = container.getMajorFragmentId(); holder.setJoinMajorId(majorFragmentId); - Wrapper probeSideScanContainer = findPhysicalOpContainer(rootWrapper, probeSideScanOp); + WrapperRuntimeFilterOperatorsVisitor runtimeFilterOperatorsVisitor = new WrapperRuntimeFilterOperatorsVisitor(runtimeFilterIdentifier); + Wrapper probeSideScanContainer = findTargetWrapper(container, runtimeFilterOperatorsVisitor); + if (probeSideScanContainer == null) { + throw new IllegalStateException(String.format("No valid Wrapper found for RuntimeFilterPOP with id=%d", op.getOperatorId())); + } int probeSideScanMjId = probeSideScanContainer.getMajorFragmentId(); List<CoordinationProtos.DrillbitEndpoint> probeSideScanEps = probeSideScanContainer.getAssignedEndpoints(); holder.setProbeSideScanEndpoints(probeSideScanEps); @@ -209,59 +172,63 @@ public class RuntimeFilterRouter { } } - private class WrapperOperatorsVisitor extends AbstractPhysicalVisitor<Void, Void, RuntimeException> { + private Wrapper findTargetWrapper(Wrapper wrapper, TargetPhysicalOperatorVisitor targetOpVisitor) { + targetOpVisitor.setCurrentFragment(wrapper.getNode()); + wrapper.getNode().getRoot().accept(targetOpVisitor, null); + boolean contain = targetOpVisitor.isContain(); + if (contain) { + return wrapper; + } + List<Wrapper> dependencies = wrapper.getFragmentDependencies(); + if (CollectionUtils.isEmpty(dependencies)) { + return null; + } + for (Wrapper dependencyWrapper : dependencies) { + Wrapper opContainer = findTargetWrapper(dependencyWrapper, targetOpVisitor); + if (opContainer != null) { + return opContainer; + } + } + return null; + } - private Fragment fragment; + private abstract class TargetPhysicalOperatorVisitor<T, X, E extends Throwable> extends AbstractPhysicalVisitor<T, X, E> { - private boolean contain = false; + protected Exchange sendingExchange; - private boolean targetIsGroupScan; + public void setCurrentFragment(Fragment fragment) { + sendingExchange = fragment.getSendingExchange(); + } - private boolean targetIsHashJoin; + public abstract boolean isContain(); + } - private String targetGroupScanDigest; + private class WrapperOperatorsVisitor extends TargetPhysicalOperatorVisitor<Void, Void, RuntimeException> { - private String targetHashJoinJson; + private boolean contain = false; + private PhysicalOperator targetOp; - public WrapperOperatorsVisitor(PhysicalOperator targetOp, Fragment fragment) { - this.fragment = fragment; - this.targetIsGroupScan = targetOp instanceof GroupScan; - this.targetIsHashJoin = targetOp instanceof HashJoinPOP; - this.targetGroupScanDigest = targetIsGroupScan ? ((GroupScan) targetOp).getDigest() : null; - this.targetHashJoinJson = targetIsHashJoin ? jsonOfPhysicalOp(targetOp) : null; + public WrapperOperatorsVisitor(PhysicalOperator targetOp) { + this.targetOp = targetOp; } @Override public Void visitExchange(Exchange exchange, Void value) throws RuntimeException { - List<Fragment.ExchangeFragmentPair> exchangeFragmentPairs = fragment.getReceivingExchangePairs(); - for (Fragment.ExchangeFragmentPair exchangeFragmentPair : exchangeFragmentPairs) { - boolean same = exchange == exchangeFragmentPair.getExchange(); - if (same) { - return null; - } + if (exchange != sendingExchange) { + return null; } return exchange.getChild().accept(this, value); } @Override public Void visitOp(PhysicalOperator op, Void value) throws RuntimeException { - boolean same = false; - if (targetIsGroupScan && op instanceof GroupScan) { - //Since GroupScan may be rewrite during the planing, here we use the digest to identify it. - String currentDigest = ((GroupScan) op).getDigest(); - same = targetGroupScanDigest.equals(currentDigest); - } - if (targetIsHashJoin && op instanceof HashJoinPOP) { - String currentOpJson = jsonOfPhysicalOp(op); - same = targetHashJoinJson.equals(currentOpJson); - } - if (!same) { + if (op == targetOp) { + contain = true; + } else { for (PhysicalOperator child : op) { child.accept(this, value); } - } else { - contain = true; } return null; } @@ -269,42 +236,57 @@ public class RuntimeFilterRouter { public boolean isContain() { return contain; } + } + + private class WrapperRuntimeFilterOperatorsVisitor extends TargetPhysicalOperatorVisitor<Void, Void, RuntimeException> { + + private boolean contain = false; + + private long identifier; + + + public WrapperRuntimeFilterOperatorsVisitor(long identifier) { + this.identifier = identifier; + } - public String jsonOfPhysicalOp(PhysicalOperator operator) { - try { - ObjectMapper objectMapper = new ObjectMapper(); - StringWriter stringWriter = new StringWriter(); - objectMapper.writeValue(stringWriter, operator); - return stringWriter.toString(); - } catch (IOException e) { - throw new RuntimeException(e); + @Override + public Void visitExchange(Exchange exchange, Void value) throws RuntimeException { + if (exchange != sendingExchange) { + return null; } + return exchange.getChild().accept(this, value); } - } - private boolean containsPhysicalOperator(Wrapper wrapper, PhysicalOperator op) { - WrapperOperatorsVisitor wrapperOpsVistitor = new WrapperOperatorsVisitor(op, wrapper.getNode()); - wrapper.getNode().getRoot().accept(wrapperOpsVistitor, null); - return wrapperOpsVistitor.isContain(); - } + @Override + public Void visitOp(PhysicalOperator op, Void value) throws RuntimeException { + boolean same; + boolean isRuntimeFilterPop = op instanceof RuntimeFilterPOP; + boolean isHashJoinPop = op instanceof HashJoinPOP; - private Wrapper findPhysicalOpContainer(Wrapper wrapper, PhysicalOperator op) { - boolean contain = containsPhysicalOperator(wrapper, op); - if (contain) { - return wrapper; - } - List<Wrapper> dependencies = wrapper.getFragmentDependencies(); - if (CollectionUtils.isEmpty(dependencies)) { + if (isHashJoinPop) { + HashJoinPOP hashJoinPOP = (HashJoinPOP) op; + PhysicalOperator leftPop = hashJoinPOP.getLeft(); + leftPop.accept(this, value); + return null; + } + + if (isRuntimeFilterPop) { + RuntimeFilterPOP runtimeFilterPOP = (RuntimeFilterPOP) op; + same = this.identifier == runtimeFilterPOP.getIdentifier(); + if (same) { + contain = true; + return null; + } + } + for (PhysicalOperator child : op) { + child.accept(this, value); + } return null; } - for (Wrapper dependencyWrapper : dependencies) { - Wrapper opContainer = findPhysicalOpContainer(dependencyWrapper, op); - if (opContainer != null) { - return opContainer; - } + + public boolean isContain() { + return contain; } - //should not be here - throw new IllegalStateException(String.format("No valid Wrapper found for physicalOperator with id=%d", op.getOperatorId())); } /** @@ -320,6 +302,22 @@ public class RuntimeFilterRouter { private RuntimeFilterDef runtimeFilterDef; + private int joinOpId; + + private int buildSideRfNumber; + + public RFHelperHolder(int joinOpId) { + this.joinOpId = joinOpId; + } + + public int getJoinOpId() { + return joinOpId; + } + + public void setJoinOpId(int joinOpId) { + this.joinOpId = joinOpId; + } + public List<CoordinationProtos.DrillbitEndpoint> getProbeSideScanEndpoints() { return probeSideScanEndpoints; } @@ -352,5 +350,13 @@ public class RuntimeFilterRouter { public void setRuntimeFilterDef(RuntimeFilterDef runtimeFilterDef) { this.runtimeFilterDef = runtimeFilterDef; } + + public int getBuildSideRfNumber() { + return buildSideRfNumber; + } + + public void setBuildSideRfNumber(int buildSideRfNumber) { + this.buildSideRfNumber = buildSideRfNumber; + } } -} +}
\ No newline at end of file diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java index 14686254f..f69a44ef7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterSink.java @@ -17,206 +17,250 @@ */ package org.apache.drill.exec.work.filter; -import org.apache.drill.exec.memory.BufferAllocator; +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.exceptions.DrillRuntimeException; +import org.apache.drill.exec.ops.AccountingDataTunnel; +import org.apache.drill.exec.ops.Consumer; +import org.apache.drill.exec.ops.SendingAccountor; +import org.apache.drill.exec.ops.StatusHandler; +import org.apache.drill.exec.proto.BitData; +import org.apache.drill.exec.proto.CoordinationProtos; +import org.apache.drill.exec.proto.GeneralRPCProtos; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.rpc.RpcException; +import org.apache.drill.exec.rpc.RpcOutcomeListener; +import org.apache.drill.exec.rpc.data.DataTunnel; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; /** * This sink receives the RuntimeFilters from the netty thread, - * aggregates them in an async thread, supplies the aggregated - * one to the fragment running thread. + * aggregates them in an async thread, broadcast the final aggregated + * one to the RuntimeFilterRecordBatch. */ -public class RuntimeFilterSink implements AutoCloseable { - - private AtomicInteger currentBookId = new AtomicInteger(0); +public class RuntimeFilterSink implements Closeable +{ - private int staleBookId = 0; + private BlockingQueue<RuntimeFilterWritable> rfQueue = new LinkedBlockingQueue<>(); - /** - * RuntimeFilterWritable holding the aggregated version of all the received filter - */ - private RuntimeFilterWritable aggregated = null; + private Map<Integer, Integer> joinMjId2rfNumber; - private BlockingQueue<RuntimeFilterWritable> rfQueue = new LinkedBlockingQueue<>(); + //HashJoin node's major fragment id to its corresponding probe side nodes's endpoints + private Map<Integer, List<CoordinationProtos.DrillbitEndpoint>> joinMjId2probeScanEps = new HashMap<>(); - /** - * Flag used by Minor Fragment thread to indicate it has encountered error - */ - private AtomicBoolean running = new AtomicBoolean(true); + //HashJoin node's major fragment id to its corresponding probe side scan node's belonging major fragment id + private Map<Integer, Integer> joinMjId2ScanMjId = new HashMap<>(); - /** - * Lock used to synchronize between producer (Netty Thread) and consumer (AsyncAggregateThread) of elements of this - * queue. This is needed because in error condition running flag can be consumed by producer and consumer thread at - * different times. Whoever sees it first will take this lock and clear all elements and set the queue to null to - * indicate producer not to put any new elements in it. - */ - private ReentrantLock queueLock = new ReentrantLock(); + //HashJoin node's major fragment id to its aggregated RuntimeFilterWritable + private Map<Integer, RuntimeFilterWritable> joinMjId2AggregatedRF = new HashMap<>(); + //for debug usage + private Map<Integer, Stopwatch> joinMjId2Stopwatch = new HashMap<>(); - private Condition notEmpty = queueLock.newCondition(); + private DrillbitContext drillbitContext; - private ReentrantLock aggregatedRFLock = new ReentrantLock(); + private SendingAccountor sendingAccountor; - private BufferAllocator bufferAllocator; + private AsyncAggregateWorker asyncAggregateWorker; - private Future future; + private AtomicBoolean running = new AtomicBoolean(true); private static final Logger logger = LoggerFactory.getLogger(RuntimeFilterSink.class); - public RuntimeFilterSink(BufferAllocator bufferAllocator, ExecutorService executorService) { - this.bufferAllocator = bufferAllocator; - AsyncAggregateWorker asyncAggregateWorker = new AsyncAggregateWorker(); - future = executorService.submit(asyncAggregateWorker); + public RuntimeFilterSink(DrillbitContext drillbitContext, SendingAccountor sendingAccountor) + { + this.drillbitContext = drillbitContext; + this.sendingAccountor = sendingAccountor; + asyncAggregateWorker = new AsyncAggregateWorker(); + drillbitContext.getExecutor().submit(asyncAggregateWorker); } - public void aggregate(RuntimeFilterWritable runtimeFilterWritable) { - if (running.get()) { - try { - aggregatedRFLock.lock(); - if (containOne()) { - boolean same = aggregated.equals(runtimeFilterWritable); - if (!same) { - // This is to solve the only one fragment case that two RuntimeFilterRecordBatchs - // share the same FragmentContext. - aggregated.close(); - currentBookId.set(0); - staleBookId = 0; - clearQueued(false); - } - } - } finally { - aggregatedRFLock.unlock(); - } + public void add(RuntimeFilterWritable runtimeFilterWritable) + { + if (!running.get()) { + runtimeFilterWritable.close(); + return; + } + runtimeFilterWritable.retainBuffers(1); + int joinMjId = runtimeFilterWritable.getRuntimeFilterBDef().getMajorFragmentId(); + if (joinMjId2Stopwatch.get(joinMjId) == null) { + Stopwatch stopwatch = Stopwatch.createStarted(); + joinMjId2Stopwatch.put(joinMjId, stopwatch); + } + synchronized (rfQueue) { + rfQueue.add(runtimeFilterWritable); + rfQueue.notify(); + } + } + public void close() { + running.set(false); + if (asyncAggregateWorker != null) { + synchronized (rfQueue) { + rfQueue.notify(); + } + } + while (!asyncAggregateWorker.over.get()) { try { - queueLock.lock(); - if (rfQueue != null) { - rfQueue.add(runtimeFilterWritable); - notEmpty.signal(); - } else { - runtimeFilterWritable.close(); - } - } finally { - queueLock.unlock(); + Thread.sleep(100); + } catch (InterruptedException e) { + logger.error("interrupted while sleeping to wait for the aggregating worker thread to exit", e); } - } else { + } + for (RuntimeFilterWritable runtimeFilterWritable : joinMjId2AggregatedRF.values()) { runtimeFilterWritable.close(); } } - public RuntimeFilterWritable fetchLatestDuplicatedAggregatedOne() { - try { - aggregatedRFLock.lock(); - return aggregated.duplicate(bufferAllocator); - } finally { - aggregatedRFLock.unlock(); + private void aggregate(RuntimeFilterWritable srcRuntimeFilterWritable) + { + BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef(); + int joinMajorId = runtimeFilterB.getMajorFragmentId(); + int buildSideRfNumber; + RuntimeFilterWritable toAggregated = null; + buildSideRfNumber = joinMjId2rfNumber.get(joinMajorId); + buildSideRfNumber--; + joinMjId2rfNumber.put(joinMajorId, buildSideRfNumber); + toAggregated = joinMjId2AggregatedRF.get(joinMajorId); + if (toAggregated == null) { + toAggregated = srcRuntimeFilterWritable; + toAggregated.retainBuffers(1); + } else { + toAggregated.aggregate(srcRuntimeFilterWritable); } - } - - /** - * whether there's a fresh aggregated RuntimeFilter - * - * @return - */ - public boolean hasFreshOne() { - if (currentBookId.get() > staleBookId) { - staleBookId = currentBookId.get(); - return true; + joinMjId2AggregatedRF.put(joinMajorId, toAggregated); + if (buildSideRfNumber == 0) { + joinMjId2AggregatedRF.remove(joinMajorId); + route(toAggregated); + joinMjId2rfNumber.remove(joinMajorId); + Stopwatch stopwatch = joinMjId2Stopwatch.get(joinMajorId); + logger.info( + "received all the RFWs belonging to the majorId {}'s HashJoin nodes and flushed aggregated RFW out elapsed {} ms", + joinMajorId, + stopwatch.elapsed(TimeUnit.MILLISECONDS) + ); } - return false; - } - - /** - * whether there's a usable RuntimeFilter. - * - * @return - */ - public boolean containOne() { - return aggregated != null; } - private void doCleanup() { - running.compareAndSet(true, false); - try { - aggregatedRFLock.lock(); - if (containOne()) { - aggregated.close(); - aggregated = null; + private void route(RuntimeFilterWritable srcRuntimeFilterWritable) + { + BitData.RuntimeFilterBDef runtimeFilterB = srcRuntimeFilterWritable.getRuntimeFilterBDef(); + int joinMajorId = runtimeFilterB.getMajorFragmentId(); + UserBitShared.QueryId queryId = runtimeFilterB.getQueryId(); + List<String> probeFields = runtimeFilterB.getProbeFieldsList(); + List<Integer> sizeInBytes = runtimeFilterB.getBloomFilterSizeInBytesList(); + long rfIdentifier = runtimeFilterB.getRfIdentifier(); + DrillBuf[] data = srcRuntimeFilterWritable.getData(); + List<CoordinationProtos.DrillbitEndpoint> scanNodeEps = joinMjId2probeScanEps.get(joinMajorId); + int scanNodeSize = scanNodeEps.size(); + srcRuntimeFilterWritable.retainBuffers(scanNodeSize - 1); + int scanNodeMjId = joinMjId2ScanMjId.get(joinMajorId); + for (int minorId = 0; minorId < scanNodeEps.size(); minorId++) { + BitData.RuntimeFilterBDef.Builder builder = BitData.RuntimeFilterBDef.newBuilder(); + for (String probeField : probeFields) { + builder.addProbeFields(probeField); } - } finally { - aggregatedRFLock.unlock(); + BitData.RuntimeFilterBDef runtimeFilterBDef = builder.setQueryId(queryId) + .setMajorFragmentId(scanNodeMjId) + .setMinorFragmentId(minorId) + .setToForeman(false) + .setRfIdentifier(rfIdentifier) + .addAllBloomFilterSizeInBytes(sizeInBytes) + .build(); + RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, data); + CoordinationProtos.DrillbitEndpoint drillbitEndpoint = scanNodeEps.get(minorId); + + DataTunnel dataTunnel = drillbitContext.getDataConnectionsPool().getTunnel(drillbitEndpoint); + Consumer<RpcException> exceptionConsumer = new Consumer<RpcException>() + { + @Override + public void accept(final RpcException e) + { + logger.warn("fail to broadcast a runtime filter to the probe side scan node", e); + } + + @Override + public void interrupt(final InterruptedException e) + { + logger.warn("fail to broadcast a runtime filter to the probe side scan node", e); + } + }; + RpcOutcomeListener<GeneralRPCProtos.Ack> statusHandler = new StatusHandler(exceptionConsumer, sendingAccountor); + AccountingDataTunnel accountingDataTunnel = new AccountingDataTunnel(dataTunnel, sendingAccountor, statusHandler); + accountingDataTunnel.sendRuntimeFilter(runtimeFilterWritable); } } - @Override - public void close() throws Exception { - future.cancel(true); - doCleanup(); + public void setJoinMjId2rfNumber(Map<Integer, Integer> joinMjId2rfNumber) + { + this.joinMjId2rfNumber = joinMjId2rfNumber; } - private void clearQueued(boolean setToNull) { - RuntimeFilterWritable toClear; - try { - queueLock.lock(); - while (rfQueue != null && (toClear = rfQueue.poll()) != null) { - toClear.close(); - } - rfQueue = (setToNull) ? null : rfQueue; - } finally { - queueLock.unlock(); - } + public void setJoinMjId2probeScanEps(Map<Integer, List<CoordinationProtos.DrillbitEndpoint>> joinMjId2probeScanEps) + { + this.joinMjId2probeScanEps = joinMjId2probeScanEps; } - private class AsyncAggregateWorker implements Runnable { + public void setJoinMjId2ScanMjId(Map<Integer, Integer> joinMjId2ScanMjId) + { + this.joinMjId2ScanMjId = joinMjId2ScanMjId; + } + + private class AsyncAggregateWorker implements Runnable + { + private AtomicBoolean over = new AtomicBoolean(false); @Override - public void run() { - try { + public void run() + { + while ((joinMjId2rfNumber == null || !joinMjId2rfNumber.isEmpty() ) && running.get()) { RuntimeFilterWritable toAggregate = null; - while (running.get()) { + synchronized (rfQueue) { try { - queueLock.lock(); - toAggregate = (rfQueue != null) ? rfQueue.poll() : null; - if (toAggregate == null) { - notEmpty.await(); - continue; + toAggregate = rfQueue.poll(); + while (toAggregate == null && running.get()) { + rfQueue.wait(); + toAggregate = rfQueue.poll(); } - } finally { - queueLock.unlock(); + } catch (InterruptedException ex) { + logger.error("RFW_Aggregator thread being interrupted", ex); + continue; } - - try { - aggregatedRFLock.lock(); - if (containOne()) { - aggregated.aggregate(toAggregate); - - // Release the byteBuf referenced by toAggregate since aggregate will not do it - toAggregate.close(); - } else { - aggregated = toAggregate; - } - } finally { - aggregatedRFLock.unlock(); + } + if (toAggregate == null) { + continue; + } + // perform aggregate outside the sync block. + try { + aggregate(toAggregate); + } catch (Exception ex) { + logger.error("Failed to aggregate or route the RFW", ex); + throw new DrillRuntimeException(ex); + } finally { + if (toAggregate != null) { + toAggregate.close(); } - currentBookId.incrementAndGet(); } - } catch (InterruptedException e) { - logger.info("RFAggregating Thread : {} was interrupted.", Thread.currentThread().getName()); - Thread.currentThread().interrupt(); - } finally { - doCleanup(); - clearQueued(true); } + + if (!running.get()) { + RuntimeFilterWritable toClose; + while ((toClose = rfQueue.poll()) != null) { + toClose.close(); + } + } + over.set(true); } } } - - diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java index 9a971e94c..f8c2701b1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/filter/RuntimeFilterWritable.java @@ -103,6 +103,27 @@ public class RuntimeFilterWritable implements AutoCloseables.Closeable{ return new RuntimeFilterWritable(runtimeFilterBDef, cloned); } + public void retainBuffers(final int increment) { + if (increment <= 0) { + return; + } + for (final DrillBuf buf : data) { + buf.retain(increment); + } + } + //TODO: Not used currently because of DRILL-6826 + public RuntimeFilterWritable newRuntimeFilterWritable(BufferAllocator bufferAllocator) { + int bufNum = data.length; + DrillBuf [] newBufs = new DrillBuf[bufNum]; + int i = 0; + for (DrillBuf buf : data) { + DrillBuf transferredBuffer = buf.transferOwnership(bufferAllocator).buffer; + newBufs[i] = transferredBuffer; + i++; + } + return new RuntimeFilterWritable(this.runtimeFilterBDef, newBufs); + } + public String toString() { return identifier; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java index 42b76f278..a379db175 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/foreman/Foreman.java @@ -151,7 +151,7 @@ public class Foreman implements Runnable { this.fragmentsRunner = new FragmentsRunner(bee, initiatingClient, drillbitContext, this); this.queryStateProcessor = new QueryStateProcessor(queryIdString, queryManager, drillbitContext, new ForemanResult()); this.profileOption = setProfileOption(queryContext.getOptions()); - this.enableRuntimeFilter = drillbitContext.getOptionManager().getBoolean(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_KEY); + this.enableRuntimeFilter = queryContext.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_KEY).bool_val; } |