diff options
author | weijie.tong <weijie.tong@alipay.com> | 2018-10-14 19:41:51 +0800 |
---|---|---|
committer | Vitalii Diravka <vitalii.diravka@gmail.com> | 2018-11-29 18:33:23 +0200 |
commit | 9667e92e1e87ce1826f0eac3f2396187dbfa8aaa (patch) | |
tree | cb68cd4bbedf6f84e00168cb0ab300c6dacdb35c /exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical | |
parent | 325fa26b5df1bc29594677a0f3e1360fbb4f8bca (diff) |
DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf reference count bugs & tune the execution flow & support left deep tree
closes #1504
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical')
2 files changed, 77 insertions, 17 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java index 59e1622b5..1729027de 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java @@ -27,25 +27,29 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode; import java.io.IOException; import java.util.List; -public class RuntimeFilterPrel extends SinglePrel{ +public class RuntimeFilterPrel extends SinglePrel { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterPrel.class); - public RuntimeFilterPrel(Prel child){ + private long identifier; + + public RuntimeFilterPrel(Prel child, long identifier){ super(child.getCluster(), child.getTraitSet(), child); + this.identifier = identifier; } - public RuntimeFilterPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child) { + public RuntimeFilterPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, long identifier) { super(cluster, traits, child); + this.identifier = identifier; } @Override public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) { - return new RuntimeFilterPrel(this.getCluster(), traitSet, inputs.get(0)); + return new RuntimeFilterPrel(this.getCluster(), traitSet, inputs.get(0), identifier); } @Override public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException { - RuntimeFilterPOP r = new RuntimeFilterPOP( ((Prel)getInput()).getPhysicalOperator(creator)); + RuntimeFilterPOP r = new RuntimeFilterPOP( ((Prel)getInput()).getPhysicalOperator(creator), identifier); return creator.addMetadata(this, r); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java index fcfa2bca1..4d309aea0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.planner.physical.visitor; -import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.calcite.plan.volcano.RelSubset; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.core.JoinInfo; @@ -28,7 +27,6 @@ import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.util.ImmutableBitSet; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ops.QueryContext; -import org.apache.drill.exec.physical.base.GroupScan; import org.apache.drill.exec.planner.physical.BroadcastExchangePrel; import org.apache.drill.exec.planner.physical.ExchangePrel; import org.apache.drill.exec.planner.physical.HashAggPrel; @@ -43,11 +41,14 @@ import org.apache.drill.exec.planner.physical.TopNPrel; import org.apache.drill.exec.work.filter.BloomFilter; import org.apache.drill.exec.work.filter.BloomFilterDef; import org.apache.drill.exec.work.filter.RuntimeFilterDef; - +import org.apache.drill.shaded.guava.com.google.common.collect.HashMultimap; +import org.apache.drill.shaded.guava.com.google.common.collect.Multimap; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; /** * This visitor does two major things: @@ -58,9 +59,14 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc private Set<ScanPrel> toAddRuntimeFilter = new HashSet<>(); + private Multimap<ScanPrel, HashJoinPrel> probeSideScan2hj = HashMultimap.create(); + private double fpp; + private int bloomFilterMaxSizeInBytesDef; + private static final AtomicLong rfIdCounter = new AtomicLong(); + private RuntimeFilterVisitor(QueryContext queryContext) { this.bloomFilterMaxSizeInBytesDef = queryContext.getOption(ExecConstants.HASHJOIN_BLOOM_FILTER_MAX_SIZE_KEY).num_val.intValue(); this.fpp = queryContext.getOption(ExecConstants.HASHJOIN_BLOOM_FILTER_FPP_KEY).float_val; @@ -76,7 +82,7 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc } public Prel visitPrel(Prel prel, Void value) throws RuntimeException { - List<RelNode> children = Lists.newArrayList(); + List<RelNode> children = new ArrayList<>(); for (Prel child : prel) { child = child.accept(this, value); children.add(child); @@ -100,8 +106,18 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc @Override public Prel visitScan(ScanPrel prel, Void value) throws RuntimeException { if (toAddRuntimeFilter.contains(prel)) { - //Spawn a fresh RuntimeFilterPrel over the previous identified probe side scan node. - RuntimeFilterPrel runtimeFilterPrel = new RuntimeFilterPrel(prel); + //Spawn a fresh RuntimeFilterPrel over the previous identified probe side scan node or a runtime filter node. + Collection<HashJoinPrel> hashJoinPrels = probeSideScan2hj.get(prel); + RuntimeFilterPrel runtimeFilterPrel = null; + for (HashJoinPrel hashJoinPrel : hashJoinPrels) { + long identifier = rfIdCounter.incrementAndGet(); + hashJoinPrel.getRuntimeFilterDef().setRuntimeFilterIdentifier(identifier); + if (runtimeFilterPrel == null) { + runtimeFilterPrel = new RuntimeFilterPrel(prel, identifier); + } else { + runtimeFilterPrel = new RuntimeFilterPrel(runtimeFilterPrel, identifier); + } + } return runtimeFilterPrel; } else { return prel; @@ -134,13 +150,24 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc List<BloomFilterDef> bloomFilterDefs = new ArrayList<>(); //find the possible left scan node of the left join key - GroupScan groupScan = null; + ScanPrel probeSideScanPrel = null; RelNode left = hashJoinPrel.getLeft(); + RelNode right = hashJoinPrel.getRight(); + ExchangePrel exchangePrel = findRightExchangePrel(right); + if (exchangePrel == null) { + //Does not support the single fragment mode ,that is the right build side + //can only be BroadcastExchangePrel or HashToRandomExchangePrel + return null; + } List<String> leftFields = left.getRowType().getFieldNames(); + List<String> rightFields = right.getRowType().getFieldNames(); List<Integer> leftKeys = hashJoinPrel.getLeftKeys(); RelMetadataQuery metadataQuery = left.getCluster().getMetadataQuery(); + int i = 0; for (Integer leftKey : leftKeys) { String leftFieldName = leftFields.get(leftKey); + String rightFieldName = rightFields.get(i); + i++; //This also avoids the left field of the join condition with a function call. ScanPrel scanPrel = findLeftScanPrel(leftFieldName, left); if (scanPrel != null) { @@ -160,17 +187,17 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc int bloomFilterSizeInBytes = BloomFilter.optimalNumOfBytes(ndv.longValue(), fpp); bloomFilterSizeInBytes = bloomFilterSizeInBytes > bloomFilterMaxSizeInBytesDef ? bloomFilterMaxSizeInBytesDef : bloomFilterSizeInBytes; //left the local parameter to be set later. - BloomFilterDef bloomFilterDef = new BloomFilterDef(bloomFilterSizeInBytes, false, leftFieldName); + BloomFilterDef bloomFilterDef = new BloomFilterDef(bloomFilterSizeInBytes, false, leftFieldName, rightFieldName); bloomFilterDef.setLeftNDV(ndv); bloomFilterDefs.add(bloomFilterDef); toAddRuntimeFilter.add(scanPrel); - groupScan = scanPrel.getGroupScan(); + probeSideScanPrel = scanPrel; } } if (bloomFilterDefs.size() > 0) { //left sendToForeman parameter to be set later. - RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false); - runtimeFilterDef.setProbeSideGroupScan(groupScan); + RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false, -1); + probeSideScan2hj.put(probeSideScanPrel, hashJoinPrel); return runtimeFilterDef; } return null; @@ -265,6 +292,30 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc } } + private ExchangePrel findRightExchangePrel(RelNode rightRelNode) { + if (rightRelNode instanceof ExchangePrel) { + return (ExchangePrel) rightRelNode; + } + if (rightRelNode instanceof ScanPrel) { + return null; + } else if (rightRelNode instanceof RelSubset) { + RelNode bestNode = ((RelSubset) rightRelNode).getBest(); + if (bestNode != null) { + return findRightExchangePrel(bestNode); + } else { + return null; + } + } else { + List<RelNode> relNodes = rightRelNode.getInputs(); + if (relNodes.size() == 1) { + RelNode leftNode = relNodes.get(0); + return findRightExchangePrel(leftNode); + } else { + return null; + } + } + } + private boolean containBlockNode(Prel startNode, Prel endNode) { BlockNodeVisitor blockNodeVisitor = new BlockNodeVisitor(); startNode.accept(blockNodeVisitor, endNode); @@ -311,6 +362,11 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc return null; } + if (currentPrel instanceof HashJoinPrel) { + encounteredBlockNode = true; + return null; + } + for (Prel subPrel : currentPrel) { visitPrel(subPrel, endValue); } @@ -349,4 +405,4 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc } } -} +}
\ No newline at end of file |