aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorXiaofeng Lin <xlin@twilio.com>2017-08-31 08:57:15 +0800
committerjerryshao <sshao@hortonworks.com>2017-08-31 08:57:15 +0800
commitcd5d0f3379b1a9fa0940ffd98bfff33f8cbcdeb0 (patch)
tree471e583ea8448fadf2a77f79bff7ce458210ac8d
parent313c6ca43593e247ab8cedac15c77d13e2830d6b (diff)
[SPARK-11574][CORE] Add metrics StatsD sink
This patch adds statsd sink to the current metrics system in spark core. Author: Xiaofeng Lin <xlin@twilio.com> Closes #9518 from xflin/statsd. Change-Id: Ib8720e86223d4a650df53f51ceb963cd95b49a44
-rw-r--r--conf/metrics.properties.template12
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala163
-rw-r--r--core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala75
-rw-r--r--core/src/test/scala/org/apache/spark/metrics/sink/StatsdSinkSuite.scala161
-rw-r--r--docs/monitoring.md1
5 files changed, 412 insertions, 0 deletions
diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template
index aeb76c9b2f..4c008a1360 100644
--- a/conf/metrics.properties.template
+++ b/conf/metrics.properties.template
@@ -118,6 +118,14 @@
# prefix EMPTY STRING Prefix to prepend to every metric's name
# protocol tcp Protocol ("tcp" or "udp") to use
+# org.apache.spark.metrics.sink.StatsdSink
+# Name: Default: Description:
+# host 127.0.0.1 Hostname or IP of StatsD server
+# port 8125 Port of StatsD server
+# period 10 Poll period
+# unit seconds Units of poll period
+# prefix EMPTY STRING Prefix to prepend to metric name
+
## Examples
# Enable JmxSink for all instances by class name
#*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
@@ -125,6 +133,10 @@
# Enable ConsoleSink for all instances by class name
#*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink
+# Enable StatsdSink for all instances by class name
+#*.sink.statsd.class=org.apache.spark.metrics.sink.StatsdSink
+#*.sink.statsd.prefix=spark
+
# Polling period for the ConsoleSink
#*.sink.console.period=10
# Unit of the polling period for the ConsoleSink
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala
new file mode 100644
index 0000000000..ba75aa1c65
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdReporter.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import java.io.IOException
+import java.net.{DatagramPacket, DatagramSocket, InetSocketAddress}
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.SortedMap
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.util.{Failure, Success, Try}
+
+import com.codahale.metrics._
+import org.apache.hadoop.net.NetUtils
+
+import org.apache.spark.internal.Logging
+
+/**
+ * @see <a href="https://github.com/etsy/statsd/blob/master/docs/metric_types.md">
+ * StatsD metric types</a>
+ */
+private[spark] object StatsdMetricType {
+ val COUNTER = "c"
+ val GAUGE = "g"
+ val TIMER = "ms"
+ val Set = "s"
+}
+
+private[spark] class StatsdReporter(
+ registry: MetricRegistry,
+ host: String = "127.0.0.1",
+ port: Int = 8125,
+ prefix: String = "",
+ filter: MetricFilter = MetricFilter.ALL,
+ rateUnit: TimeUnit = TimeUnit.SECONDS,
+ durationUnit: TimeUnit = TimeUnit.MILLISECONDS)
+ extends ScheduledReporter(registry, "statsd-reporter", filter, rateUnit, durationUnit)
+ with Logging {
+
+ import StatsdMetricType._
+
+ private val address = new InetSocketAddress(host, port)
+ private val whitespace = "[\\s]+".r
+
+ override def report(
+ gauges: SortedMap[String, Gauge[_]],
+ counters: SortedMap[String, Counter],
+ histograms: SortedMap[String, Histogram],
+ meters: SortedMap[String, Meter],
+ timers: SortedMap[String, Timer]): Unit =
+ Try(new DatagramSocket) match {
+ case Failure(ioe: IOException) => logWarning("StatsD datagram socket construction failed",
+ NetUtils.wrapException(host, port, NetUtils.getHostname(), 0, ioe))
+ case Failure(e) => logWarning("StatsD datagram socket construction failed", e)
+ case Success(s) =>
+ implicit val socket = s
+ val localAddress = Try(socket.getLocalAddress).map(_.getHostAddress).getOrElse(null)
+ val localPort = socket.getLocalPort
+ Try {
+ gauges.entrySet.asScala.foreach(e => reportGauge(e.getKey, e.getValue))
+ counters.entrySet.asScala.foreach(e => reportCounter(e.getKey, e.getValue))
+ histograms.entrySet.asScala.foreach(e => reportHistogram(e.getKey, e.getValue))
+ meters.entrySet.asScala.foreach(e => reportMetered(e.getKey, e.getValue))
+ timers.entrySet.asScala.foreach(e => reportTimer(e.getKey, e.getValue))
+ } recover {
+ case ioe: IOException =>
+ logDebug(s"Unable to send packets to StatsD", NetUtils.wrapException(
+ address.getHostString, address.getPort, localAddress, localPort, ioe))
+ case e: Throwable => logDebug(s"Unable to send packets to StatsD at '$host:$port'", e)
+ }
+ Try(socket.close()) recover {
+ case ioe: IOException =>
+ logDebug("Error when close socket to StatsD", NetUtils.wrapException(
+ address.getHostString, address.getPort, localAddress, localPort, ioe))
+ case e: Throwable => logDebug("Error when close socket to StatsD", e)
+ }
+ }
+
+ private def reportGauge(name: String, gauge: Gauge[_])(implicit socket: DatagramSocket): Unit =
+ formatAny(gauge.getValue).foreach(v => send(fullName(name), v, GAUGE))
+
+ private def reportCounter(name: String, counter: Counter)(implicit socket: DatagramSocket): Unit =
+ send(fullName(name), format(counter.getCount), COUNTER)
+
+ private def reportHistogram(name: String, histogram: Histogram)
+ (implicit socket: DatagramSocket): Unit = {
+ val snapshot = histogram.getSnapshot
+ send(fullName(name, "count"), format(histogram.getCount), GAUGE)
+ send(fullName(name, "max"), format(snapshot.getMax), TIMER)
+ send(fullName(name, "mean"), format(snapshot.getMean), TIMER)
+ send(fullName(name, "min"), format(snapshot.getMin), TIMER)
+ send(fullName(name, "stddev"), format(snapshot.getStdDev), TIMER)
+ send(fullName(name, "p50"), format(snapshot.getMedian), TIMER)
+ send(fullName(name, "p75"), format(snapshot.get75thPercentile), TIMER)
+ send(fullName(name, "p95"), format(snapshot.get95thPercentile), TIMER)
+ send(fullName(name, "p98"), format(snapshot.get98thPercentile), TIMER)
+ send(fullName(name, "p99"), format(snapshot.get99thPercentile), TIMER)
+ send(fullName(name, "p999"), format(snapshot.get999thPercentile), TIMER)
+ }
+
+ private def reportMetered(name: String, meter: Metered)(implicit socket: DatagramSocket): Unit = {
+ send(fullName(name, "count"), format(meter.getCount), GAUGE)
+ send(fullName(name, "m1_rate"), format(convertRate(meter.getOneMinuteRate)), TIMER)
+ send(fullName(name, "m5_rate"), format(convertRate(meter.getFiveMinuteRate)), TIMER)
+ send(fullName(name, "m15_rate"), format(convertRate(meter.getFifteenMinuteRate)), TIMER)
+ send(fullName(name, "mean_rate"), format(convertRate(meter.getMeanRate)), TIMER)
+ }
+
+ private def reportTimer(name: String, timer: Timer)(implicit socket: DatagramSocket): Unit = {
+ val snapshot = timer.getSnapshot
+ send(fullName(name, "max"), format(convertDuration(snapshot.getMax)), TIMER)
+ send(fullName(name, "mean"), format(convertDuration(snapshot.getMean)), TIMER)
+ send(fullName(name, "min"), format(convertDuration(snapshot.getMin)), TIMER)
+ send(fullName(name, "stddev"), format(convertDuration(snapshot.getStdDev)), TIMER)
+ send(fullName(name, "p50"), format(convertDuration(snapshot.getMedian)), TIMER)
+ send(fullName(name, "p75"), format(convertDuration(snapshot.get75thPercentile)), TIMER)
+ send(fullName(name, "p95"), format(convertDuration(snapshot.get95thPercentile)), TIMER)
+ send(fullName(name, "p98"), format(convertDuration(snapshot.get98thPercentile)), TIMER)
+ send(fullName(name, "p99"), format(convertDuration(snapshot.get99thPercentile)), TIMER)
+ send(fullName(name, "p999"), format(convertDuration(snapshot.get999thPercentile)), TIMER)
+
+ reportMetered(name, timer)
+ }
+
+ private def send(name: String, value: String, metricType: String)
+ (implicit socket: DatagramSocket): Unit = {
+ val bytes = sanitize(s"$name:$value|$metricType").getBytes(UTF_8)
+ val packet = new DatagramPacket(bytes, bytes.length, address)
+ socket.send(packet)
+ }
+
+ private def fullName(names: String*): String = MetricRegistry.name(prefix, names : _*)
+
+ private def sanitize(s: String): String = whitespace.replaceAllIn(s, "-")
+
+ private def format(v: Any): String = formatAny(v).getOrElse("")
+
+ private def formatAny(v: Any): Option[String] =
+ v match {
+ case f: Float => Some("%2.2f".format(f))
+ case d: Double => Some("%2.2f".format(d))
+ case b: BigDecimal => Some("%2.2f".format(b))
+ case n: Number => Some(v.toString)
+ case _ => None
+ }
+}
+
diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala
new file mode 100644
index 0000000000..859a2f6bcd
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/StatsdSink.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import com.codahale.metrics.MetricRegistry
+
+import org.apache.spark.SecurityManager
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.MetricsSystem
+
+private[spark] object StatsdSink {
+ val STATSD_KEY_HOST = "host"
+ val STATSD_KEY_PORT = "port"
+ val STATSD_KEY_PERIOD = "period"
+ val STATSD_KEY_UNIT = "unit"
+ val STATSD_KEY_PREFIX = "prefix"
+
+ val STATSD_DEFAULT_HOST = "127.0.0.1"
+ val STATSD_DEFAULT_PORT = "8125"
+ val STATSD_DEFAULT_PERIOD = "10"
+ val STATSD_DEFAULT_UNIT = "SECONDS"
+ val STATSD_DEFAULT_PREFIX = ""
+}
+
+private[spark] class StatsdSink(
+ val property: Properties,
+ val registry: MetricRegistry,
+ securityMgr: SecurityManager)
+ extends Sink with Logging {
+ import StatsdSink._
+
+ val host = property.getProperty(STATSD_KEY_HOST, STATSD_DEFAULT_HOST)
+ val port = property.getProperty(STATSD_KEY_PORT, STATSD_DEFAULT_PORT).toInt
+
+ val pollPeriod = property.getProperty(STATSD_KEY_PERIOD, STATSD_DEFAULT_PERIOD).toInt
+ val pollUnit =
+ TimeUnit.valueOf(property.getProperty(STATSD_KEY_UNIT, STATSD_DEFAULT_UNIT).toUpperCase)
+
+ val prefix = property.getProperty(STATSD_KEY_PREFIX, STATSD_DEFAULT_PREFIX)
+
+ MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
+
+ val reporter = new StatsdReporter(registry, host, port, prefix)
+
+ override def start(): Unit = {
+ reporter.start(pollPeriod, pollUnit)
+ logInfo(s"StatsdSink started with prefix: '$prefix'")
+ }
+
+ override def stop(): Unit = {
+ reporter.stop()
+ logInfo("StatsdSink stopped.")
+ }
+
+ override def report(): Unit = reporter.report()
+}
+
diff --git a/core/src/test/scala/org/apache/spark/metrics/sink/StatsdSinkSuite.scala b/core/src/test/scala/org/apache/spark/metrics/sink/StatsdSinkSuite.scala
new file mode 100644
index 0000000000..0e21a36071
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/metrics/sink/StatsdSinkSuite.scala
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.metrics.sink
+
+import java.net.{DatagramPacket, DatagramSocket}
+import java.nio.charset.StandardCharsets.UTF_8
+import java.util.Properties
+import java.util.concurrent.TimeUnit._
+
+import com.codahale.metrics._
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.metrics.sink.StatsdSink._
+
+class StatsdSinkSuite extends SparkFunSuite {
+ private val securityMgr = new SecurityManager(new SparkConf(false))
+ private val defaultProps = Map(
+ STATSD_KEY_PREFIX -> "spark",
+ STATSD_KEY_PERIOD -> "1",
+ STATSD_KEY_UNIT -> "seconds",
+ STATSD_KEY_HOST -> "127.0.0.1"
+ )
+ private val socketTimeout = 30000 // milliseconds
+ private val socketBufferSize = 8192
+
+ private def withSocketAndSink(testCode: (DatagramSocket, StatsdSink) => Any): Unit = {
+ val socket = new DatagramSocket
+ socket.setReceiveBufferSize(socketBufferSize)
+ socket.setSoTimeout(socketTimeout)
+ val props = new Properties
+ defaultProps.foreach(e => props.put(e._1, e._2))
+ props.put(STATSD_KEY_PORT, socket.getLocalPort.toString)
+ val registry = new MetricRegistry
+ val sink = new StatsdSink(props, registry, securityMgr)
+ try {
+ testCode(socket, sink)
+ } finally {
+ socket.close()
+ }
+ }
+
+ test("metrics StatsD sink with Counter") {
+ withSocketAndSink { (socket, sink) =>
+ val counter = new Counter
+ counter.inc(12)
+ sink.registry.register("counter", counter)
+ sink.report()
+
+ val p = new DatagramPacket(new Array[Byte](socketBufferSize), socketBufferSize)
+ socket.receive(p)
+
+ val result = new String(p.getData, 0, p.getLength, UTF_8)
+ assert(result === "spark.counter:12|c", "Counter metric received should match data sent")
+ }
+ }
+
+ test("metrics StatsD sink with Gauge") {
+ withSocketAndSink { (socket, sink) =>
+ val gauge = new Gauge[Double] {
+ override def getValue: Double = 1.23
+ }
+ sink.registry.register("gauge", gauge)
+ sink.report()
+
+ val p = new DatagramPacket(new Array[Byte](socketBufferSize), socketBufferSize)
+ socket.receive(p)
+
+ val result = new String(p.getData, 0, p.getLength, UTF_8)
+ assert(result === "spark.gauge:1.23|g", "Gauge metric received should match data sent")
+ }
+ }
+
+ test("metrics StatsD sink with Histogram") {
+ withSocketAndSink { (socket, sink) =>
+ val p = new DatagramPacket(new Array[Byte](socketBufferSize), socketBufferSize)
+ val histogram = new Histogram(new UniformReservoir)
+ histogram.update(10)
+ histogram.update(20)
+ histogram.update(30)
+ sink.registry.register("histogram", histogram)
+ sink.report()
+
+ val expectedResults = Set(
+ "spark.histogram.count:3|g",
+ "spark.histogram.max:30|ms",
+ "spark.histogram.mean:20.00|ms",
+ "spark.histogram.min:10|ms",
+ "spark.histogram.stddev:10.00|ms",
+ "spark.histogram.p50:20.00|ms",
+ "spark.histogram.p75:30.00|ms",
+ "spark.histogram.p95:30.00|ms",
+ "spark.histogram.p98:30.00|ms",
+ "spark.histogram.p99:30.00|ms",
+ "spark.histogram.p999:30.00|ms"
+ )
+
+ (1 to expectedResults.size).foreach { i =>
+ socket.receive(p)
+ val result = new String(p.getData, 0, p.getLength, UTF_8)
+ logInfo(s"Received histogram result $i: '$result'")
+ assert(expectedResults.contains(result),
+ "Histogram metric received should match data sent")
+ }
+ }
+ }
+
+ test("metrics StatsD sink with Timer") {
+ withSocketAndSink { (socket, sink) =>
+ val p = new DatagramPacket(new Array[Byte](socketBufferSize), socketBufferSize)
+ val timer = new Timer()
+ timer.update(1, SECONDS)
+ timer.update(2, SECONDS)
+ timer.update(3, SECONDS)
+ sink.registry.register("timer", timer)
+ sink.report()
+
+ val expectedResults = Set(
+ "spark.timer.max:3000.00|ms",
+ "spark.timer.mean:2000.00|ms",
+ "spark.timer.min:1000.00|ms",
+ "spark.timer.stddev:816.50|ms",
+ "spark.timer.p50:2000.00|ms",
+ "spark.timer.p75:3000.00|ms",
+ "spark.timer.p95:3000.00|ms",
+ "spark.timer.p98:3000.00|ms",
+ "spark.timer.p99:3000.00|ms",
+ "spark.timer.p999:3000.00|ms",
+ "spark.timer.count:3|g",
+ "spark.timer.m1_rate:0.00|ms",
+ "spark.timer.m5_rate:0.00|ms",
+ "spark.timer.m15_rate:0.00|ms"
+ )
+ // mean rate varies on each test run
+ val oneMoreResult = """spark.timer.mean_rate:\d+\.\d\d\|ms"""
+
+ (1 to (expectedResults.size + 1)).foreach { i =>
+ socket.receive(p)
+ val result = new String(p.getData, 0, p.getLength, UTF_8)
+ logInfo(s"Received timer result $i: '$result'")
+ assert(expectedResults.contains(result) || result.matches(oneMoreResult),
+ "Timer metric received should match data sent")
+ }
+ }
+ }
+}
+
diff --git a/docs/monitoring.md b/docs/monitoring.md
index d22cd945ea..51084a2598 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -455,6 +455,7 @@ Each instance can report to zero or more _sinks_. Sinks are contained in the
* `MetricsServlet`: Adds a servlet within the existing Spark UI to serve metrics data as JSON data.
* `GraphiteSink`: Sends metrics to a Graphite node.
* `Slf4jSink`: Sends metrics to slf4j as log entries.
+* `StatsdSink`: Sends metrics to a StatsD node.
Spark also supports a Ganglia sink which is not included in the default build due to
licensing restrictions: