diff options
Diffstat (limited to 'bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py')
-rw-r--r--[-rwxr-xr-x] | bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py | 192 |
1 files changed, 119 insertions, 73 deletions
diff --git a/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py b/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py index dc2e3730..91fcbf79 100755..100644 --- a/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py +++ b/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py @@ -13,11 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. import os +import time from jujubigdata import utils from path import Path from charms.layer.apache_bigtop_base import Bigtop -from charms.reactive import is_state from charms import layer from charmhelpers.core import hookenv, host, unitdata from charmhelpers.fetch.archiveurl import ArchiveUrlFetchHandler @@ -36,7 +36,7 @@ class Spark(object): mode = hookenv.config()['spark_execution_mode'] zk_units = unitdata.kv().get('zookeeper.units', []) master = None - if mode.startswith('local') or mode == 'yarn-cluster': + if mode.startswith('local') or mode.startswith('yarn'): master = mode elif mode == 'standalone' and not zk_units: master = 'spark://{}:7077'.format(spark_master_host) @@ -47,18 +47,8 @@ class Spark(object): nodes.append('{}:7077'.format(ip)) nodes_str = ','.join(nodes) master = 'spark://{}'.format(nodes_str) - elif mode.startswith('yarn'): - master = 'yarn-client' return master - def get_roles(self): - roles = ['spark-worker', 'spark-client'] - zk_units = unitdata.kv().get('zookeeper.units', []) - if is_state('leadership.is_leader') or zk_units: - roles.append('spark-master') - roles.append('spark-history-server') - return roles - def install_benchmark(self): """ Install and configure SparkBench. @@ -155,10 +145,10 @@ class Spark(object): self.dist_config.add_users() self.dist_config.add_dirs() self.install_demo() - self.open_ports() def setup_hdfs_logs(self): - # create hdfs storage space for history server + # Create hdfs storage space for history server and return the name + # of the created directory. dc = self.dist_config events_dir = dc.path('spark_events') events_dir = 'hdfs://{}'.format(events_dir) @@ -168,49 +158,94 @@ class Spark(object): events_dir) return events_dir - def configure(self, available_hosts, zk_units, peers): + def configure(self, available_hosts, zk_units, peers, extra_libs): """ This is the core logic of setting up spark. - Two flags are needed: - - * Namenode exists aka HDFS is there - * Resource manager exists aka YARN is ready - - both flags are infered from the available hosts. - :param dict available_hosts: Hosts that Spark should know about. + :param list zk_units: List of Zookeeper dicts with host/port info. + :param list peers: List of Spark peer tuples (unit name, IP). + :param list extra_libs: List of extra lib paths for driver/executors. """ - unitdata.kv().set('zookeeper.units', zk_units) - unitdata.kv().set('sparkpeer.units', peers) - unitdata.kv().flush(True) - + # Bootstrap spark if not unitdata.kv().get('spark.bootstrapped', False): self.setup() unitdata.kv().set('spark.bootstrapped', True) + # Set KV based on connected applications + unitdata.kv().set('zookeeper.units', zk_units) + unitdata.kv().set('sparkpeer.units', peers) + unitdata.kv().flush(True) + + # Get our config ready + dc = self.dist_config + events_log_dir = 'file://{}'.format(dc.path('spark_events')) + mode = hookenv.config()['spark_execution_mode'] master_ip = utils.resolve_private_address(available_hosts['spark-master']) + master_url = self.get_master_url(master_ip) + req_driver_mem = hookenv.config()['driver_memory'] + req_executor_mem = hookenv.config()['executor_memory'] + + # handle tuning options that may be set as percentages + driver_mem = '1g' + executor_mem = '1g' + if req_driver_mem.endswith('%'): + if mode == 'standalone' or mode.startswith('local'): + mem_mb = host.get_total_ram() / 1024 / 1024 + req_percentage = float(req_driver_mem.strip('%')) / 100 + driver_mem = str(int(mem_mb * req_percentage)) + 'm' + else: + hookenv.log("driver_memory percentage in non-local mode. Using 1g default.", + level=None) + else: + driver_mem = req_driver_mem + + if req_executor_mem.endswith('%'): + if mode == 'standalone' or mode.startswith('local'): + mem_mb = host.get_total_ram() / 1024 / 1024 + req_percentage = float(req_executor_mem.strip('%')) / 100 + executor_mem = str(int(mem_mb * req_percentage)) + 'm' + else: + hookenv.log("executor_memory percentage in non-local mode. Using 1g default.", + level=None) + else: + executor_mem = req_executor_mem + + # Setup hosts dict hosts = { 'spark': master_ip, } - - dc = self.dist_config - events_log_dir = 'file://{}'.format(dc.path('spark_events')) if 'namenode' in available_hosts: hosts['namenode'] = available_hosts['namenode'] events_log_dir = self.setup_hdfs_logs() - + else: + # Bigtop includes a default hadoop_head_node if we do not specify + # any namenode info. To ensure spark standalone doesn't get + # invalid hadoop config, set our NN to an empty string. + hosts['namenode'] = '' if 'resourcemanager' in available_hosts: hosts['resourcemanager'] = available_hosts['resourcemanager'] - roles = self.get_roles() + # Setup roles dict. We always include the history server and client. + # Determine other roles based on our execution mode. + roles = ['spark-history-server', 'spark-client'] + if mode == 'standalone': + roles.append('spark-master') + roles.append('spark-worker') + elif mode.startswith('yarn'): + roles.append('spark-on-yarn') + roles.append('spark-yarn-slave') + # Setup overrides dict override = { - 'spark::common::master_url': self.get_master_url(master_ip), + 'spark::common::master_url': master_url, 'spark::common::event_log_dir': events_log_dir, 'spark::common::history_log_dir': events_log_dir, + 'spark::common::extra_lib_dirs': + ':'.join(extra_libs) if extra_libs else None, + 'spark::common::driver_mem': driver_mem, + 'spark::common::executor_mem': executor_mem, } - if zk_units: zks = [] for unit in zk_units: @@ -220,18 +255,14 @@ class Spark(object): zk_connect = ",".join(zks) override['spark::common::zookeeper_connection_string'] = zk_connect else: - override['spark::common::zookeeper_connection_string'] = "" + override['spark::common::zookeeper_connection_string'] = None + # Create our site.yaml and trigger puppet bigtop = Bigtop() bigtop.render_site_yaml(hosts, roles, override) bigtop.trigger_puppet() - # There is a race condition here. - # The work role will not start the first time we trigger puppet apply. - # The exception in /var/logs/spark: - # Exception in thread "main" org.apache.spark.SparkException: Invalid master URL: spark://:7077 - # The master url is not set at the time the worker start the first time. - # TODO(kjackal): ...do the needed... (investiate,debug,submit patch) - bigtop.trigger_puppet() + + # Do this after our puppet bits in case puppet overrides needed perms if 'namenode' not in available_hosts: # Local event dir (not in HDFS) needs to be 777 so non-spark # users can write job history there. It needs to be g+s so @@ -239,22 +270,21 @@ class Spark(object): # It needs to be +t so users cannot remove files they don't own. dc.path('spark_events').chmod(0o3777) - self.patch_worker_master_url(master_ip) + self.patch_worker_master_url(master_ip, master_url) + # Install SB (subsequent calls will reconfigure existing install) # SparkBench looks for the spark master in /etc/environment with utils.environment_edit_in_place('/etc/environment') as env: - env['MASTER'] = self.get_master_url(master_ip) - # Install SB (subsequent calls will reconfigure existing install) + env['MASTER'] = master_url self.install_benchmark() - def patch_worker_master_url(self, master_ip): + def patch_worker_master_url(self, master_ip, master_url): ''' Patch the worker startup script to use the full master url istead of contracting it. The master url is placed in the spark-env.sh so that the startup script will use it. In HA mode the master_ip is set to be the local_ip instead of the one the leader elects. This requires a restart of the master service. ''' - master_url = self.get_master_url(master_ip) zk_units = unitdata.kv().get('zookeeper.units', []) if master_url.startswith('spark://'): if zk_units: @@ -268,8 +298,6 @@ class Spark(object): self.inplace_change('/etc/init.d/spark-worker', 'spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT', '$SPARK_MASTER_URL') - host.service_restart('spark-master') - host.service_restart('spark-worker') def inplace_change(self, filename, old_string, new_string): # Safely read the input filename using 'with' @@ -294,32 +322,50 @@ class Spark(object): Path(demo_target).chown('ubuntu', 'hadoop') def start(self): - if unitdata.kv().get('spark.uprading', False): - return - - # stop services (if they're running) to pick up any config change - self.stop() - # always start the history server, start master/worker if we're standalone + ''' + Always start the Spark History Server. Start other services as + required by our execution mode. Open related ports as appropriate. + ''' host.service_start('spark-history-server') + hookenv.open_port(self.dist_config.port('spark-history-ui')) + + # Spark master/worker is only started in standalone mode if hookenv.config()['spark_execution_mode'] == 'standalone': - host.service_start('spark-master') - host.service_start('spark-worker') + if host.service_start('spark-master'): + hookenv.log("Spark Master started") + hookenv.open_port(self.dist_config.port('spark-master-ui')) + # If the master started and we have peers, wait 2m for recovery + # before starting the worker. This ensures the worker binds + # to the correct master. + if unitdata.kv().get('sparkpeer.units'): + hookenv.status_set('maintenance', + 'waiting for spark master recovery') + hookenv.log("Waiting 2m to ensure spark master is ALIVE") + time.sleep(120) + else: + hookenv.log("Spark Master did not start; this is normal " + "for non-leader units in standalone mode") + + # NB: Start the worker even if the master process on this unit + # fails to start. In non-HA mode, spark master only runs on the + # leader. On non-leader units, we still want a worker bound to + # the leader. + if host.service_start('spark-worker'): + hookenv.log("Spark Worker started") + hookenv.open_port(self.dist_config.port('spark-worker-ui')) + else: + hookenv.log("Spark Worker did not start") def stop(self): - if not unitdata.kv().get('spark.installed', False): - return - # Only stop services if they're running - if utils.jps("HistoryServer"): - host.service_stop('spark-history-server') - if utils.jps("Master"): - host.service_stop('spark-master') - if utils.jps("Worker"): - host.service_stop('spark-worker') - - def open_ports(self): - for port in self.dist_config.exposed_ports('spark'): - hookenv.open_port(port) - - def close_ports(self): - for port in self.dist_config.exposed_ports('spark'): - hookenv.close_port(port) + ''' + Stop all services (and close associated ports). Stopping a service + that is not currently running does no harm. + ''' + host.service_stop('spark-history-server') + hookenv.close_port(self.dist_config.port('spark-history-ui')) + + # Stop the worker before the master + host.service_stop('spark-worker') + hookenv.close_port(self.dist_config.port('spark-worker-ui')) + host.service_stop('spark-master') + hookenv.close_port(self.dist_config.port('spark-master-ui')) |