aboutsummaryrefslogtreecommitdiff
path: root/resource-managers
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2017-01-25 08:18:41 -0600
committerTom Graves <tgraves@yahoo-inc.com>2017-01-25 08:18:41 -0600
commit76db394f2baedc2c7b7a52c05314a64ec9068263 (patch)
tree6a19097f99077145c34441af5322005259303c41 /resource-managers
parent3fdce814348fae34df379a6ab9655dbbb2c3427c (diff)
[SPARK-18750][YARN] Avoid using "mapValues" when allocating containers.
That method is prone to stack overflows when the input map is really large; instead, use plain "map". Also includes a unit test that was tested and caused stack overflows without the fix. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #16667 from vanzin/SPARK-18750.
Diffstat (limited to 'resource-managers')
-rw-r--r--resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala11
-rw-r--r--resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala87
2 files changed, 93 insertions, 5 deletions
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
index fb2d61f621..f2b6324db6 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala
@@ -129,9 +129,9 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
val largestRatio = updatedHostToContainerCount.values.max
// Round the ratio of preferred locality to the number of locality required container
// number, which is used for locality preferred host calculating.
- var preferredLocalityRatio = updatedHostToContainerCount.mapValues { ratio =>
+ var preferredLocalityRatio = updatedHostToContainerCount.map { case(k, ratio) =>
val adjustedRatio = ratio.toDouble * requiredLocalityAwareContainerNum / largestRatio
- adjustedRatio.ceil.toInt
+ (k, adjustedRatio.ceil.toInt)
}
for (i <- 0 until requiredLocalityAwareContainerNum) {
@@ -145,7 +145,7 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
// Minus 1 each time when the host is used. When the current ratio is 0,
// which means all the required ratio is satisfied, this host will not be allocated again.
- preferredLocalityRatio = preferredLocalityRatio.mapValues(_ - 1)
+ preferredLocalityRatio = preferredLocalityRatio.map { case (k, v) => (k, v - 1) }
}
}
@@ -218,7 +218,8 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy(
val possibleTotalContainerNum = pendingHostToContainerCount.values.sum
val localityMatchedPendingNum = localityMatchedPendingAllocations.size.toDouble
- pendingHostToContainerCount.mapValues(_ * localityMatchedPendingNum / possibleTotalContainerNum)
- .toMap
+ pendingHostToContainerCount.map { case (k, v) =>
+ (k, v * localityMatchedPendingNum / possibleTotalContainerNum)
+ }.toMap
}
}
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala
new file mode 100644
index 0000000000..fb80ff9f31
--- /dev/null
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/LocalityPlacementStrategySuite.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.{HashMap, HashSet, Set}
+
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic
+import org.apache.hadoop.net.DNSToSwitchMapping
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.mockito.Mockito._
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+
+class LocalityPlacementStrategySuite extends SparkFunSuite {
+
+ test("handle large number of containers and tasks (SPARK-18750)") {
+ // Run the test in a thread with a small stack size, since the original issue
+ // surfaced as a StackOverflowError.
+ var error: Throwable = null
+
+ val runnable = new Runnable() {
+ override def run(): Unit = try {
+ runTest()
+ } catch {
+ case e: Throwable => error = e
+ }
+ }
+
+ val thread = new Thread(new ThreadGroup("test"), runnable, "test-thread", 32 * 1024)
+ thread.start()
+ thread.join()
+
+ assert(error === null)
+ }
+
+ private def runTest(): Unit = {
+ val yarnConf = new YarnConfiguration()
+ yarnConf.setClass(
+ CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ classOf[MockResolver], classOf[DNSToSwitchMapping])
+
+ // The numbers below have been chosen to balance being large enough to replicate the
+ // original issue while not taking too long to run when the issue is fixed. The main
+ // goal is to create enough requests for localized containers (so there should be many
+ // tasks on several hosts that have no allocated containers).
+
+ val resource = Resource.newInstance(8 * 1024, 4)
+ val strategy = new LocalityPreferredContainerPlacementStrategy(new SparkConf(),
+ yarnConf, resource)
+
+ val totalTasks = 32 * 1024
+ val totalContainers = totalTasks / 16
+ val totalHosts = totalContainers / 16
+
+ val mockId = mock(classOf[ContainerId])
+ val hosts = (1 to totalHosts).map { i => (s"host_$i", totalTasks % i) }.toMap
+ val containers = (1 to totalContainers).map { i => mockId }
+ val count = containers.size / hosts.size / 2
+
+ val hostToContainerMap = new HashMap[String, Set[ContainerId]]()
+ hosts.keys.take(hosts.size / 2).zipWithIndex.foreach { case (host, i) =>
+ val hostContainers = new HashSet[ContainerId]()
+ containers.drop(count * i).take(i).foreach { c => hostContainers += c }
+ hostToContainerMap(host) = hostContainers
+ }
+
+ strategy.localityOfRequestedContainers(containers.size * 2, totalTasks, hosts,
+ hostToContainerMap, Nil)
+ }
+
+}