xref: /OK3568_Linux_fs/yocto/poky/meta/lib/oeqa/utils/decorators.py (revision 4882a59341e53eb6f0b4789bf948001014eff981)
1#
2# Copyright (C) 2013 Intel Corporation
3#
4# SPDX-License-Identifier: MIT
5#
6
7# Some custom decorators that can be used by unittests
8# Most useful is skipUnlessPassed which can be used for
9# creating dependecies between two test methods.
10
11import os
12import logging
13import sys
14import unittest
15import threading
16import signal
17from functools import wraps
18
19#get the "result" object from one of the upper frames provided that one of these upper frames is a unittest.case frame
20class getResults(object):
21    def __init__(self):
22        #dynamically determine the unittest.case frame and use it to get the name of the test method
23        ident = threading.current_thread().ident
24        upperf = sys._current_frames()[ident]
25        while (upperf.f_globals['__name__'] != 'unittest.case'):
26            upperf = upperf.f_back
27
28        def handleList(items):
29            ret = []
30            # items is a list of tuples, (test, failure) or (_ErrorHandler(), Exception())
31            for i in items:
32                s = i[0].id()
33                #Handle the _ErrorHolder objects from skipModule failures
34                if "setUpModule (" in s:
35                    ret.append(s.replace("setUpModule (", "").replace(")",""))
36                else:
37                    ret.append(s)
38                # Append also the test without the full path
39                testname = s.split('.')[-1]
40                if testname:
41                    ret.append(testname)
42            return ret
43        self.faillist = handleList(upperf.f_locals['result'].failures)
44        self.errorlist = handleList(upperf.f_locals['result'].errors)
45        self.skiplist = handleList(upperf.f_locals['result'].skipped)
46
47    def getFailList(self):
48        return self.faillist
49
50    def getErrorList(self):
51        return self.errorlist
52
53    def getSkipList(self):
54        return self.skiplist
55
56class skipIfFailure(object):
57
58    def __init__(self,testcase):
59        self.testcase = testcase
60
61    def __call__(self,f):
62        @wraps(f)
63        def wrapped_f(*args, **kwargs):
64            res = getResults()
65            if self.testcase in (res.getFailList() or res.getErrorList()):
66                raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase)
67            return f(*args, **kwargs)
68        wrapped_f.__name__ = f.__name__
69        return wrapped_f
70
71class skipIfSkipped(object):
72
73    def __init__(self,testcase):
74        self.testcase = testcase
75
76    def __call__(self,f):
77        @wraps(f)
78        def wrapped_f(*args, **kwargs):
79            res = getResults()
80            if self.testcase in res.getSkipList():
81                raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase)
82            return f(*args, **kwargs)
83        wrapped_f.__name__ = f.__name__
84        return wrapped_f
85
86class skipUnlessPassed(object):
87
88    def __init__(self,testcase):
89        self.testcase = testcase
90
91    def __call__(self,f):
92        @wraps(f)
93        def wrapped_f(*args, **kwargs):
94            res = getResults()
95            if self.testcase in res.getSkipList() or \
96                    self.testcase in res.getFailList() or \
97                    self.testcase in res.getErrorList():
98                raise unittest.SkipTest("Testcase dependency not met: %s" % self.testcase)
99            return f(*args, **kwargs)
100        wrapped_f.__name__ = f.__name__
101        wrapped_f._depends_on = self.testcase
102        return wrapped_f
103
104class testcase(object):
105    def __init__(self, test_case):
106        self.test_case = test_case
107
108    def __call__(self, func):
109        @wraps(func)
110        def wrapped_f(*args, **kwargs):
111            return func(*args, **kwargs)
112        wrapped_f.test_case = self.test_case
113        wrapped_f.__name__ = func.__name__
114        return wrapped_f
115
116class NoParsingFilter(logging.Filter):
117    def filter(self, record):
118        return record.levelno == 100
119
120import inspect
121
122def LogResults(original_class):
123    orig_method = original_class.run
124
125    from time import strftime, gmtime
126    caller = os.path.basename(sys.argv[0])
127    timestamp = strftime('%Y%m%d%H%M%S',gmtime())
128    logfile = os.path.join(os.getcwd(),'results-'+caller+'.'+timestamp+'.log')
129    linkfile = os.path.join(os.getcwd(),'results-'+caller+'.log')
130
131    def get_class_that_defined_method(meth):
132        if inspect.ismethod(meth):
133            for cls in inspect.getmro(meth.__self__.__class__):
134               if cls.__dict__.get(meth.__name__) is meth:
135                    return cls
136            meth = meth.__func__ # fallback to __qualname__ parsing
137        if inspect.isfunction(meth):
138            cls = getattr(inspect.getmodule(meth),
139                          meth.__qualname__.split('.<locals>', 1)[0].rsplit('.', 1)[0])
140            if isinstance(cls, type):
141               return cls
142        return None
143
144    #rewrite the run method of unittest.TestCase to add testcase logging
145    def run(self, result, *args, **kws):
146        orig_method(self, result, *args, **kws)
147        passed = True
148        testMethod = getattr(self, self._testMethodName)
149        #if test case is decorated then use it's number, else use it's name
150        try:
151            test_case = testMethod.test_case
152        except AttributeError:
153            test_case = self._testMethodName
154
155        class_name = str(get_class_that_defined_method(testMethod)).split("'")[1]
156
157        #create custom logging level for filtering.
158        custom_log_level = 100
159        logging.addLevelName(custom_log_level, 'RESULTS')
160
161        def results(self, message, *args, **kws):
162            if self.isEnabledFor(custom_log_level):
163                self.log(custom_log_level, message, *args, **kws)
164        logging.Logger.results = results
165
166        logging.basicConfig(filename=logfile,
167                            filemode='w',
168                            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
169                            datefmt='%H:%M:%S',
170                            level=custom_log_level)
171        for handler in logging.root.handlers:
172            handler.addFilter(NoParsingFilter())
173        local_log = logging.getLogger(caller)
174
175        #check status of tests and record it
176
177        tcid = self.id()
178        for (name, msg) in result.errors:
179            if tcid == name.id():
180                local_log.results("Testcase "+str(test_case)+": ERROR")
181                local_log.results("Testcase "+str(test_case)+":\n"+msg)
182                passed = False
183        for (name, msg) in result.failures:
184            if tcid == name.id():
185                local_log.results("Testcase "+str(test_case)+": FAILED")
186                local_log.results("Testcase "+str(test_case)+":\n"+msg)
187                passed = False
188        for (name, msg) in result.skipped:
189            if tcid == name.id():
190                local_log.results("Testcase "+str(test_case)+": SKIPPED")
191                passed = False
192        if passed:
193            local_log.results("Testcase "+str(test_case)+": PASSED")
194
195        # XXX: In order to avoid race condition when test if exists the linkfile
196        # use bb.utils.lock, the best solution is to create a unique name for the
197        # link file.
198        try:
199            import bb
200            has_bb = True
201            lockfilename = linkfile + '.lock'
202        except ImportError:
203            has_bb = False
204
205        if has_bb:
206            lf = bb.utils.lockfile(lockfilename, block=True)
207        # Create symlink to the current log
208        if os.path.lexists(linkfile):
209            os.remove(linkfile)
210        os.symlink(logfile, linkfile)
211        if has_bb:
212            bb.utils.unlockfile(lf)
213
214    original_class.run = run
215
216    return original_class
217
218class TimeOut(BaseException):
219    pass
220
221def timeout(seconds):
222    def decorator(fn):
223        if hasattr(signal, 'alarm'):
224            @wraps(fn)
225            def wrapped_f(*args, **kw):
226                current_frame = sys._getframe()
227                def raiseTimeOut(signal, frame):
228                    if frame is not current_frame:
229                        raise TimeOut('%s seconds' % seconds)
230                prev_handler = signal.signal(signal.SIGALRM, raiseTimeOut)
231                try:
232                    signal.alarm(seconds)
233                    return fn(*args, **kw)
234                finally:
235                    signal.alarm(0)
236                    signal.signal(signal.SIGALRM, prev_handler)
237            return wrapped_f
238        else:
239            return fn
240    return decorator
241
242__tag_prefix = "tag__"
243def tag(*args, **kwargs):
244    """Decorator that adds attributes to classes or functions
245    for use with the Attribute (-a) plugin.
246    """
247    def wrap_ob(ob):
248        for name in args:
249            setattr(ob, __tag_prefix + name, True)
250        for name, value in kwargs.items():
251            setattr(ob, __tag_prefix + name, value)
252        return ob
253    return wrap_ob
254
255def gettag(obj, key, default=None):
256    key = __tag_prefix + key
257    if not isinstance(obj, unittest.TestCase):
258        return getattr(obj, key, default)
259    tc_method = getattr(obj, obj._testMethodName)
260    ret = getattr(tc_method, key, getattr(obj, key, default))
261    return ret
262
263def getAllTags(obj):
264    def __gettags(o):
265        r = {k[len(__tag_prefix):]:getattr(o,k) for k in dir(o) if k.startswith(__tag_prefix)}
266        return r
267    if not isinstance(obj, unittest.TestCase):
268        return __gettags(obj)
269    tc_method = getattr(obj, obj._testMethodName)
270    ret = __gettags(obj)
271    ret.update(__gettags(tc_method))
272    return ret
273
274def timeout_handler(seconds):
275    def decorator(fn):
276        if hasattr(signal, 'alarm'):
277            @wraps(fn)
278            def wrapped_f(self, *args, **kw):
279                current_frame = sys._getframe()
280                def raiseTimeOut(signal, frame):
281                    if frame is not current_frame:
282                        try:
283                            self.target.restart()
284                            raise TimeOut('%s seconds' % seconds)
285                        except:
286                            raise TimeOut('%s seconds' % seconds)
287                prev_handler = signal.signal(signal.SIGALRM, raiseTimeOut)
288                try:
289                    signal.alarm(seconds)
290                    return fn(self, *args, **kw)
291                finally:
292                    signal.alarm(0)
293                    signal.signal(signal.SIGALRM, prev_handler)
294            return wrapped_f
295        else:
296            return fn
297    return decorator
298