aboutsummaryrefslogtreecommitdiff
path: root/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
diff options
context:
space:
mode:
Diffstat (limited to 'bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py')
-rw-r--r--bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py154
1 files changed, 95 insertions, 59 deletions
diff --git a/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py b/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
index 91fcbf79..f2142d4e 100644
--- a/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
+++ b/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
@@ -21,6 +21,7 @@ from charms.layer.apache_bigtop_base import Bigtop
from charms import layer
from charmhelpers.core import hookenv, host, unitdata
from charmhelpers.fetch.archiveurl import ArchiveUrlFetchHandler
+from charmhelpers.payload import archive
class Spark(object):
@@ -49,13 +50,13 @@ class Spark(object):
master = 'spark://{}'.format(nodes_str)
return master
- def install_benchmark(self):
+ def configure_sparkbench(self):
"""
- Install and configure SparkBench.
+ Install/configure/remove Spark-Bench based on user config.
If config[spark_bench_enabled], fetch, install, and configure
- SparkBench on initial invocation. Subsequent invocations will skip the
- fetch/install, but will reconfigure SparkBench since we may need to
+ Spark-Bench on initial invocation. Subsequent invocations will skip the
+ fetch/install, but will reconfigure Spark-Bench since we may need to
adjust the data dir (eg: benchmark data is stored in hdfs when spark
is in yarn mode; locally in all other execution modes).
"""
@@ -65,11 +66,7 @@ class Spark(object):
# Fetch/install on our first go-round, then set unit data so we
# don't reinstall every time this function is called.
if not unitdata.kv().get('spark_bench.installed', False):
- if utils.cpu_arch() == 'ppc64le':
- sb_url = hookenv.config()['spark_bench_ppc64le']
- else:
- # TODO: may need more arch cases (go with x86 sb for now)
- sb_url = hookenv.config()['spark_bench_x86_64']
+ sb_url = hookenv.config()['spark_bench_url']
Path(sb_dir).rmtree_p()
au = ArchiveUrlFetchHandler()
@@ -141,22 +138,81 @@ class Spark(object):
unitdata.kv().set('spark_bench.installed', False)
unitdata.kv().flush(True)
- def setup(self):
- self.dist_config.add_users()
- self.dist_config.add_dirs()
- self.install_demo()
+ def configure_examples(self):
+ """
+ Install sparkpi.sh and sample data to /home/ubuntu.
+
+ The sparkpi.sh script demonstrates spark-submit with the SparkPi class
+ included with Spark. This small script is packed into the spark charm
+ source in the ./scripts subdirectory.
+
+ The sample data is used for benchmarks (only PageRank for now). This
+ may grow quite large in the future, so we utilize Juju Resources for
+ getting this data onto the unit. Sample data originated as follows:
+
+ - PageRank: https://snap.stanford.edu/data/web-Google.html
+ """
+ # Handle sparkpi.sh
+ script_source = 'scripts/sparkpi.sh'
+ script_path = Path(script_source)
+ if script_path.exists():
+ script_target = '/home/ubuntu/sparkpi.sh'
+ new_hash = host.file_hash(script_source)
+ old_hash = unitdata.kv().get('sparkpi.hash')
+ if new_hash != old_hash:
+ hookenv.log('Installing SparkPi script')
+ script_path.copy(script_target)
+ Path(script_target).chmod(0o755)
+ Path(script_target).chown('ubuntu', 'hadoop')
+ unitdata.kv().set('sparkpi.hash', new_hash)
+ hookenv.log('SparkPi script was installed successfully')
+
+ # Handle sample data
+ sample_source = hookenv.resource_get('sample-data')
+ sample_path = sample_source and Path(sample_source)
+ if sample_path and sample_path.exists() and sample_path.stat().st_size:
+ sample_target = '/home/ubuntu'
+ new_hash = host.file_hash(sample_source)
+ old_hash = unitdata.kv().get('sample-data.hash')
+ if new_hash != old_hash:
+ hookenv.log('Extracting Spark sample data')
+ # Extract the sample data; since sample data does not impact
+ # functionality, log any extraction error but don't fail.
+ try:
+ archive.extract(sample_path, destpath=sample_target)
+ except Exception:
+ hookenv.log('Unable to extract Spark sample data: {}'
+ .format(sample_path))
+ else:
+ unitdata.kv().set('sample-data.hash', new_hash)
+ hookenv.log('Spark sample data was extracted successfully')
+
+ def configure_events_dir(self, mode):
+ """
+ Create directory for spark event data.
+
+ This directory is used by workers to store event data. It is also read
+ by the history server when displaying event information.
- def setup_hdfs_logs(self):
- # Create hdfs storage space for history server and return the name
- # of the created directory.
+ :param string mode: Spark execution mode to determine the dir location.
+ """
dc = self.dist_config
- events_dir = dc.path('spark_events')
- events_dir = 'hdfs://{}'.format(events_dir)
- utils.run_as('hdfs', 'hdfs', 'dfs', '-mkdir', '-p', events_dir)
- utils.run_as('hdfs', 'hdfs', 'dfs', '-chmod', '1777', events_dir)
- utils.run_as('hdfs', 'hdfs', 'dfs', '-chown', '-R', 'ubuntu:spark',
- events_dir)
- return events_dir
+
+ # Directory needs to be 777 so non-spark users can write job history
+ # there. It needs to be g+s (HDFS is g+s by default) so all entries
+ # are readable by spark (in the spark group). It needs to be +t so
+ # users cannot remove files they don't own.
+ if mode.startswith('yarn'):
+ events_dir = 'hdfs://{}'.format(dc.path('spark_events'))
+ utils.run_as('hdfs', 'hdfs', 'dfs', '-mkdir', '-p', events_dir)
+ utils.run_as('hdfs', 'hdfs', 'dfs', '-chmod', '1777', events_dir)
+ utils.run_as('hdfs', 'hdfs', 'dfs', '-chown', '-R', 'ubuntu:spark',
+ events_dir)
+ else:
+ events_dir = dc.path('spark_events')
+ events_dir.makedirs_p()
+ events_dir.chmod(0o3777)
+ host.chownr(events_dir, 'ubuntu', 'spark', chowntopdir=True)
def configure(self, available_hosts, zk_units, peers, extra_libs):
"""
@@ -167,11 +223,6 @@ class Spark(object):
:param list peers: List of Spark peer tuples (unit name, IP).
:param list extra_libs: List of extra lib paths for driver/executors.
"""
- # Bootstrap spark
- if not unitdata.kv().get('spark.bootstrapped', False):
- self.setup()
- unitdata.kv().set('spark.bootstrapped', True)
-
# Set KV based on connected applications
unitdata.kv().set('zookeeper.units', zk_units)
unitdata.kv().set('sparkpeer.units', peers)
@@ -179,12 +230,15 @@ class Spark(object):
# Get our config ready
dc = self.dist_config
- events_log_dir = 'file://{}'.format(dc.path('spark_events'))
mode = hookenv.config()['spark_execution_mode']
master_ip = utils.resolve_private_address(available_hosts['spark-master'])
master_url = self.get_master_url(master_ip)
req_driver_mem = hookenv.config()['driver_memory']
req_executor_mem = hookenv.config()['executor_memory']
+ if mode.startswith('yarn'):
+ spark_events = 'hdfs://{}'.format(dc.path('spark_events'))
+ else:
+ spark_events = 'file://{}'.format(dc.path('spark_events'))
# handle tuning options that may be set as percentages
driver_mem = '1g'
@@ -217,12 +271,6 @@ class Spark(object):
}
if 'namenode' in available_hosts:
hosts['namenode'] = available_hosts['namenode']
- events_log_dir = self.setup_hdfs_logs()
- else:
- # Bigtop includes a default hadoop_head_node if we do not specify
- # any namenode info. To ensure spark standalone doesn't get
- # invalid hadoop config, set our NN to an empty string.
- hosts['namenode'] = ''
if 'resourcemanager' in available_hosts:
hosts['resourcemanager'] = available_hosts['resourcemanager']
@@ -239,8 +287,8 @@ class Spark(object):
# Setup overrides dict
override = {
'spark::common::master_url': master_url,
- 'spark::common::event_log_dir': events_log_dir,
- 'spark::common::history_log_dir': events_log_dir,
+ 'spark::common::event_log_dir': spark_events,
+ 'spark::common::history_log_dir': spark_events,
'spark::common::extra_lib_dirs':
':'.join(extra_libs) if extra_libs else None,
'spark::common::driver_mem': driver_mem,
@@ -261,22 +309,20 @@ class Spark(object):
bigtop = Bigtop()
bigtop.render_site_yaml(hosts, roles, override)
bigtop.trigger_puppet()
-
- # Do this after our puppet bits in case puppet overrides needed perms
- if 'namenode' not in available_hosts:
- # Local event dir (not in HDFS) needs to be 777 so non-spark
- # users can write job history there. It needs to be g+s so
- # all entries will be readable by spark (in the spark group).
- # It needs to be +t so users cannot remove files they don't own.
- dc.path('spark_events').chmod(0o3777)
-
self.patch_worker_master_url(master_ip, master_url)
- # Install SB (subsequent calls will reconfigure existing install)
- # SparkBench looks for the spark master in /etc/environment
+ # Packages don't create the event dir out of the box. Do it now.
+ self.configure_events_dir(mode)
+
+ # Some spark applications look for envars in /etc/environment
with utils.environment_edit_in_place('/etc/environment') as env:
env['MASTER'] = master_url
- self.install_benchmark()
+ env['SPARK_HOME'] = dc.path('spark_home')
+
+ # Handle examples and Spark-Bench. Do this each time this method is
+ # called in case we need to act on a new resource or user config.
+ self.configure_examples()
+ self.configure_sparkbench()
def patch_worker_master_url(self, master_ip, master_url):
'''
@@ -311,16 +357,6 @@ class Spark(object):
s = s.replace(old_string, new_string)
f.write(s)
- def install_demo(self):
- '''
- Install sparkpi.sh to /home/ubuntu (executes SparkPI example app)
- '''
- demo_source = 'scripts/sparkpi.sh'
- demo_target = '/home/ubuntu/sparkpi.sh'
- Path(demo_source).copy(demo_target)
- Path(demo_target).chmod(0o755)
- Path(demo_target).chown('ubuntu', 'hadoop')
-
def start(self):
'''
Always start the Spark History Server. Start other services as