From ab48dcadc4eb74a126a8d76bece720ab000b1452 Mon Sep 17 00:00:00 2001 From: Milosz Wasilewski Date: Thu, 30 Mar 2017 18:54:46 +0100 Subject: automated: android: add refactored apk-automation Change-Id: I5a81b3c4317dc56f90df37c5a8f83d2547542fab Signed-off-by: Milosz Wasilewski --- .../android/apk-automation/common/__init__.py | 256 +++++++++++++++++++++ 1 file changed, 256 insertions(+) create mode 100755 automated/android/apk-automation/common/__init__.py (limited to 'automated/android/apk-automation/common/__init__.py') diff --git a/automated/android/apk-automation/common/__init__.py b/automated/android/apk-automation/common/__init__.py new file mode 100755 index 0000000..963463e --- /dev/null +++ b/automated/android/apk-automation/common/__init__.py @@ -0,0 +1,256 @@ +import csv +import datetime +import json +import logging +import math +import os +import requests +import shutil +import subprocess +import sys +import time +import urlparse +from com.dtmilano.android.viewclient import ViewClient + + +class ApkTestRunner(object): + def __init__(self, config): + self.config = config + + self.logger = logging.getLogger(self.config['name']) + self.logger.setLevel(logging.INFO) + if self.config['verbose']: + self.logger.setLevel(logging.DEBUG) + ch = logging.StreamHandler() + ch.setLevel(logging.DEBUG) + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + ch.setFormatter(formatter) + self.logger.addHandler(ch) + + self.config['output'] = os.getenv("OUTPUT", "./output/%s" % config['name']) + if os.path.exists(self.config['output']): + suffix = datetime.datetime.now().strftime('%Y%m%d%H%M%S') + shutil.move(self.config['output'], '%s-%s' % (self.config['output'], suffix)) + os.makedirs(self.config['output']) + self.results = [] + + serialno = os.getenv('ANDROID_SERIAL') + if serialno is None: + serialno = '.*' + kwargs1 = { + 'serialno': serialno, + 'verbose': True, + 'ignoresecuredevice': False} + self.logger.debug('VC kwargs1: %s' % kwargs1) + self.device, self.serialno = ViewClient.connectToDeviceOrExit(**kwargs1) + kwargs2 = { + 'startviewserver': True, + 'forceviewserveruse': False, + 'autodump': False, + 'ignoreuiautomatorkilled': True, + 'compresseddump': False} + self.logger.debug('VC kwargs2: %s' % kwargs2) + self.vc = ViewClient(self.device, self.serialno, **kwargs2) + + def run(self): + self.validate() + + for i in range(1, self.config['loops'] + 1): + try: + self.logger.info('Running iteration [%s/%s]' % (i, self.config['loops'])) + self.config['itr'] = i + self.logger.info('Test config: %s' % self.config) + self.setUp() + self.execute() + self.parseResult() + self.take_screencap() + self.tearDown() + except Exception as e: + self.take_screencap() + self.report_result(self.config['name'], 'fail') + self.logger.error(e) + sys.exit(1) + + self.collect_log() + self.result_post_processing() + + def report_result(self, name, result, score=None, units=None): + if score is not None: + score = float(score) + if units is not None: + units = str(units) + + tc_name = str(name) + if self.config['loops'] > 1 and self.config['itr'] != 'stats': + tc_name = '%s-itr%s' % (name, self.config['itr']) + + result_string = '%s %s %s %s' % (tc_name, result, score, units) + if score is None: + result_string = '%s %s' % (tc_name, result) + if score is not None and units is None: + result_string = '%s %s %s' % (tc_name, result, score) + + self.logger.info('TestResult: %s' % result_string) + with open('%s/result.txt' % self.config['output'], 'a') as f: + f.write('%s\n' % result_string) + + # Save result to results for post processing. + result = {'itr': self.config['itr'], + 'test_case_id': str(name), + 'result': str(result), + 'measurement': score, + 'units': units} + self.results.append(result) + + def statistics_result(self): + if self.config['loops'] == 1: + return + + self.config['itr'] = 'stats' + + tc_list = [] + for result in self.results: + if result['measurement'] is not None: + tc_list.append(result['test_case_id']) + tc_list = set(tc_list) + + for tc in tc_list: + ms_list = [] + for result in self.results: + if result['test_case_id'] == tc: + ms_list.append(result['measurement']) + + units = '' + for result in self.results: + if result['test_case_id'] == tc: + units = result['units'] + break + + # Calculate and report population standard deviation and standard error. + mean = sum(ms_list) / len(ms_list) + variance = sum([(e - mean) ** 2 for e in ms_list]) / len(ms_list) + pstdev = math.sqrt(variance) + pstderr = pstdev / math.sqrt(len(ms_list)) + self.report_result('%s-min' % tc, 'pass', min(ms_list), units) + self.report_result('%s-max' % tc, 'pass', max(ms_list), units) + self.report_result('%s-mean' % tc, 'pass', mean, units) + self.report_result('%s-sigma' % tc, 'pass', pstdev, units) + self.report_result('%s-stderr' % tc, 'pass', pstderr, units) + + def result_post_processing(self): + self.statistics_result() + + # Save results to output/name/name-result.csv. + fieldnames = ['itr', 'test_case_id', 'result', 'measurement', 'units'] + with open('%s/result.csv' % self.config['output'], 'w') as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + writer.writeheader() + for result in self.results: + writer.writerow(result) + self.logger.info('Result saved to %s/result.csv' % self.config['output']) + + # Save results to output/name/name-result.json + with open('%s/result.json' % self.config['output'], 'w') as f: + json.dump([self.results], f, indent=4) + self.logger.info('Result saved to %s/result.json' % self.config['output']) + + def dump_always(self): + success = False + while not success: + try: + self.vc.dump() + success = True + except RuntimeError: + print("Got RuntimeError when call vc.dump()") + time.sleep(5) + except ValueError: + print("Got ValueError when call vc.dump()") + time.sleep(5) + + def call_adb(self, args): + self.logger.debug("calling") + self.logger.debug("adb %s" % args) + try: + # Need to set shell=True to save output to host directly. + subprocess.check_call("adb %s" % args, shell=True) + except (OSError, subprocess.CalledProcessError) as e: + print(e) + sys.exit(1) + + def validate(self): + if self.config['apk_file_name'] is None: + self.logger.error("APK file name not set") + sys.exit(1) + + if self.config['apk_package'] is None: + self.logger.error("APK package name not set") + sys.exit(1) + + if self.config['activity'] is None: + self.logger.error("Activity name not set") + sys.exit(1) + + def download_apk(self, apk_name): + # create directory for downloaded files + if not os.path.isdir(os.path.abspath(self.config['apk_dir'])): + os.makedirs(os.path.abspath(self.config['apk_dir'])) + + # download APK if not already downloaded + apk_path = os.path.join(os.path.abspath(self.config['apk_dir']), apk_name) + if not os.path.isfile(apk_path): + apk_url = urlparse.urljoin(self.config['base_url'], apk_name) + r = requests.get(apk_url, stream=True) + if r.status_code == 200: + with open(apk_path, 'wb') as f: + r.raw.decode_content = True + shutil.copyfileobj(r.raw, f) + + def install_apk(self, apk_name): + apk_path = os.path.join(os.path.abspath(self.config['apk_dir']), apk_name) + self.logger.info('Installing %s' % os.path.basename(apk_path)) + self.call_adb("install %s" % apk_path) + + def uninstall_apk(self, package): + install_packages = subprocess.check_output(['adb', 'shell', 'pm', 'list', 'packages']) + if package in install_packages: + self.logger.info('Stopping %s' % package) + self.call_adb("shell am force-stop %s" % package) + + self.logger.info('Uninstalling %s' % package) + self.call_adb("shell pm uninstall %s" % package) + + def take_screencap(self): + screencap_file = '/data/local/tmp/%s-itr%s.png' % (self.config['name'], self.config['itr']) + self.call_adb('shell screencap %s' % screencap_file) + self.logger.info('Pulling %s to output directory...' % screencap_file) + self.call_adb('pull %s %s' % (screencap_file, self.config['output'])) + + def collect_log(self): + self.logger.info("Saving logcat.log, logcat-events.log and dmesg.log to output directory...") + self.call_adb('logcat -d -v time > %s/logcat.log' % self.config['output']) + self.call_adb('logcat -d -b events -v time > %s/logcat-events.log' % self.config['output']) + self.call_adb('shell dmesg > %s/dmesg.log' % self.config['output']) + + def setUp(self): + # Install APK. + self.download_apk(self.config['apk_file_name']) + self.uninstall_apk(self.config['apk_package']) + self.install_apk(self.config['apk_file_name']) + + # Clear logcat buffer. + self.call_adb("logcat -c") + self.call_adb("logcat -b events -c") + time.sleep(3) + + # Start intent. + self.logger.info('Starting %s' % self.config['apk_package']) + self.call_adb("shell am start -W -S %s" % self.config['activity']) + + def execute(self): + raise NotImplementedError + + def parseResult(self): + raise NotImplementedError + + def tearDown(self): + self.uninstall_apk(self.config['apk_package']) -- cgit v1.2.3