diff options
-rw-r--r-- | meta/lib/oeqa/utils/decorators.py | 297 |
1 files changed, 0 insertions, 297 deletions
diff --git a/meta/lib/oeqa/utils/decorators.py b/meta/lib/oeqa/utils/decorators.py deleted file mode 100644 index aabf4110cbe..00000000000 --- a/meta/lib/oeqa/utils/decorators.py +++ /dev/null @@ -1,297 +0,0 @@ -# -# Copyright (C) 2013 Intel Corporation -# -# SPDX-License-Identifier: MIT -# - -# Some custom decorators that can be used by unittests -# Most useful is skipUnlessPassed which can be used for -# creating dependecies between two test methods. - -import os -import logging -import sys -import unittest -import threading -import signal -from functools import wraps - -#get the "result" object from one of the upper frames provided that one of these upper frames is a unittest.case frame -class getResults(object): - def __init__(self): - #dynamically determine the unittest.case frame and use it to get the name of the test method - ident = threading.current_thread().ident - upperf = sys._current_frames()[ident] - while (upperf.f_globals['__name__'] != 'unittest.case'): - upperf = upperf.f_back - - def handleList(items): - ret = [] - # items is a list of tuples, (test, failure) or (_ErrorHandler(), Exception()) - for i in items: - s = i[0].id() - #Handle the _ErrorHolder objects from skipModule failures - if "setUpModule (" in s: - ret.append(s.replace("setUpModule (", "").replace(")","")) - else: - ret.append(s) - # Append also the test without the full path - testname = s.split('.')[-1] - if testname: - ret.append(testname) - return ret - self.faillist = handleList(upperf.f_locals['result'].failures) - self.errorlist = handleList(upperf.f_locals['result'].errors) - self.skiplist = handleList(upperf.f_locals['result'].skipped) - - def getFailList(self): - return self.faillist - - def getErrorList(self): - return self.errorlist - - def getSkipList(self): - return self.skiplist - -class skipIfFailure(object): - - def __init__(self,testcase): - self.testcase = testcase - - def __call__(self,f): - @wraps(f) - def wrapped_f(*args, **kwargs): - res = getResults() - if self.testcase in (res.getFailList() or res.getErrorList()): - raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase) - return f(*args, **kwargs) - wrapped_f.__name__ = f.__name__ - return wrapped_f - -class skipIfSkipped(object): - - def __init__(self,testcase): - self.testcase = testcase - - def __call__(self,f): - @wraps(f) - def wrapped_f(*args, **kwargs): - res = getResults() - if self.testcase in res.getSkipList(): - raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase) - return f(*args, **kwargs) - wrapped_f.__name__ = f.__name__ - return wrapped_f - -class skipUnlessPassed(object): - - def __init__(self,testcase): - self.testcase = testcase - - def __call__(self,f): - @wraps(f) - def wrapped_f(*args, **kwargs): - res = getResults() - if self.testcase in res.getSkipList() or \ - self.testcase in res.getFailList() or \ - self.testcase in res.getErrorList(): - raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase) - return f(*args, **kwargs) - wrapped_f.__name__ = f.__name__ - wrapped_f._depends_on = self.testcase - return wrapped_f - -class testcase(object): - def __init__(self, test_case): - self.test_case = test_case - - def __call__(self, func): - @wraps(func) - def wrapped_f(*args, **kwargs): - return func(*args, **kwargs) - wrapped_f.test_case = self.test_case - wrapped_f.__name__ = func.__name__ - return wrapped_f - -class NoParsingFilter(logging.Filter): - def filter(self, record): - return record.levelno == 100 - -import inspect - -def LogResults(original_class): - orig_method = original_class.run - - from time import strftime, gmtime - caller = os.path.basename(sys.argv[0]) - timestamp = strftime('%Y%m%d%H%M%S',gmtime()) - logfile = os.path.join(os.getcwd(),'results-'+caller+'.'+timestamp+'.log') - linkfile = os.path.join(os.getcwd(),'results-'+caller+'.log') - - def get_class_that_defined_method(meth): - if inspect.ismethod(meth): - for cls in inspect.getmro(meth.__self__.__class__): - if cls.__dict__.get(meth.__name__) is meth: - return cls - meth = meth.__func__ # fallback to __qualname__ parsing - if inspect.isfunction(meth): - cls = getattr(inspect.getmodule(meth), - meth.__qualname__.split('.<locals>', 1)[0].rsplit('.', 1)[0]) - if isinstance(cls, type): - return cls - return None - - #rewrite the run method of unittest.TestCase to add testcase logging - def run(self, result, *args, **kws): - orig_method(self, result, *args, **kws) - passed = True - testMethod = getattr(self, self._testMethodName) - #if test case is decorated then use it's number, else use it's name - try: - test_case = testMethod.test_case - except AttributeError: - test_case = self._testMethodName - - class_name = str(get_class_that_defined_method(testMethod)).split("'")[1] - - #create custom logging level for filtering. - custom_log_level = 100 - logging.addLevelName(custom_log_level, 'RESULTS') - - def results(self, message, *args, **kws): - if self.isEnabledFor(custom_log_level): - self.log(custom_log_level, message, *args, **kws) - logging.Logger.results = results - - logging.basicConfig(filename=logfile, - filemode='w', - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - datefmt='%H:%M:%S', - level=custom_log_level) - for handler in logging.root.handlers: - handler.addFilter(NoParsingFilter()) - local_log = logging.getLogger(caller) - - #check status of tests and record it - - tcid = self.id() - for (name, msg) in result.errors: - if tcid == name.id(): - local_log.results("Testcase "+str(test_case)+": ERROR") - local_log.results("Testcase "+str(test_case)+":\n"+msg) - passed = False - for (name, msg) in result.failures: - if tcid == name.id(): - local_log.results("Testcase "+str(test_case)+": FAILED") - local_log.results("Testcase "+str(test_case)+":\n"+msg) - passed = False - for (name, msg) in result.skipped: - if tcid == name.id(): - local_log.results("Testcase "+str(test_case)+": SKIPPED") - passed = False - if passed: - local_log.results("Testcase "+str(test_case)+": PASSED") - - # XXX: In order to avoid race condition when test if exists the linkfile - # use bb.utils.lock, the best solution is to create a unique name for the - # link file. - try: - import bb - has_bb = True - lockfilename = linkfile + '.lock' - except ImportError: - has_bb = False - - if has_bb: - lf = bb.utils.lockfile(lockfilename, block=True) - # Create symlink to the current log - if os.path.lexists(linkfile): - os.remove(linkfile) - os.symlink(logfile, linkfile) - if has_bb: - bb.utils.unlockfile(lf) - - original_class.run = run - - return original_class - -class TimeOut(BaseException): - pass - -def timeout(seconds): - def decorator(fn): - if hasattr(signal, 'alarm'): - @wraps(fn) - def wrapped_f(*args, **kw): - current_frame = sys._getframe() - def raiseTimeOut(signal, frame): - if frame is not current_frame: - raise TimeOut('%s seconds' % seconds) - prev_handler = signal.signal(signal.SIGALRM, raiseTimeOut) - try: - signal.alarm(seconds) - return fn(*args, **kw) - finally: - signal.alarm(0) - signal.signal(signal.SIGALRM, prev_handler) - return wrapped_f - else: - return fn - return decorator - -__tag_prefix = "tag__" -def tag(*args, **kwargs): - """Decorator that adds attributes to classes or functions - for use with the Attribute (-a) plugin. - """ - def wrap_ob(ob): - for name in args: - setattr(ob, __tag_prefix + name, True) - for name, value in kwargs.items(): - setattr(ob, __tag_prefix + name, value) - return ob - return wrap_ob - -def gettag(obj, key, default=None): - key = __tag_prefix + key - if not isinstance(obj, unittest.TestCase): - return getattr(obj, key, default) - tc_method = getattr(obj, obj._testMethodName) - ret = getattr(tc_method, key, getattr(obj, key, default)) - return ret - -def getAllTags(obj): - def __gettags(o): - r = {k[len(__tag_prefix):]:getattr(o,k) for k in dir(o) if k.startswith(__tag_prefix)} - return r - if not isinstance(obj, unittest.TestCase): - return __gettags(obj) - tc_method = getattr(obj, obj._testMethodName) - ret = __gettags(obj) - ret.update(__gettags(tc_method)) - return ret - -def timeout_handler(seconds): - def decorator(fn): - if hasattr(signal, 'alarm'): - @wraps(fn) - def wrapped_f(self, *args, **kw): - current_frame = sys._getframe() - def raiseTimeOut(signal, frame): - if frame is not current_frame: - try: - self.target.restart() - raise TimeOut('%s seconds' % seconds) - except: - raise TimeOut('%s seconds' % seconds) - prev_handler = signal.signal(signal.SIGALRM, raiseTimeOut) - try: - signal.alarm(seconds) - return fn(self, *args, **kw) - finally: - signal.alarm(0) - signal.signal(signal.SIGALRM, prev_handler) - return wrapped_f - else: - return fn - return decorator |