1# 2# Copyright (C) 2013 Intel Corporation 3# 4# SPDX-License-Identifier: MIT 5# 6 7# Main unittest module used by testimage.bbclass 8# This provides the oeRuntimeTest base class which is inherited by all tests in meta/lib/oeqa/runtime. 9 10# It also has some helper functions and it's responsible for actually starting the tests 11 12import os, re, sys 13import unittest 14import inspect 15import subprocess 16import signal 17import shutil 18import functools 19try: 20 import bb 21except ImportError: 22 pass 23import logging 24 25import oeqa.runtime 26# Exported test doesn't require sdkext 27try: 28 import oeqa.sdkext 29except ImportError: 30 pass 31from oeqa.utils.decorators import LogResults, gettag, getResults 32 33logger = logging.getLogger("BitBake") 34 35def getVar(obj): 36 #extend form dict, if a variable didn't exists, need find it in testcase 37 class VarDict(dict): 38 def __getitem__(self, key): 39 return gettag(obj, key) 40 return VarDict() 41 42def checkTags(tc, tagexp): 43 return eval(tagexp, None, getVar(tc)) 44 45def filterByTagExp(testsuite, tagexp): 46 if not tagexp: 47 return testsuite 48 caseList = [] 49 for each in testsuite: 50 if not isinstance(each, unittest.BaseTestSuite): 51 if checkTags(each, tagexp): 52 caseList.append(each) 53 else: 54 caseList.append(filterByTagExp(each, tagexp)) 55 return testsuite.__class__(caseList) 56 57@LogResults 58class oeTest(unittest.TestCase): 59 60 pscmd = "ps" 61 longMessage = True 62 63 @classmethod 64 def hasPackage(self, pkg): 65 """ 66 True if the full package name exists in the manifest, False otherwise. 67 """ 68 return pkg in oeTest.tc.pkgmanifest 69 70 @classmethod 71 def hasPackageMatch(self, match): 72 """ 73 True if match exists in the manifest as a regular expression substring, 74 False otherwise. 75 """ 76 for s in oeTest.tc.pkgmanifest: 77 if re.match(match, s): 78 return True 79 return False 80 81 @classmethod 82 def hasFeature(self,feature): 83 if feature in oeTest.tc.imagefeatures or \ 84 feature in oeTest.tc.distrofeatures: 85 return True 86 else: 87 return False 88 89class oeRuntimeTest(oeTest): 90 def __init__(self, methodName='runTest'): 91 self.target = oeRuntimeTest.tc.target 92 super(oeRuntimeTest, self).__init__(methodName) 93 94 def setUp(self): 95 # Install packages in the DUT 96 self.tc.install_uninstall_packages(self.id()) 97 98 # Check if test needs to run 99 if self.tc.sigterm: 100 self.fail("Got SIGTERM") 101 elif (type(self.target).__name__ == "QemuTarget"): 102 self.assertTrue(self.target.check(), msg = "Qemu not running?") 103 104 self.setUpLocal() 105 106 # a setup method before tests but after the class instantiation 107 def setUpLocal(self): 108 pass 109 110 def tearDown(self): 111 # Uninstall packages in the DUT 112 self.tc.install_uninstall_packages(self.id(), False) 113 114 res = getResults() 115 # If a test fails or there is an exception dump 116 # for QemuTarget only 117 if (type(self.target).__name__ == "QemuTarget" and 118 (self.id() in res.getErrorList() or 119 self.id() in res.getFailList())): 120 self.tc.host_dumper.create_dir(self._testMethodName) 121 self.tc.host_dumper.dump_host() 122 self.target.target_dumper.dump_target( 123 self.tc.host_dumper.dump_dir) 124 print ("%s dump data stored in %s" % (self._testMethodName, 125 self.tc.host_dumper.dump_dir)) 126 127 self.tearDownLocal() 128 129 # Method to be run after tearDown and implemented by child classes 130 def tearDownLocal(self): 131 pass 132 133def getmodule(pos=2): 134 # stack returns a list of tuples containg frame information 135 # First element of the list the is current frame, caller is 1 136 frameinfo = inspect.stack()[pos] 137 modname = inspect.getmodulename(frameinfo[1]) 138 #modname = inspect.getmodule(frameinfo[0]).__name__ 139 return modname 140 141def skipModule(reason, pos=2): 142 modname = getmodule(pos) 143 if modname not in oeTest.tc.testsrequired: 144 raise unittest.SkipTest("%s: %s" % (modname, reason)) 145 else: 146 raise Exception("\nTest %s wants to be skipped.\nReason is: %s" \ 147 "\nTest was required in TEST_SUITES, so either the condition for skipping is wrong" \ 148 "\nor the image really doesn't have the required feature/package when it should." % (modname, reason)) 149 150def skipModuleIf(cond, reason): 151 152 if cond: 153 skipModule(reason, 3) 154 155def skipModuleUnless(cond, reason): 156 157 if not cond: 158 skipModule(reason, 3) 159 160_buffer_logger = "" 161def custom_verbose(msg, *args, **kwargs): 162 global _buffer_logger 163 if msg[-1] != "\n": 164 _buffer_logger += msg 165 else: 166 _buffer_logger += msg 167 try: 168 bb.plain(_buffer_logger.rstrip("\n"), *args, **kwargs) 169 except NameError: 170 logger.info(_buffer_logger.rstrip("\n"), *args, **kwargs) 171 _buffer_logger = "" 172 173class TestContext(object): 174 def __init__(self, d, exported=False): 175 self.d = d 176 177 self.testsuites = self._get_test_suites() 178 179 if exported: 180 path = [os.path.dirname(os.path.abspath(__file__))] 181 extrapath = "" 182 else: 183 path = d.getVar("BBPATH").split(':') 184 extrapath = "lib/oeqa" 185 186 self.testslist = self._get_tests_list(path, extrapath) 187 self.testsrequired = self._get_test_suites_required() 188 189 self.filesdir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "runtime/files") 190 self.corefilesdir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "files") 191 self.imagefeatures = d.getVar("IMAGE_FEATURES").split() 192 self.distrofeatures = d.getVar("DISTRO_FEATURES").split() 193 194 # get testcase list from specified file 195 # if path is a relative path, then relative to build/conf/ 196 def _read_testlist(self, fpath, builddir): 197 if not os.path.isabs(fpath): 198 fpath = os.path.join(builddir, "conf", fpath) 199 if not os.path.exists(fpath): 200 bb.fatal("No such manifest file: ", fpath) 201 tcs = [] 202 for line in open(fpath).readlines(): 203 line = line.strip() 204 if line and not line.startswith("#"): 205 tcs.append(line) 206 return " ".join(tcs) 207 208 # return test list by type also filter if TEST_SUITES is specified 209 def _get_tests_list(self, bbpath, extrapath): 210 testslist = [] 211 212 type = self._get_test_namespace() 213 214 # This relies on lib/ under each directory in BBPATH being added to sys.path 215 # (as done by default in base.bbclass) 216 for testname in self.testsuites: 217 if testname != "auto": 218 if testname.startswith("oeqa."): 219 testslist.append(testname) 220 continue 221 found = False 222 for p in bbpath: 223 if os.path.exists(os.path.join(p, extrapath, type, testname + ".py")): 224 testslist.append("oeqa." + type + "." + testname) 225 found = True 226 break 227 elif os.path.exists(os.path.join(p, extrapath, type, testname.split(".")[0] + ".py")): 228 testslist.append("oeqa." + type + "." + testname) 229 found = True 230 break 231 if not found: 232 bb.fatal('Test %s specified in TEST_SUITES could not be found in lib/oeqa/runtime under BBPATH' % testname) 233 234 if "auto" in self.testsuites: 235 def add_auto_list(path): 236 files = sorted([f for f in os.listdir(path) if f.endswith('.py') and not f.startswith('_')]) 237 for f in files: 238 module = 'oeqa.' + type + '.' + f[:-3] 239 if module not in testslist: 240 testslist.append(module) 241 242 for p in bbpath: 243 testpath = os.path.join(p, 'lib', 'oeqa', type) 244 bb.debug(2, 'Searching for tests in %s' % testpath) 245 if os.path.exists(testpath): 246 add_auto_list(testpath) 247 248 return testslist 249 250 def getTestModules(self): 251 """ 252 Returns all the test modules in the testlist. 253 """ 254 255 import pkgutil 256 257 modules = [] 258 for test in self.testslist: 259 if re.search("\w+\.\w+\.test_\S+", test): 260 test = '.'.join(t.split('.')[:3]) 261 module = pkgutil.get_loader(test) 262 modules.append(module) 263 264 return modules 265 266 def getModulefromID(self, test_id): 267 """ 268 Returns the test module based on a test id. 269 """ 270 271 module_name = ".".join(test_id.split(".")[:3]) 272 modules = self.getTestModules() 273 for module in modules: 274 if module.name == module_name: 275 return module 276 277 return None 278 279 def getTests(self, test): 280 '''Return all individual tests executed when running the suite.''' 281 # Unfortunately unittest does not have an API for this, so we have 282 # to rely on implementation details. This only needs to work 283 # for TestSuite containing TestCase. 284 method = getattr(test, '_testMethodName', None) 285 if method: 286 # leaf case: a TestCase 287 yield test 288 else: 289 # Look into TestSuite. 290 tests = getattr(test, '_tests', []) 291 for t1 in tests: 292 for t2 in self.getTests(t1): 293 yield t2 294 295 def loadTests(self): 296 setattr(oeTest, "tc", self) 297 298 testloader = unittest.TestLoader() 299 testloader.sortTestMethodsUsing = None 300 suites = [testloader.loadTestsFromName(name) for name in self.testslist] 301 suites = filterByTagExp(suites, getattr(self, "tagexp", None)) 302 303 # Determine dependencies between suites by looking for @skipUnlessPassed 304 # method annotations. Suite A depends on suite B if any method in A 305 # depends on a method on B. 306 for suite in suites: 307 suite.dependencies = [] 308 suite.depth = 0 309 for test in self.getTests(suite): 310 methodname = getattr(test, '_testMethodName', None) 311 if methodname: 312 method = getattr(test, methodname) 313 depends_on = getattr(method, '_depends_on', None) 314 if depends_on: 315 for dep_suite in suites: 316 if depends_on in [getattr(t, '_testMethodName', None) for t in self.getTests(dep_suite)]: 317 if dep_suite not in suite.dependencies and \ 318 dep_suite is not suite: 319 suite.dependencies.append(dep_suite) 320 break 321 else: 322 logger.warning("Test %s was declared as @skipUnlessPassed('%s') but that test is either not defined or not active. Will run the test anyway." % 323 (test, depends_on)) 324 325 # Use brute-force topological sort to determine ordering. Sort by 326 # depth (higher depth = must run later), with original ordering to 327 # break ties. 328 def set_suite_depth(suite): 329 for dep in suite.dependencies: 330 new_depth = set_suite_depth(dep) + 1 331 if new_depth > suite.depth: 332 suite.depth = new_depth 333 return suite.depth 334 335 for index, suite in enumerate(suites): 336 set_suite_depth(suite) 337 suite.index = index 338 339 def cmp(a, b): 340 return (a > b) - (a < b) 341 342 def cmpfunc(a, b): 343 return cmp((a.depth, a.index), (b.depth, b.index)) 344 345 suites.sort(key=functools.cmp_to_key(cmpfunc)) 346 347 self.suite = testloader.suiteClass(suites) 348 349 return self.suite 350 351 def runTests(self): 352 logger.info("Test modules %s" % self.testslist) 353 if hasattr(self, "tagexp") and self.tagexp: 354 logger.info("Filter test cases by tags: %s" % self.tagexp) 355 logger.info("Found %s tests" % self.suite.countTestCases()) 356 runner = unittest.TextTestRunner(verbosity=2) 357 if 'bb' in sys.modules: 358 runner.stream.write = custom_verbose 359 360 return runner.run(self.suite) 361 362class RuntimeTestContext(TestContext): 363 def __init__(self, d, target, exported=False): 364 super(RuntimeTestContext, self).__init__(d, exported) 365 366 self.target = target 367 368 self.pkgmanifest = {} 369 manifest = os.path.join(d.getVar("DEPLOY_DIR_IMAGE"), 370 d.getVar("IMAGE_LINK_NAME") + ".manifest") 371 nomanifest = d.getVar("IMAGE_NO_MANIFEST") 372 if nomanifest is None or nomanifest != "1": 373 try: 374 with open(manifest) as f: 375 for line in f: 376 (pkg, arch, version) = line.strip().split() 377 self.pkgmanifest[pkg] = (version, arch) 378 except IOError as e: 379 bb.fatal("No package manifest file found. Did you build the image?\n%s" % e) 380 381 def _get_test_namespace(self): 382 return "runtime" 383 384 def _get_test_suites(self): 385 testsuites = [] 386 387 manifests = (self.d.getVar("TEST_SUITES_MANIFEST") or '').split() 388 if manifests: 389 for manifest in manifests: 390 testsuites.extend(self._read_testlist(manifest, 391 self.d.getVar("TOPDIR")).split()) 392 393 else: 394 testsuites = self.d.getVar("TEST_SUITES").split() 395 396 return testsuites 397 398 def _get_test_suites_required(self): 399 return [t for t in self.d.getVar("TEST_SUITES").split() if t != "auto"] 400 401 def loadTests(self): 402 super(RuntimeTestContext, self).loadTests() 403 if oeTest.hasPackage("procps"): 404 oeRuntimeTest.pscmd = "ps -ef" 405 406 def extract_packages(self): 407 """ 408 Find packages that will be needed during runtime. 409 """ 410 411 modules = self.getTestModules() 412 bbpaths = self.d.getVar("BBPATH").split(":") 413 414 shutil.rmtree(self.d.getVar("TEST_EXTRACTED_DIR")) 415 shutil.rmtree(self.d.getVar("TEST_PACKAGED_DIR")) 416 for module in modules: 417 json_file = self._getJsonFile(module) 418 if json_file: 419 needed_packages = self._getNeededPackages(json_file) 420 self._perform_package_extraction(needed_packages) 421 422 def _perform_package_extraction(self, needed_packages): 423 """ 424 Extract packages that will be needed during runtime. 425 """ 426 427 import oe.path 428 429 extracted_path = self.d.getVar("TEST_EXTRACTED_DIR") 430 packaged_path = self.d.getVar("TEST_PACKAGED_DIR") 431 432 for key,value in needed_packages.items(): 433 packages = () 434 if isinstance(value, dict): 435 packages = (value, ) 436 elif isinstance(value, list): 437 packages = value 438 else: 439 bb.fatal("Failed to process needed packages for %s; " 440 "Value must be a dict or list" % key) 441 442 for package in packages: 443 pkg = package["pkg"] 444 rm = package.get("rm", False) 445 extract = package.get("extract", True) 446 if extract: 447 dst_dir = os.path.join(extracted_path, pkg) 448 else: 449 dst_dir = os.path.join(packaged_path) 450 451 # Extract package and copy it to TEST_EXTRACTED_DIR 452 pkg_dir = self._extract_in_tmpdir(pkg) 453 if extract: 454 455 # Same package used for more than one test, 456 # don't need to extract again. 457 if os.path.exists(dst_dir): 458 continue 459 oe.path.copytree(pkg_dir, dst_dir) 460 shutil.rmtree(pkg_dir) 461 462 # Copy package to TEST_PACKAGED_DIR 463 else: 464 self._copy_package(pkg) 465 466 def _getJsonFile(self, module): 467 """ 468 Returns the path of the JSON file for a module, empty if doesn't exitst. 469 """ 470 471 module_file = module.path 472 json_file = "%s.json" % module_file.rsplit(".", 1)[0] 473 if os.path.isfile(module_file) and os.path.isfile(json_file): 474 return json_file 475 else: 476 return "" 477 478 def _getNeededPackages(self, json_file, test=None): 479 """ 480 Returns a dict with needed packages based on a JSON file. 481 482 483 If a test is specified it will return the dict just for that test. 484 """ 485 486 import json 487 488 needed_packages = {} 489 490 with open(json_file) as f: 491 test_packages = json.load(f) 492 for key,value in test_packages.items(): 493 needed_packages[key] = value 494 495 if test: 496 if test in needed_packages: 497 needed_packages = needed_packages[test] 498 else: 499 needed_packages = {} 500 501 return needed_packages 502 503 def _extract_in_tmpdir(self, pkg): 504 """" 505 Returns path to a temp directory where the package was 506 extracted without dependencies. 507 """ 508 509 from oeqa.utils.package_manager import get_package_manager 510 511 pkg_path = os.path.join(self.d.getVar("TEST_INSTALL_TMP_DIR"), pkg) 512 pm = get_package_manager(self.d, pkg_path) 513 extract_dir = pm.extract(pkg) 514 shutil.rmtree(pkg_path) 515 516 return extract_dir 517 518 def _copy_package(self, pkg): 519 """ 520 Copy the RPM, DEB or IPK package to dst_dir 521 """ 522 523 from oeqa.utils.package_manager import get_package_manager 524 525 pkg_path = os.path.join(self.d.getVar("TEST_INSTALL_TMP_DIR"), pkg) 526 dst_dir = self.d.getVar("TEST_PACKAGED_DIR") 527 pm = get_package_manager(self.d, pkg_path) 528 pkg_info = pm.package_info(pkg) 529 file_path = pkg_info[pkg]["filepath"] 530 shutil.copy2(file_path, dst_dir) 531 shutil.rmtree(pkg_path) 532 533 def install_uninstall_packages(self, test_id, pkg_dir, install): 534 """ 535 Check if the test requires a package and Install/Uninstall it in the DUT 536 """ 537 538 test = test_id.split(".")[4] 539 module = self.getModulefromID(test_id) 540 json = self._getJsonFile(module) 541 if json: 542 needed_packages = self._getNeededPackages(json, test) 543 if needed_packages: 544 self._install_uninstall_packages(needed_packages, pkg_dir, install) 545 546 def _install_uninstall_packages(self, needed_packages, pkg_dir, install=True): 547 """ 548 Install/Uninstall packages in the DUT without using a package manager 549 """ 550 551 if isinstance(needed_packages, dict): 552 packages = [needed_packages] 553 elif isinstance(needed_packages, list): 554 packages = needed_packages 555 556 for package in packages: 557 pkg = package["pkg"] 558 rm = package.get("rm", False) 559 extract = package.get("extract", True) 560 src_dir = os.path.join(pkg_dir, pkg) 561 562 # Install package 563 if install and extract: 564 self.target.connection.copy_dir_to(src_dir, "/") 565 566 # Uninstall package 567 elif not install and rm: 568 self.target.connection.delete_dir_structure(src_dir, "/") 569 570class ImageTestContext(RuntimeTestContext): 571 def __init__(self, d, target, host_dumper): 572 super(ImageTestContext, self).__init__(d, target) 573 574 self.tagexp = d.getVar("TEST_SUITES_TAGS") 575 576 self.host_dumper = host_dumper 577 578 self.sigterm = False 579 self.origsigtermhandler = signal.getsignal(signal.SIGTERM) 580 signal.signal(signal.SIGTERM, self._sigterm_exception) 581 582 def _sigterm_exception(self, signum, stackframe): 583 bb.warn("TestImage received SIGTERM, shutting down...") 584 self.sigterm = True 585 self.target.stop() 586 587 def install_uninstall_packages(self, test_id, install=True): 588 """ 589 Check if the test requires a package and Install/Uninstall it in the DUT 590 """ 591 592 pkg_dir = self.d.getVar("TEST_EXTRACTED_DIR") 593 super(ImageTestContext, self).install_uninstall_packages(test_id, pkg_dir, install) 594 595class ExportTestContext(RuntimeTestContext): 596 def __init__(self, d, target, exported=False, parsedArgs={}): 597 """ 598 This class is used when exporting tests and when are executed outside OE environment. 599 600 parsedArgs can contain the following: 601 - tag: Filter test by tag. 602 """ 603 super(ExportTestContext, self).__init__(d, target, exported) 604 605 tag = parsedArgs.get("tag", None) 606 self.tagexp = tag if tag != None else d.getVar("TEST_SUITES_TAGS") 607 608 self.sigterm = None 609 610 def install_uninstall_packages(self, test_id, install=True): 611 """ 612 Check if the test requires a package and Install/Uninstall it in the DUT 613 """ 614 615 export_dir = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) 616 extracted_dir = self.d.getVar("TEST_EXPORT_EXTRACTED_DIR") 617 pkg_dir = os.path.join(export_dir, extracted_dir) 618 super(ExportTestContext, self).install_uninstall_packages(test_id, pkg_dir, install) 619