aboutsummaryrefslogtreecommitdiff
path: root/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank
blob: 2650e74ad276fb9a6365b70c77a08eb315b7b585 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/env python3
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import subprocess
import sys

from path import Path
from time import time

from charmhelpers.core import hookenv
from charms.benchmark import Benchmark
from charms.reactive import is_state
from jujubigdata import utils


def fail(msg):
    hookenv.action_set({'outcome': 'failure'})
    hookenv.action_fail(msg)
    sys.exit()


def main():
    bench = Benchmark()

    if not is_state('spark.started'):
        msg = 'Spark is not started yet'
        fail(msg)

    # gather params and create dir to store results
    num_iter = hookenv.action_get('iterations')
    run = int(time())
    result_dir = Path('/opt/sparkpagerank-results')
    result_log = result_dir / '{}.log'.format(run)
    if not result_dir.exists():
        result_dir.mkdir()
    result_dir.chown('ubuntu', 'ubuntu')
    hookenv.log("values: {} {}".format(num_iter, result_log))

    sample = "/home/ubuntu/SparkBench/PageRank/web-Google.txt"
    if not os.path.isfile(sample):
        msg = 'Could not find pagerank sample data'
        fail('{}: {}'.format(msg, sample))

    # Benchmark input data is packed into our sparkbench.tgz, which makes
    # it available on all spark units. In yarn mode, however, the nodemanagers
    # act as the spark workers and will not have access to this local data.
    # In yarn mode, copy our input data to hdfs so nodemanagers can access it.
    mode = hookenv.config()['spark_execution_mode']
    if mode.startswith('yarn'):
        if is_state('hadoop.hdfs.ready'):
            try:
                utils.run_as('ubuntu',
                             'hdfs', 'dfs', '-put', '-f', sample, '/user/ubuntu',
                             capture_output=True)
            except subprocess.CalledProcessError as e:
                msg = 'Unable to copy pagerank sample data to hdfs'
                fail('{}: {}'.format(msg, e))
            else:
                sample = "/user/ubuntu/web-Google.txt"
        else:
            msg = 'Spark is configured for yarn mode, but HDFS is not ready yet'
            fail(msg)

    # find jar location
    spark_home = "/usr/lib/spark"
    example_jar_name = "spark-examples.jar"
    example_jar_path = None
    for root, dirs, files in os.walk(spark_home):
        if example_jar_name in files:
            example_jar_path = os.path.join(root, example_jar_name)

    if not example_jar_path:
        msg = 'Could not find {}'.format(example_jar_name)
        fail(msg)

    print('Calculating PageRank')
    bench.start()
    start = int(time())

    with open(result_log, 'w') as log_file:
        arg_list = [
            'spark-submit',
            '--class',
            'org.apache.spark.examples.SparkPageRank',
            example_jar_path,
            sample,
            num_iter,
        ]

        try:
            subprocess.check_call(arg_list, stdout=log_file,
                                  stderr=subprocess.STDOUT)
        except subprocess.CalledProcessError as e:
            msg = 'SparkPageRank command failed: {}'.format(' '.join(arg_list))
            fail('{}: {}'.format(msg, e))

    stop = int(time())
    bench.finish()

    duration = stop - start
    bench.set_composite_score(duration, units='secs', direction='asc')

    # Tell the user how they can get the full result log
    scp_log_msg = "juju scp {}:{} .".format(hookenv.local_unit(), result_log)
    hookenv.action_set({'meta.fetchcmd': scp_log_msg})

    with open(result_log) as log:
        success = False
        for line in log.readlines():
            if 'rank' in line:
                success = True
                break

    if not success:
        msg = 'Spark-submit failed to calculate pagerank'
        fail(msg)

    hookenv.action_set({'outcome': 'success'})


if __name__ == '__main__':
    main()