diff options
Diffstat (limited to 'meta/lib/oeqa/core/runner.py')
-rw-r--r-- | meta/lib/oeqa/core/runner.py | 247 |
1 files changed, 174 insertions, 73 deletions
diff --git a/meta/lib/oeqa/core/runner.py b/meta/lib/oeqa/core/runner.py index 13cdf5ba523..54efdd0df5c 100644 --- a/meta/lib/oeqa/core/runner.py +++ b/meta/lib/oeqa/core/runner.py @@ -6,17 +6,11 @@ import time import unittest import logging import re +import json +import pathlib -xmlEnabled = False -try: - import xmlrunner - from xmlrunner.result import _XMLTestResult as _TestResult - from xmlrunner.runner import XMLTestRunner as _TestRunner - xmlEnabled = True -except ImportError: - # use the base runner instead - from unittest import TextTestResult as _TestResult - from unittest import TextTestRunner as _TestRunner +from unittest import TextTestResult as _TestResult +from unittest import TextTestRunner as _TestRunner class OEStreamLogger(object): def __init__(self, logger): @@ -42,22 +36,38 @@ class OETestResult(_TestResult): def __init__(self, tc, *args, **kwargs): super(OETestResult, self).__init__(*args, **kwargs) + self.successes = [] + self.starttime = {} + self.endtime = {} + self.progressinfo = {} + + # Inject into tc so that TestDepends decorator can see results + tc.results = self + self.tc = tc - self._tc_map_results() + + self.result_types = ['failures', 'errors', 'skipped', 'expectedFailures', 'successes'] + self.result_desc = ['FAILED', 'ERROR', 'SKIPPED', 'EXPECTEDFAIL', 'PASSED'] def startTest(self, test): - # Allow us to trigger the testcase buffer mode on a per test basis - # so stdout/stderr are only printed upon failure. Enables debugging - # but clean output - if hasattr(test, "buffer"): - self.buffer = test.buffer + # May have been set by concurrencytest + if test.id() not in self.starttime: + self.starttime[test.id()] = time.time() super(OETestResult, self).startTest(test) - def _tc_map_results(self): - self.tc._results['failures'] = self.failures - self.tc._results['errors'] = self.errors - self.tc._results['skipped'] = self.skipped - self.tc._results['expectedFailures'] = self.expectedFailures + def stopTest(self, test): + self.endtime[test.id()] = time.time() + super(OETestResult, self).stopTest(test) + if test.id() in self.progressinfo: + self.tc.logger.info(self.progressinfo[test.id()]) + + # Print the errors/failures early to aid/speed debugging, its a pain + # to wait until selftest finishes to see them. + for t in ['failures', 'errors', 'skipped', 'expectedFailures']: + for (scase, msg) in getattr(self, t): + if test.id() == scase.id(): + self.tc.logger.info(str(msg)) + break def logSummary(self, component, context_msg=''): elapsed_time = self.tc._run_end_time - self.tc._run_start_time @@ -70,28 +80,19 @@ class OETestResult(_TestResult): msg = "%s - OK - All required tests passed" % component else: msg = "%s - FAIL - Required tests failed" % component - skipped = len(self.tc._results['skipped']) + skipped = len(self.skipped) if skipped: msg += " (skipped=%d)" % skipped self.tc.logger.info(msg) - def _getDetailsNotPassed(self, case, type, desc): + def _isTestResultContainTestCaseWithResultTypeProvided(self, case, type): found = False - for (scase, msg) in self.tc._results[type]: - # XXX: When XML reporting is enabled scase is - # xmlrunner.result._TestInfo instance instead of - # string. - if xmlEnabled: - if case.id() == scase.test_id: - found = True - break - scase_str = scase.test_id - else: - if case == scase: - found = True - break - scase_str = str(scase) + for (scase, msg) in getattr(self, type): + if case.id() == scase.id(): + found = True + break + scase_str = str(scase.id()) # When fails at module or class level the class name is passed as string # so figure out to see if match @@ -115,21 +116,22 @@ class OETestResult(_TestResult): return (found, None) + def addSuccess(self, test): + #Added so we can keep track of successes too + self.successes.append((test, None)) + super(OETestResult, self).addSuccess(test) + def logDetails(self): self.tc.logger.info("RESULTS:") for case_name in self.tc._registry['cases']: case = self.tc._registry['cases'][case_name] - result_types = ['failures', 'errors', 'skipped', 'expectedFailures'] - result_desc = ['FAILED', 'ERROR', 'SKIPPED', 'EXPECTEDFAIL'] - - fail = False + found = False desc = None - for idx, name in enumerate(result_types): - (fail, msg) = self._getDetailsNotPassed(case, result_types[idx], - result_desc[idx]) - if fail: - desc = result_desc[idx] + for idx, name in enumerate(self.result_types): + (found, msg) = self._isTestResultContainTestCaseWithResultTypeProvided(case, self.result_types[idx]) + if found: + desc = self.result_desc[idx] break oeid = -1 @@ -138,12 +140,46 @@ class OETestResult(_TestResult): if hasattr(d, 'oeid'): oeid = d.oeid - if fail: - self.tc.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(), - oeid, desc)) + t = "" + if case.id() in self.starttime and case.id() in self.endtime: + t = " (" + "{0:.2f}".format(self.endtime[case.id()] - self.starttime[case.id()]) + "s)" + + if found: + self.tc.logger.info("RESULTS - %s - Testcase %s: %s%s" % (case.id(), + oeid, desc, t)) + else: + self.tc.logger.info("RESULTS - %s - Testcase %s: %s%s" % (case.id(), + oeid, 'UNKNOWN', t)) + + def _get_testcase_result_and_testmessage_dict(self): + testcase_result_dict = {} + testcase_testmessage_dict = {} + for case_name in self.tc._registry['cases']: + case = self.tc._registry['cases'][case_name] + + found = False + desc = None + test_msg = '' + for idx, name in enumerate(self.result_types): + (found, msg) = self._isTestResultContainTestCaseWithResultTypeProvided(case, self.result_types[idx]) + if found: + desc = self.result_desc[idx] + test_msg = msg + break + + if found: + testcase_result_dict[case.id()] = desc + testcase_testmessage_dict[case.id()] = test_msg else: - self.tc.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(), - oeid, 'PASSED')) + testcase_result_dict[case.id()] = "UNKNOWN" + return testcase_result_dict, testcase_testmessage_dict + + def logDetailsInJson(self, file_dir): + (testcase_result_dict, testcase_testmessage_dict) = self._get_testcase_result_and_testmessage_dict() + if len(testcase_result_dict) > 0 and len(testcase_testmessage_dict) > 0: + jsontresulthelper = OEJSONTestResultHelper(testcase_result_dict, testcase_testmessage_dict) + jsontresulthelper.write_json_testresult_files(file_dir) + jsontresulthelper.write_testcase_log_files(os.path.join(file_dir, 'logs')) class OEListTestsResult(object): def wasSuccessful(self): @@ -153,33 +189,14 @@ class OETestRunner(_TestRunner): streamLoggerClass = OEStreamLogger def __init__(self, tc, *args, **kwargs): - if xmlEnabled: - if not kwargs.get('output'): - kwargs['output'] = os.path.join(os.getcwd(), - 'TestResults_%s_%s' % (time.strftime("%Y%m%d%H%M%S"), os.getpid())) - kwargs['stream'] = self.streamLoggerClass(tc.logger) super(OETestRunner, self).__init__(*args, **kwargs) self.tc = tc self.resultclass = OETestResult - # XXX: The unittest-xml-reporting package defines _make_result method instead - # of _makeResult standard on unittest. - if xmlEnabled: - def _make_result(self): - """ - Creates a TestResult object which will be used to store - information about the executed tests. - """ - # override in subclasses if necessary. - return self.resultclass(self.tc, - self.stream, self.descriptions, self.verbosity, self.elapsed_times - ) - else: - def _makeResult(self): - return self.resultclass(self.tc, self.stream, self.descriptions, - self.verbosity) - + def _makeResult(self): + return self.resultclass(self.tc, self.stream, self.descriptions, + self.verbosity) def _walk_suite(self, suite, func): for obj in suite: @@ -275,3 +292,87 @@ class OETestRunner(_TestRunner): self._list_tests_module(suite) return OEListTestsResult() + +class OEJSONTestResultHelper(object): + def __init__(self, testcase_result_dict, testcase_log_dict): + self.testcase_result_dict = testcase_result_dict + self.testcase_log_dict = testcase_log_dict + + def get_testcase_list(self): + return self.testcase_result_dict.keys() + + def get_testsuite_from_testcase(self, testcase): + testsuite = testcase[0:testcase.rfind(".")] + return testsuite + + def get_testmodule_from_testsuite(self, testsuite): + testmodule = testsuite[0:testsuite.find(".")] + return testmodule + + def get_testsuite_testcase_dictionary(self): + testsuite_testcase_dict = {} + for testcase in self.get_testcase_list(): + testsuite = self.get_testsuite_from_testcase(testcase) + if testsuite in testsuite_testcase_dict: + testsuite_testcase_dict[testsuite].append(testcase) + else: + testsuite_testcase_dict[testsuite] = [testcase] + return testsuite_testcase_dict + + def get_testmodule_testsuite_dictionary(self, testsuite_testcase_dict): + testsuite_list = testsuite_testcase_dict.keys() + testmodule_testsuite_dict = {} + for testsuite in testsuite_list: + testmodule = self.get_testmodule_from_testsuite(testsuite) + if testmodule in testmodule_testsuite_dict: + testmodule_testsuite_dict[testmodule].append(testsuite) + else: + testmodule_testsuite_dict[testmodule] = [testsuite] + return testmodule_testsuite_dict + + def _get_testcase_result(self, testcase, testcase_status_dict): + if testcase in testcase_status_dict: + return testcase_status_dict[testcase] + return "" + + def _create_testcase_testresult_object(self, testcase_list, testcase_result_dict): + testcase_dict = {} + for testcase in sorted(testcase_list): + result = self._get_testcase_result(testcase, testcase_result_dict) + testcase_dict[testcase] = {"testresult": result} + return testcase_dict + + def _create_json_testsuite_string(self, testsuite_list, testsuite_testcase_dict, testcase_result_dict): + testsuite_object = {'testsuite': {}} + testsuite_dict = testsuite_object['testsuite'] + for testsuite in sorted(testsuite_list): + testsuite_dict[testsuite] = {'testcase': {}} + testsuite_dict[testsuite]['testcase'] = self._create_testcase_testresult_object( + testsuite_testcase_dict[testsuite], + testcase_result_dict) + return json.dumps(testsuite_object, sort_keys=True, indent=4) + + def write_json_testresult_files(self, write_dir): + if not os.path.exists(write_dir): + pathlib.Path(write_dir).mkdir(parents=True, exist_ok=True) + testsuite_testcase_dict = self.get_testsuite_testcase_dictionary() + testmodule_testsuite_dict = self.get_testmodule_testsuite_dictionary(testsuite_testcase_dict) + for testmodule in testmodule_testsuite_dict.keys(): + testsuite_list = testmodule_testsuite_dict[testmodule] + json_testsuite = self._create_json_testsuite_string(testsuite_list, testsuite_testcase_dict, + self.testcase_result_dict) + file_name = '%s.json' % testmodule + file_path = os.path.join(write_dir, file_name) + with open(file_path, 'w') as the_file: + the_file.write(json_testsuite) + + def write_testcase_log_files(self, write_dir): + if not os.path.exists(write_dir): + pathlib.Path(write_dir).mkdir(parents=True, exist_ok=True) + for testcase in self.testcase_log_dict.keys(): + test_log = self.testcase_log_dict[testcase] + if test_log is not None: + file_name = '%s.log' % testcase + file_path = os.path.join(write_dir, file_name) + with open(file_path, 'w') as the_file: + the_file.write(test_log) |