diff options
author | Karsten Tausche <karsten@fairphone.com> | 2018-06-19 14:44:31 +0200 |
---|---|---|
committer | Karsten Tausche <karsten@fairphone.com> | 2018-12-13 20:05:27 +0100 |
commit | f5baf68f5a2a5c610d5b0038fd40b327c1c1bc60 (patch) | |
tree | a103c044b54e2710316bdf005b6da17fb72a8714 | |
parent | 8ab173973ba792dee5482362b28fdaccc4d2714a (diff) |
Extract result_parser.py from tradefed-runner.py
A first step to make the runner code more reusable by other
implementations.
Code from the previous result_parser() function and related constants
were moved to result_parser.py to provide the parser code in a
self-contained file. Additionally, the two printing approaches
(aggregated, atomic) are now in dedicated functions to make the code
more readable.
Change-Id: I638d6a2cf44a5569569703308172e3056030783f
Signed-off-by: Karsten Tausche <karsten@fairphone.com>
-rw-r--r-- | automated/android/tradefed/result_parser.py | 173 | ||||
-rwxr-xr-x | automated/android/tradefed/tradefed-runner.py | 139 |
2 files changed, 187 insertions, 125 deletions
diff --git a/automated/android/tradefed/result_parser.py b/automated/android/tradefed/result_parser.py new file mode 100644 index 0000000..43440ad --- /dev/null +++ b/automated/android/tradefed/result_parser.py @@ -0,0 +1,173 @@ +import collections +import logging +import os +import re +import sys +import xml.etree.ElementTree as ET + +sys.path.insert(0, '../../lib/') +import py_test_lib # nopep8 + + +class TradefedResultParser: + def __init__(self, result_output_file): + self.result_output_file = result_output_file + self.logger = logging.getLogger() + self.failures_to_print = 0 + self.results_format = 'aggregated' + self.test_result_file_name = 'test_result.xml' + + def parse_recursively(self, result_dir): + if not os.path.exists(result_dir) or not os.path.isdir(result_dir): + return False + for root, dirs, files in os.walk(result_dir): + for name in files: + if name != self.test_result_file_name: + continue + if not self.parse(os.path.join(root, name)): + return False + return True + + def parse(self, xml_file): + etree_file = open(xml_file) + etree_content = etree_file.read() + rx = re.compile("&#([0-9]+);|&#x([0-9a-fA-F]+);") + endpos = len(etree_content) + pos = 0 + while pos < endpos: + # remove characters that don't conform to XML spec + m = rx.search(etree_content, pos) + if not m: + break + mstart, mend = m.span() + target = m.group(1) + if target: + num = int(target) + else: + num = int(m.group(2), 16) + # #x9 | #xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD] | [#x10000-#x10FFFF] + if not ( + num in (0x9, 0xA, 0xD) or + 0x20 <= num <= 0xD7FF or + 0xE000 <= num <= 0xFFFD or + 0x10000 <= num <= 0x10FFFF + ): + etree_content = etree_content[:mstart] + etree_content[mend:] + endpos = len(etree_content) + pos = mend + + try: + root = ET.fromstring(etree_content) + except ET.ParseError as e: + self.logger.error('xml.etree.ElementTree.ParseError: %s' % e) + self.logger.info('Please Check %s manually' % xml_file) + return False + self.logger.info( + 'Test modules in %s: %s' + % (xml_file, str(len(root.findall('Module')))) + ) + + remaining_failures_to_print = self.failures_to_print + for elem in root.findall('Module'): + # Naming: Module Name + Test Case Name + Test Name + if 'abi' in elem.attrib.keys(): + module_name = '.'.join( + [elem.attrib['abi'], elem.attrib['name']] + ) + else: + module_name = elem.attrib['name'] + + if self.results_format == 'aggregated': + r = self.print_aggregated( + module_name, elem, remaining_failures_to_print + ) + remaining_failures_to_print -= r.num_printed_failures + if r.failures_skipped: + self.logger.info( + 'There are more than %d test cases ' + 'failed, the output for the rest ' + 'failed test cases will be ' + 'skipped.' % (self.failures_to_print) + ) + + elif self.results_format == 'atomic': + self.print_atomic(module_name, elem) + return True + + def print_aggregated(self, module_name, elem, failures_to_print): + tests_executed = len(elem.findall('.//Test')) + tests_passed = len(elem.findall('.//Test[@result="pass"]')) + tests_failed = len(elem.findall('.//Test[@result="fail"]')) + + result = '%s_executed pass %s' % (module_name, str(tests_executed)) + py_test_lib.add_result(self.result_output_file, result) + + result = '%s_passed pass %s' % (module_name, str(tests_passed)) + py_test_lib.add_result(self.result_output_file, result) + + failed_result = 'pass' + if tests_failed > 0: + failed_result = 'fail' + result = '%s_failed %s %s' % ( + module_name, + failed_result, + str(tests_failed), + ) + py_test_lib.add_result(self.result_output_file, result) + + # output result to show if the module is done or not + tests_done = elem.get('done', 'false') + if tests_done == 'false': + result = '%s_done fail' % module_name + else: + result = '%s_done pass' % module_name + py_test_lib.add_result(self.result_output_file, result) + + Result = collections.namedtuple( + 'Result', ['num_printed_failures', 'failures_skipped'] + ) + + if failures_to_print == 0: + return Result(0, False) + + # print failed test cases for debug + num_printed_failures = 0 + test_cases = elem.findall('.//TestCase') + for test_case in test_cases: + failed_tests = test_case.findall('.//Test[@result="fail"]') + for failed_test in failed_tests: + if num_printed_failures == failures_to_print: + return Result(num_printed_failures, True) + test_name = '%s/%s.%s' % ( + module_name, + test_case.get("name"), + failed_test.get("name"), + ) + failures = failed_test.findall('.//Failure') + failure_msg = '' + for failure in failures: + failure_msg = '%s \n %s' % ( + failure_msg, + failure.get('message'), + ) + + self.logger.info('%s %s' % (test_name, failure_msg.strip())) + num_printed_failures += 1 + + return Result(num_printed_failures, False) + + def print_atomic(self, module_name, elem): + test_cases = elem.findall('.//TestCase') + for test_case in test_cases: + tests = test_case.findall('.//Test') + for atomic_test in tests: + atomic_test_result = atomic_test.get("result") + atomic_test_name = "%s/%s.%s" % ( + module_name, + test_case.get("name"), + atomic_test.get("name"), + ) + py_test_lib.add_result( + self.result_output_file, + "%s %s" % (atomic_test_name, atomic_test_result), + ) diff --git a/automated/android/tradefed/tradefed-runner.py b/automated/android/tradefed/tradefed-runner.py index 36172fc..50e9723 100755 --- a/automated/android/tradefed/tradefed-runner.py +++ b/automated/android/tradefed/tradefed-runner.py @@ -1,18 +1,18 @@ #!/usr/bin/env python +import argparse import datetime +import logging import os -import re -import sys +import pexpect import shlex import shutil import subprocess -import xml.etree.ElementTree as ET -import pexpect -import argparse -import logging +import sys import time +import result_parser + sys.path.insert(0, '../../lib/') import py_test_lib # nopep8 @@ -22,118 +22,6 @@ RESULT_FILE = '%s/result.txt' % OUTPUT TRADEFED_STDOUT = '%s/tradefed-stdout.txt' % OUTPUT TRADEFED_LOGCAT = '%s/tradefed-logcat.txt' % OUTPUT TEST_PARAMS = '' -AGGREGATED = 'aggregated' -ATOMIC = 'atomic' - - -def result_parser(xml_file, result_format): - etree_file = open(xml_file, 'rb') - etree_content = etree_file.read() - rx = re.compile("&#([0-9]+);|&#x([0-9a-fA-F]+);") - endpos = len(etree_content) - pos = 0 - while pos < endpos: - # remove characters that don't conform to XML spec - m = rx.search(etree_content, pos) - if not m: - break - mstart, mend = m.span() - target = m.group(1) - if target: - num = int(target) - else: - num = int(m.group(2), 16) - # #x9 | #xA | #xD | [#x20-#xD7FF] | [#xE000-#xFFFD] | [#x10000-#x10FFFF] - if not(num in (0x9, 0xA, 0xD) or - 0x20 <= num <= 0xD7FF or - 0xE000 <= num <= 0xFFFD or - 0x10000 <= num <= 0x10FFFF): - etree_content = etree_content[:mstart] + etree_content[mend:] - endpos = len(etree_content) - pos = mend - - try: - root = ET.fromstring(etree_content) - except ET.ParseError as e: - logger.error('xml.etree.ElementTree.ParseError: %s' % e) - logger.info('Please Check %s manually' % xml_file) - sys.exit(1) - logger.info('Test modules in %s: %s' - % (xml_file, str(len(root.findall('Module'))))) - failures_count = 0 - for elem in root.findall('Module'): - # Naming: Module Name + Test Case Name + Test Name - if 'abi' in elem.attrib.keys(): - module_name = '.'.join([elem.attrib['abi'], elem.attrib['name']]) - else: - module_name = elem.attrib['name'] - - if result_format == AGGREGATED: - tests_executed = len(elem.findall('.//Test')) - tests_passed = len(elem.findall('.//Test[@result="pass"]')) - tests_failed = len(elem.findall('.//Test[@result="fail"]')) - - result = '%s_executed pass %s' % (module_name, str(tests_executed)) - py_test_lib.add_result(RESULT_FILE, result) - - result = '%s_passed pass %s' % (module_name, str(tests_passed)) - py_test_lib.add_result(RESULT_FILE, result) - - failed_result = 'pass' - if tests_failed > 0: - failed_result = 'fail' - result = '%s_failed %s %s' % (module_name, failed_result, - str(tests_failed)) - py_test_lib.add_result(RESULT_FILE, result) - - # output result to show if the module is done or not - tests_done = elem.get('done', 'false') - if tests_done == 'false': - result = '%s_done fail' % module_name - else: - result = '%s_done pass' % module_name - py_test_lib.add_result(RESULT_FILE, result) - - if args.FAILURES_PRINTED > 0 and failures_count < args.FAILURES_PRINTED: - # print failed test cases for debug - test_cases = elem.findall('.//TestCase') - for test_case in test_cases: - failed_tests = test_case.findall('.//Test[@result="fail"]') - for failed_test in failed_tests: - test_name = '%s/%s.%s' % (module_name, - test_case.get("name"), - failed_test.get("name")) - failures = failed_test.findall('.//Failure') - failure_msg = '' - for failure in failures: - failure_msg = '%s \n %s' % (failure_msg, - failure.get('message')) - - logger.info('%s %s' % (test_name, failure_msg.strip())) - failures_count = failures_count + 1 - if failures_count > args.FAILURES_PRINTED: - logger.info('There are more than %d test cases ' - 'failed, the output for the rest ' - 'failed test cases will be ' - 'skipped.' % (args.FAILURES_PRINTED)) - #break the for loop of failed_tests - break - if failures_count > args.FAILURES_PRINTED: - #break the for loop of test_cases - break - - if result_format == ATOMIC: - test_cases = elem.findall('.//TestCase') - for test_case in test_cases: - tests = test_case.findall('.//Test') - for atomic_test in tests: - atomic_test_result = atomic_test.get("result") - atomic_test_name = "%s/%s.%s" % (module_name, - test_case.get("name"), - atomic_test.get("name")) - py_test_lib.add_result( - RESULT_FILE, "%s %s" % (atomic_test_name, - atomic_test_result)) parser = argparse.ArgumentParser() @@ -142,7 +30,8 @@ parser.add_argument('-t', dest='TEST_PARAMS', required=True, parser.add_argument('-p', dest='TEST_PATH', required=True, help="path to tradefed package top directory") parser.add_argument('-r', dest='RESULTS_FORMAT', required=False, - default=AGGREGATED, choices=[AGGREGATED, ATOMIC], + default='aggregated', + choices=['aggregated', 'atomic'], help="The format of the saved results. 'aggregated' means number of \ passed and failed tests are recorded for each module. 'atomic' means \ each test result is recorded separately") @@ -284,9 +173,9 @@ if vts_monitor_enabled: # Locate and parse test result. result_dir = '%s/results' % args.TEST_PATH -test_result = 'test_result.xml' -if os.path.exists(result_dir) and os.path.isdir(result_dir): - for root, dirs, files in os.walk(result_dir): - for name in files: - if name == test_result: - result_parser(os.path.join(root, name), args.RESULTS_FORMAT) +parser = result_parser.TradefedResultParser(RESULT_FILE) +parser.logger = logger +parser.results_format = args.RESULTS_FORMAT +parser.failures_to_print = args.FAILURES_PRINTED +success = parser.parse_recursively(result_dir) +sys.exit(0 if success else 1) |