1#!/usr/bin/env python3 2# 3# SPDX-License-Identifier: GPL-2.0-or-later 4# 5# Modified for use in OE by Richard Purdie, 2018 6# 7# Modified by: Corey Goldberg, 2013 8# License: GPLv2+ 9# 10# Original code from: 11# Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013) 12# Copyright (C) 2005-2011 Canonical Ltd 13# License: GPLv2+ 14 15import os 16import sys 17import traceback 18import unittest 19import subprocess 20import testtools 21import threading 22import time 23import io 24import json 25import subunit 26 27from queue import Queue 28from itertools import cycle 29from subunit import ProtocolTestCase, TestProtocolClient 30from subunit.test_results import AutoTimingTestResultDecorator 31from testtools import ThreadsafeForwardingResult, iterate_tests 32from testtools.content import Content 33from testtools.content_type import ContentType 34from oeqa.utils.commands import get_test_layer 35 36import bb.utils 37import oe.path 38 39_all__ = [ 40 'ConcurrentTestSuite', 41 'fork_for_tests', 42 'partition_tests', 43] 44 45# 46# Patch the version from testtools to allow access to _test_start and allow 47# computation of timing information and threading progress 48# 49class BBThreadsafeForwardingResult(ThreadsafeForwardingResult): 50 51 def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests, output, finalresult): 52 super(BBThreadsafeForwardingResult, self).__init__(target, semaphore) 53 self.threadnum = threadnum 54 self.totalinprocess = totalinprocess 55 self.totaltests = totaltests 56 self.buffer = True 57 self.outputbuf = output 58 self.finalresult = finalresult 59 self.finalresult.buffer = True 60 self.target = target 61 62 def _add_result_with_semaphore(self, method, test, *args, **kwargs): 63 self.semaphore.acquire() 64 try: 65 if self._test_start: 66 self.result.starttime[test.id()] = self._test_start.timestamp() 67 self.result.threadprogress[self.threadnum].append(test.id()) 68 totalprogress = sum(len(x) for x in self.result.threadprogress.values()) 69 self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s failed) (%s)" % ( 70 self.threadnum, 71 len(self.result.threadprogress[self.threadnum]), 72 self.totalinprocess, 73 totalprogress, 74 self.totaltests, 75 "{0:.2f}".format(time.time()-self._test_start.timestamp()), 76 self.target.failed_tests, 77 test.id()) 78 finally: 79 self.semaphore.release() 80 self.finalresult._stderr_buffer = io.StringIO(initial_value=self.outputbuf.getvalue().decode("utf-8")) 81 self.finalresult._stdout_buffer = io.StringIO() 82 super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs) 83 84class ProxyTestResult: 85 # a very basic TestResult proxy, in order to modify add* calls 86 def __init__(self, target): 87 self.result = target 88 self.failed_tests = 0 89 90 def _addResult(self, method, test, *args, exception = False, **kwargs): 91 return method(test, *args, **kwargs) 92 93 def addError(self, test, err = None, **kwargs): 94 self.failed_tests += 1 95 self._addResult(self.result.addError, test, err, exception = True, **kwargs) 96 97 def addFailure(self, test, err = None, **kwargs): 98 self.failed_tests += 1 99 self._addResult(self.result.addFailure, test, err, exception = True, **kwargs) 100 101 def addSuccess(self, test, **kwargs): 102 self._addResult(self.result.addSuccess, test, **kwargs) 103 104 def addExpectedFailure(self, test, err = None, **kwargs): 105 self._addResult(self.result.addExpectedFailure, test, err, exception = True, **kwargs) 106 107 def addUnexpectedSuccess(self, test, **kwargs): 108 self._addResult(self.result.addUnexpectedSuccess, test, **kwargs) 109 110 def wasSuccessful(self): 111 return self.failed_tests == 0 112 113 def __getattr__(self, attr): 114 return getattr(self.result, attr) 115 116class ExtraResultsDecoderTestResult(ProxyTestResult): 117 def _addResult(self, method, test, *args, exception = False, **kwargs): 118 if "details" in kwargs and "extraresults" in kwargs["details"]: 119 if isinstance(kwargs["details"]["extraresults"], Content): 120 kwargs = kwargs.copy() 121 kwargs["details"] = kwargs["details"].copy() 122 extraresults = kwargs["details"]["extraresults"] 123 data = bytearray() 124 for b in extraresults.iter_bytes(): 125 data += b 126 extraresults = json.loads(data.decode()) 127 kwargs["details"]["extraresults"] = extraresults 128 return method(test, *args, **kwargs) 129 130class ExtraResultsEncoderTestResult(ProxyTestResult): 131 def _addResult(self, method, test, *args, exception = False, **kwargs): 132 if hasattr(test, "extraresults"): 133 extras = lambda : [json.dumps(test.extraresults).encode()] 134 kwargs = kwargs.copy() 135 if "details" not in kwargs: 136 kwargs["details"] = {} 137 else: 138 kwargs["details"] = kwargs["details"].copy() 139 kwargs["details"]["extraresults"] = Content(ContentType("application", "json", {'charset': 'utf8'}), extras) 140 # if using details, need to encode any exceptions into the details obj, 141 # testtools does not handle "err" and "details" together. 142 if "details" in kwargs and exception and (len(args) >= 1 and args[0] is not None): 143 kwargs["details"]["traceback"] = testtools.content.TracebackContent(args[0], test) 144 args = [] 145 return method(test, *args, **kwargs) 146 147# 148# We have to patch subunit since it doesn't understand how to handle addError 149# outside of a running test case. This can happen if classSetUp() fails 150# for a class of tests. This unfortunately has horrible internal knowledge. 151# 152def outSideTestaddError(self, offset, line): 153 """An 'error:' directive has been read.""" 154 test_name = line[offset:-1].decode('utf8') 155 self.parser._current_test = subunit.RemotedTestCase(test_name) 156 self.parser.current_test_description = test_name 157 self.parser._state = self.parser._reading_error_details 158 self.parser._reading_error_details.set_simple() 159 self.parser.subunitLineReceived(line) 160 161subunit._OutSideTest.addError = outSideTestaddError 162 163# Like outSideTestaddError above, we need an equivalent for skips 164# happening at the setUpClass() level, otherwise we will see "UNKNOWN" 165# as a result for concurrent tests 166# 167def outSideTestaddSkip(self, offset, line): 168 """A 'skip:' directive has been read.""" 169 test_name = line[offset:-1].decode('utf8') 170 self.parser._current_test = subunit.RemotedTestCase(test_name) 171 self.parser.current_test_description = test_name 172 self.parser._state = self.parser._reading_skip_details 173 self.parser._reading_skip_details.set_simple() 174 self.parser.subunitLineReceived(line) 175 176subunit._OutSideTest.addSkip = outSideTestaddSkip 177 178# 179# A dummy structure to add to io.StringIO so that the .buffer object 180# is available and accepts writes. This allows unittest with buffer=True 181# to interact ok with subunit which wants to access sys.stdout.buffer. 182# 183class dummybuf(object): 184 def __init__(self, parent): 185 self.p = parent 186 def write(self, data): 187 self.p.write(data.decode("utf-8")) 188 189# 190# Taken from testtools.ConncurrencyTestSuite but modified for OE use 191# 192class ConcurrentTestSuite(unittest.TestSuite): 193 194 def __init__(self, suite, processes, setupfunc, removefunc): 195 super(ConcurrentTestSuite, self).__init__([suite]) 196 self.processes = processes 197 self.setupfunc = setupfunc 198 self.removefunc = removefunc 199 200 def run(self, result): 201 testservers, totaltests = fork_for_tests(self.processes, self) 202 try: 203 threads = {} 204 queue = Queue() 205 semaphore = threading.Semaphore(1) 206 result.threadprogress = {} 207 for i, (testserver, testnum, output) in enumerate(testservers): 208 result.threadprogress[i] = [] 209 process_result = BBThreadsafeForwardingResult( 210 ExtraResultsDecoderTestResult(result), 211 semaphore, i, testnum, totaltests, output, result) 212 reader_thread = threading.Thread( 213 target=self._run_test, args=(testserver, process_result, queue)) 214 threads[testserver] = reader_thread, process_result 215 reader_thread.start() 216 while threads: 217 finished_test = queue.get() 218 threads[finished_test][0].join() 219 del threads[finished_test] 220 except: 221 for thread, process_result in threads.values(): 222 process_result.stop() 223 raise 224 finally: 225 for testserver in testservers: 226 testserver[0]._stream.close() 227 228 def _run_test(self, testserver, process_result, queue): 229 try: 230 try: 231 testserver.run(process_result) 232 except Exception: 233 # The run logic itself failed 234 case = testtools.ErrorHolder( 235 "broken-runner", 236 error=sys.exc_info()) 237 case.run(process_result) 238 finally: 239 queue.put(testserver) 240 241def fork_for_tests(concurrency_num, suite): 242 testservers = [] 243 if 'BUILDDIR' in os.environ: 244 selftestdir = get_test_layer() 245 246 test_blocks = partition_tests(suite, concurrency_num) 247 # Clear the tests from the original suite so it doesn't keep them alive 248 suite._tests[:] = [] 249 totaltests = sum(len(x) for x in test_blocks) 250 for process_tests in test_blocks: 251 numtests = len(process_tests) 252 process_suite = unittest.TestSuite(process_tests) 253 # Also clear each split list so new suite has only reference 254 process_tests[:] = [] 255 c2pread, c2pwrite = os.pipe() 256 # Clear buffers before fork to avoid duplicate output 257 sys.stdout.flush() 258 sys.stderr.flush() 259 pid = os.fork() 260 if pid == 0: 261 ourpid = os.getpid() 262 try: 263 newbuilddir = None 264 stream = os.fdopen(c2pwrite, 'wb', 1) 265 os.close(c2pread) 266 267 (builddir, newbuilddir) = suite.setupfunc("-st-" + str(ourpid), selftestdir, process_suite) 268 269 # Leave stderr and stdout open so we can see test noise 270 # Close stdin so that the child goes away if it decides to 271 # read from stdin (otherwise its a roulette to see what 272 # child actually gets keystrokes for pdb etc). 273 newsi = os.open(os.devnull, os.O_RDWR) 274 os.dup2(newsi, sys.stdin.fileno()) 275 276 # Send stdout/stderr over the stream 277 os.dup2(c2pwrite, sys.stdout.fileno()) 278 os.dup2(c2pwrite, sys.stderr.fileno()) 279 280 subunit_client = TestProtocolClient(stream) 281 subunit_result = AutoTimingTestResultDecorator(subunit_client) 282 unittest_result = process_suite.run(ExtraResultsEncoderTestResult(subunit_result)) 283 if ourpid != os.getpid(): 284 os._exit(0) 285 if newbuilddir and unittest_result.wasSuccessful(): 286 suite.removefunc(newbuilddir) 287 except: 288 # Don't do anything with process children 289 if ourpid != os.getpid(): 290 os._exit(1) 291 # Try and report traceback on stream, but exit with error 292 # even if stream couldn't be created or something else 293 # goes wrong. The traceback is formatted to a string and 294 # written in one go to avoid interleaving lines from 295 # multiple failing children. 296 try: 297 stream.write(traceback.format_exc().encode('utf-8')) 298 except: 299 sys.stderr.write(traceback.format_exc()) 300 finally: 301 if newbuilddir: 302 suite.removefunc(newbuilddir) 303 stream.flush() 304 os._exit(1) 305 stream.flush() 306 os._exit(0) 307 else: 308 os.close(c2pwrite) 309 stream = os.fdopen(c2pread, 'rb', 1) 310 # Collect stdout/stderr into an io buffer 311 output = io.BytesIO() 312 testserver = ProtocolTestCase(stream, passthrough=output) 313 testservers.append((testserver, numtests, output)) 314 return testservers, totaltests 315 316def partition_tests(suite, count): 317 # Keep tests from the same class together but allow tests from modules 318 # to go to different processes to aid parallelisation. 319 modules = {} 320 for test in iterate_tests(suite): 321 m = test.__module__ + "." + test.__class__.__name__ 322 if m not in modules: 323 modules[m] = [] 324 modules[m].append(test) 325 326 # Simply divide the test blocks between the available processes 327 partitions = [list() for _ in range(count)] 328 for partition, m in zip(cycle(partitions), modules): 329 partition.extend(modules[m]) 330 331 # No point in empty threads so drop them 332 return [p for p in partitions if p] 333 334