diff options
author | Kevin W Monroe <kevin.monroe@canonical.com> | 2017-07-05 20:30:06 +0000 |
---|---|---|
committer | Kevin W Monroe <kevin.monroe@canonical.com> | 2017-07-07 16:39:38 -0500 |
commit | 63c75d4f1c2e7973150ee067ecebcf008a669c50 (patch) | |
tree | 4fc867f1a50301535acb2ee5659dd24cd39d103c | |
parent | 6f7e97e93393d7c078d8928cc666b2bb03147f32 (diff) |
BIGTOP-2834: spark charm: refactor for restricted networks; lib cleanup
Closes #246
7 files changed, 145 insertions, 93 deletions
diff --git a/bigtop-packages/src/charm/spark/layer-spark/README.md b/bigtop-packages/src/charm/spark/layer-spark/README.md index f7feaab7..3f65bfc6 100644 --- a/bigtop-packages/src/charm/spark/layer-spark/README.md +++ b/bigtop-packages/src/charm/spark/layer-spark/README.md @@ -238,9 +238,19 @@ http://developer.download.nvidia.com/. Ensure appropriate proxies are configured if needed. ## spark_bench_enabled -Install the SparkBench benchmarking suite. If `true` (the default), this charm -will download spark bench from the URL specified by `spark_bench_ppc64le` -or `spark_bench_x86_64`, depending on the unit's architecture. +Controls the installation of the [Spark-Bench][] benchmarking suite. When set +to `true`, this charm will download and install Spark-Bench from the URL +specified by the `spark_bench_url` config option. When set to `false` +(the default), Spark-Bench will not be installed on the unit, though any data +stored in `hdfs:///user/ubuntu/spark-bench` from previous installations will +be preserved. + +> **Note**: Spark-Bench has not been verified to work with Spark 2.1.x. + +> **Note**: This option requires external network access to the configured +Spark-Bench URL. Ensure appropriate proxies are configured if needed. + +[Spark-Bench]: https://github.com/SparkTC/spark-bench ## spark_execution_mode Spark has four modes of execution: local, standalone, yarn-client, and diff --git a/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank b/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank index 2650e74a..30131e2b 100755 --- a/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank +++ b/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank @@ -48,15 +48,16 @@ def main(): if not result_dir.exists(): result_dir.mkdir() result_dir.chown('ubuntu', 'ubuntu') - hookenv.log("values: {} {}".format(num_iter, result_log)) + hookenv.log("pagerank ({} iteration) log: {}".format(num_iter, result_log)) - sample = "/home/ubuntu/SparkBench/PageRank/web-Google.txt" - if not os.path.isfile(sample): + sample_dir = "/home/ubuntu/sample-data" + sample_pr = "/home/ubuntu/sample-data/pagerank/web-Google.txt" + if not os.path.isfile(sample_pr): msg = 'Could not find pagerank sample data' - fail('{}: {}'.format(msg, sample)) + fail('{}: {}'.format(msg, sample_pr)) - # Benchmark input data is packed into our sparkbench.tgz, which makes - # it available on all spark units. In yarn mode, however, the nodemanagers + # Benchmark input data is packed into a sample-data resource, which makes + # it available on all spark units. In yarn mode, the nodemanagers # act as the spark workers and will not have access to this local data. # In yarn mode, copy our input data to hdfs so nodemanagers can access it. mode = hookenv.config()['spark_execution_mode'] @@ -64,13 +65,14 @@ def main(): if is_state('hadoop.hdfs.ready'): try: utils.run_as('ubuntu', - 'hdfs', 'dfs', '-put', '-f', sample, '/user/ubuntu', + 'hdfs', 'dfs', '-copyFromLocal', '-f', + sample_dir, '/user/ubuntu', capture_output=True) except subprocess.CalledProcessError as e: msg = 'Unable to copy pagerank sample data to hdfs' fail('{}: {}'.format(msg, e)) else: - sample = "/user/ubuntu/web-Google.txt" + sample_pr = "/user/ubuntu/sample-data/pagerank/web-Google.txt" else: msg = 'Spark is configured for yarn mode, but HDFS is not ready yet' fail(msg) @@ -97,7 +99,7 @@ def main(): '--class', 'org.apache.spark.examples.SparkPageRank', example_jar_path, - sample, + sample_pr, num_iter, ] diff --git a/bigtop-packages/src/charm/spark/layer-spark/config.yaml b/bigtop-packages/src/charm/spark/layer-spark/config.yaml index b923687e..dee9f007 100644 --- a/bigtop-packages/src/charm/spark/layer-spark/config.yaml +++ b/bigtop-packages/src/charm/spark/layer-spark/config.yaml @@ -15,27 +15,20 @@ options: of total system memory (e.g. 50%). spark_bench_enabled: type: boolean - default: true + default: false description: | - When set to 'true' (the default), this charm will download and - install the SparkBench benchmark suite from the configured URLs. - When set to 'false', SparkBench will be removed from the unit, - though any data stored in hdfs:///user/ubuntu/spark-bench will be - preserved. - spark_bench_ppc64le: + When set to 'true', this charm will download and install the + Spark-Bench benchmark suite from the configured URLs. When set to + 'false' (the default), Spark-Bench will not be installed on the + unit, though any data stored in hdfs:///user/ubuntu/spark-bench + from previous installations will be preserved. Note that + Spark-Bench has not been verified to work with Spark 2.1.x. + spark_bench_url: type: string default: 'https://s3.amazonaws.com/jujubigdata/ibm/noarch/SparkBench-2.0-20170403.tgz#sha256=709caec6667dd82e42de25eb8bcd5763ca894e99e5c83c97bdfcf62cb1aa00c8' description: | - URL (including hash) of a ppc64le tarball of SparkBench. By - default, this points to a pre-built SparkBench binary based on - sources in the upstream repository. This option is only valid when - 'spark_bench_enabled' is 'true'. - spark_bench_x86_64: - type: string - default: 'https://s3.amazonaws.com/jujubigdata/ibm/noarch/SparkBench-2.0-20170403.tgz#sha256=709caec6667dd82e42de25eb8bcd5763ca894e99e5c83c97bdfcf62cb1aa00c8' - description: | - URL (including hash) of an x86_64 tarball of SparkBench. By - default, this points to a pre-built SparkBench binary based on + URL (including hash) of a Spark-Bench tarball. By + default, this points to a pre-built Spark-Bench binary based on sources in the upstream repository. This option is only valid when 'spark_bench_enabled' is 'true'. spark_execution_mode: diff --git a/bigtop-packages/src/charm/spark/layer-spark/layer.yaml b/bigtop-packages/src/charm/spark/layer-spark/layer.yaml index 4ee763d3..c5cc6f46 100644 --- a/bigtop-packages/src/charm/spark/layer-spark/layer.yaml +++ b/bigtop-packages/src/charm/spark/layer-spark/layer.yaml @@ -14,11 +14,14 @@ options: - 'bc' - 'libgfortran3' hadoop-client: - groups: - - 'hadoop' dirs: + # Dirs are not configurable by Bigtop puppet, but we still set config + # based on them to avoid hard coding paths; specify dirs that align with + # Bigtop defaults here. spark_events: path: '/var/log/spark/apps' + spark_home: + path: '/usr/lib/spark' ports: # Ports that need to be exposed, overridden, or manually specified. # Only expose ports serving a UI or external API (e.g. spark history diff --git a/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py b/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py index 91fcbf79..f2142d4e 100644 --- a/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py +++ b/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py @@ -21,6 +21,7 @@ from charms.layer.apache_bigtop_base import Bigtop from charms import layer from charmhelpers.core import hookenv, host, unitdata from charmhelpers.fetch.archiveurl import ArchiveUrlFetchHandler +from charmhelpers.payload import archive class Spark(object): @@ -49,13 +50,13 @@ class Spark(object): master = 'spark://{}'.format(nodes_str) return master - def install_benchmark(self): + def configure_sparkbench(self): """ - Install and configure SparkBench. + Install/configure/remove Spark-Bench based on user config. If config[spark_bench_enabled], fetch, install, and configure - SparkBench on initial invocation. Subsequent invocations will skip the - fetch/install, but will reconfigure SparkBench since we may need to + Spark-Bench on initial invocation. Subsequent invocations will skip the + fetch/install, but will reconfigure Spark-Bench since we may need to adjust the data dir (eg: benchmark data is stored in hdfs when spark is in yarn mode; locally in all other execution modes). """ @@ -65,11 +66,7 @@ class Spark(object): # Fetch/install on our first go-round, then set unit data so we # don't reinstall every time this function is called. if not unitdata.kv().get('spark_bench.installed', False): - if utils.cpu_arch() == 'ppc64le': - sb_url = hookenv.config()['spark_bench_ppc64le'] - else: - # TODO: may need more arch cases (go with x86 sb for now) - sb_url = hookenv.config()['spark_bench_x86_64'] + sb_url = hookenv.config()['spark_bench_url'] Path(sb_dir).rmtree_p() au = ArchiveUrlFetchHandler() @@ -141,22 +138,81 @@ class Spark(object): unitdata.kv().set('spark_bench.installed', False) unitdata.kv().flush(True) - def setup(self): - self.dist_config.add_users() - self.dist_config.add_dirs() - self.install_demo() + def configure_examples(self): + """ + Install sparkpi.sh and sample data to /home/ubuntu. + + The sparkpi.sh script demonstrates spark-submit with the SparkPi class + included with Spark. This small script is packed into the spark charm + source in the ./scripts subdirectory. + + The sample data is used for benchmarks (only PageRank for now). This + may grow quite large in the future, so we utilize Juju Resources for + getting this data onto the unit. Sample data originated as follows: + + - PageRank: https://snap.stanford.edu/data/web-Google.html + """ + # Handle sparkpi.sh + script_source = 'scripts/sparkpi.sh' + script_path = Path(script_source) + if script_path.exists(): + script_target = '/home/ubuntu/sparkpi.sh' + new_hash = host.file_hash(script_source) + old_hash = unitdata.kv().get('sparkpi.hash') + if new_hash != old_hash: + hookenv.log('Installing SparkPi script') + script_path.copy(script_target) + Path(script_target).chmod(0o755) + Path(script_target).chown('ubuntu', 'hadoop') + unitdata.kv().set('sparkpi.hash', new_hash) + hookenv.log('SparkPi script was installed successfully') + + # Handle sample data + sample_source = hookenv.resource_get('sample-data') + sample_path = sample_source and Path(sample_source) + if sample_path and sample_path.exists() and sample_path.stat().st_size: + sample_target = '/home/ubuntu' + new_hash = host.file_hash(sample_source) + old_hash = unitdata.kv().get('sample-data.hash') + if new_hash != old_hash: + hookenv.log('Extracting Spark sample data') + # Extract the sample data; since sample data does not impact + # functionality, log any extraction error but don't fail. + try: + archive.extract(sample_path, destpath=sample_target) + except Exception: + hookenv.log('Unable to extract Spark sample data: {}' + .format(sample_path)) + else: + unitdata.kv().set('sample-data.hash', new_hash) + hookenv.log('Spark sample data was extracted successfully') + + def configure_events_dir(self, mode): + """ + Create directory for spark event data. + + This directory is used by workers to store event data. It is also read + by the history server when displaying event information. - def setup_hdfs_logs(self): - # Create hdfs storage space for history server and return the name - # of the created directory. + :param string mode: Spark execution mode to determine the dir location. + """ dc = self.dist_config - events_dir = dc.path('spark_events') - events_dir = 'hdfs://{}'.format(events_dir) - utils.run_as('hdfs', 'hdfs', 'dfs', '-mkdir', '-p', events_dir) - utils.run_as('hdfs', 'hdfs', 'dfs', '-chmod', '1777', events_dir) - utils.run_as('hdfs', 'hdfs', 'dfs', '-chown', '-R', 'ubuntu:spark', - events_dir) - return events_dir + + # Directory needs to be 777 so non-spark users can write job history + # there. It needs to be g+s (HDFS is g+s by default) so all entries + # are readable by spark (in the spark group). It needs to be +t so + # users cannot remove files they don't own. + if mode.startswith('yarn'): + events_dir = 'hdfs://{}'.format(dc.path('spark_events')) + utils.run_as('hdfs', 'hdfs', 'dfs', '-mkdir', '-p', events_dir) + utils.run_as('hdfs', 'hdfs', 'dfs', '-chmod', '1777', events_dir) + utils.run_as('hdfs', 'hdfs', 'dfs', '-chown', '-R', 'ubuntu:spark', + events_dir) + else: + events_dir = dc.path('spark_events') + events_dir.makedirs_p() + events_dir.chmod(0o3777) + host.chownr(events_dir, 'ubuntu', 'spark', chowntopdir=True) def configure(self, available_hosts, zk_units, peers, extra_libs): """ @@ -167,11 +223,6 @@ class Spark(object): :param list peers: List of Spark peer tuples (unit name, IP). :param list extra_libs: List of extra lib paths for driver/executors. """ - # Bootstrap spark - if not unitdata.kv().get('spark.bootstrapped', False): - self.setup() - unitdata.kv().set('spark.bootstrapped', True) - # Set KV based on connected applications unitdata.kv().set('zookeeper.units', zk_units) unitdata.kv().set('sparkpeer.units', peers) @@ -179,12 +230,15 @@ class Spark(object): # Get our config ready dc = self.dist_config - events_log_dir = 'file://{}'.format(dc.path('spark_events')) mode = hookenv.config()['spark_execution_mode'] master_ip = utils.resolve_private_address(available_hosts['spark-master']) master_url = self.get_master_url(master_ip) req_driver_mem = hookenv.config()['driver_memory'] req_executor_mem = hookenv.config()['executor_memory'] + if mode.startswith('yarn'): + spark_events = 'hdfs://{}'.format(dc.path('spark_events')) + else: + spark_events = 'file://{}'.format(dc.path('spark_events')) # handle tuning options that may be set as percentages driver_mem = '1g' @@ -217,12 +271,6 @@ class Spark(object): } if 'namenode' in available_hosts: hosts['namenode'] = available_hosts['namenode'] - events_log_dir = self.setup_hdfs_logs() - else: - # Bigtop includes a default hadoop_head_node if we do not specify - # any namenode info. To ensure spark standalone doesn't get - # invalid hadoop config, set our NN to an empty string. - hosts['namenode'] = '' if 'resourcemanager' in available_hosts: hosts['resourcemanager'] = available_hosts['resourcemanager'] @@ -239,8 +287,8 @@ class Spark(object): # Setup overrides dict override = { 'spark::common::master_url': master_url, - 'spark::common::event_log_dir': events_log_dir, - 'spark::common::history_log_dir': events_log_dir, + 'spark::common::event_log_dir': spark_events, + 'spark::common::history_log_dir': spark_events, 'spark::common::extra_lib_dirs': ':'.join(extra_libs) if extra_libs else None, 'spark::common::driver_mem': driver_mem, @@ -261,22 +309,20 @@ class Spark(object): bigtop = Bigtop() bigtop.render_site_yaml(hosts, roles, override) bigtop.trigger_puppet() - - # Do this after our puppet bits in case puppet overrides needed perms - if 'namenode' not in available_hosts: - # Local event dir (not in HDFS) needs to be 777 so non-spark - # users can write job history there. It needs to be g+s so - # all entries will be readable by spark (in the spark group). - # It needs to be +t so users cannot remove files they don't own. - dc.path('spark_events').chmod(0o3777) - self.patch_worker_master_url(master_ip, master_url) - # Install SB (subsequent calls will reconfigure existing install) - # SparkBench looks for the spark master in /etc/environment + # Packages don't create the event dir out of the box. Do it now. + self.configure_events_dir(mode) + + # Some spark applications look for envars in /etc/environment with utils.environment_edit_in_place('/etc/environment') as env: env['MASTER'] = master_url - self.install_benchmark() + env['SPARK_HOME'] = dc.path('spark_home') + + # Handle examples and Spark-Bench. Do this each time this method is + # called in case we need to act on a new resource or user config. + self.configure_examples() + self.configure_sparkbench() def patch_worker_master_url(self, master_ip, master_url): ''' @@ -311,16 +357,6 @@ class Spark(object): s = s.replace(old_string, new_string) f.write(s) - def install_demo(self): - ''' - Install sparkpi.sh to /home/ubuntu (executes SparkPI example app) - ''' - demo_source = 'scripts/sparkpi.sh' - demo_target = '/home/ubuntu/sparkpi.sh' - Path(demo_source).copy(demo_target) - Path(demo_target).chmod(0o755) - Path(demo_target).chown('ubuntu', 'hadoop') - def start(self): ''' Always start the Spark History Server. Start other services as diff --git a/bigtop-packages/src/charm/spark/layer-spark/metadata.yaml b/bigtop-packages/src/charm/spark/layer-spark/metadata.yaml index 06664290..de6b217e 100644 --- a/bigtop-packages/src/charm/spark/layer-spark/metadata.yaml +++ b/bigtop-packages/src/charm/spark/layer-spark/metadata.yaml @@ -6,6 +6,11 @@ description: > This charm provides version 2.1.0 of the Spark application from Apache Bigtop. tags: ["analytics"] +resources: + sample-data: + description: A zip archive of sample data required by Spark benchmarks. + type: file + filename: sample-data.zip provides: benchmark: interface: benchmark diff --git a/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py b/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py index fc74fa10..c9328ab5 100644 --- a/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py +++ b/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py @@ -17,7 +17,7 @@ import time from charms.reactive import RelationBase, when, when_not, is_state, set_state, remove_state, when_any from charms.layer.apache_bigtop_base import get_fqdn, get_package_version from charms.layer.bigtop_spark import Spark -from charmhelpers.core import hookenv +from charmhelpers.core import hookenv, host from charms import leadership from charms.reactive.helpers import data_changed from jujubigdata import utils @@ -164,12 +164,15 @@ def reinstall_spark(): # peers are only used to set our MASTER_URL in standalone HA mode peers = get_spark_peers() + # Construct a deployment matrix + sample_data = hookenv.resource_get('sample-data') deployment_matrix = { + 'hdfs_ready': is_state('hadoop.hdfs.ready'), + 'peers': peers, + 'sample_data': host.file_hash(sample_data) if sample_data else None, 'spark_master': spark_master_host, 'yarn_ready': is_state('hadoop.yarn.ready'), - 'hdfs_ready': is_state('hadoop.hdfs.ready'), 'zookeepers': zks, - 'peers': peers, } # If neither config nor our matrix is changing, there is nothing to do. |