aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical
diff options
context:
space:
mode:
authorweijie.tong <weijie.tong@alipay.com>2018-10-14 19:41:51 +0800
committerVitalii Diravka <vitalii.diravka@gmail.com>2018-11-29 18:33:23 +0200
commit9667e92e1e87ce1826f0eac3f2396187dbfa8aaa (patch)
treecb68cd4bbedf6f84e00168cb0ab300c6dacdb35c /exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical
parent325fa26b5df1bc29594677a0f3e1360fbb4f8bca (diff)
DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf reference count bugs & tune the execution flow & support left deep tree
closes #1504
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java14
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java80
2 files changed, 77 insertions, 17 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java
index 59e1622b5..1729027de 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/RuntimeFilterPrel.java
@@ -27,25 +27,29 @@ import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import java.io.IOException;
import java.util.List;
-public class RuntimeFilterPrel extends SinglePrel{
+public class RuntimeFilterPrel extends SinglePrel {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RuntimeFilterPrel.class);
- public RuntimeFilterPrel(Prel child){
+ private long identifier;
+
+ public RuntimeFilterPrel(Prel child, long identifier){
super(child.getCluster(), child.getTraitSet(), child);
+ this.identifier = identifier;
}
- public RuntimeFilterPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child) {
+ public RuntimeFilterPrel(RelOptCluster cluster, RelTraitSet traits, RelNode child, long identifier) {
super(cluster, traits, child);
+ this.identifier = identifier;
}
@Override
public RelNode copy(RelTraitSet traitSet, List<RelNode> inputs) {
- return new RuntimeFilterPrel(this.getCluster(), traitSet, inputs.get(0));
+ return new RuntimeFilterPrel(this.getCluster(), traitSet, inputs.get(0), identifier);
}
@Override
public PhysicalOperator getPhysicalOperator(PhysicalPlanCreator creator) throws IOException {
- RuntimeFilterPOP r = new RuntimeFilterPOP( ((Prel)getInput()).getPhysicalOperator(creator));
+ RuntimeFilterPOP r = new RuntimeFilterPOP( ((Prel)getInput()).getPhysicalOperator(creator), identifier);
return creator.addMetadata(this, r);
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
index fcfa2bca1..4d309aea0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java
@@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.planner.physical.visitor;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.calcite.plan.volcano.RelSubset;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.JoinInfo;
@@ -28,7 +27,6 @@ import org.apache.calcite.rel.type.RelDataTypeField;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.QueryContext;
-import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.physical.BroadcastExchangePrel;
import org.apache.drill.exec.planner.physical.ExchangePrel;
import org.apache.drill.exec.planner.physical.HashAggPrel;
@@ -43,11 +41,14 @@ import org.apache.drill.exec.planner.physical.TopNPrel;
import org.apache.drill.exec.work.filter.BloomFilter;
import org.apache.drill.exec.work.filter.BloomFilterDef;
import org.apache.drill.exec.work.filter.RuntimeFilterDef;
-
+import org.apache.drill.shaded.guava.com.google.common.collect.HashMultimap;
+import org.apache.drill.shaded.guava.com.google.common.collect.Multimap;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
/**
* This visitor does two major things:
@@ -58,9 +59,14 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
private Set<ScanPrel> toAddRuntimeFilter = new HashSet<>();
+ private Multimap<ScanPrel, HashJoinPrel> probeSideScan2hj = HashMultimap.create();
+
private double fpp;
+
private int bloomFilterMaxSizeInBytesDef;
+ private static final AtomicLong rfIdCounter = new AtomicLong();
+
private RuntimeFilterVisitor(QueryContext queryContext) {
this.bloomFilterMaxSizeInBytesDef = queryContext.getOption(ExecConstants.HASHJOIN_BLOOM_FILTER_MAX_SIZE_KEY).num_val.intValue();
this.fpp = queryContext.getOption(ExecConstants.HASHJOIN_BLOOM_FILTER_FPP_KEY).float_val;
@@ -76,7 +82,7 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
}
public Prel visitPrel(Prel prel, Void value) throws RuntimeException {
- List<RelNode> children = Lists.newArrayList();
+ List<RelNode> children = new ArrayList<>();
for (Prel child : prel) {
child = child.accept(this, value);
children.add(child);
@@ -100,8 +106,18 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
@Override
public Prel visitScan(ScanPrel prel, Void value) throws RuntimeException {
if (toAddRuntimeFilter.contains(prel)) {
- //Spawn a fresh RuntimeFilterPrel over the previous identified probe side scan node.
- RuntimeFilterPrel runtimeFilterPrel = new RuntimeFilterPrel(prel);
+ //Spawn a fresh RuntimeFilterPrel over the previous identified probe side scan node or a runtime filter node.
+ Collection<HashJoinPrel> hashJoinPrels = probeSideScan2hj.get(prel);
+ RuntimeFilterPrel runtimeFilterPrel = null;
+ for (HashJoinPrel hashJoinPrel : hashJoinPrels) {
+ long identifier = rfIdCounter.incrementAndGet();
+ hashJoinPrel.getRuntimeFilterDef().setRuntimeFilterIdentifier(identifier);
+ if (runtimeFilterPrel == null) {
+ runtimeFilterPrel = new RuntimeFilterPrel(prel, identifier);
+ } else {
+ runtimeFilterPrel = new RuntimeFilterPrel(runtimeFilterPrel, identifier);
+ }
+ }
return runtimeFilterPrel;
} else {
return prel;
@@ -134,13 +150,24 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
List<BloomFilterDef> bloomFilterDefs = new ArrayList<>();
//find the possible left scan node of the left join key
- GroupScan groupScan = null;
+ ScanPrel probeSideScanPrel = null;
RelNode left = hashJoinPrel.getLeft();
+ RelNode right = hashJoinPrel.getRight();
+ ExchangePrel exchangePrel = findRightExchangePrel(right);
+ if (exchangePrel == null) {
+ //Does not support the single fragment mode ,that is the right build side
+ //can only be BroadcastExchangePrel or HashToRandomExchangePrel
+ return null;
+ }
List<String> leftFields = left.getRowType().getFieldNames();
+ List<String> rightFields = right.getRowType().getFieldNames();
List<Integer> leftKeys = hashJoinPrel.getLeftKeys();
RelMetadataQuery metadataQuery = left.getCluster().getMetadataQuery();
+ int i = 0;
for (Integer leftKey : leftKeys) {
String leftFieldName = leftFields.get(leftKey);
+ String rightFieldName = rightFields.get(i);
+ i++;
//This also avoids the left field of the join condition with a function call.
ScanPrel scanPrel = findLeftScanPrel(leftFieldName, left);
if (scanPrel != null) {
@@ -160,17 +187,17 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
int bloomFilterSizeInBytes = BloomFilter.optimalNumOfBytes(ndv.longValue(), fpp);
bloomFilterSizeInBytes = bloomFilterSizeInBytes > bloomFilterMaxSizeInBytesDef ? bloomFilterMaxSizeInBytesDef : bloomFilterSizeInBytes;
//left the local parameter to be set later.
- BloomFilterDef bloomFilterDef = new BloomFilterDef(bloomFilterSizeInBytes, false, leftFieldName);
+ BloomFilterDef bloomFilterDef = new BloomFilterDef(bloomFilterSizeInBytes, false, leftFieldName, rightFieldName);
bloomFilterDef.setLeftNDV(ndv);
bloomFilterDefs.add(bloomFilterDef);
toAddRuntimeFilter.add(scanPrel);
- groupScan = scanPrel.getGroupScan();
+ probeSideScanPrel = scanPrel;
}
}
if (bloomFilterDefs.size() > 0) {
//left sendToForeman parameter to be set later.
- RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false);
- runtimeFilterDef.setProbeSideGroupScan(groupScan);
+ RuntimeFilterDef runtimeFilterDef = new RuntimeFilterDef(true, false, bloomFilterDefs, false, -1);
+ probeSideScan2hj.put(probeSideScanPrel, hashJoinPrel);
return runtimeFilterDef;
}
return null;
@@ -265,6 +292,30 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
}
}
+ private ExchangePrel findRightExchangePrel(RelNode rightRelNode) {
+ if (rightRelNode instanceof ExchangePrel) {
+ return (ExchangePrel) rightRelNode;
+ }
+ if (rightRelNode instanceof ScanPrel) {
+ return null;
+ } else if (rightRelNode instanceof RelSubset) {
+ RelNode bestNode = ((RelSubset) rightRelNode).getBest();
+ if (bestNode != null) {
+ return findRightExchangePrel(bestNode);
+ } else {
+ return null;
+ }
+ } else {
+ List<RelNode> relNodes = rightRelNode.getInputs();
+ if (relNodes.size() == 1) {
+ RelNode leftNode = relNodes.get(0);
+ return findRightExchangePrel(leftNode);
+ } else {
+ return null;
+ }
+ }
+ }
+
private boolean containBlockNode(Prel startNode, Prel endNode) {
BlockNodeVisitor blockNodeVisitor = new BlockNodeVisitor();
startNode.accept(blockNodeVisitor, endNode);
@@ -311,6 +362,11 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
return null;
}
+ if (currentPrel instanceof HashJoinPrel) {
+ encounteredBlockNode = true;
+ return null;
+ }
+
for (Prel subPrel : currentPrel) {
visitPrel(subPrel, endValue);
}
@@ -349,4 +405,4 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc
}
}
-}
+} \ No newline at end of file