aboutsummaryrefslogtreecommitdiff
path: root/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
diff options
context:
space:
mode:
Diffstat (limited to 'bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py')
-rw-r--r--[-rwxr-xr-x]bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py192
1 files changed, 119 insertions, 73 deletions
diff --git a/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py b/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
index dc2e3730..91fcbf79 100755..100644
--- a/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
+++ b/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
@@ -13,11 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
+import time
from jujubigdata import utils
from path import Path
from charms.layer.apache_bigtop_base import Bigtop
-from charms.reactive import is_state
from charms import layer
from charmhelpers.core import hookenv, host, unitdata
from charmhelpers.fetch.archiveurl import ArchiveUrlFetchHandler
@@ -36,7 +36,7 @@ class Spark(object):
mode = hookenv.config()['spark_execution_mode']
zk_units = unitdata.kv().get('zookeeper.units', [])
master = None
- if mode.startswith('local') or mode == 'yarn-cluster':
+ if mode.startswith('local') or mode.startswith('yarn'):
master = mode
elif mode == 'standalone' and not zk_units:
master = 'spark://{}:7077'.format(spark_master_host)
@@ -47,18 +47,8 @@ class Spark(object):
nodes.append('{}:7077'.format(ip))
nodes_str = ','.join(nodes)
master = 'spark://{}'.format(nodes_str)
- elif mode.startswith('yarn'):
- master = 'yarn-client'
return master
- def get_roles(self):
- roles = ['spark-worker', 'spark-client']
- zk_units = unitdata.kv().get('zookeeper.units', [])
- if is_state('leadership.is_leader') or zk_units:
- roles.append('spark-master')
- roles.append('spark-history-server')
- return roles
-
def install_benchmark(self):
"""
Install and configure SparkBench.
@@ -155,10 +145,10 @@ class Spark(object):
self.dist_config.add_users()
self.dist_config.add_dirs()
self.install_demo()
- self.open_ports()
def setup_hdfs_logs(self):
- # create hdfs storage space for history server
+ # Create hdfs storage space for history server and return the name
+ # of the created directory.
dc = self.dist_config
events_dir = dc.path('spark_events')
events_dir = 'hdfs://{}'.format(events_dir)
@@ -168,49 +158,94 @@ class Spark(object):
events_dir)
return events_dir
- def configure(self, available_hosts, zk_units, peers):
+ def configure(self, available_hosts, zk_units, peers, extra_libs):
"""
This is the core logic of setting up spark.
- Two flags are needed:
-
- * Namenode exists aka HDFS is there
- * Resource manager exists aka YARN is ready
-
- both flags are infered from the available hosts.
-
:param dict available_hosts: Hosts that Spark should know about.
+ :param list zk_units: List of Zookeeper dicts with host/port info.
+ :param list peers: List of Spark peer tuples (unit name, IP).
+ :param list extra_libs: List of extra lib paths for driver/executors.
"""
- unitdata.kv().set('zookeeper.units', zk_units)
- unitdata.kv().set('sparkpeer.units', peers)
- unitdata.kv().flush(True)
-
+ # Bootstrap spark
if not unitdata.kv().get('spark.bootstrapped', False):
self.setup()
unitdata.kv().set('spark.bootstrapped', True)
+ # Set KV based on connected applications
+ unitdata.kv().set('zookeeper.units', zk_units)
+ unitdata.kv().set('sparkpeer.units', peers)
+ unitdata.kv().flush(True)
+
+ # Get our config ready
+ dc = self.dist_config
+ events_log_dir = 'file://{}'.format(dc.path('spark_events'))
+ mode = hookenv.config()['spark_execution_mode']
master_ip = utils.resolve_private_address(available_hosts['spark-master'])
+ master_url = self.get_master_url(master_ip)
+ req_driver_mem = hookenv.config()['driver_memory']
+ req_executor_mem = hookenv.config()['executor_memory']
+
+ # handle tuning options that may be set as percentages
+ driver_mem = '1g'
+ executor_mem = '1g'
+ if req_driver_mem.endswith('%'):
+ if mode == 'standalone' or mode.startswith('local'):
+ mem_mb = host.get_total_ram() / 1024 / 1024
+ req_percentage = float(req_driver_mem.strip('%')) / 100
+ driver_mem = str(int(mem_mb * req_percentage)) + 'm'
+ else:
+ hookenv.log("driver_memory percentage in non-local mode. Using 1g default.",
+ level=None)
+ else:
+ driver_mem = req_driver_mem
+
+ if req_executor_mem.endswith('%'):
+ if mode == 'standalone' or mode.startswith('local'):
+ mem_mb = host.get_total_ram() / 1024 / 1024
+ req_percentage = float(req_executor_mem.strip('%')) / 100
+ executor_mem = str(int(mem_mb * req_percentage)) + 'm'
+ else:
+ hookenv.log("executor_memory percentage in non-local mode. Using 1g default.",
+ level=None)
+ else:
+ executor_mem = req_executor_mem
+
+ # Setup hosts dict
hosts = {
'spark': master_ip,
}
-
- dc = self.dist_config
- events_log_dir = 'file://{}'.format(dc.path('spark_events'))
if 'namenode' in available_hosts:
hosts['namenode'] = available_hosts['namenode']
events_log_dir = self.setup_hdfs_logs()
-
+ else:
+ # Bigtop includes a default hadoop_head_node if we do not specify
+ # any namenode info. To ensure spark standalone doesn't get
+ # invalid hadoop config, set our NN to an empty string.
+ hosts['namenode'] = ''
if 'resourcemanager' in available_hosts:
hosts['resourcemanager'] = available_hosts['resourcemanager']
- roles = self.get_roles()
+ # Setup roles dict. We always include the history server and client.
+ # Determine other roles based on our execution mode.
+ roles = ['spark-history-server', 'spark-client']
+ if mode == 'standalone':
+ roles.append('spark-master')
+ roles.append('spark-worker')
+ elif mode.startswith('yarn'):
+ roles.append('spark-on-yarn')
+ roles.append('spark-yarn-slave')
+ # Setup overrides dict
override = {
- 'spark::common::master_url': self.get_master_url(master_ip),
+ 'spark::common::master_url': master_url,
'spark::common::event_log_dir': events_log_dir,
'spark::common::history_log_dir': events_log_dir,
+ 'spark::common::extra_lib_dirs':
+ ':'.join(extra_libs) if extra_libs else None,
+ 'spark::common::driver_mem': driver_mem,
+ 'spark::common::executor_mem': executor_mem,
}
-
if zk_units:
zks = []
for unit in zk_units:
@@ -220,18 +255,14 @@ class Spark(object):
zk_connect = ",".join(zks)
override['spark::common::zookeeper_connection_string'] = zk_connect
else:
- override['spark::common::zookeeper_connection_string'] = ""
+ override['spark::common::zookeeper_connection_string'] = None
+ # Create our site.yaml and trigger puppet
bigtop = Bigtop()
bigtop.render_site_yaml(hosts, roles, override)
bigtop.trigger_puppet()
- # There is a race condition here.
- # The work role will not start the first time we trigger puppet apply.
- # The exception in /var/logs/spark:
- # Exception in thread "main" org.apache.spark.SparkException: Invalid master URL: spark://:7077
- # The master url is not set at the time the worker start the first time.
- # TODO(kjackal): ...do the needed... (investiate,debug,submit patch)
- bigtop.trigger_puppet()
+
+ # Do this after our puppet bits in case puppet overrides needed perms
if 'namenode' not in available_hosts:
# Local event dir (not in HDFS) needs to be 777 so non-spark
# users can write job history there. It needs to be g+s so
@@ -239,22 +270,21 @@ class Spark(object):
# It needs to be +t so users cannot remove files they don't own.
dc.path('spark_events').chmod(0o3777)
- self.patch_worker_master_url(master_ip)
+ self.patch_worker_master_url(master_ip, master_url)
+ # Install SB (subsequent calls will reconfigure existing install)
# SparkBench looks for the spark master in /etc/environment
with utils.environment_edit_in_place('/etc/environment') as env:
- env['MASTER'] = self.get_master_url(master_ip)
- # Install SB (subsequent calls will reconfigure existing install)
+ env['MASTER'] = master_url
self.install_benchmark()
- def patch_worker_master_url(self, master_ip):
+ def patch_worker_master_url(self, master_ip, master_url):
'''
Patch the worker startup script to use the full master url istead of contracting it.
The master url is placed in the spark-env.sh so that the startup script will use it.
In HA mode the master_ip is set to be the local_ip instead of the one the leader
elects. This requires a restart of the master service.
'''
- master_url = self.get_master_url(master_ip)
zk_units = unitdata.kv().get('zookeeper.units', [])
if master_url.startswith('spark://'):
if zk_units:
@@ -268,8 +298,6 @@ class Spark(object):
self.inplace_change('/etc/init.d/spark-worker',
'spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT',
'$SPARK_MASTER_URL')
- host.service_restart('spark-master')
- host.service_restart('spark-worker')
def inplace_change(self, filename, old_string, new_string):
# Safely read the input filename using 'with'
@@ -294,32 +322,50 @@ class Spark(object):
Path(demo_target).chown('ubuntu', 'hadoop')
def start(self):
- if unitdata.kv().get('spark.uprading', False):
- return
-
- # stop services (if they're running) to pick up any config change
- self.stop()
- # always start the history server, start master/worker if we're standalone
+ '''
+ Always start the Spark History Server. Start other services as
+ required by our execution mode. Open related ports as appropriate.
+ '''
host.service_start('spark-history-server')
+ hookenv.open_port(self.dist_config.port('spark-history-ui'))
+
+ # Spark master/worker is only started in standalone mode
if hookenv.config()['spark_execution_mode'] == 'standalone':
- host.service_start('spark-master')
- host.service_start('spark-worker')
+ if host.service_start('spark-master'):
+ hookenv.log("Spark Master started")
+ hookenv.open_port(self.dist_config.port('spark-master-ui'))
+ # If the master started and we have peers, wait 2m for recovery
+ # before starting the worker. This ensures the worker binds
+ # to the correct master.
+ if unitdata.kv().get('sparkpeer.units'):
+ hookenv.status_set('maintenance',
+ 'waiting for spark master recovery')
+ hookenv.log("Waiting 2m to ensure spark master is ALIVE")
+ time.sleep(120)
+ else:
+ hookenv.log("Spark Master did not start; this is normal "
+ "for non-leader units in standalone mode")
+
+ # NB: Start the worker even if the master process on this unit
+ # fails to start. In non-HA mode, spark master only runs on the
+ # leader. On non-leader units, we still want a worker bound to
+ # the leader.
+ if host.service_start('spark-worker'):
+ hookenv.log("Spark Worker started")
+ hookenv.open_port(self.dist_config.port('spark-worker-ui'))
+ else:
+ hookenv.log("Spark Worker did not start")
def stop(self):
- if not unitdata.kv().get('spark.installed', False):
- return
- # Only stop services if they're running
- if utils.jps("HistoryServer"):
- host.service_stop('spark-history-server')
- if utils.jps("Master"):
- host.service_stop('spark-master')
- if utils.jps("Worker"):
- host.service_stop('spark-worker')
-
- def open_ports(self):
- for port in self.dist_config.exposed_ports('spark'):
- hookenv.open_port(port)
-
- def close_ports(self):
- for port in self.dist_config.exposed_ports('spark'):
- hookenv.close_port(port)
+ '''
+ Stop all services (and close associated ports). Stopping a service
+ that is not currently running does no harm.
+ '''
+ host.service_stop('spark-history-server')
+ hookenv.close_port(self.dist_config.port('spark-history-ui'))
+
+ # Stop the worker before the master
+ host.service_stop('spark-worker')
+ hookenv.close_port(self.dist_config.port('spark-worker-ui'))
+ host.service_stop('spark-master')
+ hookenv.close_port(self.dist_config.port('spark-master-ui'))