/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.drill.exec; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.nio.file.Paths; import java.util.List; import org.apache.drill.test.ClientFixture; import org.apache.drill.test.ClusterFixture; import org.apache.drill.test.ClusterFixtureBuilder; import org.apache.drill.test.ClusterTest; import org.apache.drill.test.QueryBuilder.QuerySummary; import org.apache.drill.test.rowSet.RowSet; import org.apache.drill.test.rowSet.RowSetReader; import org.apache.drill.categories.PlannerTest; import org.apache.drill.categories.SlowTest; import org.apache.drill.exec.proto.BitControl.PlanFragment; import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; import org.apache.drill.exec.proto.UserBitShared.QueryType; import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; /** * Class to test different planning use cases (separate from query execution) * */ @Category({SlowTest.class, PlannerTest.class}) public class DrillSeparatePlanningTest extends ClusterTest { @BeforeClass public static void setupFiles() { dirTestWatcher.copyResourceToRoot(Paths.get("multilevel", "json")); dirTestWatcher.copyResourceToRoot(Paths.get("multilevel", "csv")); } @Before public void testSetup() throws Exception { ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher) .clusterSize(2); startCluster(builder); } @Test(timeout=60_000) public void testSingleFragmentQuery() throws Exception { final String query = "SELECT * FROM cp.`employee.json` where employee_id > 1 and employee_id < 1000"; QueryPlanFragments planFragments = getFragmentsHelper(query); assertNotNull(planFragments); assertEquals(1, planFragments.getFragmentsCount()); assertTrue(planFragments.getFragments(0).getLeafFragment()); QuerySummary summary = client.queryBuilder().plan(planFragments.getFragmentsList()).run(); assertEquals(997, summary.recordCount()); } @Test(timeout=60_000) public void testMultiMinorFragmentSimpleQuery() throws Exception { final String query = "SELECT o_orderkey FROM dfs.`multilevel/json`"; QueryPlanFragments planFragments = getFragmentsHelper(query); assertNotNull(planFragments); assertTrue((planFragments.getFragmentsCount() > 1)); for (PlanFragment planFragment : planFragments.getFragmentsList()) { assertTrue(planFragment.getLeafFragment()); } int rowCount = getResultsHelper(planFragments); assertEquals(120, rowCount); } @Test(timeout=60_000) public void testMultiMinorFragmentComplexQuery() throws Exception { final String query = "SELECT dir0, sum(o_totalprice) FROM dfs.`multilevel/json` group by dir0 order by dir0"; QueryPlanFragments planFragments = getFragmentsHelper(query); assertNotNull(planFragments); assertTrue((planFragments.getFragmentsCount() > 1)); for ( PlanFragment planFragment : planFragments.getFragmentsList()) { assertTrue(planFragment.getLeafFragment()); } int rowCount = getResultsHelper(planFragments); assertEquals(8, rowCount); } @Test(timeout=60_000) public void testPlanningNoSplit() throws Exception { final String query = "SELECT dir0, sum(o_totalprice) FROM dfs.`multilevel/json` group by dir0 order by dir0"; client.alterSession("planner.slice_target", 1); try { final QueryPlanFragments planFragments = client.planQuery(query); assertNotNull(planFragments); assertTrue((planFragments.getFragmentsCount() > 1)); PlanFragment rootFragment = planFragments.getFragments(0); assertFalse(rootFragment.getLeafFragment()); QuerySummary summary = client.queryBuilder().plan(planFragments.getFragmentsList()).run(); assertEquals(3, summary.recordCount()); } finally { client.resetSession("planner.slice_target"); } } @Test(timeout=60_000) public void testPlanningNegative() throws Exception { final String query = "SELECT dir0, sum(o_totalprice) FROM dfs.`multilevel/json` group by dir0 order by dir0"; // LOGICAL is not supported final QueryPlanFragments planFragments = client.planQuery(QueryType.LOGICAL, query, false); assertNotNull(planFragments); assertNotNull(planFragments.getError()); assertTrue(planFragments.getFragmentsCount()==0); } @Test(timeout=60_000) public void testPlanning() throws Exception { final String query = "SELECT dir0, columns[3] FROM dfs.`multilevel/csv` order by dir0"; client.alterSession("planner.slice_target", 1); try { // Original version, but no reason to dump output to test results. // long rows = client.queryBuilder().sql(query).print(Format.TSV, VectorUtil.DEFAULT_COLUMN_WIDTH); QuerySummary summary = client.queryBuilder().sql(query).run(); assertEquals(120, summary.recordCount()); } finally { client.resetSession("planner.slice_target"); } } private QueryPlanFragments getFragmentsHelper(final String query) { client.alterSession("planner.slice_target", 1); try { QueryPlanFragments planFragments = client.planQuery(QueryType.SQL, query, true); return planFragments; } finally { client.resetSession("planner.slice_target"); } } private int getResultsHelper(final QueryPlanFragments planFragments) throws Exception { int totalRows = 0; for (PlanFragment fragment : planFragments.getFragmentsList()) { DrillbitEndpoint assignedNode = fragment.getAssignment(); ClientFixture fragmentClient = cluster.client(assignedNode.getAddress(), assignedNode.getUserPort()); RowSet rowSet = fragmentClient.queryBuilder().sql("select hostname, user_port from sys.drillbits where `current`=true").rowSet(); assertEquals(1, rowSet.rowCount()); RowSetReader reader = rowSet.reader(); assertTrue(reader.next()); String host = reader.scalar("hostname").getString(); int port = reader.scalar("user_port").getInt(); rowSet.clear(); assertEquals(assignedNode.getAddress(), host); assertEquals(assignedNode.getUserPort(), port); List fragmentList = Lists.newArrayList(); fragmentList.add(fragment); QuerySummary summary = fragmentClient.queryBuilder().plan(fragmentList).run(); totalRows += summary.recordCount(); fragmentClient.close(); } return totalRows; } }