aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xdev/create-release/release-build.sh4
-rwxr-xr-xdev/mima2
-rwxr-xr-xdev/scalastyle1
-rw-r--r--dev/sparktestsupport/modules.py20
-rwxr-xr-xdev/test-dependencies.sh2
-rw-r--r--docs/building-spark.md7
-rw-r--r--docs/streaming-flume-integration.md13
-rw-r--r--examples/pom.xml7
-rw-r--r--external/flume/src/main/java/org/apache/spark/examples/JavaFlumeEventCount.java (renamed from examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java)2
-rw-r--r--external/flume/src/main/scala/org/apache/spark/examples/FlumeEventCount.scala (renamed from examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala)2
-rw-r--r--external/flume/src/main/scala/org/apache/spark/examples/FlumePollingEventCount.scala (renamed from examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala)2
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala1
-rw-r--r--pom.xml13
-rw-r--r--project/SparkBuild.scala17
-rw-r--r--python/pyspark/streaming/flume.py4
-rw-r--r--python/pyspark/streaming/tests.py16
16 files changed, 73 insertions, 40 deletions
diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh
index 5390f5916f..7e8d5c7075 100755
--- a/dev/create-release/release-build.sh
+++ b/dev/create-release/release-build.sh
@@ -84,9 +84,9 @@ MVN="build/mvn --force"
# Hive-specific profiles for some builds
HIVE_PROFILES="-Phive -Phive-thriftserver"
# Profiles for publishing snapshots and release to Maven Central
-PUBLISH_PROFILES="-Pmesos -Pyarn $HIVE_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl"
+PUBLISH_PROFILES="-Pmesos -Pyarn -Pflume $HIVE_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl"
# Profiles for building binary releases
-BASE_RELEASE_PROFILES="-Pmesos -Pyarn -Psparkr"
+BASE_RELEASE_PROFILES="-Pmesos -Pyarn -Pflume -Psparkr"
# Scala 2.11 only profiles for some builds
SCALA_2_11_PROFILES="-Pkafka-0-8"
# Scala 2.12 only profiles for some builds
diff --git a/dev/mima b/dev/mima
index fdb21f5007..1e3ca9700b 100755
--- a/dev/mima
+++ b/dev/mima
@@ -24,7 +24,7 @@ set -e
FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
cd "$FWDIR"
-SPARK_PROFILES="-Pmesos -Pkafka-0-8 -Pyarn -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive"
+SPARK_PROFILES="-Pmesos -Pkafka-0-8 -Pyarn -Pflume -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive"
TOOLS_CLASSPATH="$(build/sbt -DcopyDependencies=false "export tools/fullClasspath" | tail -n1)"
OLD_DEPS_CLASSPATH="$(build/sbt -DcopyDependencies=false $SPARK_PROFILES "export oldDeps/fullClasspath" | tail -n1)"
diff --git a/dev/scalastyle b/dev/scalastyle
index e5aa589869..89ecc8abd6 100755
--- a/dev/scalastyle
+++ b/dev/scalastyle
@@ -25,6 +25,7 @@ ERRORS=$(echo -e "q\n" \
-Pmesos \
-Pkafka-0-8 \
-Pyarn \
+ -Pflume \
-Phive \
-Phive-thriftserver \
scalastyle test:scalastyle \
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 50e14b6054..91d5667ed1 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -279,6 +279,12 @@ streaming_flume_sink = Module(
source_file_regexes=[
"external/flume-sink",
],
+ build_profile_flags=[
+ "-Pflume",
+ ],
+ environ={
+ "ENABLE_FLUME_TESTS": "1"
+ },
sbt_test_goals=[
"streaming-flume-sink/test",
]
@@ -291,6 +297,12 @@ streaming_flume = Module(
source_file_regexes=[
"external/flume",
],
+ build_profile_flags=[
+ "-Pflume",
+ ],
+ environ={
+ "ENABLE_FLUME_TESTS": "1"
+ },
sbt_test_goals=[
"streaming-flume/test",
]
@@ -302,7 +314,13 @@ streaming_flume_assembly = Module(
dependencies=[streaming_flume, streaming_flume_sink],
source_file_regexes=[
"external/flume-assembly",
- ]
+ ],
+ build_profile_flags=[
+ "-Pflume",
+ ],
+ environ={
+ "ENABLE_FLUME_TESTS": "1"
+ }
)
diff --git a/dev/test-dependencies.sh b/dev/test-dependencies.sh
index c7714578bd..58b295d4f6 100755
--- a/dev/test-dependencies.sh
+++ b/dev/test-dependencies.sh
@@ -29,7 +29,7 @@ export LC_ALL=C
# TODO: This would be much nicer to do in SBT, once SBT supports Maven-style resolution.
# NOTE: These should match those in the release publishing script
-HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pkafka-0-8 -Pyarn -Phive"
+HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pkafka-0-8 -Pyarn -Pflume -Phive"
MVN="build/mvn"
HADOOP_PROFILES=(
hadoop-2.6
diff --git a/docs/building-spark.md b/docs/building-spark.md
index 57baa50325..98f7df1554 100644
--- a/docs/building-spark.md
+++ b/docs/building-spark.md
@@ -100,6 +100,13 @@ Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.
Kafka 0.10 support is still automatically built.
+## Building with Flume support
+
+Apache Flume support must be explicitly enabled with the `flume` profile.
+Note: Flume support is deprecated as of Spark 2.3.0.
+
+ ./build/mvn -Pflume -DskipTests clean package
+
## Building submodules individually
It's possible to build Spark sub-modules using the `mvn -pl` option.
diff --git a/docs/streaming-flume-integration.md b/docs/streaming-flume-integration.md
index a5d36da5b6..257a4f7d4f 100644
--- a/docs/streaming-flume-integration.md
+++ b/docs/streaming-flume-integration.md
@@ -5,6 +5,8 @@ title: Spark Streaming + Flume Integration Guide
[Apache Flume](https://flume.apache.org/) is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. Here we explain how to configure Flume and Spark Streaming to receive data from Flume. There are two approaches to this.
+**Note: Flume support is deprecated as of Spark 2.3.0.**
+
## Approach 1: Flume-style Push-based Approach
Flume is designed to push data between Flume agents. In this approach, Spark Streaming essentially sets up a receiver that acts an Avro agent for Flume, to which Flume can push the data. Here are the configuration steps.
@@ -44,8 +46,7 @@ configuring Flume agents.
val flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
- See the [API docs](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$)
- and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala).
+ See the [API docs](api/scala/index.html#org.apache.spark.streaming.flume.FlumeUtils$).
</div>
<div data-lang="java" markdown="1">
import org.apache.spark.streaming.flume.*;
@@ -53,8 +54,7 @@ configuring Flume agents.
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port]);
- See the [API docs](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html)
- and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java).
+ See the [API docs](api/java/index.html?org/apache/spark/streaming/flume/FlumeUtils.html).
</div>
<div data-lang="python" markdown="1">
from pyspark.streaming.flume import FlumeUtils
@@ -62,8 +62,7 @@ configuring Flume agents.
flumeStream = FlumeUtils.createStream(streamingContext, [chosen machine's hostname], [chosen port])
By default, the Python API will decode Flume event body as UTF8 encoded strings. You can specify your custom decoding function to decode the body byte arrays in Flume events to any arbitrary data type.
- See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils)
- and the [example]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/streaming/flume_wordcount.py).
+ See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.flume.FlumeUtils).
</div>
</div>
@@ -162,8 +161,6 @@ configuring Flume agents.
</div>
</div>
- See the Scala example [FlumePollingEventCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala).
-
Note that each input DStream can be configured to receive data from multiple sinks.
3. **Deploying:** This is same as the first approach.
diff --git a/examples/pom.xml b/examples/pom.xml
index 52a6764ae2..1791dbaad7 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -34,7 +34,6 @@
<sbt.project.name>examples</sbt.project.name>
<build.testJarPhase>none</build.testJarPhase>
<build.copyDependenciesPhase>package</build.copyDependenciesPhase>
- <flume.deps.scope>provided</flume.deps.scope>
<hadoop.deps.scope>provided</hadoop.deps.scope>
<hive.deps.scope>provided</hive.deps.scope>
<parquet.deps.scope>provided</parquet.deps.scope>
@@ -80,12 +79,6 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/external/flume/src/main/java/org/apache/spark/examples/JavaFlumeEventCount.java
index 0c651049d0..4e3420d9c3 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
+++ b/external/flume/src/main/java/org/apache/spark/examples/JavaFlumeEventCount.java
@@ -48,8 +48,6 @@ public final class JavaFlumeEventCount {
System.exit(1);
}
- StreamingExamples.setStreamingLogLevels();
-
String host = args[0];
int port = Integer.parseInt(args[1]);
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala b/external/flume/src/main/scala/org/apache/spark/examples/FlumeEventCount.scala
index 91e52e4eff..f877f79391 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala
+++ b/external/flume/src/main/scala/org/apache/spark/examples/FlumeEventCount.scala
@@ -47,8 +47,6 @@ object FlumeEventCount {
System.exit(1)
}
- StreamingExamples.setStreamingLogLevels()
-
val Array(host, IntParam(port)) = args
val batchInterval = Milliseconds(2000)
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala b/external/flume/src/main/scala/org/apache/spark/examples/FlumePollingEventCount.scala
index dd725d72c2..79a4027ca5 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala
+++ b/external/flume/src/main/scala/org/apache/spark/examples/FlumePollingEventCount.scala
@@ -44,8 +44,6 @@ object FlumePollingEventCount {
System.exit(1)
}
- StreamingExamples.setStreamingLogLevels()
-
val Array(host, IntParam(port)) = args
val batchInterval = Milliseconds(2000)
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index 3e3ed712f0..707193a957 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -30,6 +30,7 @@ import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaReceiverInputDStream, JavaStreamingContext}
import org.apache.spark.streaming.dstream.ReceiverInputDStream
+@deprecated("Deprecated without replacement", "2.3.0")
object FlumeUtils {
private val DEFAULT_POLLING_PARALLELISM = 5
private val DEFAULT_POLLING_BATCH_SIZE = 1000
diff --git a/pom.xml b/pom.xml
index 87a468c3a6..9fac8b1e53 100644
--- a/pom.xml
+++ b/pom.xml
@@ -98,15 +98,13 @@
<module>sql/core</module>
<module>sql/hive</module>
<module>assembly</module>
- <module>external/flume</module>
- <module>external/flume-sink</module>
- <module>external/flume-assembly</module>
<module>examples</module>
<module>repl</module>
<module>launcher</module>
<module>external/kafka-0-10</module>
<module>external/kafka-0-10-assembly</module>
<module>external/kafka-0-10-sql</module>
+ <!-- See additional modules enabled by profiles below -->
</modules>
<properties>
@@ -2583,6 +2581,15 @@
</dependencies>
</profile>
+ <profile>
+ <id>flume</id>
+ <modules>
+ <module>external/flume</module>
+ <module>external/flume-sink</module>
+ <module>external/flume-assembly</module>
+ </modules>
+ </profile>
+
<!-- Ganglia integration is not included by default due to LGPL-licensed code -->
<profile>
<id>spark-ganglia-lgpl</id>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index a568d264cb..9501eed1e9 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -43,11 +43,8 @@ object BuildCommons {
"catalyst", "sql", "hive", "hive-thriftserver", "sql-kafka-0-10"
).map(ProjectRef(buildLocation, _))
- val streamingProjects@Seq(
- streaming, streamingFlumeSink, streamingFlume, streamingKafka010
- ) = Seq(
- "streaming", "streaming-flume-sink", "streaming-flume", "streaming-kafka-0-10"
- ).map(ProjectRef(buildLocation, _))
+ val streamingProjects@Seq(streaming, streamingKafka010) =
+ Seq("streaming", "streaming-kafka-0-10").map(ProjectRef(buildLocation, _))
val allProjects@Seq(
core, graphx, mllib, mllibLocal, repl, networkCommon, networkShuffle, launcher, unsafe, tags, sketch, kvstore, _*
@@ -56,9 +53,13 @@ object BuildCommons {
"tags", "sketch", "kvstore"
).map(ProjectRef(buildLocation, _)) ++ sqlProjects ++ streamingProjects
- val optionallyEnabledProjects@Seq(mesos, yarn, streamingKafka, sparkGangliaLgpl,
- streamingKinesisAsl, dockerIntegrationTests, hadoopCloud) =
- Seq("mesos", "yarn", "streaming-kafka-0-8", "ganglia-lgpl", "streaming-kinesis-asl",
+ val optionallyEnabledProjects@Seq(mesos, yarn,
+ streamingFlumeSink, streamingFlume,
+ streamingKafka, sparkGangliaLgpl, streamingKinesisAsl,
+ dockerIntegrationTests, hadoopCloud) =
+ Seq("mesos", "yarn",
+ "streaming-flume-sink", "streaming-flume",
+ "streaming-kafka-0-8", "ganglia-lgpl", "streaming-kinesis-asl",
"docker-integration-tests", "hadoop-cloud").map(ProjectRef(buildLocation, _))
val assemblyProjects@Seq(networkYarn, streamingFlumeAssembly, streamingKafkaAssembly, streamingKafka010Assembly, streamingKinesisAslAssembly) =
diff --git a/python/pyspark/streaming/flume.py b/python/pyspark/streaming/flume.py
index cd30483fc6..2fed5940b3 100644
--- a/python/pyspark/streaming/flume.py
+++ b/python/pyspark/streaming/flume.py
@@ -53,6 +53,8 @@ class FlumeUtils(object):
:param enableDecompression: Should netty server decompress input stream
:param bodyDecoder: A function used to decode body (default is utf8_decoder)
:return: A DStream object
+
+ .. note:: Deprecated in 2.3.0
"""
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
helper = FlumeUtils._get_helper(ssc._sc)
@@ -79,6 +81,8 @@ class FlumeUtils(object):
will result in this stream using more threads
:param bodyDecoder: A function used to decode body (default is utf8_decoder)
:return: A DStream object
+
+ .. note:: Deprecated in 2.3.0
"""
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
hosts = []
diff --git a/python/pyspark/streaming/tests.py b/python/pyspark/streaming/tests.py
index 229cf53e47..5b86c1cb2c 100644
--- a/python/pyspark/streaming/tests.py
+++ b/python/pyspark/streaming/tests.py
@@ -1478,7 +1478,7 @@ def search_kafka_assembly_jar():
("Failed to find Spark Streaming kafka assembly jar in %s. " % kafka_assembly_dir) +
"You need to build Spark with "
"'build/sbt assembly/package streaming-kafka-0-8-assembly/assembly' or "
- "'build/mvn package' before running this test.")
+ "'build/mvn -Pkafka-0-8 package' before running this test.")
elif len(jars) > 1:
raise Exception(("Found multiple Spark Streaming Kafka assembly JARs: %s; please "
"remove all but one") % (", ".join(jars)))
@@ -1495,7 +1495,7 @@ def search_flume_assembly_jar():
("Failed to find Spark Streaming Flume assembly jar in %s. " % flume_assembly_dir) +
"You need to build Spark with "
"'build/sbt assembly/assembly streaming-flume-assembly/assembly' or "
- "'build/mvn package' before running this test.")
+ "'build/mvn -Pflume package' before running this test.")
elif len(jars) > 1:
raise Exception(("Found multiple Spark Streaming Flume assembly JARs: %s; please "
"remove all but one") % (", ".join(jars)))
@@ -1517,6 +1517,9 @@ def search_kinesis_asl_assembly_jar():
# Must be same as the variable and condition defined in modules.py
+flume_test_environ_var = "ENABLE_FLUME_TESTS"
+are_flume_tests_enabled = os.environ.get(flume_test_environ_var) == '1'
+# Must be same as the variable and condition defined in modules.py
kafka_test_environ_var = "ENABLE_KAFKA_0_8_TESTS"
are_kafka_tests_enabled = os.environ.get(kafka_test_environ_var) == '1'
# Must be same as the variable and condition defined in KinesisTestUtils.scala and modules.py
@@ -1538,9 +1541,16 @@ if __name__ == "__main__":
os.environ["PYSPARK_SUBMIT_ARGS"] = "--jars %s pyspark-shell" % jars
testcases = [BasicOperationTests, WindowFunctionTests, StreamingContextTests, CheckpointTests,
- FlumeStreamTests, FlumePollingStreamTests,
StreamingListenerTests]
+ if are_flume_tests_enabled:
+ testcases.append(FlumeStreamTests)
+ testcases.append(FlumePollingStreamTests)
+ else:
+ sys.stderr.write(
+ "Skipped test_flume_stream (enable by setting environment variable %s=1"
+ % flume_test_environ_var)
+
if are_kafka_tests_enabled:
testcases.append(KafkaStreamTests)
else: