aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
diff options
context:
space:
mode:
authorweijie.tong <weijie.tong@alipay.com>2018-10-14 19:41:51 +0800
committerVitalii Diravka <vitalii.diravka@gmail.com>2018-11-29 18:33:23 +0200
commit9667e92e1e87ce1826f0eac3f2396187dbfa8aaa (patch)
treecb68cd4bbedf6f84e00168cb0ab300c6dacdb35c /exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
parent325fa26b5df1bc29594677a0f3e1360fbb4f8bca (diff)
DRILL-6792: Find the right probe side fragment wrapper & fix DrillBuf reference count bugs & tune the execution flow & support left deep tree
closes #1504
Diffstat (limited to 'exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java')
-rw-r--r--exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java43
1 files changed, 32 insertions, 11 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 88eadf291..0ac0809d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -19,10 +19,13 @@ package org.apache.drill.exec.physical.impl.join;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import org.apache.drill.common.expression.ExpressionPosition;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
@@ -203,11 +206,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
private int originalPartition = -1; // the partition a secondary reads from
IntVector read_right_HV_vector; // HV vector that was read from the spilled batch
private int maxBatchesInMemory;
- private List<BloomFilter> bloomFilters = new ArrayList<>();
private List<String> probeFields = new ArrayList<>(); // keep the same sequence with the bloomFilters
private boolean enableRuntimeFilter;
private RuntimeFilterReporter runtimeFilterReporter;
private ValueVectorHashHelper.Hash64 hash64;
+ private Map<BloomFilter, Integer> bloomFilter2buildId = new HashMap<>();
+ private Map<BloomFilterDef, Integer> bloomFilterDef2buildId = new HashMap<>();
+ private List<BloomFilter> bloomFilters = new ArrayList<>();
/**
* This holds information about the spilled partitions for the build and probe side.
@@ -757,6 +762,24 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
enableRuntimeFilter = false;
return;
}
+ RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
+ List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs();
+ for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
+ String buildField = bloomFilterDef.getBuildField();
+ SchemaPath schemaPath = new SchemaPath(new PathSegment.NameSegment(buildField), ExpressionPosition.UNKNOWN);
+ TypedFieldId typedFieldId = buildBatch.getValueVectorId(schemaPath);
+ if (typedFieldId == null) {
+ missingField = true;
+ break;
+ }
+ int fieldId = typedFieldId.getFieldIds()[0];
+ bloomFilterDef2buildId.put(bloomFilterDef, fieldId);
+ }
+ if (missingField) {
+ logger.info("As some build side join key fields not found, runtime filter was disabled");
+ enableRuntimeFilter = false;
+ return;
+ }
ValueVectorHashHelper hashHelper = new ValueVectorHashHelper(buildBatch, context);
try {
hash64 = hashHelper.getHash64(keyExprsBuild, buildSideTypeFieldIds);
@@ -799,9 +822,6 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
if (!enableRuntimeFilter) {
return;
}
- if (runtimeFilterReporter != null) {
- return;
- }
runtimeFilterReporter = new RuntimeFilterReporter((ExecutorFragmentContext) context);
RuntimeFilterDef runtimeFilterDef = popConfig.getRuntimeFilterDef();
//RuntimeFilter is not a necessary part of a HashJoin operator, only the query which satisfy the
@@ -809,11 +829,13 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
if (runtimeFilterDef != null) {
List<BloomFilterDef> bloomFilterDefs = runtimeFilterDef.getBloomFilterDefs();
for (BloomFilterDef bloomFilterDef : bloomFilterDefs) {
+ int buildFieldId = bloomFilterDef2buildId.get(bloomFilterDef);
int numBytes = bloomFilterDef.getNumBytes();
String probeField = bloomFilterDef.getProbeField();
probeFields.add(probeField);
BloomFilter bloomFilter = new BloomFilter(numBytes, context.getAllocator());
bloomFilters.add(bloomFilter);
+ bloomFilter2buildId.put(bloomFilter, buildFieldId);
}
}
}
@@ -992,13 +1014,12 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
//create runtime filter
if (spilledState.isFirstCycle() && enableRuntimeFilter) {
//create runtime filter and send out async
- int condFieldIndex = 0;
- for (BloomFilter bloomFilter : bloomFilters) {
+ for (BloomFilter bloomFilter : bloomFilter2buildId.keySet()) {
+ int fieldId = bloomFilter2buildId.get(bloomFilter);
for (int ind = 0; ind < currentRecordCount; ind++) {
- long hashCode = hash64.hash64Code(ind, 0, condFieldIndex);
+ long hashCode = hash64.hash64Code(ind, 0, fieldId);
bloomFilter.insert(hashCode);
}
- condFieldIndex++;
}
}
@@ -1027,9 +1048,9 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
}
if (spilledState.isFirstCycle() && enableRuntimeFilter) {
- if (bloomFilters.size() > 0) {
+ if (bloomFilter2buildId.size() > 0) {
int hashJoinOpId = this.popConfig.getOperatorId();
- runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef().isSendToForeman(), hashJoinOpId);
+ runtimeFilterReporter.sendOut(bloomFilters, probeFields, this.popConfig.getRuntimeFilterDef(), hashJoinOpId);
}
}
@@ -1237,7 +1258,7 @@ public class HashJoinBatch extends AbstractBinaryRecordBatch<HashJoinPOP> implem
RecordBatchStats.logRecordBatchStats(getRecordBatchStatsContext(),
"configured output batch size: %d", configuredBatchSize);
- enableRuntimeFilter = context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER);
+ enableRuntimeFilter = context.getOptions().getOption(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER) && popConfig.getRuntimeFilterDef() != null;
}
/**