aboutsummaryrefslogtreecommitdiff
path: root/exec/java-exec/src/test/java/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'exec/java-exec/src/test/java/org/apache')
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java5
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java9
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java14
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java227
-rw-r--r--exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java8
5 files changed, 247 insertions, 16 deletions
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
index 0f0bf0df0..7fb7bdbe4 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/TestOpSerialization.java
@@ -38,6 +38,7 @@ import org.apache.drill.exec.physical.config.Screen;
import org.apache.drill.exec.physical.config.UnionExchange;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory;
+import org.apache.drill.exec.planner.cost.PrelCostEstimates;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.store.direct.DirectSubScan;
import org.apache.drill.exec.store.mock.MockSubScanPOP;
@@ -58,7 +59,7 @@ public class TestOpSerialization {
private static PhysicalOperator setupPhysicalOperator(PhysicalOperator operator)
{
operator.setOperatorId(1);
- operator.setCost(1.0);
+ operator.setCost(new PrelCostEstimates(1.0, 1.0));
operator.setMaxAllocation(1000);
return operator;
}
@@ -66,7 +67,7 @@ public class TestOpSerialization {
private static void assertOperator(PhysicalOperator operator)
{
assertEquals(1, operator.getOperatorId());
- assertEquals(1.0, operator.getCost(), 0.00001);
+ assertEquals(1.0, operator.getCost().getOutputRowCount(), 0.00001);
assertEquals(1000, operator.getMaxAllocation());
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
index b45acfbc8..37cae3e92 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/TestLocalExchange.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.physical.impl;
+import org.apache.drill.exec.planner.fragment.DefaultQueryParallelizer;
import org.apache.drill.shaded.guava.com.google.common.collect.ArrayListMultimap;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableSet;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@@ -81,7 +82,8 @@ public class TestLocalExchange extends PlanTestBase {
.withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build())
.build();
- private static final SimpleParallelizer PARALLELIZER = new SimpleParallelizer(
+ private static final SimpleParallelizer PARALLELIZER = new DefaultQueryParallelizer(
+ true,
1 /*parallelizationThreshold (slice_count)*/,
6 /*maxWidthPerNode*/,
1000 /*maxGlobalWidth*/,
@@ -394,9 +396,8 @@ public class TestLocalExchange extends PlanTestBase {
findFragmentsWithPartitionSender(rootFragment, planningSet, deMuxFragments, htrFragments);
- final QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo("dummySchemaName",
- "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e");
- QueryWorkUnit qwu = PARALLELIZER.getFragments(new OptionList(), drillbitContext.getEndpoint(),
+ final QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo("dummySchemaName", "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e");
+ QueryWorkUnit qwu = PARALLELIZER.generateWorkUnit(new OptionList(), drillbitContext.getEndpoint(),
QueryId.getDefaultInstance(),
drillbitContext.getBits(), rootFragment, USER_SESSION, queryContextInfo);
qwu.applyPlan(planReader);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
index 0986c75d6..7ef94f7d7 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/partitionsender/TestPartitionSender.java
@@ -46,6 +46,8 @@ import org.apache.drill.exec.physical.impl.TopN.TopNBatch;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.Metric;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator.GeneralExecuteIface;
import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.cost.PrelCostEstimates;
+import org.apache.drill.exec.planner.fragment.DefaultQueryParallelizer;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.PlanningSet;
import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
@@ -86,7 +88,8 @@ import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
@Category(OperatorTest.class)
public class TestPartitionSender extends PlanTestBase {
- private static final SimpleParallelizer PARALLELIZER = new SimpleParallelizer(
+ private static final SimpleParallelizer PARALLELIZER = new DefaultQueryParallelizer(
+ true,
1 /*parallelizationThreshold (slice_count)*/,
6 /*maxWidthPerNode*/,
1000 /*maxGlobalWidth*/,
@@ -178,14 +181,14 @@ public class TestPartitionSender extends PlanTestBase {
options.clear();
options.add(OptionValue.create(AccessibleScopes.SESSION, "planner.slice_target", 1, OptionScope.SESSION));
options.add(OptionValue.create(OptionValue.AccessibleScopes.SESSION, "planner.partitioner_sender_max_threads", 10, OptionScope.SESSION));
- hashToRandomExchange.setCost(1000);
+ hashToRandomExchange.setCost(new PrelCostEstimates(1000, 1000));
testThreadsHelper(hashToRandomExchange, drillbitContext, options,
incoming, registry, planReader, planningSet, rootFragment, 10);
options.clear();
options.add(OptionValue.create(AccessibleScopes.SESSION, "planner.slice_target", 1000, OptionScope.SESSION));
options.add(OptionValue.create(AccessibleScopes.SESSION, "planner.partitioner_sender_threads_factor",2, OptionScope.SESSION));
- hashToRandomExchange.setCost(14000);
+ hashToRandomExchange.setCost(new PrelCostEstimates(14000, 14000));
testThreadsHelper(hashToRandomExchange, drillbitContext, options,
incoming, registry, planReader, planningSet, rootFragment, 2);
}
@@ -207,9 +210,8 @@ public class TestPartitionSender extends PlanTestBase {
RecordBatch incoming, FunctionImplementationRegistry registry, PhysicalPlanReader planReader, PlanningSet planningSet, Fragment rootFragment,
int expectedThreadsCount) throws Exception {
- final QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo("dummySchemaName",
- "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e");
- final QueryWorkUnit qwu = PARALLELIZER.getFragments(options, drillbitContext.getEndpoint(),
+ final QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo("dummySchemaName", "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e");
+ final QueryWorkUnit qwu = PARALLELIZER.generateWorkUnit(options, drillbitContext.getEndpoint(),
QueryId.getDefaultInstance(),
drillbitContext.getBits(), rootFragment, USER_SESSION, queryContextInfo);
qwu.applyPlan(planReader);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java
new file mode 100644
index 000000000..4893a36fd
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/planner/rm/TestMemoryCalculator.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.planner.rm;
+
+
+import org.apache.drill.PlanTestBase;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.QueryContext;
+import org.apache.drill.exec.planner.PhysicalPlanReader;
+import org.apache.drill.exec.planner.cost.NodeResource;
+import org.apache.drill.exec.planner.fragment.Fragment;
+import org.apache.drill.exec.planner.fragment.PlanningSet;
+import org.apache.drill.exec.planner.fragment.QueueQueryParallelizer;
+import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
+import org.apache.drill.exec.planner.fragment.Wrapper;
+import org.apache.drill.exec.pop.PopUnitTestBase;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.exec.proto.UserProtos;
+import org.apache.drill.exec.rpc.user.UserSession;
+import org.apache.drill.exec.server.DrillbitContext;
+import org.apache.drill.shaded.guava.com.google.common.collect.Iterables;
+import org.apache.drill.test.ClientFixture;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterFixtureBuilder;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestMemoryCalculator extends PlanTestBase {
+
+ private static final long DEFAULT_SLICE_TARGET = 100000L;
+ private static final long DEFAULT_BATCH_SIZE = 16*1024*1024;
+
+ private static final UserSession session = UserSession.Builder.newBuilder()
+ .withCredentials(UserBitShared.UserCredentials.newBuilder()
+ .setUserName("foo")
+ .build())
+ .withUserProperties(UserProtos.UserProperties.getDefaultInstance())
+ .withOptionManager(bits[0].getContext().getOptionManager())
+ .build();
+
+ private static final DrillbitEndpoint N1_EP1 = newDrillbitEndpoint("node1", 30010);
+ private static final DrillbitEndpoint N1_EP2 = newDrillbitEndpoint("node2", 30011);
+ private static final DrillbitEndpoint N1_EP3 = newDrillbitEndpoint("node3", 30012);
+ private static final DrillbitEndpoint N1_EP4 = newDrillbitEndpoint("node4", 30013);
+
+ private static final DrillbitEndpoint[] nodeList = {N1_EP1, N1_EP2, N1_EP3, N1_EP4};
+
+ private static final DrillbitEndpoint newDrillbitEndpoint(String address, int port) {
+ return DrillbitEndpoint.newBuilder().setAddress(address).setControlPort(port).build();
+ }
+ private static final DrillbitContext drillbitContext = getDrillbitContext();
+ private static final QueryContext queryContext = new QueryContext(session, drillbitContext,
+ UserBitShared.QueryId.getDefaultInstance());
+
+ @AfterClass
+ public static void close() throws Exception {
+ queryContext.close();
+ }
+
+ private final Wrapper mockWrapper(Wrapper rootFragment,
+ Map<DrillbitEndpoint, NodeResource> resourceMap,
+ List<DrillbitEndpoint> endpoints,
+ Map<Fragment, Wrapper> originalToMockWrapper ) {
+ final Wrapper mockWrapper = mock(Wrapper.class);
+ originalToMockWrapper.put(rootFragment.getNode(), mockWrapper);
+ List<Wrapper> mockdependencies = new ArrayList<>();
+
+ for (Wrapper dependency : rootFragment.getFragmentDependencies()) {
+ mockdependencies.add(mockWrapper(dependency, resourceMap, endpoints, originalToMockWrapper));
+ }
+
+ when(mockWrapper.getNode()).thenReturn(rootFragment.getNode());
+ when(mockWrapper.getAssignedEndpoints()).thenReturn(endpoints);
+ when(mockWrapper.getResourceMap()).thenReturn(resourceMap);
+ when(mockWrapper.getWidth()).thenReturn(endpoints.size());
+ when(mockWrapper.getFragmentDependencies()).thenReturn(mockdependencies);
+ when(mockWrapper.isEndpointsAssignmentDone()).thenReturn(true);
+ return mockWrapper;
+ }
+
+ private final PlanningSet mockPlanningSet(PlanningSet planningSet,
+ Map<DrillbitEndpoint, NodeResource> resourceMap,
+ List<DrillbitEndpoint> endpoints) {
+ Map<Fragment, Wrapper> wrapperToMockWrapper = new HashMap<>();
+ Wrapper rootFragment = mockWrapper( planningSet.getRootWrapper(), resourceMap,
+ endpoints, wrapperToMockWrapper);
+ PlanningSet mockPlanningSet = mock(PlanningSet.class);
+ when(mockPlanningSet.getRootWrapper()).thenReturn(rootFragment);
+ when(mockPlanningSet.get(any(Fragment.class))).thenAnswer(invocation -> {
+ return wrapperToMockWrapper.get(invocation.getArgument(0));
+ });
+ return mockPlanningSet;
+ }
+
+ private String getPlanForQuery(String query) throws Exception {
+ return getPlanForQuery(query, DEFAULT_BATCH_SIZE);
+ }
+
+ private String getPlanForQuery(String query, long outputBatchSize) throws Exception {
+ return getPlanForQuery(query, outputBatchSize, DEFAULT_SLICE_TARGET);
+ }
+
+ private String getPlanForQuery(String query, long outputBatchSize,
+ long slice_target) throws Exception {
+ ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
+ .setOptionDefault(ExecConstants.OUTPUT_BATCH_SIZE, outputBatchSize)
+ .setOptionDefault(ExecConstants.SLICE_TARGET, slice_target);
+ String plan;
+
+ try (ClusterFixture cluster = builder.build();
+ ClientFixture client = cluster.clientFixture()) {
+ plan = client.queryBuilder()
+ .sql(query)
+ .explainJson();
+ }
+ return plan;
+ }
+
+ private List<DrillbitEndpoint> getEndpoints(int totalMinorFragments,
+ Set<DrillbitEndpoint> notIn) {
+ List<DrillbitEndpoint> endpoints = new ArrayList<>();
+ Iterator drillbits = Iterables.cycle(nodeList).iterator();
+
+ while(totalMinorFragments-- > 0) {
+ DrillbitEndpoint dbit = (DrillbitEndpoint) drillbits.next();
+ if (!notIn.contains(dbit)) {
+ endpoints.add(dbit);
+ }
+ }
+ return endpoints;
+ }
+
+ private Set<Wrapper> createSet(Wrapper... wrappers) {
+ Set<Wrapper> setOfWrappers = new HashSet<>();
+ for (Wrapper wrapper : wrappers) {
+ setOfWrappers.add(wrapper);
+ }
+ return setOfWrappers;
+ }
+
+ private Fragment getRootFragmentFromPlan(DrillbitContext context,
+ String plan) throws Exception {
+ final PhysicalPlanReader planReader = context.getPlanReader();
+ return PopUnitTestBase.getRootFragmentFromPlanString(planReader, plan);
+ }
+
+ private PlanningSet preparePlanningSet(List<DrillbitEndpoint> activeEndpoints, long slice_target,
+ Map<DrillbitEndpoint, NodeResource> resources, String sql,
+ SimpleParallelizer parallelizer) throws Exception {
+ Fragment rootFragment = getRootFragmentFromPlan(drillbitContext, getPlanForQuery(sql, 10, slice_target));
+ return mockPlanningSet(parallelizer.prepareFragmentTree(rootFragment), resources, activeEndpoints);
+ }
+
+ @Test
+ public void TestSingleMajorFragmentWithProjectAndScan() throws Exception {
+ List<DrillbitEndpoint> activeEndpoints = getEndpoints(2, new HashSet<>());
+ Map<DrillbitEndpoint, NodeResource> resources = activeEndpoints.stream()
+ .collect(Collectors.toMap(x -> x,
+ x -> NodeResource.create()));
+ String sql = "SELECT * from cp.`tpch/nation.parquet`";
+
+ SimpleParallelizer parallelizer = new QueueQueryParallelizer(false, queryContext);
+ PlanningSet planningSet = preparePlanningSet(activeEndpoints, DEFAULT_SLICE_TARGET, resources, sql, parallelizer);
+ parallelizer.adjustMemory(planningSet, createSet(planningSet.getRootWrapper()), activeEndpoints);
+ assertTrue("memory requirement is different", Iterables.all(resources.entrySet(), (e) -> e.getValue().getMemory() == 30));
+ }
+
+
+ @Test
+ public void TestSingleMajorFragmentWithGroupByProjectAndScan() throws Exception {
+ List<DrillbitEndpoint> activeEndpoints = getEndpoints(2, new HashSet<>());
+ Map<DrillbitEndpoint, NodeResource> resources = activeEndpoints.stream()
+ .collect(Collectors.toMap(x -> x,
+ x -> NodeResource.create()));
+ String sql = "SELECT dept_id, count(*) from cp.`tpch/lineitem.parquet` group by dept_id";
+
+ SimpleParallelizer parallelizer = new QueueQueryParallelizer(false, queryContext);
+ PlanningSet planningSet = preparePlanningSet(activeEndpoints, DEFAULT_SLICE_TARGET, resources, sql, parallelizer);
+ parallelizer.adjustMemory(planningSet, createSet(planningSet.getRootWrapper()), activeEndpoints);
+ assertTrue("memory requirement is different", Iterables.all(resources.entrySet(), (e) -> e.getValue().getMemory() == 529570));
+ }
+
+
+ @Test
+ public void TestTwoMajorFragmentWithSortyProjectAndScan() throws Exception {
+ List<DrillbitEndpoint> activeEndpoints = getEndpoints(2, new HashSet<>());
+ Map<DrillbitEndpoint, NodeResource> resources = activeEndpoints.stream()
+ .collect(Collectors.toMap(x -> x,
+ x -> NodeResource.create()));
+ String sql = "SELECT * from cp.`tpch/lineitem.parquet` order by dept_id";
+
+ SimpleParallelizer parallelizer = new QueueQueryParallelizer(false, queryContext);
+ PlanningSet planningSet = preparePlanningSet(activeEndpoints, 2, resources, sql, parallelizer);
+ parallelizer.adjustMemory(planningSet, createSet(planningSet.getRootWrapper()), activeEndpoints);
+ assertTrue("memory requirement is different", Iterables.all(resources.entrySet(), (e) -> e.getValue().getMemory() == 481490));
+ }
+}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
index 7b53bb65c..c01143b26 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/pop/TestFragmentChecker.java
@@ -22,6 +22,7 @@ import java.util.List;
import org.apache.drill.categories.PlannerTest;
import org.apache.drill.exec.planner.PhysicalPlanReader;
import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory;
+import org.apache.drill.exec.planner.fragment.DefaultQueryParallelizer;
import org.apache.drill.exec.planner.fragment.Fragment;
import org.apache.drill.exec.planner.fragment.SimpleParallelizer;
import org.apache.drill.exec.proto.BitControl.QueryContextInformation;
@@ -52,7 +53,7 @@ public class TestFragmentChecker extends PopUnitTestBase{
private void print(String fragmentFile, int bitCount, int expectedFragmentCount) throws Exception {
PhysicalPlanReader ppr = PhysicalPlanReaderTestFactory.defaultPhysicalPlanReader(CONFIG);
Fragment fragmentRoot = getRootFragment(ppr, fragmentFile);
- SimpleParallelizer par = new SimpleParallelizer(1000*1000, 5, 10, 1.2);
+ SimpleParallelizer par = new DefaultQueryParallelizer(true, 1000*1000, 5, 10, 1.2);
List<DrillbitEndpoint> endpoints = Lists.newArrayList();
DrillbitEndpoint localBit = null;
for(int i =0; i < bitCount; i++) {
@@ -63,9 +64,8 @@ public class TestFragmentChecker extends PopUnitTestBase{
endpoints.add(b1);
}
- final QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo("dummySchemaName",
- "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e");
- QueryWorkUnit qwu = par.getFragments(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, fragmentRoot,
+ final QueryContextInformation queryContextInfo = Utilities.createQueryContextInfo("dummySchemaName", "938ea2d9-7cb9-4baf-9414-a5a0b7777e8e");
+ QueryWorkUnit qwu = par.generateWorkUnit(new OptionList(), localBit, QueryId.getDefaultInstance(), endpoints, fragmentRoot,
UserSession.Builder.newBuilder().withCredentials(UserBitShared.UserCredentials.newBuilder().setUserName("foo").build()).build(),
queryContextInfo);
qwu.applyPlan(ppr);