xref: /OK3568_Linux_fs/yocto/poky/meta/lib/oeqa/core/utils/concurrencytest.py (revision 4882a59341e53eb6f0b4789bf948001014eff981)
1#!/usr/bin/env python3
2#
3# SPDX-License-Identifier: GPL-2.0-or-later
4#
5# Modified for use in OE by Richard Purdie, 2018
6#
7# Modified by: Corey Goldberg, 2013
8#   License: GPLv2+
9#
10# Original code from:
11#   Bazaar (bzrlib.tests.__init__.py, v2.6, copied Jun 01 2013)
12#   Copyright (C) 2005-2011 Canonical Ltd
13#   License: GPLv2+
14
15import os
16import sys
17import traceback
18import unittest
19import subprocess
20import testtools
21import threading
22import time
23import io
24import json
25import subunit
26
27from queue import Queue
28from itertools import cycle
29from subunit import ProtocolTestCase, TestProtocolClient
30from subunit.test_results import AutoTimingTestResultDecorator
31from testtools import ThreadsafeForwardingResult, iterate_tests
32from testtools.content import Content
33from testtools.content_type import ContentType
34from oeqa.utils.commands import get_test_layer
35
36import bb.utils
37import oe.path
38
39_all__ = [
40    'ConcurrentTestSuite',
41    'fork_for_tests',
42    'partition_tests',
43]
44
45#
46# Patch the version from testtools to allow access to _test_start and allow
47# computation of timing information and threading progress
48#
49class BBThreadsafeForwardingResult(ThreadsafeForwardingResult):
50
51    def __init__(self, target, semaphore, threadnum, totalinprocess, totaltests, output, finalresult):
52        super(BBThreadsafeForwardingResult, self).__init__(target, semaphore)
53        self.threadnum = threadnum
54        self.totalinprocess = totalinprocess
55        self.totaltests = totaltests
56        self.buffer = True
57        self.outputbuf = output
58        self.finalresult = finalresult
59        self.finalresult.buffer = True
60        self.target = target
61
62    def _add_result_with_semaphore(self, method, test, *args, **kwargs):
63        self.semaphore.acquire()
64        try:
65            if self._test_start:
66                self.result.starttime[test.id()] = self._test_start.timestamp()
67                self.result.threadprogress[self.threadnum].append(test.id())
68                totalprogress = sum(len(x) for x in self.result.threadprogress.values())
69                self.result.progressinfo[test.id()] = "%s: %s/%s %s/%s (%ss) (%s failed) (%s)" % (
70                    self.threadnum,
71                    len(self.result.threadprogress[self.threadnum]),
72                    self.totalinprocess,
73                    totalprogress,
74                    self.totaltests,
75                    "{0:.2f}".format(time.time()-self._test_start.timestamp()),
76                    self.target.failed_tests,
77                    test.id())
78        finally:
79            self.semaphore.release()
80        self.finalresult._stderr_buffer = io.StringIO(initial_value=self.outputbuf.getvalue().decode("utf-8"))
81        self.finalresult._stdout_buffer = io.StringIO()
82        super(BBThreadsafeForwardingResult, self)._add_result_with_semaphore(method, test, *args, **kwargs)
83
84class ProxyTestResult:
85    # a very basic TestResult proxy, in order to modify add* calls
86    def __init__(self, target):
87        self.result = target
88        self.failed_tests = 0
89
90    def _addResult(self, method, test, *args, exception = False, **kwargs):
91        return method(test, *args, **kwargs)
92
93    def addError(self, test, err = None, **kwargs):
94        self.failed_tests += 1
95        self._addResult(self.result.addError, test, err, exception = True, **kwargs)
96
97    def addFailure(self, test, err = None, **kwargs):
98        self.failed_tests += 1
99        self._addResult(self.result.addFailure, test, err, exception = True, **kwargs)
100
101    def addSuccess(self, test, **kwargs):
102        self._addResult(self.result.addSuccess, test, **kwargs)
103
104    def addExpectedFailure(self, test, err = None, **kwargs):
105        self._addResult(self.result.addExpectedFailure, test, err, exception = True, **kwargs)
106
107    def addUnexpectedSuccess(self, test, **kwargs):
108        self._addResult(self.result.addUnexpectedSuccess, test, **kwargs)
109
110    def wasSuccessful(self):
111        return self.failed_tests == 0
112
113    def __getattr__(self, attr):
114        return getattr(self.result, attr)
115
116class ExtraResultsDecoderTestResult(ProxyTestResult):
117    def _addResult(self, method, test, *args, exception = False, **kwargs):
118        if "details" in kwargs and "extraresults" in kwargs["details"]:
119            if isinstance(kwargs["details"]["extraresults"], Content):
120                kwargs = kwargs.copy()
121                kwargs["details"] = kwargs["details"].copy()
122                extraresults = kwargs["details"]["extraresults"]
123                data = bytearray()
124                for b in extraresults.iter_bytes():
125                    data += b
126                extraresults = json.loads(data.decode())
127                kwargs["details"]["extraresults"] = extraresults
128        return method(test, *args, **kwargs)
129
130class ExtraResultsEncoderTestResult(ProxyTestResult):
131    def _addResult(self, method, test, *args, exception = False, **kwargs):
132        if hasattr(test, "extraresults"):
133            extras = lambda : [json.dumps(test.extraresults).encode()]
134            kwargs = kwargs.copy()
135            if "details" not in kwargs:
136                kwargs["details"] = {}
137            else:
138                kwargs["details"] = kwargs["details"].copy()
139            kwargs["details"]["extraresults"] = Content(ContentType("application", "json", {'charset': 'utf8'}), extras)
140        # if using details, need to encode any exceptions into the details obj,
141        # testtools does not handle "err" and "details" together.
142        if "details" in kwargs and exception and (len(args) >= 1 and args[0] is not None):
143            kwargs["details"]["traceback"] = testtools.content.TracebackContent(args[0], test)
144            args = []
145        return method(test, *args, **kwargs)
146
147#
148# We have to patch subunit since it doesn't understand how to handle addError
149# outside of a running test case. This can happen if classSetUp() fails
150# for a class of tests. This unfortunately has horrible internal knowledge.
151#
152def outSideTestaddError(self, offset, line):
153    """An 'error:' directive has been read."""
154    test_name = line[offset:-1].decode('utf8')
155    self.parser._current_test = subunit.RemotedTestCase(test_name)
156    self.parser.current_test_description = test_name
157    self.parser._state = self.parser._reading_error_details
158    self.parser._reading_error_details.set_simple()
159    self.parser.subunitLineReceived(line)
160
161subunit._OutSideTest.addError = outSideTestaddError
162
163# Like outSideTestaddError above, we need an equivalent for skips
164# happening at the setUpClass() level, otherwise we will see "UNKNOWN"
165# as a result for concurrent tests
166#
167def outSideTestaddSkip(self, offset, line):
168    """A 'skip:' directive has been read."""
169    test_name = line[offset:-1].decode('utf8')
170    self.parser._current_test = subunit.RemotedTestCase(test_name)
171    self.parser.current_test_description = test_name
172    self.parser._state = self.parser._reading_skip_details
173    self.parser._reading_skip_details.set_simple()
174    self.parser.subunitLineReceived(line)
175
176subunit._OutSideTest.addSkip = outSideTestaddSkip
177
178#
179# A dummy structure to add to io.StringIO so that the .buffer object
180# is available and accepts writes. This allows unittest with buffer=True
181# to interact ok with subunit which wants to access sys.stdout.buffer.
182#
183class dummybuf(object):
184   def __init__(self, parent):
185       self.p = parent
186   def write(self, data):
187       self.p.write(data.decode("utf-8"))
188
189#
190# Taken from testtools.ConncurrencyTestSuite but modified for OE use
191#
192class ConcurrentTestSuite(unittest.TestSuite):
193
194    def __init__(self, suite, processes, setupfunc, removefunc):
195        super(ConcurrentTestSuite, self).__init__([suite])
196        self.processes = processes
197        self.setupfunc = setupfunc
198        self.removefunc = removefunc
199
200    def run(self, result):
201        testservers, totaltests = fork_for_tests(self.processes, self)
202        try:
203            threads = {}
204            queue = Queue()
205            semaphore = threading.Semaphore(1)
206            result.threadprogress = {}
207            for i, (testserver, testnum, output) in enumerate(testservers):
208                result.threadprogress[i] = []
209                process_result = BBThreadsafeForwardingResult(
210                        ExtraResultsDecoderTestResult(result),
211                        semaphore, i, testnum, totaltests, output, result)
212                reader_thread = threading.Thread(
213                    target=self._run_test, args=(testserver, process_result, queue))
214                threads[testserver] = reader_thread, process_result
215                reader_thread.start()
216            while threads:
217                finished_test = queue.get()
218                threads[finished_test][0].join()
219                del threads[finished_test]
220        except:
221            for thread, process_result in threads.values():
222                process_result.stop()
223            raise
224        finally:
225            for testserver in testservers:
226                testserver[0]._stream.close()
227
228    def _run_test(self, testserver, process_result, queue):
229        try:
230            try:
231                testserver.run(process_result)
232            except Exception:
233                # The run logic itself failed
234                case = testtools.ErrorHolder(
235                    "broken-runner",
236                    error=sys.exc_info())
237                case.run(process_result)
238        finally:
239            queue.put(testserver)
240
241def fork_for_tests(concurrency_num, suite):
242    testservers = []
243    if 'BUILDDIR' in os.environ:
244        selftestdir = get_test_layer()
245
246    test_blocks = partition_tests(suite, concurrency_num)
247    # Clear the tests from the original suite so it doesn't keep them alive
248    suite._tests[:] = []
249    totaltests = sum(len(x) for x in test_blocks)
250    for process_tests in test_blocks:
251        numtests = len(process_tests)
252        process_suite = unittest.TestSuite(process_tests)
253        # Also clear each split list so new suite has only reference
254        process_tests[:] = []
255        c2pread, c2pwrite = os.pipe()
256        # Clear buffers before fork to avoid duplicate output
257        sys.stdout.flush()
258        sys.stderr.flush()
259        pid = os.fork()
260        if pid == 0:
261            ourpid = os.getpid()
262            try:
263                newbuilddir = None
264                stream = os.fdopen(c2pwrite, 'wb', 1)
265                os.close(c2pread)
266
267                (builddir, newbuilddir) = suite.setupfunc("-st-" + str(ourpid), selftestdir, process_suite)
268
269                # Leave stderr and stdout open so we can see test noise
270                # Close stdin so that the child goes away if it decides to
271                # read from stdin (otherwise its a roulette to see what
272                # child actually gets keystrokes for pdb etc).
273                newsi = os.open(os.devnull, os.O_RDWR)
274                os.dup2(newsi, sys.stdin.fileno())
275
276                # Send stdout/stderr over the stream
277                os.dup2(c2pwrite, sys.stdout.fileno())
278                os.dup2(c2pwrite, sys.stderr.fileno())
279
280                subunit_client = TestProtocolClient(stream)
281                subunit_result = AutoTimingTestResultDecorator(subunit_client)
282                unittest_result = process_suite.run(ExtraResultsEncoderTestResult(subunit_result))
283                if ourpid != os.getpid():
284                    os._exit(0)
285                if newbuilddir and unittest_result.wasSuccessful():
286                    suite.removefunc(newbuilddir)
287            except:
288                # Don't do anything with process children
289                if ourpid != os.getpid():
290                    os._exit(1)
291                # Try and report traceback on stream, but exit with error
292                # even if stream couldn't be created or something else
293                # goes wrong.  The traceback is formatted to a string and
294                # written in one go to avoid interleaving lines from
295                # multiple failing children.
296                try:
297                    stream.write(traceback.format_exc().encode('utf-8'))
298                except:
299                    sys.stderr.write(traceback.format_exc())
300                finally:
301                    if newbuilddir:
302                        suite.removefunc(newbuilddir)
303                    stream.flush()
304                    os._exit(1)
305            stream.flush()
306            os._exit(0)
307        else:
308            os.close(c2pwrite)
309            stream = os.fdopen(c2pread, 'rb', 1)
310            # Collect stdout/stderr into an io buffer
311            output = io.BytesIO()
312            testserver = ProtocolTestCase(stream, passthrough=output)
313            testservers.append((testserver, numtests, output))
314    return testservers, totaltests
315
316def partition_tests(suite, count):
317    # Keep tests from the same class together but allow tests from modules
318    # to go to different processes to aid parallelisation.
319    modules = {}
320    for test in iterate_tests(suite):
321        m = test.__module__ + "." + test.__class__.__name__
322        if m not in modules:
323            modules[m] = []
324        modules[m].append(test)
325
326    # Simply divide the test blocks between the available processes
327    partitions = [list() for _ in range(count)]
328    for partition, m in zip(cycle(partitions), modules):
329        partition.extend(modules[m])
330
331    # No point in empty threads so drop them
332    return [p for p in partitions if p]
333
334