diff options
author | weijie.tong <weijie.tong@alipay.com> | 2018-10-14 19:41:51 +0800 |
---|---|---|
committer | Vitalii Diravka <vitalii.diravka@gmail.com> | 2018-11-29 18:33:23 +0200 |
commit | 9667e92e1e87ce1826f0eac3f2396187dbfa8aaa (patch) | |
tree | cb68cd4bbedf6f84e00168cb0ab300c6dacdb35c /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java | |
parent | 325fa26b5df1bc29594677a0f3e1360fbb4f8bca (diff) |
DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf reference count bugs & tune the execution flow & support left deep tree
closes #1504
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java')
-rw-r--r-- | exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java | 43 |
1 files changed, 32 insertions, 11 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 88eadf291..0ac0809d8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -19,10 +19,13 @@ package org.apache.drill.exec.physical.impl.join; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; +import org.apache.drill.common.expression.ExpressionPosition; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; import org.apache.drill.shaded.guava.com.google.common.collect.Sets; @@ -203,11 +206,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem private int originalPartition = -1; // the partition a secondary reads from IntVector read_right_HV_vector; // HV vector that was read from the spilled batch private int maxBatchesInMemory; - private List<BloomFilter> bloomFilters = new ArrayList<>(); private List<String> probeFields = new ArrayList<>(); // keep the same sequence with the bloomFilters private boolean enableRuntimeFilter; private RuntimeFilterReporter runtimeFilterReporter; private ValueVectorHashHelper.Hash64 hash64; + private Map<BloomFilter, Integer> bloomFilter2buildId = new HashMap<>(); + private Map<BloomFilterDef, Integer> bloomFilterDef2buildId = new HashMap<>(); + private List<BloomFilter> bloomFilters = new ArrayList<>(); /** * This holds information about the spilled partitions for the build and probe side. @@ -757,6 +762,24 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem enableRuntimeFilter = false; return; } + RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef(); + List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs(); + for (BloomFilterDef bloomFilterDef : bloomFilterDefs) { + String buildField = bloomFilterDef.getBuildField(); + SchemaPath schemaPath = new SchemaPath(new PathSegment.NameSegment(buildField), ExpressionPosition.UNKNOWN); + TypedFieldId typedFieldId = buildBatch.getValueVectorId(schemaPath); + if (typedFieldId == null) { + missingField = true; + break; + } + int fieldId = typedFieldId.getFieldIds()[0]; + bloomFilterDef2buildId.put(bloomFilterDef, fieldId); + } + if (missingField) { + logger.info("As some build side join key fields not found, runtime filter was disabled"); + enableRuntimeFilter = false; + return; + } ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(buildBatch, context); try { hash64 = hashHelper.getHash64(keyExprsBuild, buildSideTypeFieldIds); @@ -799,9 +822,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem if (!enableRuntimeFilter) { return; } - if (runtimeFilterReporter != null) { - return; - } runtimeFilterReporter = new RuntimeFilterReporter((ExecutorFragmentContext) context); RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef(); //RuntimeFilter is not a necessary part of a HashJoin operator, only the query which satisfy the @@ -809,11 +829,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem if (runtimeFilterDef != null) { List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs(); for (BloomFilterDef bloomFilterDef : bloomFilterDefs) { + int buildFieldId = bloomFilterDef2buildId.get(bloomFilterDef); int numBytes = bloomFilterDef.getNumBytes(); String probeField = bloomFilterDef.getProbeField(); probeFields.add(probeField); BloomFilter bloomFilter = new BloomFilter(numBytes, context.getAllocator()); bloomFilters.add(bloomFilter); + bloomFilter2buildId.put(bloomFilter, buildFieldId); } } } @@ -992,13 +1014,12 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem //create runtime filter if (spilledState.isFirstCycle() && enableRuntimeFilter) { //create runtime filter and send out async - int condFieldIndex = 0; - for (BloomFilter bloomFilter : bloomFilters) { + for (BloomFilter bloomFilter : bloomFilter2buildId.keySet()) { + int fieldId = bloomFilter2buildId.get(bloomFilter); for (int ind = 0; ind < currentRecordCount; ind++) { - long hashCode = hash64.hash64Code(ind, 0, condFieldIndex); + long hashCode = hash64.hash64Code(ind, 0, fieldId); bloomFilter.insert(hashCode); } - condFieldIndex++; } } @@ -1027,9 +1048,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem } if (spilledState.isFirstCycle() && enableRuntimeFilter) { - if (bloomFilters.size() > 0) { + if (bloomFilter2buildId.size() > 0) { int hashJoinOpId = this.popConfig.getOperatorId(); - runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef().isSendToForeman(), hashJoinOpId); + runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef(), hashJoinOpId); } } @@ -1237,7 +1258,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(), "configured output batch size: %d", configuredBatchSize); - enableRuntimeFilter = context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER); + enableRuntimeFilter = context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER) && popConfig.getRuntimeFilterDef() != null; } /** |