xref: /OK3568_Linux_fs/yocto/poky/meta/lib/oeqa/oetest.py (revision 4882a59341e53eb6f0b4789bf948001014eff981)
1#
2# Copyright (C) 2013 Intel Corporation
3#
4# SPDX-License-Identifier: MIT
5#
6
7# Main unittest module used by testimage.bbclass
8# This provides the oeRuntimeTest base class which is inherited by all tests in meta/lib/oeqa/runtime.
9
10# It also has some helper functions and it's responsible for actually starting the tests
11
12import os, re, sys
13import unittest
14import inspect
15import subprocess
16import signal
17import shutil
18import functools
19try:
20    import bb
21except ImportError:
22    pass
23import logging
24
25import oeqa.runtime
26# Exported test doesn't require sdkext
27try:
28    import oeqa.sdkext
29except ImportError:
30    pass
31from oeqa.utils.decorators import LogResults, gettag, getResults
32
33logger = logging.getLogger("BitBake")
34
35def getVar(obj):
36    #extend form dict, if a variable didn't exists, need find it in testcase
37    class VarDict(dict):
38        def __getitem__(self, key):
39            return gettag(obj, key)
40    return VarDict()
41
42def checkTags(tc, tagexp):
43    return eval(tagexp, None, getVar(tc))
44
45def filterByTagExp(testsuite, tagexp):
46    if not tagexp:
47        return testsuite
48    caseList = []
49    for each in testsuite:
50        if not isinstance(each, unittest.BaseTestSuite):
51            if checkTags(each, tagexp):
52                caseList.append(each)
53        else:
54            caseList.append(filterByTagExp(each, tagexp))
55    return testsuite.__class__(caseList)
56
57@LogResults
58class oeTest(unittest.TestCase):
59
60    pscmd = "ps"
61    longMessage = True
62
63    @classmethod
64    def hasPackage(self, pkg):
65        """
66        True if the full package name exists in the manifest, False otherwise.
67        """
68        return pkg in oeTest.tc.pkgmanifest
69
70    @classmethod
71    def hasPackageMatch(self, match):
72        """
73        True if match exists in the manifest as a regular expression substring,
74        False otherwise.
75        """
76        for s in oeTest.tc.pkgmanifest:
77            if re.match(match, s):
78                return True
79        return False
80
81    @classmethod
82    def hasFeature(self,feature):
83        if feature in oeTest.tc.imagefeatures or \
84                feature in oeTest.tc.distrofeatures:
85            return True
86        else:
87            return False
88
89class oeRuntimeTest(oeTest):
90    def __init__(self, methodName='runTest'):
91        self.target = oeRuntimeTest.tc.target
92        super(oeRuntimeTest, self).__init__(methodName)
93
94    def setUp(self):
95        # Install packages in the DUT
96        self.tc.install_uninstall_packages(self.id())
97
98        # Check if test needs to run
99        if self.tc.sigterm:
100            self.fail("Got SIGTERM")
101        elif (type(self.target).__name__ == "QemuTarget"):
102            self.assertTrue(self.target.check(), msg = "Qemu not running?")
103
104        self.setUpLocal()
105
106    # a setup method before tests but after the class instantiation
107    def setUpLocal(self):
108        pass
109
110    def tearDown(self):
111        # Uninstall packages in the DUT
112        self.tc.install_uninstall_packages(self.id(), False)
113
114        res = getResults()
115        # If a test fails or there is an exception dump
116        # for QemuTarget only
117        if (type(self.target).__name__ == "QemuTarget" and
118                (self.id() in res.getErrorList() or
119                self.id() in  res.getFailList())):
120            self.tc.host_dumper.create_dir(self._testMethodName)
121            self.tc.host_dumper.dump_host()
122            self.target.target_dumper.dump_target(
123                    self.tc.host_dumper.dump_dir)
124            print ("%s dump data stored in %s" % (self._testMethodName,
125                     self.tc.host_dumper.dump_dir))
126
127        self.tearDownLocal()
128
129    # Method to be run after tearDown and implemented by child classes
130    def tearDownLocal(self):
131        pass
132
133def getmodule(pos=2):
134    # stack returns a list of tuples containg frame information
135    # First element of the list the is current frame, caller is 1
136    frameinfo = inspect.stack()[pos]
137    modname = inspect.getmodulename(frameinfo[1])
138    #modname = inspect.getmodule(frameinfo[0]).__name__
139    return modname
140
141def skipModule(reason, pos=2):
142    modname = getmodule(pos)
143    if modname not in oeTest.tc.testsrequired:
144        raise unittest.SkipTest("%s: %s" % (modname, reason))
145    else:
146        raise Exception("\nTest %s wants to be skipped.\nReason is: %s" \
147                "\nTest was required in TEST_SUITES, so either the condition for skipping is wrong" \
148                "\nor the image really doesn't have the required feature/package when it should." % (modname, reason))
149
150def skipModuleIf(cond, reason):
151
152    if cond:
153        skipModule(reason, 3)
154
155def skipModuleUnless(cond, reason):
156
157    if not cond:
158        skipModule(reason, 3)
159
160_buffer_logger = ""
161def custom_verbose(msg, *args, **kwargs):
162    global _buffer_logger
163    if msg[-1] != "\n":
164        _buffer_logger += msg
165    else:
166        _buffer_logger += msg
167        try:
168            bb.plain(_buffer_logger.rstrip("\n"), *args, **kwargs)
169        except NameError:
170            logger.info(_buffer_logger.rstrip("\n"), *args, **kwargs)
171        _buffer_logger = ""
172
173class TestContext(object):
174    def __init__(self, d, exported=False):
175        self.d = d
176
177        self.testsuites = self._get_test_suites()
178
179        if exported:
180            path = [os.path.dirname(os.path.abspath(__file__))]
181            extrapath = ""
182        else:
183            path = d.getVar("BBPATH").split(':')
184            extrapath = "lib/oeqa"
185
186        self.testslist = self._get_tests_list(path, extrapath)
187        self.testsrequired = self._get_test_suites_required()
188
189        self.filesdir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "runtime/files")
190        self.corefilesdir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "files")
191        self.imagefeatures = d.getVar("IMAGE_FEATURES").split()
192        self.distrofeatures = d.getVar("DISTRO_FEATURES").split()
193
194    # get testcase list from specified file
195    # if path is a relative path, then relative to build/conf/
196    def _read_testlist(self, fpath, builddir):
197        if not os.path.isabs(fpath):
198            fpath = os.path.join(builddir, "conf", fpath)
199        if not os.path.exists(fpath):
200            bb.fatal("No such manifest file: ", fpath)
201        tcs = []
202        for line in open(fpath).readlines():
203            line = line.strip()
204            if line and not line.startswith("#"):
205                tcs.append(line)
206        return " ".join(tcs)
207
208    # return test list by type also filter if TEST_SUITES is specified
209    def _get_tests_list(self, bbpath, extrapath):
210        testslist = []
211
212        type = self._get_test_namespace()
213
214        # This relies on lib/ under each directory in BBPATH being added to sys.path
215        # (as done by default in base.bbclass)
216        for testname in self.testsuites:
217            if testname != "auto":
218                if testname.startswith("oeqa."):
219                    testslist.append(testname)
220                    continue
221                found = False
222                for p in bbpath:
223                    if os.path.exists(os.path.join(p, extrapath, type, testname + ".py")):
224                        testslist.append("oeqa." + type + "." + testname)
225                        found = True
226                        break
227                    elif os.path.exists(os.path.join(p, extrapath, type, testname.split(".")[0] + ".py")):
228                        testslist.append("oeqa." + type + "." + testname)
229                        found = True
230                        break
231                if not found:
232                    bb.fatal('Test %s specified in TEST_SUITES could not be found in lib/oeqa/runtime under BBPATH' % testname)
233
234        if "auto" in self.testsuites:
235            def add_auto_list(path):
236                files = sorted([f for f in os.listdir(path) if f.endswith('.py') and not f.startswith('_')])
237                for f in files:
238                    module = 'oeqa.' + type + '.' + f[:-3]
239                    if module not in testslist:
240                        testslist.append(module)
241
242            for p in bbpath:
243                testpath = os.path.join(p, 'lib', 'oeqa', type)
244                bb.debug(2, 'Searching for tests in %s' % testpath)
245                if os.path.exists(testpath):
246                    add_auto_list(testpath)
247
248        return testslist
249
250    def getTestModules(self):
251        """
252        Returns all the test modules in the testlist.
253        """
254
255        import pkgutil
256
257        modules = []
258        for test in self.testslist:
259            if re.search("\w+\.\w+\.test_\S+", test):
260                test = '.'.join(t.split('.')[:3])
261            module = pkgutil.get_loader(test)
262            modules.append(module)
263
264        return modules
265
266    def getModulefromID(self, test_id):
267        """
268        Returns the test module based on a test id.
269        """
270
271        module_name = ".".join(test_id.split(".")[:3])
272        modules = self.getTestModules()
273        for module in modules:
274            if module.name == module_name:
275                return module
276
277        return None
278
279    def getTests(self, test):
280        '''Return all individual tests executed when running the suite.'''
281        # Unfortunately unittest does not have an API for this, so we have
282        # to rely on implementation details. This only needs to work
283        # for TestSuite containing TestCase.
284        method = getattr(test, '_testMethodName', None)
285        if method:
286            # leaf case: a TestCase
287            yield test
288        else:
289            # Look into TestSuite.
290            tests = getattr(test, '_tests', [])
291            for t1 in tests:
292                for t2 in self.getTests(t1):
293                    yield t2
294
295    def loadTests(self):
296        setattr(oeTest, "tc", self)
297
298        testloader = unittest.TestLoader()
299        testloader.sortTestMethodsUsing = None
300        suites = [testloader.loadTestsFromName(name) for name in self.testslist]
301        suites = filterByTagExp(suites, getattr(self, "tagexp", None))
302
303        # Determine dependencies between suites by looking for @skipUnlessPassed
304        # method annotations. Suite A depends on suite B if any method in A
305        # depends on a method on B.
306        for suite in suites:
307            suite.dependencies = []
308            suite.depth = 0
309            for test in self.getTests(suite):
310                methodname = getattr(test, '_testMethodName', None)
311                if methodname:
312                    method = getattr(test, methodname)
313                    depends_on = getattr(method, '_depends_on', None)
314                    if depends_on:
315                        for dep_suite in suites:
316                            if depends_on in [getattr(t, '_testMethodName', None) for t in self.getTests(dep_suite)]:
317                                if dep_suite not in suite.dependencies and \
318                                   dep_suite is not suite:
319                                    suite.dependencies.append(dep_suite)
320                                break
321                        else:
322                            logger.warning("Test %s was declared as @skipUnlessPassed('%s') but that test is either not defined or not active. Will run the test anyway." %
323                                    (test, depends_on))
324
325        # Use brute-force topological sort to determine ordering. Sort by
326        # depth (higher depth = must run later), with original ordering to
327        # break ties.
328        def set_suite_depth(suite):
329            for dep in suite.dependencies:
330                new_depth = set_suite_depth(dep) + 1
331                if new_depth > suite.depth:
332                    suite.depth = new_depth
333            return suite.depth
334
335        for index, suite in enumerate(suites):
336            set_suite_depth(suite)
337            suite.index = index
338
339        def cmp(a, b):
340            return (a > b) - (a < b)
341
342        def cmpfunc(a, b):
343            return cmp((a.depth, a.index), (b.depth, b.index))
344
345        suites.sort(key=functools.cmp_to_key(cmpfunc))
346
347        self.suite = testloader.suiteClass(suites)
348
349        return self.suite
350
351    def runTests(self):
352        logger.info("Test modules  %s" % self.testslist)
353        if hasattr(self, "tagexp") and self.tagexp:
354            logger.info("Filter test cases by tags: %s" % self.tagexp)
355        logger.info("Found %s tests" % self.suite.countTestCases())
356        runner = unittest.TextTestRunner(verbosity=2)
357        if 'bb' in sys.modules:
358            runner.stream.write = custom_verbose
359
360        return runner.run(self.suite)
361
362class RuntimeTestContext(TestContext):
363    def __init__(self, d, target, exported=False):
364        super(RuntimeTestContext, self).__init__(d, exported)
365
366        self.target = target
367
368        self.pkgmanifest = {}
369        manifest = os.path.join(d.getVar("DEPLOY_DIR_IMAGE"),
370                d.getVar("IMAGE_LINK_NAME") + ".manifest")
371        nomanifest = d.getVar("IMAGE_NO_MANIFEST")
372        if nomanifest is None or nomanifest != "1":
373            try:
374                with open(manifest) as f:
375                    for line in f:
376                        (pkg, arch, version) = line.strip().split()
377                        self.pkgmanifest[pkg] = (version, arch)
378            except IOError as e:
379                bb.fatal("No package manifest file found. Did you build the image?\n%s" % e)
380
381    def _get_test_namespace(self):
382        return "runtime"
383
384    def _get_test_suites(self):
385        testsuites = []
386
387        manifests = (self.d.getVar("TEST_SUITES_MANIFEST") or '').split()
388        if manifests:
389            for manifest in manifests:
390                testsuites.extend(self._read_testlist(manifest,
391                                  self.d.getVar("TOPDIR")).split())
392
393        else:
394            testsuites = self.d.getVar("TEST_SUITES").split()
395
396        return testsuites
397
398    def _get_test_suites_required(self):
399        return [t for t in self.d.getVar("TEST_SUITES").split() if t != "auto"]
400
401    def loadTests(self):
402        super(RuntimeTestContext, self).loadTests()
403        if oeTest.hasPackage("procps"):
404            oeRuntimeTest.pscmd = "ps -ef"
405
406    def extract_packages(self):
407        """
408        Find packages that will be needed during runtime.
409        """
410
411        modules = self.getTestModules()
412        bbpaths = self.d.getVar("BBPATH").split(":")
413
414        shutil.rmtree(self.d.getVar("TEST_EXTRACTED_DIR"))
415        shutil.rmtree(self.d.getVar("TEST_PACKAGED_DIR"))
416        for module in modules:
417            json_file = self._getJsonFile(module)
418            if json_file:
419                needed_packages = self._getNeededPackages(json_file)
420                self._perform_package_extraction(needed_packages)
421
422    def _perform_package_extraction(self, needed_packages):
423        """
424        Extract packages that will be needed during runtime.
425        """
426
427        import oe.path
428
429        extracted_path = self.d.getVar("TEST_EXTRACTED_DIR")
430        packaged_path = self.d.getVar("TEST_PACKAGED_DIR")
431
432        for key,value in needed_packages.items():
433            packages = ()
434            if isinstance(value, dict):
435                packages = (value, )
436            elif isinstance(value, list):
437                packages = value
438            else:
439                bb.fatal("Failed to process needed packages for %s; "
440                         "Value must be a dict or list" % key)
441
442            for package in packages:
443                pkg = package["pkg"]
444                rm = package.get("rm", False)
445                extract = package.get("extract", True)
446                if extract:
447                    dst_dir = os.path.join(extracted_path, pkg)
448                else:
449                    dst_dir = os.path.join(packaged_path)
450
451                # Extract package and copy it to TEST_EXTRACTED_DIR
452                pkg_dir = self._extract_in_tmpdir(pkg)
453                if extract:
454
455                    # Same package used for more than one test,
456                    # don't need to extract again.
457                    if os.path.exists(dst_dir):
458                        continue
459                    oe.path.copytree(pkg_dir, dst_dir)
460                    shutil.rmtree(pkg_dir)
461
462                # Copy package to TEST_PACKAGED_DIR
463                else:
464                    self._copy_package(pkg)
465
466    def _getJsonFile(self, module):
467        """
468        Returns the path of the JSON file for a module, empty if doesn't exitst.
469        """
470
471        module_file = module.path
472        json_file = "%s.json" % module_file.rsplit(".", 1)[0]
473        if os.path.isfile(module_file) and os.path.isfile(json_file):
474            return json_file
475        else:
476            return ""
477
478    def _getNeededPackages(self, json_file, test=None):
479        """
480        Returns a dict with needed packages based on a JSON file.
481
482
483        If a test is specified it will return the dict just for that test.
484        """
485
486        import json
487
488        needed_packages = {}
489
490        with open(json_file) as f:
491            test_packages = json.load(f)
492        for key,value in test_packages.items():
493            needed_packages[key] = value
494
495        if test:
496            if test in needed_packages:
497                needed_packages = needed_packages[test]
498            else:
499                needed_packages = {}
500
501        return needed_packages
502
503    def _extract_in_tmpdir(self, pkg):
504        """"
505        Returns path to a temp directory where the package was
506        extracted without dependencies.
507        """
508
509        from oeqa.utils.package_manager import get_package_manager
510
511        pkg_path = os.path.join(self.d.getVar("TEST_INSTALL_TMP_DIR"), pkg)
512        pm = get_package_manager(self.d, pkg_path)
513        extract_dir = pm.extract(pkg)
514        shutil.rmtree(pkg_path)
515
516        return extract_dir
517
518    def _copy_package(self, pkg):
519        """
520        Copy the RPM, DEB or IPK package to dst_dir
521        """
522
523        from oeqa.utils.package_manager import get_package_manager
524
525        pkg_path = os.path.join(self.d.getVar("TEST_INSTALL_TMP_DIR"), pkg)
526        dst_dir = self.d.getVar("TEST_PACKAGED_DIR")
527        pm = get_package_manager(self.d, pkg_path)
528        pkg_info = pm.package_info(pkg)
529        file_path = pkg_info[pkg]["filepath"]
530        shutil.copy2(file_path, dst_dir)
531        shutil.rmtree(pkg_path)
532
533    def install_uninstall_packages(self, test_id, pkg_dir, install):
534        """
535        Check if the test requires a package and Install/Uninstall it in the DUT
536        """
537
538        test = test_id.split(".")[4]
539        module = self.getModulefromID(test_id)
540        json = self._getJsonFile(module)
541        if json:
542            needed_packages = self._getNeededPackages(json, test)
543            if needed_packages:
544                self._install_uninstall_packages(needed_packages, pkg_dir, install)
545
546    def _install_uninstall_packages(self, needed_packages, pkg_dir, install=True):
547        """
548        Install/Uninstall packages in the DUT without using a package manager
549        """
550
551        if isinstance(needed_packages, dict):
552            packages = [needed_packages]
553        elif isinstance(needed_packages, list):
554            packages = needed_packages
555
556        for package in packages:
557            pkg = package["pkg"]
558            rm = package.get("rm", False)
559            extract = package.get("extract", True)
560            src_dir = os.path.join(pkg_dir, pkg)
561
562            # Install package
563            if install and extract:
564                self.target.connection.copy_dir_to(src_dir, "/")
565
566            # Uninstall package
567            elif not install and rm:
568                self.target.connection.delete_dir_structure(src_dir, "/")
569
570class ImageTestContext(RuntimeTestContext):
571    def __init__(self, d, target, host_dumper):
572        super(ImageTestContext, self).__init__(d, target)
573
574        self.tagexp = d.getVar("TEST_SUITES_TAGS")
575
576        self.host_dumper = host_dumper
577
578        self.sigterm = False
579        self.origsigtermhandler = signal.getsignal(signal.SIGTERM)
580        signal.signal(signal.SIGTERM, self._sigterm_exception)
581
582    def _sigterm_exception(self, signum, stackframe):
583        bb.warn("TestImage received SIGTERM, shutting down...")
584        self.sigterm = True
585        self.target.stop()
586
587    def install_uninstall_packages(self, test_id, install=True):
588        """
589        Check if the test requires a package and Install/Uninstall it in the DUT
590        """
591
592        pkg_dir = self.d.getVar("TEST_EXTRACTED_DIR")
593        super(ImageTestContext, self).install_uninstall_packages(test_id, pkg_dir, install)
594
595class ExportTestContext(RuntimeTestContext):
596    def __init__(self, d, target, exported=False, parsedArgs={}):
597        """
598        This class is used when exporting tests and when are executed outside OE environment.
599
600        parsedArgs can contain the following:
601            - tag:      Filter test by tag.
602        """
603        super(ExportTestContext, self).__init__(d, target, exported)
604
605        tag = parsedArgs.get("tag", None)
606        self.tagexp = tag if tag != None else d.getVar("TEST_SUITES_TAGS")
607
608        self.sigterm = None
609
610    def install_uninstall_packages(self, test_id, install=True):
611        """
612        Check if the test requires a package and Install/Uninstall it in the DUT
613        """
614
615        export_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
616        extracted_dir = self.d.getVar("TEST_EXPORT_EXTRACTED_DIR")
617        pkg_dir = os.path.join(export_dir, extracted_dir)
618        super(ExportTestContext, self).install_uninstall_packages(test_id, pkg_dir, install)
619