summaryrefslogtreecommitdiffstats
path: root/meta/lib/oeqa/core/runner.py
diff options
context:
space:
mode:
Diffstat (limited to 'meta/lib/oeqa/core/runner.py')
-rw-r--r--meta/lib/oeqa/core/runner.py247
1 files changed, 174 insertions, 73 deletions
diff --git a/meta/lib/oeqa/core/runner.py b/meta/lib/oeqa/core/runner.py
index 13cdf5ba523..54efdd0df5c 100644
--- a/meta/lib/oeqa/core/runner.py
+++ b/meta/lib/oeqa/core/runner.py
@@ -6,17 +6,11 @@ import time
import unittest
import logging
import re
+import json
+import pathlib
-xmlEnabled = False
-try:
- import xmlrunner
- from xmlrunner.result import _XMLTestResult as _TestResult
- from xmlrunner.runner import XMLTestRunner as _TestRunner
- xmlEnabled = True
-except ImportError:
- # use the base runner instead
- from unittest import TextTestResult as _TestResult
- from unittest import TextTestRunner as _TestRunner
+from unittest import TextTestResult as _TestResult
+from unittest import TextTestRunner as _TestRunner
class OEStreamLogger(object):
def __init__(self, logger):
@@ -42,22 +36,38 @@ class OETestResult(_TestResult):
def __init__(self, tc, *args, **kwargs):
super(OETestResult, self).__init__(*args, **kwargs)
+ self.successes = []
+ self.starttime = {}
+ self.endtime = {}
+ self.progressinfo = {}
+
+ # Inject into tc so that TestDepends decorator can see results
+ tc.results = self
+
self.tc = tc
- self._tc_map_results()
+
+ self.result_types = ['failures', 'errors', 'skipped', 'expectedFailures', 'successes']
+ self.result_desc = ['FAILED', 'ERROR', 'SKIPPED', 'EXPECTEDFAIL', 'PASSED']
def startTest(self, test):
- # Allow us to trigger the testcase buffer mode on a per test basis
- # so stdout/stderr are only printed upon failure. Enables debugging
- # but clean output
- if hasattr(test, "buffer"):
- self.buffer = test.buffer
+ # May have been set by concurrencytest
+ if test.id() not in self.starttime:
+ self.starttime[test.id()] = time.time()
super(OETestResult, self).startTest(test)
- def _tc_map_results(self):
- self.tc._results['failures'] = self.failures
- self.tc._results['errors'] = self.errors
- self.tc._results['skipped'] = self.skipped
- self.tc._results['expectedFailures'] = self.expectedFailures
+ def stopTest(self, test):
+ self.endtime[test.id()] = time.time()
+ super(OETestResult, self).stopTest(test)
+ if test.id() in self.progressinfo:
+ self.tc.logger.info(self.progressinfo[test.id()])
+
+ # Print the errors/failures early to aid/speed debugging, its a pain
+ # to wait until selftest finishes to see them.
+ for t in ['failures', 'errors', 'skipped', 'expectedFailures']:
+ for (scase, msg) in getattr(self, t):
+ if test.id() == scase.id():
+ self.tc.logger.info(str(msg))
+ break
def logSummary(self, component, context_msg=''):
elapsed_time = self.tc._run_end_time - self.tc._run_start_time
@@ -70,28 +80,19 @@ class OETestResult(_TestResult):
msg = "%s - OK - All required tests passed" % component
else:
msg = "%s - FAIL - Required tests failed" % component
- skipped = len(self.tc._results['skipped'])
+ skipped = len(self.skipped)
if skipped:
msg += " (skipped=%d)" % skipped
self.tc.logger.info(msg)
- def _getDetailsNotPassed(self, case, type, desc):
+ def _isTestResultContainTestCaseWithResultTypeProvided(self, case, type):
found = False
- for (scase, msg) in self.tc._results[type]:
- # XXX: When XML reporting is enabled scase is
- # xmlrunner.result._TestInfo instance instead of
- # string.
- if xmlEnabled:
- if case.id() == scase.test_id:
- found = True
- break
- scase_str = scase.test_id
- else:
- if case == scase:
- found = True
- break
- scase_str = str(scase)
+ for (scase, msg) in getattr(self, type):
+ if case.id() == scase.id():
+ found = True
+ break
+ scase_str = str(scase.id())
# When fails at module or class level the class name is passed as string
# so figure out to see if match
@@ -115,21 +116,22 @@ class OETestResult(_TestResult):
return (found, None)
+ def addSuccess(self, test):
+ #Added so we can keep track of successes too
+ self.successes.append((test, None))
+ super(OETestResult, self).addSuccess(test)
+
def logDetails(self):
self.tc.logger.info("RESULTS:")
for case_name in self.tc._registry['cases']:
case = self.tc._registry['cases'][case_name]
- result_types = ['failures', 'errors', 'skipped', 'expectedFailures']
- result_desc = ['FAILED', 'ERROR', 'SKIPPED', 'EXPECTEDFAIL']
-
- fail = False
+ found = False
desc = None
- for idx, name in enumerate(result_types):
- (fail, msg) = self._getDetailsNotPassed(case, result_types[idx],
- result_desc[idx])
- if fail:
- desc = result_desc[idx]
+ for idx, name in enumerate(self.result_types):
+ (found, msg) = self._isTestResultContainTestCaseWithResultTypeProvided(case, self.result_types[idx])
+ if found:
+ desc = self.result_desc[idx]
break
oeid = -1
@@ -138,12 +140,46 @@ class OETestResult(_TestResult):
if hasattr(d, 'oeid'):
oeid = d.oeid
- if fail:
- self.tc.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(),
- oeid, desc))
+ t = ""
+ if case.id() in self.starttime and case.id() in self.endtime:
+ t = " (" + "{0:.2f}".format(self.endtime[case.id()] - self.starttime[case.id()]) + "s)"
+
+ if found:
+ self.tc.logger.info("RESULTS - %s - Testcase %s: %s%s" % (case.id(),
+ oeid, desc, t))
+ else:
+ self.tc.logger.info("RESULTS - %s - Testcase %s: %s%s" % (case.id(),
+ oeid, 'UNKNOWN', t))
+
+ def _get_testcase_result_and_testmessage_dict(self):
+ testcase_result_dict = {}
+ testcase_testmessage_dict = {}
+ for case_name in self.tc._registry['cases']:
+ case = self.tc._registry['cases'][case_name]
+
+ found = False
+ desc = None
+ test_msg = ''
+ for idx, name in enumerate(self.result_types):
+ (found, msg) = self._isTestResultContainTestCaseWithResultTypeProvided(case, self.result_types[idx])
+ if found:
+ desc = self.result_desc[idx]
+ test_msg = msg
+ break
+
+ if found:
+ testcase_result_dict[case.id()] = desc
+ testcase_testmessage_dict[case.id()] = test_msg
else:
- self.tc.logger.info("RESULTS - %s - Testcase %s: %s" % (case.id(),
- oeid, 'PASSED'))
+ testcase_result_dict[case.id()] = "UNKNOWN"
+ return testcase_result_dict, testcase_testmessage_dict
+
+ def logDetailsInJson(self, file_dir):
+ (testcase_result_dict, testcase_testmessage_dict) = self._get_testcase_result_and_testmessage_dict()
+ if len(testcase_result_dict) > 0 and len(testcase_testmessage_dict) > 0:
+ jsontresulthelper = OEJSONTestResultHelper(testcase_result_dict, testcase_testmessage_dict)
+ jsontresulthelper.write_json_testresult_files(file_dir)
+ jsontresulthelper.write_testcase_log_files(os.path.join(file_dir, 'logs'))
class OEListTestsResult(object):
def wasSuccessful(self):
@@ -153,33 +189,14 @@ class OETestRunner(_TestRunner):
streamLoggerClass = OEStreamLogger
def __init__(self, tc, *args, **kwargs):
- if xmlEnabled:
- if not kwargs.get('output'):
- kwargs['output'] = os.path.join(os.getcwd(),
- 'TestResults_%s_%s' % (time.strftime("%Y%m%d%H%M%S"), os.getpid()))
-
kwargs['stream'] = self.streamLoggerClass(tc.logger)
super(OETestRunner, self).__init__(*args, **kwargs)
self.tc = tc
self.resultclass = OETestResult
- # XXX: The unittest-xml-reporting package defines _make_result method instead
- # of _makeResult standard on unittest.
- if xmlEnabled:
- def _make_result(self):
- """
- Creates a TestResult object which will be used to store
- information about the executed tests.
- """
- # override in subclasses if necessary.
- return self.resultclass(self.tc,
- self.stream, self.descriptions, self.verbosity, self.elapsed_times
- )
- else:
- def _makeResult(self):
- return self.resultclass(self.tc, self.stream, self.descriptions,
- self.verbosity)
-
+ def _makeResult(self):
+ return self.resultclass(self.tc, self.stream, self.descriptions,
+ self.verbosity)
def _walk_suite(self, suite, func):
for obj in suite:
@@ -275,3 +292,87 @@ class OETestRunner(_TestRunner):
self._list_tests_module(suite)
return OEListTestsResult()
+
+class OEJSONTestResultHelper(object):
+ def __init__(self, testcase_result_dict, testcase_log_dict):
+ self.testcase_result_dict = testcase_result_dict
+ self.testcase_log_dict = testcase_log_dict
+
+ def get_testcase_list(self):
+ return self.testcase_result_dict.keys()
+
+ def get_testsuite_from_testcase(self, testcase):
+ testsuite = testcase[0:testcase.rfind(".")]
+ return testsuite
+
+ def get_testmodule_from_testsuite(self, testsuite):
+ testmodule = testsuite[0:testsuite.find(".")]
+ return testmodule
+
+ def get_testsuite_testcase_dictionary(self):
+ testsuite_testcase_dict = {}
+ for testcase in self.get_testcase_list():
+ testsuite = self.get_testsuite_from_testcase(testcase)
+ if testsuite in testsuite_testcase_dict:
+ testsuite_testcase_dict[testsuite].append(testcase)
+ else:
+ testsuite_testcase_dict[testsuite] = [testcase]
+ return testsuite_testcase_dict
+
+ def get_testmodule_testsuite_dictionary(self, testsuite_testcase_dict):
+ testsuite_list = testsuite_testcase_dict.keys()
+ testmodule_testsuite_dict = {}
+ for testsuite in testsuite_list:
+ testmodule = self.get_testmodule_from_testsuite(testsuite)
+ if testmodule in testmodule_testsuite_dict:
+ testmodule_testsuite_dict[testmodule].append(testsuite)
+ else:
+ testmodule_testsuite_dict[testmodule] = [testsuite]
+ return testmodule_testsuite_dict
+
+ def _get_testcase_result(self, testcase, testcase_status_dict):
+ if testcase in testcase_status_dict:
+ return testcase_status_dict[testcase]
+ return ""
+
+ def _create_testcase_testresult_object(self, testcase_list, testcase_result_dict):
+ testcase_dict = {}
+ for testcase in sorted(testcase_list):
+ result = self._get_testcase_result(testcase, testcase_result_dict)
+ testcase_dict[testcase] = {"testresult": result}
+ return testcase_dict
+
+ def _create_json_testsuite_string(self, testsuite_list, testsuite_testcase_dict, testcase_result_dict):
+ testsuite_object = {'testsuite': {}}
+ testsuite_dict = testsuite_object['testsuite']
+ for testsuite in sorted(testsuite_list):
+ testsuite_dict[testsuite] = {'testcase': {}}
+ testsuite_dict[testsuite]['testcase'] = self._create_testcase_testresult_object(
+ testsuite_testcase_dict[testsuite],
+ testcase_result_dict)
+ return json.dumps(testsuite_object, sort_keys=True, indent=4)
+
+ def write_json_testresult_files(self, write_dir):
+ if not os.path.exists(write_dir):
+ pathlib.Path(write_dir).mkdir(parents=True, exist_ok=True)
+ testsuite_testcase_dict = self.get_testsuite_testcase_dictionary()
+ testmodule_testsuite_dict = self.get_testmodule_testsuite_dictionary(testsuite_testcase_dict)
+ for testmodule in testmodule_testsuite_dict.keys():
+ testsuite_list = testmodule_testsuite_dict[testmodule]
+ json_testsuite = self._create_json_testsuite_string(testsuite_list, testsuite_testcase_dict,
+ self.testcase_result_dict)
+ file_name = '%s.json' % testmodule
+ file_path = os.path.join(write_dir, file_name)
+ with open(file_path, 'w') as the_file:
+ the_file.write(json_testsuite)
+
+ def write_testcase_log_files(self, write_dir):
+ if not os.path.exists(write_dir):
+ pathlib.Path(write_dir).mkdir(parents=True, exist_ok=True)
+ for testcase in self.testcase_log_dict.keys():
+ test_log = self.testcase_log_dict[testcase]
+ if test_log is not None:
+ file_name = '%s.log' % testcase
+ file_path = os.path.join(write_dir, file_name)
+ with open(file_path, 'w') as the_file:
+ the_file.write(test_log)