import base64 import datetime import fnmatch import json import os import string import re import subprocess import xmlrpclib import time from util import * from subunitresults import SubunitResults SUBUNIT_RESULTS_FILE = "results.subunit" ALL_TESTS_FILE = "all-tests.txt" ENABLED_SERVICES_FILE = "enabled-services.txt" DISABLED_SERVICES_FILE = "disabled-services.txt" # additional status to set on a skipped test that we don't care about IGNORE = "ignore" class BundleStore(object): def __init__(self, root_path): self.root_path = root_path def bundle_list(self): dirs = [] root = os.path.expanduser(self.root_path) for subdir in os.listdir(root): if os.path.isdir(os.path.join(root, subdir)): dirs.append(subdir) return dirs def is_bundle_present(self, bundle): if not isinstance(bundle, Bundle): raise Exception("argument is not a Bundle") bundles = self.bundle_list() return bundle.sha1 in bundles def write_bundle_receipt(self, bundle, include_data=False): if not isinstance(bundle, Bundle): raise Exception("argument is not a Bundle") root = os.path.expanduser(self.root_path) bundle_root = os.path.join(root, bundle.sha1) create_dir(bundle_root) full_meta_path = os.path.join(bundle_root, "metadata") with open(full_meta_path, "w") as f: f.write(json.dumps(bundle.entry_data, default=json_serial)) if include_data: full_data_path = os.path.join(bundle_root, "data") with open(full_data_path, "w") as f: f.write(json.dumps(bundle.data, default=json_serial)) class Bundle(object): def __init__(self, sha1, entry_data, data): self.metadata = None # not populated until expand() completes self.entry_data = entry_data self.data = json.loads(data) self.sha1 = sha1 self.upload_date = entry_data["uploaded_on"] self.lava_job_id = entry_data["associated_job"] self.subdir_whitelist = "logs" self.subdir_unimportant = "xtra" self.tests_run = None self.all_tests = None self.failing_tests = None self.passing_tests = None self.all_skipped_tests = None # original unfiltered list of skipped tests self.skipped_tests = None # unexpected skips self.ignored_tests = None # expected skips self.enabled_services = None self.disabled_services = None # these are files that will be present in attachments for a test run that # are not to be extracted self.skip_files = [ "testdef.yaml", "return_code", "run.sh", "lava-results.sh" ] # # expand - process the bundle data, extracting and moving files to their # desired locations # def expand(self, whitelist, output_root): # quit if there is not a valid LAVA job ID for it if self.lava_job_id == "NA": print "Invalid LAVA Job ID - skipping" return # attempt to retrieve attributes from the first test in the bundle if "attributes" in self.data["test_runs"][0]: attributes = self.data["test_runs"][0]["attributes"] else: print "Invalid bundle data - no test attributes present" return if "testdef_metadata" in self.data["test_runs"][0]: testdef_metadata = self.data["test_runs"][0]["testdef_metadata"] else: print "Invalid bundle data - no testdef metadata present" return # start building up the metadata to write out (and return) about the bundle bundle_metadata = {} bundle_metadata['bundle_sha1'] = self.sha1 bundle_metadata['lava_job_id'] = self.lava_job_id bundle_metadata['date_uploaded'] = self.upload_date.strftime('%Y-%m-%dT%H:%M:%S') bundle_metadata['timestamp_uploaded'] = int(time.mktime(self.upload_date.timetuple())) bundle_metadata['lava_job_attributes'] = attributes bundle_metadata['lava_testdef_metadata'] = testdef_metadata # create the names for the root directories output_subdir = str(self.lava_job_id) #if "os-distro" in attributes and "os-version" in attributes and "devstack-branch" in attributes: # output_subdir = "%s,os=%s,osver=%s,branch=%s" % \ # (self.lava_job_id, attributes["os-distro"], attributes["os-version"], attributes["devstack-branch"]) full_root_path = os.path.join(os.path.expanduser(output_root), output_subdir) whitelist_path = os.path.join(full_root_path, self.subdir_whitelist) unimp_path = os.path.join(full_root_path, self.subdir_unimportant) print "storing output here: { root = '%s', whitelist = '%s', unimportant = '%s' }" % \ (full_root_path, whitelist_path, unimp_path) # create the root and top-level subdirectories create_dir(full_root_path) create_dir(whitelist_path) create_dir(unimp_path) # loop through all of the tests in the bundle for test_run in self.data["test_runs"]: test_id = test_run["test_id"] print "processing test [%s]" % test_id # create directories if necessary test_root_path = os.path.join(unimp_path, test_id) create_dir(test_root_path) # see if there is a whitelist specified for the test test_whitelist = None if test_id in whitelist: test_whitelist = whitelist[test_id] # process attachments if there are any if "attachments" in test_run: for attachment in test_run["attachments"]: matching_whitelist_filter = None filename = attachment["pathname"] if not filename in self.skip_files: if test_whitelist: # see if the file matches one of the whitelist patterns for filter in test_whitelist: pattern = filter["src"] if fnmatch.fnmatch(filename, pattern): matching_whitelist_filter = filter break # build the full path to the output file, assuming it # has not been whitelisted full_file_path = os.path.join(test_root_path, filename) if matching_whitelist_filter: filename2 = filename # the file has been whitelisted -- see if it is supposed # to be renamed if 'new-name' in matching_whitelist_filter: filename2 = matching_whitelist_filter['new-name'] # build the full path for the whitelisted file full_file_path = os.path.join(whitelist_path, filename2) # create the directory if necessary dir_name = os.path.dirname(full_file_path) create_dir(dir_name) # finally - write the attachment with open(full_file_path, "w") as f: decoded_data = base64.b64decode(attachment["content"]) f.write(decoded_data) # check for [enabled|disabled]-services.txt and process self.process_enabled_disabled_services(whitelist_path) # check for results.subunit in whitelisted files and process self.process_subunit(whitelist_path) # use the list of enabled/disabled services and the skipped tests data and filter # to determine the skipped tests that need to be ignored self.process_skipped_tests() # get final list of whitelisted files and add to the metadata whitelist_file_list = get_recursive_file_list(whitelist_path) bundle_metadata["file_list"] = whitelist_file_list # write the metadata with open(os.path.join(full_root_path, "metadata.json"), "w") as f: json.dump(bundle_metadata, f) # touch the directory with the original creation date subprocess.check_output(["touch", "--date=%s" % self.upload_date, full_root_path]) self.metadata = bundle_metadata return bundle_metadata def process_skipped_tests(self): self.skipped_tests = [] self.ignored_tests = [] if not self.all_skipped_tests: return disabled_service_check_pattern = None if self.disabled_services: disabled_services_list_copy = list(self.disabled_services) # this is unfortunate, but necessary due to inconsistent message text in tempest if "cinder" in disabled_services_list_copy: disabled_services_list_copy.append("volume service") # build up the pattern to check for tests that are skipped due to a service not being enabled reasons_string = "is not available|not available|support is required|does not support|does not check|not available|feature is disabled|feature disabled|is not supported|is not enabled" disabled_service_check_pattern = ".*(" + "|".join(disabled_services_list_copy) + ") (" + reasons_string + ").*" neutron_check_pattern = None if self.enabled_services: if "neutron" in self.enabled_services: # additional regular expression needed for when neutron is enabled neutron_check_pattern = ".*((Neutron) (allows|does not check|does not support|is available)|nova-network is not available).*" # regular expression pattern to check for tests that are skipped or disabled due to a bug bug_disabled_test_pattern = ".*(Bug|tests are disabled).*" # other patterns that indicate a skip that can be ignored other_ignore_pattern = ".*(feature is disabled|not available|is not supported).*" for test in self.all_skipped_tests: ignore_test = False if disabled_service_check_pattern and re.match(disabled_service_check_pattern, test["reason"], re.IGNORECASE): ignore_test = True elif re.match(bug_disabled_test_pattern, test["reason"], re.IGNORECASE): ignore_test = True elif re.match(other_ignore_pattern, test["reason"], re.IGNORECASE): ignore_test = True elif neutron_check_pattern and re.match(neutron_check_pattern, test["reason"], re.IGNORECASE): ignore_test = True if ignore_test: test["status"] = IGNORE self.ignored_tests.append(test) else: self.skipped_tests.append(test) def process_subunit(self, whitelist_path): subunit_stream_path = os.path.join(whitelist_path, SUBUNIT_RESULTS_FILE) all_tests_path = os.path.join(whitelist_path, ALL_TESTS_FILE) if not os.path.exists(subunit_stream_path): return None if not os.path.exists(all_tests_path): return None r = SubunitResults(subunit_stream_path, all_tests_path) self.tests_run = r.get_tests_run() with open(os.path.join(whitelist_path, "tests-run.json"), "w") as f: json.dump(self.tests_run, f) self.all_tests = r.get_all_tests() with open(os.path.join(whitelist_path, "tests-all.json"), "w") as f: json.dump(self.all_tests, f) self.failing_tests = r.get_failing_tests() with open(os.path.join(whitelist_path, "tests-failing.json"), "w") as f: json.dump(self.failing_tests, f) self.passing_tests = r.get_passing_tests() with open(os.path.join(whitelist_path, "tests-passing.json"), "w") as f: json.dump(self.passing_tests, f) self.all_skipped_tests = r.get_skipped_tests() with open(os.path.join(whitelist_path, "tests-skipped.json"), "w") as f: json.dump(self.all_skipped_tests, f) def process_enabled_disabled_services(self, whitelist_path): enabled_services_path = os.path.join(whitelist_path, ENABLED_SERVICES_FILE) disabled_services_path = os.path.join(whitelist_path, DISABLED_SERVICES_FILE) self.enabled_services = None if os.path.exists(enabled_services_path): with open(enabled_services_path, "r") as f: self.enabled_services = [x.strip() for x in f.readlines()] self.disabled_services = None if os.path.exists(disabled_services_path): with open(disabled_services_path, "r") as f: self.disabled_services = [x.strip() for x in f.readlines()] # this is to prevent our averages from being skewed because the data isn't available for past runs. this # is approximately what the list should have been for old test runs if not self.enabled_services: self.enabled_services = [ "glance", "heat", "horizon", "key", "nova-network", "nova", "postgresql", "sahara" ] if not self.disabled_services: self.disabled_services = [ "neutron", "ceilometer", "cinder", "dstat", "ironic", "mysql", "rabbit", "trove", "zaqar", "zaqar-server" ]