From d22e68b83d1d0cc0539d79ae0cb3aa70ae3242ad Mon Sep 17 00:00:00 2001 From: HanumathRao Date: Thu, 28 Feb 2019 00:12:05 -0800 Subject: DRILL-7068: Support memory adjustment framework for resource management with Queues. closes #1677 --- .../org/apache/drill/exec/TestOpSerialization.java | 5 +- .../exec/physical/impl/TestLocalExchange.java | 9 +- .../impl/partitionsender/TestPartitionSender.java | 14 +- .../exec/planner/rm/TestMemoryCalculator.java | 227 +++++++++++++++++++++ .../apache/drill/exec/pop/TestFragmentChecker.java | 8 +- 5 files changed, 247 insertions(+), 16 deletions(-) create mode 100644 exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java (limited to 'exec/java-exec/src/test/java/org') diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java index 0f0bf0df0..7fb7bdbe4 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java @@ -38,6 +38,7 @@ import org.apache.drill.exec.physical.config.Screen; import org.apache.drill.exec.physical.config.UnionExchange; import org.apache.drill.exec.planner.PhysicalPlanReader; import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory; +import org.apache.drill.exec.planner.cost.PrelCostEstimates; import org.apache.drill.exec.proto.CoordinationProtos; import org.apache.drill.exec.store.direct.DirectSubScan; import org.apache.drill.exec.store.mock.MockSubScanPOP; @@ -58,7 +59,7 @@ public class TestOpSerialization { private static PhysicalOperator setupPhysicalOperator(PhysicalOperator operator) { operator.setOperatorId(1); - operator.setCost(1.0); + operator.setCost(new PrelCostEstimates(1.0, 1.0)); operator.setMaxAllocation(1000); return operator; } @@ -66,7 +67,7 @@ public class TestOpSerialization { private static void assertOperator(PhysicalOperator operator) { assertEquals(1, operator.getOperatorId()); - assertEquals(1.0, operator.getCost(), 0.00001); + assertEquals(1.0, operator.getCost().getOutputRowCount(), 0.00001); assertEquals(1000, operator.getMaxAllocation()); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java index b45acfbc8..37cae3e92 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.physical.impl; +import org.apache.drill.exec.planner.fragment.DefaultQueryParallelizer; import org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; @@ -81,7 +82,8 @@ public class TestLocalExchange extends PlanTestBase { .withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build()) .build(); - private static final SimpleParallelizer PARALLELIZER = new SimpleParallelizer( + private static final SimpleParallelizer PARALLELIZER = new DefaultQueryParallelizer( + true, 1 /*parallelizationThreshold (slice_count)*/, 6 /*maxWidthPerNode*/, 1000 /*maxGlobalWidth*/, @@ -394,9 +396,8 @@ public class TestLocalExchange extends PlanTestBase { findFragmentsWithPartitionSender(rootFragment, planningSet, deMuxFragments, htrFragments); - final QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo("dummySchemaName", - "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e"); - QueryWorkUnit qwu = PARALLELIZER.getFragments(new OptionList(), drillbitContext.getEndpoint(), + final QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo("dummySchemaName", "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e"); + QueryWorkUnit qwu = PARALLELIZER.generateWorkUnit(new OptionList(), drillbitContext.getEndpoint(), QueryId.getDefaultInstance(), drillbitContext.getBits(), rootFragment, USER_SESSION, queryContextInfo); qwu.applyPlan(planReader); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java index 0986c75d6..7ef94f7d7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java @@ -46,6 +46,8 @@ import org.apache.drill.exec.physical.impl.TopN.TopNBatch; import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.Metric; import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator.GeneralExecuteIface; import org.apache.drill.exec.planner.PhysicalPlanReader; +import org.apache.drill.exec.planner.cost.PrelCostEstimates; +import org.apache.drill.exec.planner.fragment.DefaultQueryParallelizer; import org.apache.drill.exec.planner.fragment.Fragment; import org.apache.drill.exec.planner.fragment.PlanningSet; import org.apache.drill.exec.planner.fragment.SimpleParallelizer; @@ -86,7 +88,8 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists; @Category(OperatorTest.class) public class TestPartitionSender extends PlanTestBase { - private static final SimpleParallelizer PARALLELIZER = new SimpleParallelizer( + private static final SimpleParallelizer PARALLELIZER = new DefaultQueryParallelizer( + true, 1 /*parallelizationThreshold (slice_count)*/, 6 /*maxWidthPerNode*/, 1000 /*maxGlobalWidth*/, @@ -178,14 +181,14 @@ public class TestPartitionSender extends PlanTestBase { options.clear(); options.add(OptionValue.create(AccessibleScopes.SESSION, "planner.slice_target", 1, OptionScope.SESSION)); options.add(OptionValue.create(OptionValue.AccessibleScopes.SESSION, "planner.partitioner_sender_max_threads", 10, OptionScope.SESSION)); - hashToRandomExchange.setCost(1000); + hashToRandomExchange.setCost(new PrelCostEstimates(1000, 1000)); testThreadsHelper(hashToRandomExchange, drillbitContext, options, incoming, registry, planReader, planningSet, rootFragment, 10); options.clear(); options.add(OptionValue.create(AccessibleScopes.SESSION, "planner.slice_target", 1000, OptionScope.SESSION)); options.add(OptionValue.create(AccessibleScopes.SESSION, "planner.partitioner_sender_threads_factor",2, OptionScope.SESSION)); - hashToRandomExchange.setCost(14000); + hashToRandomExchange.setCost(new PrelCostEstimates(14000, 14000)); testThreadsHelper(hashToRandomExchange, drillbitContext, options, incoming, registry, planReader, planningSet, rootFragment, 2); } @@ -207,9 +210,8 @@ public class TestPartitionSender extends PlanTestBase { RecordBatch incoming, FunctionImplementationRegistry registry, PhysicalPlanReader planReader, PlanningSet planningSet, Fragment rootFragment, int expectedThreadsCount) throws Exception { - final QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo("dummySchemaName", - "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e"); - final QueryWorkUnit qwu = PARALLELIZER.getFragments(options, drillbitContext.getEndpoint(), + final QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo("dummySchemaName", "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e"); + final QueryWorkUnit qwu = PARALLELIZER.generateWorkUnit(options, drillbitContext.getEndpoint(), QueryId.getDefaultInstance(), drillbitContext.getBits(), rootFragment, USER_SESSION, queryContextInfo); qwu.applyPlan(planReader); diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java new file mode 100644 index 000000000..4893a36fd --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.planner.rm; + + +import org.apache.drill.PlanTestBase; +import org.apache.drill.exec.ExecConstants; +import org.apache.drill.exec.ops.QueryContext; +import org.apache.drill.exec.planner.PhysicalPlanReader; +import org.apache.drill.exec.planner.cost.NodeResource; +import org.apache.drill.exec.planner.fragment.Fragment; +import org.apache.drill.exec.planner.fragment.PlanningSet; +import org.apache.drill.exec.planner.fragment.QueueQueryParallelizer; +import org.apache.drill.exec.planner.fragment.SimpleParallelizer; +import org.apache.drill.exec.planner.fragment.Wrapper; +import org.apache.drill.exec.pop.PopUnitTestBase; +import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint; +import org.apache.drill.exec.proto.UserBitShared; +import org.apache.drill.exec.proto.UserProtos; +import org.apache.drill.exec.rpc.user.UserSession; +import org.apache.drill.exec.server.DrillbitContext; +import org.apache.drill.shaded.guava.com.google.common.collect.Iterables; +import org.apache.drill.test.ClientFixture; +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterFixtureBuilder; +import org.junit.AfterClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.HashMap; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestMemoryCalculator extends PlanTestBase { + + private static final long DEFAULT_SLICE_TARGET = 100000L; + private static final long DEFAULT_BATCH_SIZE = 16*1024*1024; + + private static final UserSession session = UserSession.Builder.newBuilder() + .withCredentials(UserBitShared.UserCredentials.newBuilder() + .setUserName("foo") + .build()) + .withUserProperties(UserProtos.UserProperties.getDefaultInstance()) + .withOptionManager(bits[0].getContext().getOptionManager()) + .build(); + + private static final DrillbitEndpoint N1_EP1 = newDrillbitEndpoint("node1", 30010); + private static final DrillbitEndpoint N1_EP2 = newDrillbitEndpoint("node2", 30011); + private static final DrillbitEndpoint N1_EP3 = newDrillbitEndpoint("node3", 30012); + private static final DrillbitEndpoint N1_EP4 = newDrillbitEndpoint("node4", 30013); + + private static final DrillbitEndpoint[] nodeList = {N1_EP1, N1_EP2, N1_EP3, N1_EP4}; + + private static final DrillbitEndpoint newDrillbitEndpoint(String address, int port) { + return DrillbitEndpoint.newBuilder().setAddress(address).setControlPort(port).build(); + } + private static final DrillbitContext drillbitContext = getDrillbitContext(); + private static final QueryContext queryContext = new QueryContext(session, drillbitContext, + UserBitShared.QueryId.getDefaultInstance()); + + @AfterClass + public static void close() throws Exception { + queryContext.close(); + } + + private final Wrapper mockWrapper(Wrapper rootFragment, + Map resourceMap, + List endpoints, + Map originalToMockWrapper ) { + final Wrapper mockWrapper = mock(Wrapper.class); + originalToMockWrapper.put(rootFragment.getNode(), mockWrapper); + List mockdependencies = new ArrayList<>(); + + for (Wrapper dependency : rootFragment.getFragmentDependencies()) { + mockdependencies.add(mockWrapper(dependency, resourceMap, endpoints, originalToMockWrapper)); + } + + when(mockWrapper.getNode()).thenReturn(rootFragment.getNode()); + when(mockWrapper.getAssignedEndpoints()).thenReturn(endpoints); + when(mockWrapper.getResourceMap()).thenReturn(resourceMap); + when(mockWrapper.getWidth()).thenReturn(endpoints.size()); + when(mockWrapper.getFragmentDependencies()).thenReturn(mockdependencies); + when(mockWrapper.isEndpointsAssignmentDone()).thenReturn(true); + return mockWrapper; + } + + private final PlanningSet mockPlanningSet(PlanningSet planningSet, + Map resourceMap, + List endpoints) { + Map wrapperToMockWrapper = new HashMap<>(); + Wrapper rootFragment = mockWrapper( planningSet.getRootWrapper(), resourceMap, + endpoints, wrapperToMockWrapper); + PlanningSet mockPlanningSet = mock(PlanningSet.class); + when(mockPlanningSet.getRootWrapper()).thenReturn(rootFragment); + when(mockPlanningSet.get(any(Fragment.class))).thenAnswer(invocation -> { + return wrapperToMockWrapper.get(invocation.getArgument(0)); + }); + return mockPlanningSet; + } + + private String getPlanForQuery(String query) throws Exception { + return getPlanForQuery(query, DEFAULT_BATCH_SIZE); + } + + private String getPlanForQuery(String query, long outputBatchSize) throws Exception { + return getPlanForQuery(query, outputBatchSize, DEFAULT_SLICE_TARGET); + } + + private String getPlanForQuery(String query, long outputBatchSize, + long slice_target) throws Exception { + ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher) + .setOptionDefault(ExecConstants.OUTPUT_BATCH_SIZE, outputBatchSize) + .setOptionDefault(ExecConstants.SLICE_TARGET, slice_target); + String plan; + + try (ClusterFixture cluster = builder.build(); + ClientFixture client = cluster.clientFixture()) { + plan = client.queryBuilder() + .sql(query) + .explainJson(); + } + return plan; + } + + private List getEndpoints(int totalMinorFragments, + Set notIn) { + List endpoints = new ArrayList<>(); + Iterator drillbits = Iterables.cycle(nodeList).iterator(); + + while(totalMinorFragments-- > 0) { + DrillbitEndpoint dbit = (DrillbitEndpoint) drillbits.next(); + if (!notIn.contains(dbit)) { + endpoints.add(dbit); + } + } + return endpoints; + } + + private Set createSet(Wrapper... wrappers) { + Set setOfWrappers = new HashSet<>(); + for (Wrapper wrapper : wrappers) { + setOfWrappers.add(wrapper); + } + return setOfWrappers; + } + + private Fragment getRootFragmentFromPlan(DrillbitContext context, + String plan) throws Exception { + final PhysicalPlanReader planReader = context.getPlanReader(); + return PopUnitTestBase.getRootFragmentFromPlanString(planReader, plan); + } + + private PlanningSet preparePlanningSet(List activeEndpoints, long slice_target, + Map resources, String sql, + SimpleParallelizer parallelizer) throws Exception { + Fragment rootFragment = getRootFragmentFromPlan(drillbitContext, getPlanForQuery(sql, 10, slice_target)); + return mockPlanningSet(parallelizer.prepareFragmentTree(rootFragment), resources, activeEndpoints); + } + + @Test + public void TestSingleMajorFragmentWithProjectAndScan() throws Exception { + List activeEndpoints = getEndpoints(2, new HashSet<>()); + Map resources = activeEndpoints.stream() + .collect(Collectors.toMap(x -> x, + x -> NodeResource.create())); + String sql = "SELECT * from cp.`tpch/nation.parquet`"; + + SimpleParallelizer parallelizer = new QueueQueryParallelizer(false, queryContext); + PlanningSet planningSet = preparePlanningSet(activeEndpoints, DEFAULT_SLICE_TARGET, resources, sql, parallelizer); + parallelizer.adjustMemory(planningSet, createSet(planningSet.getRootWrapper()), activeEndpoints); + assertTrue("memory requirement is different", Iterables.all(resources.entrySet(), (e) -> e.getValue().getMemory() == 30)); + } + + + @Test + public void TestSingleMajorFragmentWithGroupByProjectAndScan() throws Exception { + List activeEndpoints = getEndpoints(2, new HashSet<>()); + Map resources = activeEndpoints.stream() + .collect(Collectors.toMap(x -> x, + x -> NodeResource.create())); + String sql = "SELECT dept_id, count(*) from cp.`tpch/lineitem.parquet` group by dept_id"; + + SimpleParallelizer parallelizer = new QueueQueryParallelizer(false, queryContext); + PlanningSet planningSet = preparePlanningSet(activeEndpoints, DEFAULT_SLICE_TARGET, resources, sql, parallelizer); + parallelizer.adjustMemory(planningSet, createSet(planningSet.getRootWrapper()), activeEndpoints); + assertTrue("memory requirement is different", Iterables.all(resources.entrySet(), (e) -> e.getValue().getMemory() == 529570)); + } + + + @Test + public void TestTwoMajorFragmentWithSortyProjectAndScan() throws Exception { + List activeEndpoints = getEndpoints(2, new HashSet<>()); + Map resources = activeEndpoints.stream() + .collect(Collectors.toMap(x -> x, + x -> NodeResource.create())); + String sql = "SELECT * from cp.`tpch/lineitem.parquet` order by dept_id"; + + SimpleParallelizer parallelizer = new QueueQueryParallelizer(false, queryContext); + PlanningSet planningSet = preparePlanningSet(activeEndpoints, 2, resources, sql, parallelizer); + parallelizer.adjustMemory(planningSet, createSet(planningSet.getRootWrapper()), activeEndpoints); + assertTrue("memory requirement is different", Iterables.all(resources.entrySet(), (e) -> e.getValue().getMemory() == 481490)); + } +} diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java index 7b53bb65c..c01143b26 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.drill.categories.PlannerTest; import org.apache.drill.exec.planner.PhysicalPlanReader; import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory; +import org.apache.drill.exec.planner.fragment.DefaultQueryParallelizer; import org.apache.drill.exec.planner.fragment.Fragment; import org.apache.drill.exec.planner.fragment.SimpleParallelizer; import org.apache.drill.exec.proto.BitControl.QueryContextInformation; @@ -52,7 +53,7 @@ public class TestFragmentChecker extends PopUnitTestBase{ private void print(String fragmentFile, int bitCount, int expectedFragmentCount) throws Exception { PhysicalPlanReader ppr = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(CONFIG); Fragment fragmentRoot = getRootFragment(ppr, fragmentFile); - SimpleParallelizer par = new SimpleParallelizer(1000*1000, 5, 10, 1.2); + SimpleParallelizer par = new DefaultQueryParallelizer(true, 1000*1000, 5, 10, 1.2); List endpoints = Lists.newArrayList(); DrillbitEndpoint localBit = null; for(int i =0; i < bitCount; i++) { @@ -63,9 +64,8 @@ public class TestFragmentChecker extends PopUnitTestBase{ endpoints.add(b1); } - final QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo("dummySchemaName", - "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e"); - QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, fragmentRoot, + final QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo("dummySchemaName", "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e"); + QueryWorkUnit qwu = par.generateWorkUnit(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, fragmentRoot, UserSession.Builder.newBuilder().withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build()).build(), queryContextInfo); qwu.applyPlan(ppr); -- cgit v1.2.3