aboutsummaryrefslogtreecommitdiff
path: root/bigtop-packages/src/charm/zeppelin/layer-zeppelin/actions/smoke-test
blob: 0339d465096aa470de18758f1c38387d764c3ecf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#!/usr/bin/env python3

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
import json
from time import sleep
from urllib.parse import urljoin
from operator import itemgetter

import requests

from charmhelpers.core import hookenv
from charms.reactive import is_state


def fail(msg):
    hookenv.action_fail(msg)
    sys.exit()


if not is_state('zeppelin.installed'):
    fail('Zeppelin not yet ready')


notebook_id = '2A94M5J1Z'
zep_addr = 'localhost'
base_url = 'http://{}:9080/api/notebook/'.format(zep_addr)
interp_url = urljoin(base_url, 'interpreter/bind/%s' % notebook_id)
job_url = urljoin(base_url, 'job/%s' % notebook_id)
para_url = urljoin(base_url, '%s/paragraph/' % notebook_id)

try:
    # bind interpreters
    resp = requests.get(interp_url, timeout=60)
    resp.raise_for_status()
    interpreters = resp.json()
    interp_ids = list(map(itemgetter('id'), interpreters['body']))
    resp = requests.put(interp_url, data=json.dumps(interp_ids), timeout=60)
    resp.raise_for_status()

    # run notebook
    resp = requests.post(job_url, timeout=60)
    resp.raise_for_status()
    for i in range(5):
        sleep(10)  # sleep first to give the job some time to run
        try:
            resp = requests.get(job_url, timeout=60)
        except requests.exceptions.Timeout:
            # sometimes a long-running paragraph will cause the notebook
            # job endpoint to timeout, but it may eventually recover
            continue
        if resp.status_code == 500:
            # sometimes a long-running paragraph will cause the notebook
            # job endpoint to return 500, but it may eventually recover
            continue
        statuses = list(map(itemgetter('status'), resp.json()['body']))
        in_progress = {'PENDING', 'RUNNING'} & set(statuses)
        if not in_progress:
            break

    # check for errors
    errors = []
    body = resp.json()['body']
    for result in body:
        if result['status'] == 'ERROR':
            para_id = result['id']
            resp = requests.get(urljoin(para_url, para_id), timeout=60)
            resp.raise_for_status()
            para = resp.json()['body']
            if 'errorMessage' in para:
                errmsg = para['errorMessage'].splitlines()[0]
            elif 'result' in para:
                errmsg = para['result']['msg'].splitlines()[0]
            else:
                errmsg = 'Unable to find error message'
                hookenv.action_set('paragraph', resp.text)
            fail('Notebook failed: {}'.format(errmsg))
except requests.exceptions.RequestException as e:
    fail('Request failed: {}: {}'.format(e.request.url, e))