#!/usr/bin/env python import argparse import csv import cmd import copy import json import logging import os import re import shlex import shutil import subprocess import sys import textwrap import time from uuid import uuid4 from distutils.spawn import find_executable try: import pexpect import yaml except ImportError as e: print(e) print('Please run the below command to install modules required') print('Python2: pip install -r ${REPO_PATH}/automated/utils/requirements.txt') print('Python3: pip3 install -r ${REPO_PATH}/automated/utils/requirements.txt') sys.exit(1) class StoreDictKeyPair(argparse.Action): def __init__(self, option_strings, dest, nargs=None, **kwargs): self._nargs = nargs super(StoreDictKeyPair, self).__init__(option_strings, dest, nargs=nargs, **kwargs) def __call__(self, parser, namespace, values, option_string=None): my_dict = {} for kv in values: if "=" in kv: k, v = kv.split("=", 1) my_dict[k] = v else: print("Invalid parameter: %s" % kv) setattr(namespace, self.dest, my_dict) # quit gracefully if the connection is closed by remote host SSH_PARAMS = "-o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null -o ServerAliveInterval=5" def run_command(command, target=None): """ Run a shell command. If target is specified, ssh to the given target first. """ run = command if target: run = 'ssh {} {} "{}"'.format(SSH_PARAMS, target, command) logger = logging.getLogger('RUNNER.run_command') logger.debug(run) if sys.version_info[0] < 3: return subprocess.check_output(shlex.split(run)).strip() else: return subprocess.check_output(shlex.split(run)).strip().decode('utf-8') class TestPlan(object): """ Analysis args specified, then generate test plan. """ def __init__(self, args): self.test_def = args.test_def self.test_plan = args.test_plan self.timeout = args.timeout self.skip_install = args.skip_install self.logger = logging.getLogger('RUNNER.TestPlan') self.overlay = args.overlay def apply_overlay(self, test_list): fixed_test_list = copy.deepcopy(test_list) logger = logging.getLogger('RUNNER.TestPlan.Overlay') with open(self.overlay) as f: data = yaml.load(f) if data.get('skip'): skip_tests = data['skip'] for test in test_list: for skip_test in skip_tests: if test['path'] == skip_test['path'] and test['repository'] == skip_test['repository']: fixed_test_list.remove(test) logger.info("Skipped: {}".format(test)) else: continue if data.get('amend'): amend_tests = data['amend'] for test in fixed_test_list: for amend_test in amend_tests: if test['path'] == amend_test['path'] and test['repository'] == skip_test['repository']: if amend_test.get('parameters'): if test.get('parameters'): test['parameters'].update(amend_test['parameters']) else: test['parameters'] = amend_test['parameters'] logger.info('Updated: {}'.format(test)) else: logger.warning("'parameters' not found in {}, nothing to amend.".format(amend_test)) if data.get('add'): add_tests = data['add'] unique_add_tests = [] for test in add_tests: if test not in unique_add_tests: unique_add_tests.append(test) else: logger.warning("Skipping duplicate test {}".format(test)) for test in test_list: del test['uuid'] for add_test in unique_add_tests: if add_test in test_list: logger.warning("{} already included in test plan, do nothing.".format(add_test)) else: add_test['uuid'] = str(uuid4()) fixed_test_list.append(add_test) logger.info("Added: {}".format(add_test)) return fixed_test_list def test_list(self, kind="automated"): if self.test_def: if not os.path.exists(self.test_def): self.logger.error(' %s NOT found, exiting...' % self.test_def) sys.exit(1) test_list = [{'path': self.test_def}] test_list[0]['uuid'] = str(uuid4()) test_list[0]['timeout'] = self.timeout test_list[0]['skip_install'] = self.skip_install elif self.test_plan: if not os.path.exists(self.test_plan): self.logger.error(' %s NOT found, exiting...' % self.test_plan) sys.exit(1) with open(self.test_plan, 'r') as f: test_plan = yaml.safe_load(f) try: plan_version = test_plan['metadata'].get('format') self.logger.info('Test plan version: {}'.format(plan_version)) if plan_version == "Linaro Test Plan v2": tests = test_plan['tests'][kind] elif plan_version == "Linaro Test Plan v1" or plan_version is None: tests = [] for requirement in test_plan['requirements']: if 'tests' in requirement.keys(): if requirement['tests'] and \ kind in requirement['tests'].keys() and \ requirement['tests'][kind]: for test in requirement['tests'][kind]: tests.append(test) test_list = [] unique_tests = [] # List of test hashes for test in tests: test_hash = hash(json.dumps(test, sort_keys=True)) if test_hash in unique_tests: # Test is already in the test_list; don't add it again. self.logger.warning("Skipping duplicate test {}".format(test)) continue unique_tests.append(test_hash) test_list.append(test) for test in test_list: test['uuid'] = str(uuid4()) except KeyError as e: self.logger.error("%s is missing from test plan" % str(e)) sys.exit(1) else: self.logger.error('Plese specify a test or test plan.') sys.exit(1) if self.overlay is None: return test_list else: return self.apply_overlay(test_list) class TestSetup(object): """ Create directories required, then copy files needed to these directories. """ def __init__(self, test, args): self.test = test self.args = args self.logger = logging.getLogger('RUNNER.TestSetup') self.test_kind = args.kind self.test_version = test.get('version', None) def validate_env(self): # Inspect if environment set properly. try: self.repo_path = os.environ['REPO_PATH'] except KeyError: self.logger.error('KeyError: REPO_PATH') self.logger.error("Please run '. ./bin/setenv.sh' to setup test environment") sys.exit(1) def create_dir(self): if not os.path.exists(self.test['output']): os.makedirs(self.test['output']) self.logger.info('Output directory created: %s' % self.test['output']) def copy_test_repo(self): self.validate_env() shutil.rmtree(self.test['test_path'], ignore_errors=True) if self.repo_path in self.test['test_path']: self.logger.error("Cannot copy repository into itself. Please choose output directory outside repository path") sys.exit(1) shutil.copytree(self.repo_path, self.test['test_path'], symlinks=True) self.logger.info('Test repo copied to: %s' % self.test['test_path']) def checkout_version(self): if self.test_version: path = os.getcwd() os.chdir(self.test['test_path']) subprocess.call("git checkout %s" % self.test_version, shell=True) os.chdir(path) def create_uuid_file(self): with open('%s/uuid' % self.test['test_path'], 'w') as f: f.write(self.test['uuid']) class TestDefinition(object): """ Convert test definition to testdef.yaml, testdef_metadata and run.sh. """ def __init__(self, test, args): self.test = test self.args = args self.logger = logging.getLogger('RUNNER.TestDef') self.skip_install = args.skip_install self.is_manual = False if 'skip_install' in test: self.skip_install = test['skip_install'] self.custom_params = None if 'parameters' in test: self.custom_params = test['parameters'] if 'params' in test: self.custom_params = test['params'] self.exists = False if os.path.isfile(self.test['path']): self.exists = True with open(self.test['path'], 'r') as f: self.testdef = yaml.safe_load(f) if self.testdef['metadata']['format'].startswith("Manual Test Definition"): self.is_manual = True if self.is_manual: self.runner = ManualTestRun(test, args) elif self.args.target is not None: self.runner = RemoteTestRun(test, args) else: self.runner = AutomatedTestRun(test, args) def definition(self): with open('%s/testdef.yaml' % self.test['test_path'], 'wb') as f: f.write(yaml.dump(self.testdef, encoding='utf-8', allow_unicode=True)) def metadata(self): with open('%s/testdef_metadata' % self.test['test_path'], 'wb') as f: f.write(yaml.dump(self.testdef['metadata'], encoding='utf-8', allow_unicode=True)) def mkrun(self): if not self.is_manual: with open('%s/run.sh' % self.test['test_path'], 'a') as f: f.write('#!/bin/sh\n') self.parameters = self.handle_parameters() if self.parameters: for line in self.parameters: f.write(line) f.write('set -e\n') f.write('set -x\n') f.write('export TESTRUN_ID=%s\n' % self.testdef['metadata']['name']) if self.args.target is None: f.write('cd %s\n' % (self.test['test_path'])) else: f.write('cd %s\n' % (self.test['target_test_path'])) f.write('UUID=`cat uuid`\n') f.write('echo ""\n') f.write('export PATH=/usr/local/bin:/usr/bin:/bin:/usr/local/sbin:/usr/sbin:/sbin\n') steps = self.testdef['run'].get('steps', []) if steps: for step in steps: command = step if '--cmd' in step or '--shell' in step: command = re.sub(r'\$(\d+)\b', r'\\$\1', step) f.write('%s\n' % command) f.write('echo ""\n') os.chmod('%s/run.sh' % self.test['test_path'], 0o755) def run(self): self.runner.run() def handle_parameters(self): ret_val = ['###default parameters from test definition###\n'] if 'params' in self.testdef: for def_param_name, def_param_value in list(self.testdef['params'].items()): # ?'yaml_line' if def_param_name is 'yaml_line': continue ret_val.append('%s=\'%s\'\n' % (def_param_name, def_param_value)) elif 'parameters' in self.testdef: for def_param_name, def_param_value in list(self.testdef['parameters'].items()): if def_param_name is 'yaml_line': continue ret_val.append('%s=\'%s\'\n' % (def_param_name, def_param_value)) else: return None ret_val.append('######\n') ret_val.append('###custom parameters from test plan###\n') if self.custom_params: for param_name, param_value in list(self.custom_params.items()): if param_name is 'yaml_line': continue ret_val.append('%s=\'%s\'\n' % (param_name, param_value)) if self.skip_install: ret_val.append('SKIP_INSTALL="True"\n') ret_val.append('######\n') ret_val.append('###custom parameters from command line###\n') if self.args.test_def_params: for param_name, param_value in self.args.test_def_params.items(): ret_val.append('%s=\'%s\'\n' % (param_name, param_value)) ret_val.append('######\n') return ret_val class TestRun(object): def __init__(self, test, args): self.test = test self.args = args self.logger = logging.getLogger('RUNNER.TestRun') self.test_timeout = self.args.timeout if 'timeout' in test: self.test_timeout = test['timeout'] def run(self): raise NotImplementedError def check_result(self): raise NotImplementedError class AutomatedTestRun(TestRun): def run(self): self.logger.info('Executing %s/run.sh' % self.test['test_path']) shell_cmd = '%s/run.sh 2>&1 | tee %s/stdout.log' % (self.test['test_path'], self.test['test_path']) self.child = pexpect.spawn('/bin/sh', ['-c', shell_cmd]) self.check_result() def check_result(self): if self.test_timeout: self.logger.info('Test timeout: %s' % self.test_timeout) test_end = time.time() + self.test_timeout while self.child.isalive(): if self.test_timeout and time.time() > test_end: self.logger.warning('%s test timed out, killing test process...' % self.test['test_uuid']) self.child.terminate(force=True) break try: self.child.expect('\r\n') if sys.version_info[0] < 3: print(self.child.before) else: print(self.child.before.decode('utf-8')) except pexpect.TIMEOUT: continue except pexpect.EOF: self.logger.info('%s test finished.\n' % self.test['test_uuid']) break class RemoteTestRun(AutomatedTestRun): def copy_to_target(self): os.chdir(self.test['test_path']) tarball_name = "target-test-files.tar" self.logger.info("Archiving test files") run_command( 'tar -caf %s run.sh uuid automated/lib automated/bin automated/utils %s' % (tarball_name, self.test['tc_relative_dir'])) self.logger.info("Creating test path") run_command("mkdir -p %s" % (self.test['target_test_path']), self.args.target) self.logger.info("Copying test archive to target host") run_command('scp %s ./%s %s:%s' % (SSH_PARAMS, tarball_name, self.args.target, self.test['target_test_path'])) self.logger.info("Unarchiving test files on target") run_command("cd %s && tar -xf %s" % (self.test['target_test_path'], tarball_name), self.args.target) self.logger.info("Removing test file archive from target") run_command("rm %s/%s" % (self.test['target_test_path'], tarball_name), self.args.target) def run(self): self.copy_to_target() self.logger.info('Executing %s/run.sh remotely on %s' % (self.test['target_test_path'], self.args.target)) shell_cmd = 'ssh %s %s "%s/run.sh 2>&1"' % (SSH_PARAMS, self.args.target, self.test['target_test_path']) self.logger.debug('shell_cmd: %s' % shell_cmd) output = open("%s/stdout.log" % self.test['test_path'], "w") self.child = pexpect.spawn(shell_cmd) self.child.logfile = output self.check_result() class ManualTestShell(cmd.Cmd): def __init__(self, test_dict, result_path): cmd.Cmd.__init__(self) self.test_dict = test_dict self.result_path = result_path self.current_step_index = 0 self.steps = self.test_dict['run']['steps'] self.expected = self.test_dict['run']['expected'] self.prompt = "%s > " % self.test_dict['metadata']['name'] self.result = None self.intro = """ Welcome to manual test executor. Type 'help' for available commands. This shell is meant to be executed on your computer, not on the system under test. Please execute the steps from the test case, compare to expected result and record the test result as 'pass' or 'fail'. If there is an issue that prevents from executing the step, please record the result as 'skip'. """ def do_quit(self, line): """ Exit test execution """ if self.result is not None: return True if line.find("-f") >= 0: self._record_result("skip") return True print("Test result not recorded. Use -f to force. Forced quit records result as 'skip'") do_EOF = do_quit def do_description(self, line): """ Prints current test overall description """ print(self.test_dict['metadata']['description']) def do_steps(self, line): """ Prints all steps of the current test case """ for index, step in enumerate(self.steps): print("%s. %s" % (index, step)) def do_expected(self, line): """ Prints all expected results of the current test case """ for index, expected in enumerate(self.expected): print("%s. %s" % (index, expected)) def do_current(self, line): """ Prints current test step """ self._print_step() do_start = do_current def do_next(self, line): """ Prints next test step """ if len(self.steps) > self.current_step_index + 1: self.current_step_index += 1 self._print_step() def _print_step(self): print("%s. %s" % (self.current_step_index, self.steps[self.current_step_index])) def _record_result(self, result): print("Recording %s in %s/stdout.log" % (result, self.result_path)) with open("%s/stdout.log" % self.result_path, "a") as f: f.write("" % (self.test_dict['metadata']['name'], result)) def do_pass(self, line): """ Records PASS as test result """ self.result = "pass" self._record_result(self.result) return True def do_fail(self, line): """ Records FAIL as test result """ self.result = "fail" self._record_result(self.result) return True def do_skip(self, line): """ Records SKIP as test result """ self.result = "skip" self._record_result(self.result) return True class ManualTestRun(TestRun, cmd.Cmd): def run(self): print(self.test['test_name']) with open('%s/testdef.yaml' % self.test['test_path'], 'r') as f: self.testdef = yaml.safe_load(f) ManualTestShell(self.testdef, self.test['test_path']).cmdloop() def check_result(self): pass def get_packages(linux_distribution, target=None): """ Return a list of installed packages with versions linux_distribution is a string that may be 'debian', 'ubuntu', 'centos', or 'fedora'. For example (ubuntu): 'packages': ['acl-2.2.52-2', 'adduser-3.113+nmu3', ... 'zlib1g:amd64-1:1.2.8.dfsg-2+b1', 'zlib1g-dev:amd64-1:1.2.8.dfsg-2+b1'] (centos): "packages": ["acl-2.2.51-12.el7", "apr-1.4.8-3.el7", ... "zlib-1.2.7-17.el7", "zlib-devel-1.2.7-17.el7" ] """ logger = logging.getLogger('RUNNER.get_packages') packages = [] if linux_distribution in ['debian', 'ubuntu']: # Debian (apt) based system packages = run_command("dpkg-query -W -f '${package}-${version}\n'", target).splitlines() elif linux_distribution in ['centos', 'fedora']: # RedHat (rpm) based system packages = run_command("rpm -qa --qf '%{NAME}-%{VERSION}-%{RELEASE}\n'", target).splitlines() else: logger.warning("Unknown linux distribution '{}'; package list not populated.".format(linux_distribution)) packages.sort() return packages def get_environment(target=None, skip_collection=False): """ Return a dictionary with environmental information target: optional ssh host string to gather environment remotely. skip_collection: Skip data collection and return an empty dictionary. For example (on a HiSilicon D03): { "bios_version": "Hisilicon D03 UEFI 16.12 Release", "board_name": "D03", "board_vendor": "Huawei", "kernel": "4.9.0-20.gitedc2a1c.linaro.aarch64", "linux_distribution": "centos", "packages": [ "GeoIP-1.5.0-11.el7", "NetworkManager-1.4.0-20.el7_3", ... "yum-plugin-fastestmirror-1.1.31-40.el7", "zlib-1.2.7-17.el7" ], "uname": "Linux localhost.localdomain 4.9.0-20.gitedc2a1c.linaro.aarch64 #1 SMP Wed Dec 14 17:50:15 UTC 2016 aarch64 aarch64 aarch64 GNU/Linux" } """ environment = {} if skip_collection: return environment try: environment['linux_distribution'] = run_command( "grep ^ID= /etc/os-release", target).split('=')[-1].strip('"').lower() except subprocess.CalledProcessError: environment['linux_distribution'] = "" try: environment['kernel'] = run_command("uname -r", target) except subprocess.CalledProcessError: environment['kernel'] = "" try: environment['uname'] = run_command("uname -a", target) except subprocess.CalledProcessError: environment['uname'] = "" try: environment['bios_version'] = run_command( "cat /sys/devices/virtual/dmi/id/bios_version", target) except subprocess.CalledProcessError: environment['bios_version'] = "" try: environment['board_vendor'] = run_command( "cat /sys/devices/virtual/dmi/id/board_vendor", target) except subprocess.CalledProcessError: environment['board_vendor'] = "" try: environment['board_name'] = run_command( "cat /sys/devices/virtual/dmi/id/board_name", target) except subprocess.CalledProcessError: environment['board_name'] = "" try: environment['packages'] = get_packages(environment['linux_distribution'], target) except subprocess.CalledProcessError: environment['packages'] = [] return environment class ResultParser(object): def __init__(self, test, args): self.test = test self.args = args self.metrics = [] self.results = {} self.results['test'] = test['test_name'] self.results['id'] = test['test_uuid'] self.results['test_plan'] = args.test_plan self.results['environment'] = get_environment( target=self.args.target, skip_collection=self.args.skip_environment) self.logger = logging.getLogger('RUNNER.ResultParser') self.results['params'] = {} self.pattern = None self.fixup = None with open(os.path.join(self.test['test_path'], "testdef.yaml"), "r") as f: self.testdef = yaml.safe_load(f) self.results['name'] = "" if 'metadata' in self.testdef.keys() and \ 'name' in self.testdef['metadata'].keys(): self.results['name'] = self.testdef['metadata']['name'] if 'params' in self.testdef.keys(): self.results['params'] = self.testdef['params'] if self.args.test_def_params: for param_name, param_value in self.args.test_def_params.items(): self.results['params'][param_name] = param_value if 'parse' in self.testdef.keys() and 'pattern' in self.testdef['parse'].keys(): self.pattern = self.testdef['parse']['pattern'] self.logger.info("Enabling log parse pattern: %s" % self.pattern) if 'fixupdict' in self.testdef['parse'].keys(): self.fixup = self.testdef['parse']['fixupdict'] self.logger.info("Enabling log parse pattern fixup: %s" % self.fixup) if 'parameters' in test.keys(): self.results['params'].update(test['parameters']) if 'params' in test.keys(): self.results['params'].update(test['params']) if 'version' in test.keys(): self.results['version'] = test['version'] else: path = os.getcwd() os.chdir(self.test['test_path']) if sys.version_info[0] < 3: test_version = subprocess.check_output("git rev-parse HEAD", shell=True) else: test_version = subprocess.check_output("git rev-parse HEAD", shell=True).decode('utf-8') self.results['version'] = test_version.rstrip() os.chdir(path) self.lava_run = args.lava_run if self.lava_run and not find_executable('lava-test-case'): self.logger.info("lava-test-case not found, '-l' or '--lava_run' option ignored'") self.lava_run = False def run(self): self.parse_stdout() if self.pattern: self.parse_pattern() # If 'metrics' is empty, add 'no-result-found fail'. if not self.metrics: self.metrics = [{'test_case_id': 'no-result-found', 'result': 'fail', 'measurement': '', 'units': ''}] self.results['metrics'] = self.metrics self.dict_to_json() self.dict_to_csv() self.logger.info('Result files saved to: %s' % self.test['test_path']) print('--- Printing result.csv ---') with open('%s/result.csv' % self.test['test_path']) as f: print(f.read()) def parse_stdout(self): with open('%s/stdout.log' % self.test['test_path'], 'r') as f: test_case_re = re.compile("TEST_CASE_ID=(.*)") result_re = re.compile("RESULT=(.*)") measurement_re = re.compile("MEASUREMENT=(.*)") units_re = re.compile("UNITS=(.*)") for line in f: if re.match(r'\<(|LAVA_SIGNAL_TESTCASE )TEST_CASE_ID=.*', line): line = line.strip('\n').strip('\r').strip('<>').split(' ') data = {'test_case_id': '', 'result': '', 'measurement': '', 'units': ''} for string in line: test_case_match = test_case_re.match(string) result_match = result_re.match(string) measurement_match = measurement_re.match(string) units_match = units_re.match(string) if test_case_match: data['test_case_id'] = test_case_match.group(1) if result_match: data['result'] = result_match.group(1) if measurement_match: data['measurement'] = measurement_match.group(1) if units_match: data['units'] = units_match.group(1) self.metrics.append(data.copy()) if self.lava_run: self.send_to_lava(data) def parse_pattern(self): with open('%s/stdout.log' % self.test['test_path'], 'r') as f: for line in f: data = {} m = re.search(r'%s' % self.pattern, line) if m: data = m.groupdict() for x in ['measurement', 'units']: if x not in data: data[x] = '' if self.fixup and data['result'] in self.fixup: data['result'] = self.fixup[data['result']] self.metrics.append(data.copy()) if self.lava_run: self.send_to_lava(data) def send_to_lava(self, data): cmd = 'lava-test-case {} --result {}'.format(data['test_case_id'], data['result']) if data['measurement']: cmd = '{} --measurement {} --units {}'.format(cmd, data['measurement'], data['units']) self.logger.debug('lava-run: cmd: {}'.format(cmd)) subprocess.call(shlex.split(cmd)) def dict_to_json(self): # Save test results to output/test_id/result.json with open('%s/result.json' % self.test['test_path'], 'w') as f: json.dump([self.results], f, indent=4) # Collect test results of all tests in output/result.json feeds = [] if os.path.isfile('%s/result.json' % self.test['output']): with open('%s/result.json' % self.test['output'], 'r') as f: feeds = json.load(f) feeds.append(self.results) with open('%s/result.json' % self.test['output'], 'w') as f: json.dump(feeds, f, indent=4) def dict_to_csv(self): # Convert dict self.results['params'] to a string. test_params = '' if self.results['params']: params_dict = self.results['params'] test_params = ';'.join(['%s=%s' % (k, v) for k, v in params_dict.items()]) for metric in self.results['metrics']: metric['name'] = self.results['name'] metric['test_params'] = test_params # Save test results to output/test_id/result.csv fieldnames = ['name', 'test_case_id', 'result', 'measurement', 'units', 'test_params'] with open('%s/result.csv' % self.test['test_path'], 'w') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() for metric in self.results['metrics']: writer.writerow(metric) # Collect test results of all tests in output/result.csv if not os.path.isfile('%s/result.csv' % self.test['output']): with open('%s/result.csv' % self.test['output'], 'w') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() with open('%s/result.csv' % self.test['output'], 'a') as f: writer = csv.DictWriter(f, fieldnames=fieldnames) for metric in self.results['metrics']: writer.writerow(metric) def get_args(): parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter) parser.add_argument('-o', '--output', default=os.getenv("HOME", "") + '/output', dest='output', help=textwrap.dedent('''\ specify a directory to store test and result files. Default: $HOME/output ''')) parser.add_argument('-p', '--test_plan', default=None, dest='test_plan', help=textwrap.dedent('''\ specify an test plan file which has tests and related params listed in yaml format. ''')) parser.add_argument('-d', '--test_def', default=None, dest='test_def', help=textwrap.dedent('''\ base on test definition repo location, specify relative path to the test definition to run. Format example: "ubuntu/smoke-tests-basic.yaml" ''')) parser.add_argument('-r', '--test_def_params', default={}, dest='test_def_params', action=StoreDictKeyPair, nargs="+", metavar="KEY=VALUE", help=textwrap.dedent('''\ Set additional parameters when using test definition without a test plan. The name values are set similarily to environment variables: --test_def_params KEY1=VALUE1 KEY2=VALUE2 ... ''')) parser.add_argument('-k', '--kind', default="automated", dest='kind', choices=['automated', 'manual'], help=textwrap.dedent('''\ Selects type of tests to be executed from the test plan. Possible options: automated, manual ''')) parser.add_argument('-t', '--timeout', type=int, default=None, dest='timeout', help='Specify test timeout') parser.add_argument('-g', '--target', default=None, dest='target', help=textwrap.dedent('''\ Specify SSH target to execute tests. Format: user@host Note: ssh authentication must be paswordless ''')) parser.add_argument('-s', '--skip_install', dest='skip_install', default=False, action='store_true', help='skip install section defined in test definition.') parser.add_argument('-e', '--skip_environment', dest='skip_environment', default=False, action='store_true', help='skip environmental data collection (board name, distro, etc)') parser.add_argument('-l', '--lava_run', dest='lava_run', default=False, action='store_true', help='send test result to LAVA with lava-test-case.') parser.add_argument('-O', '--overlay', default=None, dest='overlay', help=textwrap.dedent('''\ Specify test plan ovelay file to: * skip tests * amend test parameters * add new tests ''')) args = parser.parse_args() return args def main(): # Setup logger. logger = logging.getLogger('RUNNER') logger.setLevel(logging.DEBUG) ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s - %(name)s: %(levelname)s: %(message)s') ch.setFormatter(formatter) logger.addHandler(ch) args = get_args() logger.debug('Test job arguments: %s' % args) if args.kind != "manual" and args.target is None: if os.geteuid() != 0: logger.error("Sorry, you need to run this as root") sys.exit(1) # Validate target argument format and connectivity. if args.target: rex = re.compile('.+@.+') if not rex.match(args.target): logger.error('Usage: -g username@host') sys.exit(1) if pexpect.which('ssh') is None: logger.error('openssh client must be installed on the host.') sys.exit(1) try: run_command("exit", args.target) except subprocess.CalledProcessError as e: logger.error('ssh login failed.') print(e) sys.exit(1) # Generate test plan. test_plan = TestPlan(args) test_list = test_plan.test_list(args.kind) logger.info('Tests to run:') for test in test_list: print(test) # Run tests. for test in test_list: # Set and save test params to test dictionary. test['test_name'] = os.path.splitext(test['path'].split('/')[-1])[0] test['test_uuid'] = '%s_%s' % (test['test_name'], test['uuid']) test['output'] = os.path.realpath(args.output) if args.target is not None and '-o' not in sys.argv: test['output'] = os.path.join(test['output'], args.target) test['test_path'] = os.path.join(test['output'], test['test_uuid']) if args.target is not None: # Get relative directory path of yaml file for partial file copy. # '-d' takes any relative paths to the yaml file, so get the realpath first. tc_realpath = os.path.realpath(test['path']) tc_dirname = os.path.dirname(tc_realpath) test['tc_relative_dir'] = '%s%s' % (args.kind, tc_dirname.split(args.kind)[1]) target_user_home = run_command("echo $HOME", args.target) test['target_test_path'] = '%s/output/%s' % (target_user_home, test['test_uuid']) logger.debug('Test parameters: %s' % test) # Create directories and copy files needed. setup = TestSetup(test, args) setup.create_dir() setup.copy_test_repo() setup.checkout_version() setup.create_uuid_file() # Convert test definition. test_def = TestDefinition(test, args) if test_def.exists: test_def.definition() test_def.metadata() test_def.mkrun() # Run test. test_def.run() # Parse test output, save results in json and csv format. result_parser = ResultParser(test, args) result_parser.run() else: logger.warning("Requested test definition %s doesn't exist" % test['path']) if __name__ == "__main__": main()