diff options
Diffstat (limited to 'bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py')
-rw-r--r-- | bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py | 154 |
1 files changed, 95 insertions, 59 deletions
diff --git a/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py b/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py index 91fcbf79..f2142d4e 100644 --- a/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py +++ b/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py @@ -21,6 +21,7 @@ from charms.layer.apache_bigtop_base import Bigtop from charms import layer from charmhelpers.core import hookenv, host, unitdata from charmhelpers.fetch.archiveurl import ArchiveUrlFetchHandler +from charmhelpers.payload import archive class Spark(object): @@ -49,13 +50,13 @@ class Spark(object): master = 'spark://{}'.format(nodes_str) return master - def install_benchmark(self): + def configure_sparkbench(self): """ - Install and configure SparkBench. + Install/configure/remove Spark-Bench based on user config. If config[spark_bench_enabled], fetch, install, and configure - SparkBench on initial invocation. Subsequent invocations will skip the - fetch/install, but will reconfigure SparkBench since we may need to + Spark-Bench on initial invocation. Subsequent invocations will skip the + fetch/install, but will reconfigure Spark-Bench since we may need to adjust the data dir (eg: benchmark data is stored in hdfs when spark is in yarn mode; locally in all other execution modes). """ @@ -65,11 +66,7 @@ class Spark(object): # Fetch/install on our first go-round, then set unit data so we # don't reinstall every time this function is called. if not unitdata.kv().get('spark_bench.installed', False): - if utils.cpu_arch() == 'ppc64le': - sb_url = hookenv.config()['spark_bench_ppc64le'] - else: - # TODO: may need more arch cases (go with x86 sb for now) - sb_url = hookenv.config()['spark_bench_x86_64'] + sb_url = hookenv.config()['spark_bench_url'] Path(sb_dir).rmtree_p() au = ArchiveUrlFetchHandler() @@ -141,22 +138,81 @@ class Spark(object): unitdata.kv().set('spark_bench.installed', False) unitdata.kv().flush(True) - def setup(self): - self.dist_config.add_users() - self.dist_config.add_dirs() - self.install_demo() + def configure_examples(self): + """ + Install sparkpi.sh and sample data to /home/ubuntu. + + The sparkpi.sh script demonstrates spark-submit with the SparkPi class + included with Spark. This small script is packed into the spark charm + source in the ./scripts subdirectory. + + The sample data is used for benchmarks (only PageRank for now). This + may grow quite large in the future, so we utilize Juju Resources for + getting this data onto the unit. Sample data originated as follows: + + - PageRank: https://snap.stanford.edu/data/web-Google.html + """ + # Handle sparkpi.sh + script_source = 'scripts/sparkpi.sh' + script_path = Path(script_source) + if script_path.exists(): + script_target = '/home/ubuntu/sparkpi.sh' + new_hash = host.file_hash(script_source) + old_hash = unitdata.kv().get('sparkpi.hash') + if new_hash != old_hash: + hookenv.log('Installing SparkPi script') + script_path.copy(script_target) + Path(script_target).chmod(0o755) + Path(script_target).chown('ubuntu', 'hadoop') + unitdata.kv().set('sparkpi.hash', new_hash) + hookenv.log('SparkPi script was installed successfully') + + # Handle sample data + sample_source = hookenv.resource_get('sample-data') + sample_path = sample_source and Path(sample_source) + if sample_path and sample_path.exists() and sample_path.stat().st_size: + sample_target = '/home/ubuntu' + new_hash = host.file_hash(sample_source) + old_hash = unitdata.kv().get('sample-data.hash') + if new_hash != old_hash: + hookenv.log('Extracting Spark sample data') + # Extract the sample data; since sample data does not impact + # functionality, log any extraction error but don't fail. + try: + archive.extract(sample_path, destpath=sample_target) + except Exception: + hookenv.log('Unable to extract Spark sample data: {}' + .format(sample_path)) + else: + unitdata.kv().set('sample-data.hash', new_hash) + hookenv.log('Spark sample data was extracted successfully') + + def configure_events_dir(self, mode): + """ + Create directory for spark event data. + + This directory is used by workers to store event data. It is also read + by the history server when displaying event information. - def setup_hdfs_logs(self): - # Create hdfs storage space for history server and return the name - # of the created directory. + :param string mode: Spark execution mode to determine the dir location. + """ dc = self.dist_config - events_dir = dc.path('spark_events') - events_dir = 'hdfs://{}'.format(events_dir) - utils.run_as('hdfs', 'hdfs', 'dfs', '-mkdir', '-p', events_dir) - utils.run_as('hdfs', 'hdfs', 'dfs', '-chmod', '1777', events_dir) - utils.run_as('hdfs', 'hdfs', 'dfs', '-chown', '-R', 'ubuntu:spark', - events_dir) - return events_dir + + # Directory needs to be 777 so non-spark users can write job history + # there. It needs to be g+s (HDFS is g+s by default) so all entries + # are readable by spark (in the spark group). It needs to be +t so + # users cannot remove files they don't own. + if mode.startswith('yarn'): + events_dir = 'hdfs://{}'.format(dc.path('spark_events')) + utils.run_as('hdfs', 'hdfs', 'dfs', '-mkdir', '-p', events_dir) + utils.run_as('hdfs', 'hdfs', 'dfs', '-chmod', '1777', events_dir) + utils.run_as('hdfs', 'hdfs', 'dfs', '-chown', '-R', 'ubuntu:spark', + events_dir) + else: + events_dir = dc.path('spark_events') + events_dir.makedirs_p() + events_dir.chmod(0o3777) + host.chownr(events_dir, 'ubuntu', 'spark', chowntopdir=True) def configure(self, available_hosts, zk_units, peers, extra_libs): """ @@ -167,11 +223,6 @@ class Spark(object): :param list peers: List of Spark peer tuples (unit name, IP). :param list extra_libs: List of extra lib paths for driver/executors. """ - # Bootstrap spark - if not unitdata.kv().get('spark.bootstrapped', False): - self.setup() - unitdata.kv().set('spark.bootstrapped', True) - # Set KV based on connected applications unitdata.kv().set('zookeeper.units', zk_units) unitdata.kv().set('sparkpeer.units', peers) @@ -179,12 +230,15 @@ class Spark(object): # Get our config ready dc = self.dist_config - events_log_dir = 'file://{}'.format(dc.path('spark_events')) mode = hookenv.config()['spark_execution_mode'] master_ip = utils.resolve_private_address(available_hosts['spark-master']) master_url = self.get_master_url(master_ip) req_driver_mem = hookenv.config()['driver_memory'] req_executor_mem = hookenv.config()['executor_memory'] + if mode.startswith('yarn'): + spark_events = 'hdfs://{}'.format(dc.path('spark_events')) + else: + spark_events = 'file://{}'.format(dc.path('spark_events')) # handle tuning options that may be set as percentages driver_mem = '1g' @@ -217,12 +271,6 @@ class Spark(object): } if 'namenode' in available_hosts: hosts['namenode'] = available_hosts['namenode'] - events_log_dir = self.setup_hdfs_logs() - else: - # Bigtop includes a default hadoop_head_node if we do not specify - # any namenode info. To ensure spark standalone doesn't get - # invalid hadoop config, set our NN to an empty string. - hosts['namenode'] = '' if 'resourcemanager' in available_hosts: hosts['resourcemanager'] = available_hosts['resourcemanager'] @@ -239,8 +287,8 @@ class Spark(object): # Setup overrides dict override = { 'spark::common::master_url': master_url, - 'spark::common::event_log_dir': events_log_dir, - 'spark::common::history_log_dir': events_log_dir, + 'spark::common::event_log_dir': spark_events, + 'spark::common::history_log_dir': spark_events, 'spark::common::extra_lib_dirs': ':'.join(extra_libs) if extra_libs else None, 'spark::common::driver_mem': driver_mem, @@ -261,22 +309,20 @@ class Spark(object): bigtop = Bigtop() bigtop.render_site_yaml(hosts, roles, override) bigtop.trigger_puppet() - - # Do this after our puppet bits in case puppet overrides needed perms - if 'namenode' not in available_hosts: - # Local event dir (not in HDFS) needs to be 777 so non-spark - # users can write job history there. It needs to be g+s so - # all entries will be readable by spark (in the spark group). - # It needs to be +t so users cannot remove files they don't own. - dc.path('spark_events').chmod(0o3777) - self.patch_worker_master_url(master_ip, master_url) - # Install SB (subsequent calls will reconfigure existing install) - # SparkBench looks for the spark master in /etc/environment + # Packages don't create the event dir out of the box. Do it now. + self.configure_events_dir(mode) + + # Some spark applications look for envars in /etc/environment with utils.environment_edit_in_place('/etc/environment') as env: env['MASTER'] = master_url - self.install_benchmark() + env['SPARK_HOME'] = dc.path('spark_home') + + # Handle examples and Spark-Bench. Do this each time this method is + # called in case we need to act on a new resource or user config. + self.configure_examples() + self.configure_sparkbench() def patch_worker_master_url(self, master_ip, master_url): ''' @@ -311,16 +357,6 @@ class Spark(object): s = s.replace(old_string, new_string) f.write(s) - def install_demo(self): - ''' - Install sparkpi.sh to /home/ubuntu (executes SparkPI example app) - ''' - demo_source = 'scripts/sparkpi.sh' - demo_target = '/home/ubuntu/sparkpi.sh' - Path(demo_source).copy(demo_target) - Path(demo_target).chmod(0o755) - Path(demo_target).chown('ubuntu', 'hadoop') - def start(self): ''' Always start the Spark History Server. Start other services as |