aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2018-06-05 08:29:29 +0700
committerhyukjinkwon <gurwls223@apache.org>2018-06-05 08:29:29 +0700
commitb3417b731d4e323398a0d7ec6e86405f4464f4f9 (patch)
treedf4cf3488893e78c29b9e1d1e15ff8d5505d6c89
parentdbb4d83829ec4b51d6e6d3a96f7a4e611d8827bc (diff)
[SPARK-16451][REPL] Fail shell if SparkSession fails to start.
Currently, in spark-shell, if the session fails to start, the user sees a bunch of unrelated errors which are caused by code in the shell initialization that references the "spark" variable, which does not exist in that case. Things like: ``` <console>:14: error: not found: value spark import spark.sql ``` The user is also left with a non-working shell (unless they want to just write non-Spark Scala or Python code, that is). This change fails the whole shell session at the point where the failure occurs, so that the last error message is the one with the actual information about the failure. For the python error handling, I moved the session initialization code to session.py, so that traceback.print_exc() only shows the last error. Otherwise, the printed exception would contain all previous exceptions with a message "During handling of the above exception, another exception occurred", making the actual error kinda hard to parse. Tested with spark-shell, pyspark (with 2.7 and 3.5), by forcing an error during SparkContext initialization. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #21368 from vanzin/SPARK-16451.
-rw-r--r--python/pyspark/shell.py26
-rw-r--r--python/pyspark/sql/session.py34
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/Main.scala72
3 files changed, 81 insertions, 51 deletions
diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py
index b5fcf7092d..472c3cd445 100644
--- a/python/pyspark/shell.py
+++ b/python/pyspark/shell.py
@@ -38,25 +38,13 @@ if os.environ.get("SPARK_EXECUTOR_URI"):
SparkContext._ensure_initialized()
try:
- # Try to access HiveConf, it will raise exception if Hive is not added
- conf = SparkConf()
- if conf.get('spark.sql.catalogImplementation', 'hive').lower() == 'hive':
- SparkContext._jvm.org.apache.hadoop.hive.conf.HiveConf()
- spark = SparkSession.builder\
- .enableHiveSupport()\
- .getOrCreate()
- else:
- spark = SparkSession.builder.getOrCreate()
-except py4j.protocol.Py4JError:
- if conf.get('spark.sql.catalogImplementation', '').lower() == 'hive':
- warnings.warn("Fall back to non-hive support because failing to access HiveConf, "
- "please make sure you build spark with hive")
- spark = SparkSession.builder.getOrCreate()
-except TypeError:
- if conf.get('spark.sql.catalogImplementation', '').lower() == 'hive':
- warnings.warn("Fall back to non-hive support because failing to access HiveConf, "
- "please make sure you build spark with hive")
- spark = SparkSession.builder.getOrCreate()
+ spark = SparkSession._create_shell_session()
+except Exception:
+ import sys
+ import traceback
+ warnings.warn("Failed to initialize Spark session.")
+ traceback.print_exc(file=sys.stderr)
+ sys.exit(1)
sc = spark.sparkContext
sql = spark.sql
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index d675a24017..e880dd1ca6 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -547,6 +547,40 @@ class SparkSession(object):
df._schema = schema
return df
+ @staticmethod
+ def _create_shell_session():
+ """
+ Initialize a SparkSession for a pyspark shell session. This is called from shell.py
+ to make error handling simpler without needing to declare local variables in that
+ script, which would expose those to users.
+ """
+ import py4j
+ from pyspark.conf import SparkConf
+ from pyspark.context import SparkContext
+ try:
+ # Try to access HiveConf, it will raise exception if Hive is not added
+ conf = SparkConf()
+ if conf.get('spark.sql.catalogImplementation', 'hive').lower() == 'hive':
+ SparkContext._jvm.org.apache.hadoop.hive.conf.HiveConf()
+ return SparkSession.builder\
+ .enableHiveSupport()\
+ .getOrCreate()
+ else:
+ return SparkSession.builder.getOrCreate()
+ except py4j.protocol.Py4JError:
+ if conf.get('spark.sql.catalogImplementation', '').lower() == 'hive':
+ warnings.warn("Fall back to non-hive support because failing to access HiveConf, "
+ "please make sure you build spark with hive")
+
+ try:
+ return SparkSession.builder.getOrCreate()
+ except TypeError:
+ if conf.get('spark.sql.catalogImplementation', '').lower() == 'hive':
+ warnings.warn("Fall back to non-hive support because failing to access HiveConf, "
+ "please make sure you build spark with hive")
+
+ return SparkSession.builder.getOrCreate()
+
@since(2.0)
@ignore_unicode_prefix
def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=True):
diff --git a/repl/src/main/scala/org/apache/spark/repl/Main.scala b/repl/src/main/scala/org/apache/spark/repl/Main.scala
index cc76a703bd..e4ddcef977 100644
--- a/repl/src/main/scala/org/apache/spark/repl/Main.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/Main.scala
@@ -44,6 +44,7 @@ object Main extends Logging {
var interp: SparkILoop = _
private var hasErrors = false
+ private var isShellSession = false
private def scalaOptionError(msg: String): Unit = {
hasErrors = true
@@ -53,6 +54,7 @@ object Main extends Logging {
}
def main(args: Array[String]) {
+ isShellSession = true
doMain(args, new SparkILoop)
}
@@ -79,44 +81,50 @@ object Main extends Logging {
}
def createSparkSession(): SparkSession = {
- val execUri = System.getenv("SPARK_EXECUTOR_URI")
- conf.setIfMissing("spark.app.name", "Spark shell")
- // SparkContext will detect this configuration and register it with the RpcEnv's
- // file server, setting spark.repl.class.uri to the actual URI for executors to
- // use. This is sort of ugly but since executors are started as part of SparkContext
- // initialization in certain cases, there's an initialization order issue that prevents
- // this from being set after SparkContext is instantiated.
- conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath())
- if (execUri != null) {
- conf.set("spark.executor.uri", execUri)
- }
- if (System.getenv("SPARK_HOME") != null) {
- conf.setSparkHome(System.getenv("SPARK_HOME"))
- }
+ try {
+ val execUri = System.getenv("SPARK_EXECUTOR_URI")
+ conf.setIfMissing("spark.app.name", "Spark shell")
+ // SparkContext will detect this configuration and register it with the RpcEnv's
+ // file server, setting spark.repl.class.uri to the actual URI for executors to
+ // use. This is sort of ugly but since executors are started as part of SparkContext
+ // initialization in certain cases, there's an initialization order issue that prevents
+ // this from being set after SparkContext is instantiated.
+ conf.set("spark.repl.class.outputDir", outputDir.getAbsolutePath())
+ if (execUri != null) {
+ conf.set("spark.executor.uri", execUri)
+ }
+ if (System.getenv("SPARK_HOME") != null) {
+ conf.setSparkHome(System.getenv("SPARK_HOME"))
+ }
- val builder = SparkSession.builder.config(conf)
- if (conf.get(CATALOG_IMPLEMENTATION.key, "hive").toLowerCase(Locale.ROOT) == "hive") {
- if (SparkSession.hiveClassesArePresent) {
- // In the case that the property is not set at all, builder's config
- // does not have this value set to 'hive' yet. The original default
- // behavior is that when there are hive classes, we use hive catalog.
- sparkSession = builder.enableHiveSupport().getOrCreate()
- logInfo("Created Spark session with Hive support")
+ val builder = SparkSession.builder.config(conf)
+ if (conf.get(CATALOG_IMPLEMENTATION.key, "hive").toLowerCase(Locale.ROOT) == "hive") {
+ if (SparkSession.hiveClassesArePresent) {
+ // In the case that the property is not set at all, builder's config
+ // does not have this value set to 'hive' yet. The original default
+ // behavior is that when there are hive classes, we use hive catalog.
+ sparkSession = builder.enableHiveSupport().getOrCreate()
+ logInfo("Created Spark session with Hive support")
+ } else {
+ // Need to change it back to 'in-memory' if no hive classes are found
+ // in the case that the property is set to hive in spark-defaults.conf
+ builder.config(CATALOG_IMPLEMENTATION.key, "in-memory")
+ sparkSession = builder.getOrCreate()
+ logInfo("Created Spark session")
+ }
} else {
- // Need to change it back to 'in-memory' if no hive classes are found
- // in the case that the property is set to hive in spark-defaults.conf
- builder.config(CATALOG_IMPLEMENTATION.key, "in-memory")
+ // In the case that the property is set but not to 'hive', the internal
+ // default is 'in-memory'. So the sparkSession will use in-memory catalog.
sparkSession = builder.getOrCreate()
logInfo("Created Spark session")
}
- } else {
- // In the case that the property is set but not to 'hive', the internal
- // default is 'in-memory'. So the sparkSession will use in-memory catalog.
- sparkSession = builder.getOrCreate()
- logInfo("Created Spark session")
+ sparkContext = sparkSession.sparkContext
+ sparkSession
+ } catch {
+ case e: Exception if isShellSession =>
+ logError("Failed to initialize Spark session.", e)
+ sys.exit(1)
}
- sparkContext = sparkSession.sparkContext
- sparkSession
}
}