1# 2# Copyright (C) 2013 Intel Corporation 3# 4# SPDX-License-Identifier: MIT 5# 6 7# Some custom decorators that can be used by unittests 8# Most useful is skipUnlessPassed which can be used for 9# creating dependecies between two test methods. 10 11import os 12import logging 13import sys 14import unittest 15import threading 16import signal 17from functools import wraps 18 19#get the "result" object from one of the upper frames provided that one of these upper frames is a unittest.case frame 20class getResults(object): 21 def __init__(self): 22 #dynamically determine the unittest.case frame and use it to get the name of the test method 23 ident = threading.current_thread().ident 24 upperf = sys._current_frames()[ident] 25 while (upperf.f_globals['__name__'] != 'unittest.case'): 26 upperf = upperf.f_back 27 28 def handleList(items): 29 ret = [] 30 # items is a list of tuples, (test, failure) or (_ErrorHandler(), Exception()) 31 for i in items: 32 s = i[0].id() 33 #Handle the _ErrorHolder objects from skipModule failures 34 if "setUpModule (" in s: 35 ret.append(s.replace("setUpModule (", "").replace(")","")) 36 else: 37 ret.append(s) 38 # Append also the test without the full path 39 testname = s.split('.')[-1] 40 if testname: 41 ret.append(testname) 42 return ret 43 self.faillist = handleList(upperf.f_locals['result'].failures) 44 self.errorlist = handleList(upperf.f_locals['result'].errors) 45 self.skiplist = handleList(upperf.f_locals['result'].skipped) 46 47 def getFailList(self): 48 return self.faillist 49 50 def getErrorList(self): 51 return self.errorlist 52 53 def getSkipList(self): 54 return self.skiplist 55 56class skipIfFailure(object): 57 58 def __init__(self,testcase): 59 self.testcase = testcase 60 61 def __call__(self,f): 62 @wraps(f) 63 def wrapped_f(*args, **kwargs): 64 res = getResults() 65 if self.testcase in (res.getFailList() or res.getErrorList()): 66 raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase) 67 return f(*args, **kwargs) 68 wrapped_f.__name__ = f.__name__ 69 return wrapped_f 70 71class skipIfSkipped(object): 72 73 def __init__(self,testcase): 74 self.testcase = testcase 75 76 def __call__(self,f): 77 @wraps(f) 78 def wrapped_f(*args, **kwargs): 79 res = getResults() 80 if self.testcase in res.getSkipList(): 81 raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase) 82 return f(*args, **kwargs) 83 wrapped_f.__name__ = f.__name__ 84 return wrapped_f 85 86class skipUnlessPassed(object): 87 88 def __init__(self,testcase): 89 self.testcase = testcase 90 91 def __call__(self,f): 92 @wraps(f) 93 def wrapped_f(*args, **kwargs): 94 res = getResults() 95 if self.testcase in res.getSkipList() or \ 96 self.testcase in res.getFailList() or \ 97 self.testcase in res.getErrorList(): 98 raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase) 99 return f(*args, **kwargs) 100 wrapped_f.__name__ = f.__name__ 101 wrapped_f._depends_on = self.testcase 102 return wrapped_f 103 104class testcase(object): 105 def __init__(self, test_case): 106 self.test_case = test_case 107 108 def __call__(self, func): 109 @wraps(func) 110 def wrapped_f(*args, **kwargs): 111 return func(*args, **kwargs) 112 wrapped_f.test_case = self.test_case 113 wrapped_f.__name__ = func.__name__ 114 return wrapped_f 115 116class NoParsingFilter(logging.Filter): 117 def filter(self, record): 118 return record.levelno == 100 119 120import inspect 121 122def LogResults(original_class): 123 orig_method = original_class.run 124 125 from time import strftime, gmtime 126 caller = os.path.basename(sys.argv[0]) 127 timestamp = strftime('%Y%m%d%H%M%S',gmtime()) 128 logfile = os.path.join(os.getcwd(),'results-'+caller+'.'+timestamp+'.log') 129 linkfile = os.path.join(os.getcwd(),'results-'+caller+'.log') 130 131 def get_class_that_defined_method(meth): 132 if inspect.ismethod(meth): 133 for cls in inspect.getmro(meth.__self__.__class__): 134 if cls.__dict__.get(meth.__name__) is meth: 135 return cls 136 meth = meth.__func__ # fallback to __qualname__ parsing 137 if inspect.isfunction(meth): 138 cls = getattr(inspect.getmodule(meth), 139 meth.__qualname__.split('.<locals>', 1)[0].rsplit('.', 1)[0]) 140 if isinstance(cls, type): 141 return cls 142 return None 143 144 #rewrite the run method of unittest.TestCase to add testcase logging 145 def run(self, result, *args, **kws): 146 orig_method(self, result, *args, **kws) 147 passed = True 148 testMethod = getattr(self, self._testMethodName) 149 #if test case is decorated then use it's number, else use it's name 150 try: 151 test_case = testMethod.test_case 152 except AttributeError: 153 test_case = self._testMethodName 154 155 class_name = str(get_class_that_defined_method(testMethod)).split("'")[1] 156 157 #create custom logging level for filtering. 158 custom_log_level = 100 159 logging.addLevelName(custom_log_level, 'RESULTS') 160 161 def results(self, message, *args, **kws): 162 if self.isEnabledFor(custom_log_level): 163 self.log(custom_log_level, message, *args, **kws) 164 logging.Logger.results = results 165 166 logging.basicConfig(filename=logfile, 167 filemode='w', 168 format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', 169 datefmt='%H:%M:%S', 170 level=custom_log_level) 171 for handler in logging.root.handlers: 172 handler.addFilter(NoParsingFilter()) 173 local_log = logging.getLogger(caller) 174 175 #check status of tests and record it 176 177 tcid = self.id() 178 for (name, msg) in result.errors: 179 if tcid == name.id(): 180 local_log.results("Testcase "+str(test_case)+": ERROR") 181 local_log.results("Testcase "+str(test_case)+":\n"+msg) 182 passed = False 183 for (name, msg) in result.failures: 184 if tcid == name.id(): 185 local_log.results("Testcase "+str(test_case)+": FAILED") 186 local_log.results("Testcase "+str(test_case)+":\n"+msg) 187 passed = False 188 for (name, msg) in result.skipped: 189 if tcid == name.id(): 190 local_log.results("Testcase "+str(test_case)+": SKIPPED") 191 passed = False 192 if passed: 193 local_log.results("Testcase "+str(test_case)+": PASSED") 194 195 # XXX: In order to avoid race condition when test if exists the linkfile 196 # use bb.utils.lock, the best solution is to create a unique name for the 197 # link file. 198 try: 199 import bb 200 has_bb = True 201 lockfilename = linkfile + '.lock' 202 except ImportError: 203 has_bb = False 204 205 if has_bb: 206 lf = bb.utils.lockfile(lockfilename, block=True) 207 # Create symlink to the current log 208 if os.path.lexists(linkfile): 209 os.remove(linkfile) 210 os.symlink(logfile, linkfile) 211 if has_bb: 212 bb.utils.unlockfile(lf) 213 214 original_class.run = run 215 216 return original_class 217 218class TimeOut(BaseException): 219 pass 220 221def timeout(seconds): 222 def decorator(fn): 223 if hasattr(signal, 'alarm'): 224 @wraps(fn) 225 def wrapped_f(*args, **kw): 226 current_frame = sys._getframe() 227 def raiseTimeOut(signal, frame): 228 if frame is not current_frame: 229 raise TimeOut('%s seconds' % seconds) 230 prev_handler = signal.signal(signal.SIGALRM, raiseTimeOut) 231 try: 232 signal.alarm(seconds) 233 return fn(*args, **kw) 234 finally: 235 signal.alarm(0) 236 signal.signal(signal.SIGALRM, prev_handler) 237 return wrapped_f 238 else: 239 return fn 240 return decorator 241 242__tag_prefix = "tag__" 243def tag(*args, **kwargs): 244 """Decorator that adds attributes to classes or functions 245 for use with the Attribute (-a) plugin. 246 """ 247 def wrap_ob(ob): 248 for name in args: 249 setattr(ob, __tag_prefix + name, True) 250 for name, value in kwargs.items(): 251 setattr(ob, __tag_prefix + name, value) 252 return ob 253 return wrap_ob 254 255def gettag(obj, key, default=None): 256 key = __tag_prefix + key 257 if not isinstance(obj, unittest.TestCase): 258 return getattr(obj, key, default) 259 tc_method = getattr(obj, obj._testMethodName) 260 ret = getattr(tc_method, key, getattr(obj, key, default)) 261 return ret 262 263def getAllTags(obj): 264 def __gettags(o): 265 r = {k[len(__tag_prefix):]:getattr(o,k) for k in dir(o) if k.startswith(__tag_prefix)} 266 return r 267 if not isinstance(obj, unittest.TestCase): 268 return __gettags(obj) 269 tc_method = getattr(obj, obj._testMethodName) 270 ret = __gettags(obj) 271 ret.update(__gettags(tc_method)) 272 return ret 273 274def timeout_handler(seconds): 275 def decorator(fn): 276 if hasattr(signal, 'alarm'): 277 @wraps(fn) 278 def wrapped_f(self, *args, **kw): 279 current_frame = sys._getframe() 280 def raiseTimeOut(signal, frame): 281 if frame is not current_frame: 282 try: 283 self.target.restart() 284 raise TimeOut('%s seconds' % seconds) 285 except: 286 raise TimeOut('%s seconds' % seconds) 287 prev_handler = signal.signal(signal.SIGALRM, raiseTimeOut) 288 try: 289 signal.alarm(seconds) 290 return fn(self, *args, **kw) 291 finally: 292 signal.alarm(0) 293 signal.signal(signal.SIGALRM, prev_handler) 294 return wrapped_f 295 else: 296 return fn 297 return decorator 298