aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKevin W Monroe <kevin.monroe@canonical.com>2017-07-05 20:30:06 +0000
committerKevin W Monroe <kevin.monroe@canonical.com>2017-07-07 16:39:38 -0500
commit63c75d4f1c2e7973150ee067ecebcf008a669c50 (patch)
tree4fc867f1a50301535acb2ee5659dd24cd39d103c
parent6f7e97e93393d7c078d8928cc666b2bb03147f32 (diff)
BIGTOP-2834: spark charm: refactor for restricted networks; lib cleanup
Closes #246
-rw-r--r--bigtop-packages/src/charm/spark/layer-spark/README.md16
-rwxr-xr-xbigtop-packages/src/charm/spark/layer-spark/actions/pagerank20
-rw-r--r--bigtop-packages/src/charm/spark/layer-spark/config.yaml27
-rw-r--r--bigtop-packages/src/charm/spark/layer-spark/layer.yaml7
-rw-r--r--bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py154
-rw-r--r--bigtop-packages/src/charm/spark/layer-spark/metadata.yaml5
-rw-r--r--bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py9
7 files changed, 145 insertions, 93 deletions
diff --git a/bigtop-packages/src/charm/spark/layer-spark/README.md b/bigtop-packages/src/charm/spark/layer-spark/README.md
index f7feaab7..3f65bfc6 100644
--- a/bigtop-packages/src/charm/spark/layer-spark/README.md
+++ b/bigtop-packages/src/charm/spark/layer-spark/README.md
@@ -238,9 +238,19 @@ http://developer.download.nvidia.com/. Ensure appropriate proxies are
configured if needed.
## spark_bench_enabled
-Install the SparkBench benchmarking suite. If `true` (the default), this charm
-will download spark bench from the URL specified by `spark_bench_ppc64le`
-or `spark_bench_x86_64`, depending on the unit's architecture.
+Controls the installation of the [Spark-Bench][] benchmarking suite. When set
+to `true`, this charm will download and install Spark-Bench from the URL
+specified by the `spark_bench_url` config option. When set to `false`
+(the default), Spark-Bench will not be installed on the unit, though any data
+stored in `hdfs:///user/ubuntu/spark-bench` from previous installations will
+be preserved.
+
+> **Note**: Spark-Bench has not been verified to work with Spark 2.1.x.
+
+> **Note**: This option requires external network access to the configured
+Spark-Bench URL. Ensure appropriate proxies are configured if needed.
+
+[Spark-Bench]: https://github.com/SparkTC/spark-bench
## spark_execution_mode
Spark has four modes of execution: local, standalone, yarn-client, and
diff --git a/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank b/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank
index 2650e74a..30131e2b 100755
--- a/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank
+++ b/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank
@@ -48,15 +48,16 @@ def main():
if not result_dir.exists():
result_dir.mkdir()
result_dir.chown('ubuntu', 'ubuntu')
- hookenv.log("values: {} {}".format(num_iter, result_log))
+ hookenv.log("pagerank ({} iteration) log: {}".format(num_iter, result_log))
- sample = "/home/ubuntu/SparkBench/PageRank/web-Google.txt"
- if not os.path.isfile(sample):
+ sample_dir = "/home/ubuntu/sample-data"
+ sample_pr = "/home/ubuntu/sample-data/pagerank/web-Google.txt"
+ if not os.path.isfile(sample_pr):
msg = 'Could not find pagerank sample data'
- fail('{}: {}'.format(msg, sample))
+ fail('{}: {}'.format(msg, sample_pr))
- # Benchmark input data is packed into our sparkbench.tgz, which makes
- # it available on all spark units. In yarn mode, however, the nodemanagers
+ # Benchmark input data is packed into a sample-data resource, which makes
+ # it available on all spark units. In yarn mode, the nodemanagers
# act as the spark workers and will not have access to this local data.
# In yarn mode, copy our input data to hdfs so nodemanagers can access it.
mode = hookenv.config()['spark_execution_mode']
@@ -64,13 +65,14 @@ def main():
if is_state('hadoop.hdfs.ready'):
try:
utils.run_as('ubuntu',
- 'hdfs', 'dfs', '-put', '-f', sample, '/user/ubuntu',
+ 'hdfs', 'dfs', '-copyFromLocal', '-f',
+ sample_dir, '/user/ubuntu',
capture_output=True)
except subprocess.CalledProcessError as e:
msg = 'Unable to copy pagerank sample data to hdfs'
fail('{}: {}'.format(msg, e))
else:
- sample = "/user/ubuntu/web-Google.txt"
+ sample_pr = "/user/ubuntu/sample-data/pagerank/web-Google.txt"
else:
msg = 'Spark is configured for yarn mode, but HDFS is not ready yet'
fail(msg)
@@ -97,7 +99,7 @@ def main():
'--class',
'org.apache.spark.examples.SparkPageRank',
example_jar_path,
- sample,
+ sample_pr,
num_iter,
]
diff --git a/bigtop-packages/src/charm/spark/layer-spark/config.yaml b/bigtop-packages/src/charm/spark/layer-spark/config.yaml
index b923687e..dee9f007 100644
--- a/bigtop-packages/src/charm/spark/layer-spark/config.yaml
+++ b/bigtop-packages/src/charm/spark/layer-spark/config.yaml
@@ -15,27 +15,20 @@ options:
of total system memory (e.g. 50%).
spark_bench_enabled:
type: boolean
- default: true
+ default: false
description: |
- When set to 'true' (the default), this charm will download and
- install the SparkBench benchmark suite from the configured URLs.
- When set to 'false', SparkBench will be removed from the unit,
- though any data stored in hdfs:///user/ubuntu/spark-bench will be
- preserved.
- spark_bench_ppc64le:
+ When set to 'true', this charm will download and install the
+ Spark-Bench benchmark suite from the configured URLs. When set to
+ 'false' (the default), Spark-Bench will not be installed on the
+ unit, though any data stored in hdfs:///user/ubuntu/spark-bench
+ from previous installations will be preserved. Note that
+ Spark-Bench has not been verified to work with Spark 2.1.x.
+ spark_bench_url:
type: string
default: 'https://s3.amazonaws.com/jujubigdata/ibm/noarch/SparkBench-2.0-20170403.tgz#sha256=709caec6667dd82e42de25eb8bcd5763ca894e99e5c83c97bdfcf62cb1aa00c8'
description: |
- URL (including hash) of a ppc64le tarball of SparkBench. By
- default, this points to a pre-built SparkBench binary based on
- sources in the upstream repository. This option is only valid when
- 'spark_bench_enabled' is 'true'.
- spark_bench_x86_64:
- type: string
- default: 'https://s3.amazonaws.com/jujubigdata/ibm/noarch/SparkBench-2.0-20170403.tgz#sha256=709caec6667dd82e42de25eb8bcd5763ca894e99e5c83c97bdfcf62cb1aa00c8'
- description: |
- URL (including hash) of an x86_64 tarball of SparkBench. By
- default, this points to a pre-built SparkBench binary based on
+ URL (including hash) of a Spark-Bench tarball. By
+ default, this points to a pre-built Spark-Bench binary based on
sources in the upstream repository. This option is only valid when
'spark_bench_enabled' is 'true'.
spark_execution_mode:
diff --git a/bigtop-packages/src/charm/spark/layer-spark/layer.yaml b/bigtop-packages/src/charm/spark/layer-spark/layer.yaml
index 4ee763d3..c5cc6f46 100644
--- a/bigtop-packages/src/charm/spark/layer-spark/layer.yaml
+++ b/bigtop-packages/src/charm/spark/layer-spark/layer.yaml
@@ -14,11 +14,14 @@ options:
- 'bc'
- 'libgfortran3'
hadoop-client:
- groups:
- - 'hadoop'
dirs:
+ # Dirs are not configurable by Bigtop puppet, but we still set config
+ # based on them to avoid hard coding paths; specify dirs that align with
+ # Bigtop defaults here.
spark_events:
path: '/var/log/spark/apps'
+ spark_home:
+ path: '/usr/lib/spark'
ports:
# Ports that need to be exposed, overridden, or manually specified.
# Only expose ports serving a UI or external API (e.g. spark history
diff --git a/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py b/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
index 91fcbf79..f2142d4e 100644
--- a/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
+++ b/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
@@ -21,6 +21,7 @@ from charms.layer.apache_bigtop_base import Bigtop
from charms import layer
from charmhelpers.core import hookenv, host, unitdata
from charmhelpers.fetch.archiveurl import ArchiveUrlFetchHandler
+from charmhelpers.payload import archive
class Spark(object):
@@ -49,13 +50,13 @@ class Spark(object):
master = 'spark://{}'.format(nodes_str)
return master
- def install_benchmark(self):
+ def configure_sparkbench(self):
"""
- Install and configure SparkBench.
+ Install/configure/remove Spark-Bench based on user config.
If config[spark_bench_enabled], fetch, install, and configure
- SparkBench on initial invocation. Subsequent invocations will skip the
- fetch/install, but will reconfigure SparkBench since we may need to
+ Spark-Bench on initial invocation. Subsequent invocations will skip the
+ fetch/install, but will reconfigure Spark-Bench since we may need to
adjust the data dir (eg: benchmark data is stored in hdfs when spark
is in yarn mode; locally in all other execution modes).
"""
@@ -65,11 +66,7 @@ class Spark(object):
# Fetch/install on our first go-round, then set unit data so we
# don't reinstall every time this function is called.
if not unitdata.kv().get('spark_bench.installed', False):
- if utils.cpu_arch() == 'ppc64le':
- sb_url = hookenv.config()['spark_bench_ppc64le']
- else:
- # TODO: may need more arch cases (go with x86 sb for now)
- sb_url = hookenv.config()['spark_bench_x86_64']
+ sb_url = hookenv.config()['spark_bench_url']
Path(sb_dir).rmtree_p()
au = ArchiveUrlFetchHandler()
@@ -141,22 +138,81 @@ class Spark(object):
unitdata.kv().set('spark_bench.installed', False)
unitdata.kv().flush(True)
- def setup(self):
- self.dist_config.add_users()
- self.dist_config.add_dirs()
- self.install_demo()
+ def configure_examples(self):
+ """
+ Install sparkpi.sh and sample data to /home/ubuntu.
+
+ The sparkpi.sh script demonstrates spark-submit with the SparkPi class
+ included with Spark. This small script is packed into the spark charm
+ source in the ./scripts subdirectory.
+
+ The sample data is used for benchmarks (only PageRank for now). This
+ may grow quite large in the future, so we utilize Juju Resources for
+ getting this data onto the unit. Sample data originated as follows:
+
+ - PageRank: https://snap.stanford.edu/data/web-Google.html
+ """
+ # Handle sparkpi.sh
+ script_source = 'scripts/sparkpi.sh'
+ script_path = Path(script_source)
+ if script_path.exists():
+ script_target = '/home/ubuntu/sparkpi.sh'
+ new_hash = host.file_hash(script_source)
+ old_hash = unitdata.kv().get('sparkpi.hash')
+ if new_hash != old_hash:
+ hookenv.log('Installing SparkPi script')
+ script_path.copy(script_target)
+ Path(script_target).chmod(0o755)
+ Path(script_target).chown('ubuntu', 'hadoop')
+ unitdata.kv().set('sparkpi.hash', new_hash)
+ hookenv.log('SparkPi script was installed successfully')
+
+ # Handle sample data
+ sample_source = hookenv.resource_get('sample-data')
+ sample_path = sample_source and Path(sample_source)
+ if sample_path and sample_path.exists() and sample_path.stat().st_size:
+ sample_target = '/home/ubuntu'
+ new_hash = host.file_hash(sample_source)
+ old_hash = unitdata.kv().get('sample-data.hash')
+ if new_hash != old_hash:
+ hookenv.log('Extracting Spark sample data')
+ # Extract the sample data; since sample data does not impact
+ # functionality, log any extraction error but don't fail.
+ try:
+ archive.extract(sample_path, destpath=sample_target)
+ except Exception:
+ hookenv.log('Unable to extract Spark sample data: {}'
+ .format(sample_path))
+ else:
+ unitdata.kv().set('sample-data.hash', new_hash)
+ hookenv.log('Spark sample data was extracted successfully')
+
+ def configure_events_dir(self, mode):
+ """
+ Create directory for spark event data.
+
+ This directory is used by workers to store event data. It is also read
+ by the history server when displaying event information.
- def setup_hdfs_logs(self):
- # Create hdfs storage space for history server and return the name
- # of the created directory.
+ :param string mode: Spark execution mode to determine the dir location.
+ """
dc = self.dist_config
- events_dir = dc.path('spark_events')
- events_dir = 'hdfs://{}'.format(events_dir)
- utils.run_as('hdfs', 'hdfs', 'dfs', '-mkdir', '-p', events_dir)
- utils.run_as('hdfs', 'hdfs', 'dfs', '-chmod', '1777', events_dir)
- utils.run_as('hdfs', 'hdfs', 'dfs', '-chown', '-R', 'ubuntu:spark',
- events_dir)
- return events_dir
+
+ # Directory needs to be 777 so non-spark users can write job history
+ # there. It needs to be g+s (HDFS is g+s by default) so all entries
+ # are readable by spark (in the spark group). It needs to be +t so
+ # users cannot remove files they don't own.
+ if mode.startswith('yarn'):
+ events_dir = 'hdfs://{}'.format(dc.path('spark_events'))
+ utils.run_as('hdfs', 'hdfs', 'dfs', '-mkdir', '-p', events_dir)
+ utils.run_as('hdfs', 'hdfs', 'dfs', '-chmod', '1777', events_dir)
+ utils.run_as('hdfs', 'hdfs', 'dfs', '-chown', '-R', 'ubuntu:spark',
+ events_dir)
+ else:
+ events_dir = dc.path('spark_events')
+ events_dir.makedirs_p()
+ events_dir.chmod(0o3777)
+ host.chownr(events_dir, 'ubuntu', 'spark', chowntopdir=True)
def configure(self, available_hosts, zk_units, peers, extra_libs):
"""
@@ -167,11 +223,6 @@ class Spark(object):
:param list peers: List of Spark peer tuples (unit name, IP).
:param list extra_libs: List of extra lib paths for driver/executors.
"""
- # Bootstrap spark
- if not unitdata.kv().get('spark.bootstrapped', False):
- self.setup()
- unitdata.kv().set('spark.bootstrapped', True)
-
# Set KV based on connected applications
unitdata.kv().set('zookeeper.units', zk_units)
unitdata.kv().set('sparkpeer.units', peers)
@@ -179,12 +230,15 @@ class Spark(object):
# Get our config ready
dc = self.dist_config
- events_log_dir = 'file://{}'.format(dc.path('spark_events'))
mode = hookenv.config()['spark_execution_mode']
master_ip = utils.resolve_private_address(available_hosts['spark-master'])
master_url = self.get_master_url(master_ip)
req_driver_mem = hookenv.config()['driver_memory']
req_executor_mem = hookenv.config()['executor_memory']
+ if mode.startswith('yarn'):
+ spark_events = 'hdfs://{}'.format(dc.path('spark_events'))
+ else:
+ spark_events = 'file://{}'.format(dc.path('spark_events'))
# handle tuning options that may be set as percentages
driver_mem = '1g'
@@ -217,12 +271,6 @@ class Spark(object):
}
if 'namenode' in available_hosts:
hosts['namenode'] = available_hosts['namenode']
- events_log_dir = self.setup_hdfs_logs()
- else:
- # Bigtop includes a default hadoop_head_node if we do not specify
- # any namenode info. To ensure spark standalone doesn't get
- # invalid hadoop config, set our NN to an empty string.
- hosts['namenode'] = ''
if 'resourcemanager' in available_hosts:
hosts['resourcemanager'] = available_hosts['resourcemanager']
@@ -239,8 +287,8 @@ class Spark(object):
# Setup overrides dict
override = {
'spark::common::master_url': master_url,
- 'spark::common::event_log_dir': events_log_dir,
- 'spark::common::history_log_dir': events_log_dir,
+ 'spark::common::event_log_dir': spark_events,
+ 'spark::common::history_log_dir': spark_events,
'spark::common::extra_lib_dirs':
':'.join(extra_libs) if extra_libs else None,
'spark::common::driver_mem': driver_mem,
@@ -261,22 +309,20 @@ class Spark(object):
bigtop = Bigtop()
bigtop.render_site_yaml(hosts, roles, override)
bigtop.trigger_puppet()
-
- # Do this after our puppet bits in case puppet overrides needed perms
- if 'namenode' not in available_hosts:
- # Local event dir (not in HDFS) needs to be 777 so non-spark
- # users can write job history there. It needs to be g+s so
- # all entries will be readable by spark (in the spark group).
- # It needs to be +t so users cannot remove files they don't own.
- dc.path('spark_events').chmod(0o3777)
-
self.patch_worker_master_url(master_ip, master_url)
- # Install SB (subsequent calls will reconfigure existing install)
- # SparkBench looks for the spark master in /etc/environment
+ # Packages don't create the event dir out of the box. Do it now.
+ self.configure_events_dir(mode)
+
+ # Some spark applications look for envars in /etc/environment
with utils.environment_edit_in_place('/etc/environment') as env:
env['MASTER'] = master_url
- self.install_benchmark()
+ env['SPARK_HOME'] = dc.path('spark_home')
+
+ # Handle examples and Spark-Bench. Do this each time this method is
+ # called in case we need to act on a new resource or user config.
+ self.configure_examples()
+ self.configure_sparkbench()
def patch_worker_master_url(self, master_ip, master_url):
'''
@@ -311,16 +357,6 @@ class Spark(object):
s = s.replace(old_string, new_string)
f.write(s)
- def install_demo(self):
- '''
- Install sparkpi.sh to /home/ubuntu (executes SparkPI example app)
- '''
- demo_source = 'scripts/sparkpi.sh'
- demo_target = '/home/ubuntu/sparkpi.sh'
- Path(demo_source).copy(demo_target)
- Path(demo_target).chmod(0o755)
- Path(demo_target).chown('ubuntu', 'hadoop')
-
def start(self):
'''
Always start the Spark History Server. Start other services as
diff --git a/bigtop-packages/src/charm/spark/layer-spark/metadata.yaml b/bigtop-packages/src/charm/spark/layer-spark/metadata.yaml
index 06664290..de6b217e 100644
--- a/bigtop-packages/src/charm/spark/layer-spark/metadata.yaml
+++ b/bigtop-packages/src/charm/spark/layer-spark/metadata.yaml
@@ -6,6 +6,11 @@ description: >
This charm provides version 2.1.0 of the Spark application from Apache Bigtop.
tags: ["analytics"]
+resources:
+ sample-data:
+ description: A zip archive of sample data required by Spark benchmarks.
+ type: file
+ filename: sample-data.zip
provides:
benchmark:
interface: benchmark
diff --git a/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py b/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py
index fc74fa10..c9328ab5 100644
--- a/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py
+++ b/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py
@@ -17,7 +17,7 @@ import time
from charms.reactive import RelationBase, when, when_not, is_state, set_state, remove_state, when_any
from charms.layer.apache_bigtop_base import get_fqdn, get_package_version
from charms.layer.bigtop_spark import Spark
-from charmhelpers.core import hookenv
+from charmhelpers.core import hookenv, host
from charms import leadership
from charms.reactive.helpers import data_changed
from jujubigdata import utils
@@ -164,12 +164,15 @@ def reinstall_spark():
# peers are only used to set our MASTER_URL in standalone HA mode
peers = get_spark_peers()
+ # Construct a deployment matrix
+ sample_data = hookenv.resource_get('sample-data')
deployment_matrix = {
+ 'hdfs_ready': is_state('hadoop.hdfs.ready'),
+ 'peers': peers,
+ 'sample_data': host.file_hash(sample_data) if sample_data else None,
'spark_master': spark_master_host,
'yarn_ready': is_state('hadoop.yarn.ready'),
- 'hdfs_ready': is_state('hadoop.hdfs.ready'),
'zookeepers': zks,
- 'peers': peers,
}
# If neither config nor our matrix is changing, there is nothing to do.