diff options
author | Sorabh Hamirwasia <sorabh@apache.org> | 2019-01-29 21:09:55 -0800 |
---|---|---|
committer | Aman Sinha <asinha@maprtech.com> | 2019-02-01 16:15:07 -0800 |
commit | 35b42ebdfd645b92330ad5dcac363aa0036696a7 (patch) | |
tree | 4c5e2646395580ab177d9d81cc3f241aabedfacd | |
parent | b5fe06534cf8a391caed65ae274ad144b10b580e (diff) |
DRILL-7016: Wrong query result with RuntimeFilter enabled when order of join and filter condition is swapped
close apache/drill#1628
2 files changed, 183 insertions, 3 deletions
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java index 4d309aea0..acc597705 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/physical/visitor/RuntimeFilterVisitor.java @@ -162,12 +162,14 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc List<String> leftFields = left.getRowType().getFieldNames(); List<String> rightFields = right.getRowType().getFieldNames(); List<Integer> leftKeys = hashJoinPrel.getLeftKeys(); + List<Integer> rightKeys = hashJoinPrel.getRightKeys(); RelMetadataQuery metadataQuery = left.getCluster().getMetadataQuery(); int i = 0; for (Integer leftKey : leftKeys) { String leftFieldName = leftFields.get(leftKey); - String rightFieldName = rightFields.get(i); - i++; + Integer rightKey = rightKeys.get(i++); + String rightFieldName = rightFields.get(rightKey); + //This also avoids the left field of the join condition with a function call. ScanPrel scanPrel = findLeftScanPrel(leftFieldName, left); if (scanPrel != null) { @@ -405,4 +407,4 @@ public class RuntimeFilterVisitor extends BasePrelVisitor<Prel, Void, RuntimeExc } } -}
\ No newline at end of file +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPDCorrectness.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPDCorrectness.java new file mode 100644 index 000000000..042a081cb --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/join/TestHashJoinJPPDCorrectness.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.join; + +import org.apache.drill.categories.SqlTest; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterTest; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.assertThat; +import java.nio.file.Paths; + +@Category(SqlTest.class) +public class TestHashJoinJPPDCorrectness extends ClusterTest { + + private static final String ALTER_RUNTIME_FILTER_OPTION_COMMAND = "ALTER SESSION SET `" + + ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_KEY + "` = %s"; + + private static final String ALTER_RUNTIME_FILTER_WAITING_OPTION_COMMAND = "ALTER SESSION SET `" + + ExecConstants.HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY + "` = %s"; + + private static final String ALTER_RUNTIME_FILTER_WAIT_TIME_OPTION_COMMAND = "ALTER SESSION SET `" + + ExecConstants.HASHJOIN_RUNTIME_FILTER_MAX_WAITING_TIME_KEY + "` = %d"; + + private static final String ALTER_RUNTIME_FILTER_ENABLE_AND_WAIT_OPTION_COMMAND = + String.format("%s;%s;%s", ALTER_RUNTIME_FILTER_OPTION_COMMAND, ALTER_RUNTIME_FILTER_WAITING_OPTION_COMMAND, + ALTER_RUNTIME_FILTER_WAIT_TIME_OPTION_COMMAND); + + @BeforeClass + public static void setUp() throws Exception { + dirTestWatcher.copyResourceToRoot(Paths.get("tpchmulti")); + // Reduce the slice target so that there are multiple minor fragments with exchange, otherwise RuntimeFilter + // will not be inserted in the plan + startCluster(ClusterFixture.builder(dirTestWatcher) + .clusterSize(2) + .maxParallelization(1) + .systemOption(ExecConstants.SLICE_TARGET, 10)); + } + + @After + public void tearDown() { + client.resetSession(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_KEY); + client.resetSession(ExecConstants.HASHJOIN_RUNTIME_FILTER_WAITING_ENABLE_KEY); + } + + /** + * Test to make sure runtime filter is inserted in the plan. This is to ensure that with current cluster setup and + * system options distributed plan is generated rather than single fragment plan. Since in later case RuntimeFilter + * will not be inserted. + */ + @Test + public void testRuntimeFilterPresentInPlan() throws Exception { + String sql = "SELECT l.n_name, r.r_name FROM dfs.`tpchmulti/nation` l, dfs.`tpchmulti/region/` r where " + + "l.n_regionkey = r.r_regionkey"; + client.alterSession(ExecConstants.HASHJOIN_ENABLE_RUNTIME_FILTER_KEY, true); + String queryPlan = queryBuilder().sql(sql).explainText(); + assertThat("Query plan doesn't contain RuntimeFilter. This may happen if plan is not distributed and has no " + + "exchange operator in it.", queryPlan, containsString("RuntimeFilter")); + } + + /** + * Verifies that result of a query with join condition doesn't changes with and without Runtime Filter for correctness + */ + @Test + public void testHashJoinCorrectnessWithRuntimeFilter() throws Exception { + String sql = "SELECT l.n_name, r.r_name FROM dfs.`tpchmulti/nation` l, dfs.`tpchmulti/region/` r where " + + "l.n_regionkey = r.r_regionkey"; + + testBuilder() + .unOrdered() + .sqlQuery(sql) + .optionSettingQueriesForTestQuery(ALTER_RUNTIME_FILTER_ENABLE_AND_WAIT_OPTION_COMMAND, "true", "true", 6000) + .sqlBaselineQuery(sql) + .optionSettingQueriesForBaseline(ALTER_RUNTIME_FILTER_ENABLE_AND_WAIT_OPTION_COMMAND, "false", "false", 1) + .go(); + } + + /** + * Verifies that result of query with join condition and filter condition is same with or without RuntimeFilter. In + * this case order of join and filter condition is same for both test and baseline query + */ + @Test + public void testSameOrderOfJoinAndFilterConditionProduceSameResult() throws Exception { + String testAndBaselineQuery = "SELECT count(*) FROM dfs.`tpchmulti/nation` l, dfs.`tpchmulti/region/` r where " + + "l.n_regionkey = r.r_regionkey and r.r_name = 'AMERICA'"; + + testBuilder() + .unOrdered() + .sqlQuery(testAndBaselineQuery) + .optionSettingQueriesForTestQuery(ALTER_RUNTIME_FILTER_ENABLE_AND_WAIT_OPTION_COMMAND, "true", "true", 6000) + .sqlBaselineQuery(testAndBaselineQuery) + .optionSettingQueriesForBaseline(ALTER_RUNTIME_FILTER_ENABLE_AND_WAIT_OPTION_COMMAND, "false", "false", 1) + .go(); + } + + /** + * Verifies that result of query with join condition and filter condition is same with or without RuntimeFilter. In + * this case order of join and filter condition is swapped between test and baseline query. Also the filter + * condition is second in case of test query + */ + @Test + public void testDifferentOrderOfJoinAndFilterCondition_filterConditionSecond() throws Exception { + String testQuery = "SELECT count(*) FROM dfs.`tpchmulti/nation` l, dfs.`tpchmulti/region/` r where " + + "l.n_regionkey = r.r_regionkey and r.r_name = 'AMERICA'"; + String baselineQuery = "SELECT count(*) FROM dfs.`tpchmulti/nation` l, dfs.`tpchmulti/region/` r where " + + "r.r_name = 'AMERICA' and l.n_regionkey = r.r_regionkey"; + + testBuilder() + .unOrdered() + .sqlQuery(testQuery) + .optionSettingQueriesForTestQuery(ALTER_RUNTIME_FILTER_ENABLE_AND_WAIT_OPTION_COMMAND, "true", "true", 6000) + .sqlBaselineQuery(baselineQuery) + .optionSettingQueriesForBaseline(ALTER_RUNTIME_FILTER_ENABLE_AND_WAIT_OPTION_COMMAND, "false", "false", 1) + .go(); + } + + /** + * Verifies that result of query doesn't change if filter and join condition order is different in test query with + * RuntimeFilter and baseline query without RuntimeFilter. In this case the filter condition is first condition in + * test query with RuntimeFilter + */ + @Test + public void testDifferentOrderOfJoinAndFilterCondition_filterConditionFirst() throws Exception { + String testQuery = "SELECT count(*) FROM dfs.`tpchmulti/nation` l, dfs.`tpchmulti/region/` r where " + + "r.r_name = 'AMERICA' and l.n_regionkey = r.r_regionkey"; + String baselineQuery = "SELECT count(*) FROM dfs.`tpchmulti/nation` l, dfs.`tpchmulti/region/` r where " + + "l.n_regionkey = r.r_regionkey and r.r_name = 'AMERICA'"; + + testBuilder() + .unOrdered() + .sqlQuery(testQuery) + .optionSettingQueriesForTestQuery(ALTER_RUNTIME_FILTER_ENABLE_AND_WAIT_OPTION_COMMAND, "true", "true", 6000) + .sqlBaselineQuery(baselineQuery) + .optionSettingQueriesForBaseline(ALTER_RUNTIME_FILTER_ENABLE_AND_WAIT_OPTION_COMMAND, "false", "false", 1) + .go(); + } + + /** + * Verifies that result of query doesn't change if filter and join condition order is different in test query with + * RuntimeFilter and baseline query with RuntimeFilter. In this case both test and baseline query has RuntimeFilter + * in it. + */ + @Test + public void testDifferentOrderOfJoinAndFilterConditionAndRTF() throws Exception { + String testQuery = "SELECT count(*) FROM dfs.`tpchmulti/nation` l, dfs.`tpchmulti/region/` r where " + + "l.n_regionkey = r.r_regionkey and r.r_name = 'AMERICA'"; + String baselineQuery = "SELECT count(*) FROM dfs.`tpchmulti/nation` l, dfs.`tpchmulti/region/` r where " + + "r.r_name = 'AMERICA' and l.n_regionkey = r.r_regionkey"; + + testBuilder() + .unOrdered() + .sqlQuery(testQuery) + .optionSettingQueriesForTestQuery(ALTER_RUNTIME_FILTER_ENABLE_AND_WAIT_OPTION_COMMAND, "true", "true", 6000) + .sqlBaselineQuery(baselineQuery) + .optionSettingQueriesForBaseline(ALTER_RUNTIME_FILTER_ENABLE_AND_WAIT_OPTION_COMMAND, "true", "true", 6000) + .go(); + } +} |