xref: /OK3568_Linux_fs/yocto/poky/bitbake/bin/bitbake-worker (revision 4882a59341e53eb6f0b4789bf948001014eff981)
1#!/usr/bin/env python3
2#
3# Copyright BitBake Contributors
4#
5# SPDX-License-Identifier: GPL-2.0-only
6#
7
8import os
9import sys
10import warnings
11warnings.simplefilter("default")
12sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
13from bb import fetch2
14import logging
15import bb
16import select
17import errno
18import signal
19import pickle
20import traceback
21import queue
22import shlex
23import subprocess
24from multiprocessing import Lock
25from threading import Thread
26
27if sys.getfilesystemencoding() != "utf-8":
28    sys.exit("Please use a locale setting which supports UTF-8 (such as LANG=en_US.UTF-8).\nPython can't change the filesystem locale after loading so we need a UTF-8 when Python starts or things won't work.")
29
30# Users shouldn't be running this code directly
31if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
32    print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
33    sys.exit(1)
34
35profiling = False
36if sys.argv[1].startswith("decafbadbad"):
37    profiling = True
38    try:
39        import cProfile as profile
40    except:
41        import profile
42
43# Unbuffer stdout to avoid log truncation in the event
44# of an unorderly exit as well as to provide timely
45# updates to log files for use with tail
46try:
47    if sys.stdout.name == '<stdout>':
48        import fcntl
49        fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL)
50        fl |= os.O_SYNC
51        fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl)
52        #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
53except:
54    pass
55
56logger = logging.getLogger("BitBake")
57
58worker_pipe = sys.stdout.fileno()
59bb.utils.nonblockingfd(worker_pipe)
60# Need to guard against multiprocessing being used in child processes
61# and multiple processes trying to write to the parent at the same time
62worker_pipe_lock = None
63
64handler = bb.event.LogHandler()
65logger.addHandler(handler)
66
67if 0:
68    # Code to write out a log file of all events passing through the worker
69    logfilename = "/tmp/workerlogfile"
70    format_str = "%(levelname)s: %(message)s"
71    conlogformat = bb.msg.BBLogFormatter(format_str)
72    consolelog = logging.FileHandler(logfilename)
73    consolelog.setFormatter(conlogformat)
74    logger.addHandler(consolelog)
75
76worker_queue = queue.Queue()
77
78def worker_fire(event, d):
79    data = b"<event>" + pickle.dumps(event) + b"</event>"
80    worker_fire_prepickled(data)
81
82def worker_fire_prepickled(event):
83    global worker_queue
84
85    worker_queue.put(event)
86
87#
88# We can end up with write contention with the cooker, it can be trying to send commands
89# and we can be trying to send event data back. Therefore use a separate thread for writing
90# back data to cooker.
91#
92worker_thread_exit = False
93
94def worker_flush(worker_queue):
95    worker_queue_int = b""
96    global worker_pipe, worker_thread_exit
97
98    while True:
99        try:
100            worker_queue_int = worker_queue_int + worker_queue.get(True, 1)
101        except queue.Empty:
102            pass
103        while (worker_queue_int or not worker_queue.empty()):
104            try:
105                (_, ready, _) = select.select([], [worker_pipe], [], 1)
106                if not worker_queue.empty():
107                    worker_queue_int = worker_queue_int + worker_queue.get()
108                written = os.write(worker_pipe, worker_queue_int)
109                worker_queue_int = worker_queue_int[written:]
110            except (IOError, OSError) as e:
111                if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
112                    raise
113        if worker_thread_exit and worker_queue.empty() and not worker_queue_int:
114            return
115
116worker_thread = Thread(target=worker_flush, args=(worker_queue,))
117worker_thread.start()
118
119def worker_child_fire(event, d):
120    global worker_pipe
121    global worker_pipe_lock
122
123    data = b"<event>" + pickle.dumps(event) + b"</event>"
124    try:
125        worker_pipe_lock.acquire()
126        while(len(data)):
127            written = worker_pipe.write(data)
128            data = data[written:]
129        worker_pipe_lock.release()
130    except IOError:
131        sigterm_handler(None, None)
132        raise
133
134bb.event.worker_fire = worker_fire
135
136lf = None
137#lf = open("/tmp/workercommandlog", "w+")
138def workerlog_write(msg):
139    if lf:
140        lf.write(msg)
141        lf.flush()
142
143def sigterm_handler(signum, frame):
144    signal.signal(signal.SIGTERM, signal.SIG_DFL)
145    os.killpg(0, signal.SIGTERM)
146    sys.exit()
147
148def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, extraconfigdata, quieterrors=False, dry_run_exec=False):
149    # We need to setup the environment BEFORE the fork, since
150    # a fork() or exec*() activates PSEUDO...
151
152    envbackup = {}
153    fakeroot = False
154    fakeenv = {}
155    umask = None
156
157    uid = os.getuid()
158    gid = os.getgid()
159
160
161    taskdep = workerdata["taskdeps"][fn]
162    if 'umask' in taskdep and taskname in taskdep['umask']:
163        umask = taskdep['umask'][taskname]
164    elif workerdata["umask"]:
165        umask = workerdata["umask"]
166    if umask:
167        # umask might come in as a number or text string..
168        try:
169             umask = int(umask, 8)
170        except TypeError:
171             pass
172
173    dry_run = cfg.dry_run or dry_run_exec
174
175    # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
176    if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run:
177        fakeroot = True
178        envvars = (workerdata["fakerootenv"][fn] or "").split()
179        for key, value in (var.split('=') for var in envvars):
180            envbackup[key] = os.environ.get(key)
181            os.environ[key] = value
182            fakeenv[key] = value
183
184        fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
185        for p in fakedirs:
186            bb.utils.mkdirhier(p)
187        logger.debug2('Running %s:%s under fakeroot, fakedirs: %s' %
188                        (fn, taskname, ', '.join(fakedirs)))
189    else:
190        envvars = (workerdata["fakerootnoenv"][fn] or "").split()
191        for key, value in (var.split('=') for var in envvars):
192            envbackup[key] = os.environ.get(key)
193            os.environ[key] = value
194            fakeenv[key] = value
195
196    sys.stdout.flush()
197    sys.stderr.flush()
198
199    try:
200        pipein, pipeout = os.pipe()
201        pipein = os.fdopen(pipein, 'rb', 4096)
202        pipeout = os.fdopen(pipeout, 'wb', 0)
203        pid = os.fork()
204    except OSError as e:
205        logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror))
206        sys.exit(1)
207
208    if pid == 0:
209        def child():
210            global worker_pipe
211            global worker_pipe_lock
212            pipein.close()
213
214            bb.utils.signal_on_parent_exit("SIGTERM")
215
216            # Save out the PID so that the event can include it the
217            # events
218            bb.event.worker_pid = os.getpid()
219            bb.event.worker_fire = worker_child_fire
220            worker_pipe = pipeout
221            worker_pipe_lock = Lock()
222
223            # Make the child the process group leader and ensure no
224            # child process will be controlled by the current terminal
225            # This ensures signals sent to the controlling terminal like Ctrl+C
226            # don't stop the child processes.
227            os.setsid()
228
229            signal.signal(signal.SIGTERM, sigterm_handler)
230            # Let SIGHUP exit as SIGTERM
231            signal.signal(signal.SIGHUP, sigterm_handler)
232
233            # No stdin
234            newsi = os.open(os.devnull, os.O_RDWR)
235            os.dup2(newsi, sys.stdin.fileno())
236
237            if umask:
238                os.umask(umask)
239
240            try:
241                bb_cache = bb.cache.NoCache(databuilder)
242                (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn)
243                the_data = databuilder.mcdata[mc]
244                the_data.setVar("BB_WORKERCONTEXT", "1")
245                the_data.setVar("BB_TASKDEPDATA", taskdepdata)
246                the_data.setVar('BB_CURRENTTASK', taskname.replace("do_", ""))
247                if cfg.limited_deps:
248                    the_data.setVar("BB_LIMITEDDEPS", "1")
249                the_data.setVar("BUILDNAME", workerdata["buildname"])
250                the_data.setVar("DATE", workerdata["date"])
251                the_data.setVar("TIME", workerdata["time"])
252                for varname, value in extraconfigdata.items():
253                    the_data.setVar(varname, value)
254
255                bb.parse.siggen.set_taskdata(workerdata["sigdata"])
256                if "newhashes" in workerdata:
257                    bb.parse.siggen.set_taskhashes(workerdata["newhashes"])
258                ret = 0
259
260                the_data = bb_cache.loadDataFull(fn, appends)
261                the_data.setVar('BB_TASKHASH', taskhash)
262                the_data.setVar('BB_UNIHASH', unihash)
263
264                bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", "")))
265
266                if not the_data.getVarFlag(taskname, 'network', False):
267                    if bb.utils.is_local_uid(uid):
268                        logger.debug("Attempting to disable network for %s" % taskname)
269                        bb.utils.disable_network(uid, gid)
270                    else:
271                        logger.debug("Skipping disable network for %s since %s is not a local uid." % (taskname, uid))
272
273                # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
274                # successfully. We also need to unset anything from the environment which shouldn't be there
275                exports = bb.data.exported_vars(the_data)
276
277                bb.utils.empty_environment()
278                for e, v in exports:
279                    os.environ[e] = v
280
281                for e in fakeenv:
282                    os.environ[e] = fakeenv[e]
283                    the_data.setVar(e, fakeenv[e])
284                    the_data.setVarFlag(e, 'export', "1")
285
286                task_exports = the_data.getVarFlag(taskname, 'exports')
287                if task_exports:
288                    for e in task_exports.split():
289                        the_data.setVarFlag(e, 'export', '1')
290                        v = the_data.getVar(e)
291                        if v is not None:
292                            os.environ[e] = v
293
294                if quieterrors:
295                    the_data.setVarFlag(taskname, "quieterrors", "1")
296
297            except Exception:
298                if not quieterrors:
299                    logger.critical(traceback.format_exc())
300                os._exit(1)
301            try:
302                if dry_run:
303                    return 0
304                try:
305                    ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile)
306                finally:
307                    if fakeroot:
308                        fakerootcmd = shlex.split(the_data.getVar("FAKEROOTCMD"))
309                        subprocess.run(fakerootcmd + ['-S'], check=True, stdout=subprocess.PIPE)
310                return ret
311            except:
312                os._exit(1)
313        if not profiling:
314            os._exit(child())
315        else:
316            profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
317            prof = profile.Profile()
318            try:
319                ret = profile.Profile.runcall(prof, child)
320            finally:
321                prof.dump_stats(profname)
322                bb.utils.process_profilelog(profname)
323                os._exit(ret)
324    else:
325        for key, value in iter(envbackup.items()):
326            if value is None:
327                del os.environ[key]
328            else:
329                os.environ[key] = value
330
331    return pid, pipein, pipeout
332
333class runQueueWorkerPipe():
334    """
335    Abstraction for a pipe between a worker thread and the worker server
336    """
337    def __init__(self, pipein, pipeout):
338        self.input = pipein
339        if pipeout:
340            pipeout.close()
341        bb.utils.nonblockingfd(self.input)
342        self.queue = b""
343
344    def read(self):
345        start = len(self.queue)
346        try:
347            self.queue = self.queue + (self.input.read(102400) or b"")
348        except (OSError, IOError) as e:
349            if e.errno != errno.EAGAIN:
350                raise
351
352        end = len(self.queue)
353        index = self.queue.find(b"</event>")
354        while index != -1:
355            msg = self.queue[:index+8]
356            assert msg.startswith(b"<event>") and msg.count(b"<event>") == 1
357            worker_fire_prepickled(msg)
358            self.queue = self.queue[index+8:]
359            index = self.queue.find(b"</event>")
360        return (end > start)
361
362    def close(self):
363        while self.read():
364            continue
365        if len(self.queue) > 0:
366            print("Warning, worker child left partial message: %s" % self.queue)
367        self.input.close()
368
369normalexit = False
370
371class BitbakeWorker(object):
372    def __init__(self, din):
373        self.input = din
374        bb.utils.nonblockingfd(self.input)
375        self.queue = b""
376        self.cookercfg = None
377        self.databuilder = None
378        self.data = None
379        self.extraconfigdata = None
380        self.build_pids = {}
381        self.build_pipes = {}
382
383        signal.signal(signal.SIGTERM, self.sigterm_exception)
384        # Let SIGHUP exit as SIGTERM
385        signal.signal(signal.SIGHUP, self.sigterm_exception)
386        if "beef" in sys.argv[1]:
387            bb.utils.set_process_name("Worker (Fakeroot)")
388        else:
389            bb.utils.set_process_name("Worker")
390
391    def sigterm_exception(self, signum, stackframe):
392        if signum == signal.SIGTERM:
393            bb.warn("Worker received SIGTERM, shutting down...")
394        elif signum == signal.SIGHUP:
395            bb.warn("Worker received SIGHUP, shutting down...")
396        self.handle_finishnow(None)
397        signal.signal(signal.SIGTERM, signal.SIG_DFL)
398        os.kill(os.getpid(), signal.SIGTERM)
399
400    def serve(self):
401        while True:
402            (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
403            if self.input in ready:
404                try:
405                    r = self.input.read()
406                    if len(r) == 0:
407                        # EOF on pipe, server must have terminated
408                        self.sigterm_exception(signal.SIGTERM, None)
409                    self.queue = self.queue + r
410                except (OSError, IOError):
411                    pass
412            if len(self.queue):
413                self.handle_item(b"cookerconfig", self.handle_cookercfg)
414                self.handle_item(b"extraconfigdata", self.handle_extraconfigdata)
415                self.handle_item(b"workerdata", self.handle_workerdata)
416                self.handle_item(b"newtaskhashes", self.handle_newtaskhashes)
417                self.handle_item(b"runtask", self.handle_runtask)
418                self.handle_item(b"finishnow", self.handle_finishnow)
419                self.handle_item(b"ping", self.handle_ping)
420                self.handle_item(b"quit", self.handle_quit)
421
422            for pipe in self.build_pipes:
423                if self.build_pipes[pipe].input in ready:
424                    self.build_pipes[pipe].read()
425            if len(self.build_pids):
426                while self.process_waitpid():
427                    continue
428
429
430    def handle_item(self, item, func):
431        if self.queue.startswith(b"<" + item + b">"):
432            index = self.queue.find(b"</" + item + b">")
433            while index != -1:
434                try:
435                    func(self.queue[(len(item) + 2):index])
436                except pickle.UnpicklingError:
437                    workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue))
438                    raise
439                self.queue = self.queue[(index + len(item) + 3):]
440                index = self.queue.find(b"</" + item + b">")
441
442    def handle_cookercfg(self, data):
443        self.cookercfg = pickle.loads(data)
444        self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
445        self.databuilder.parseBaseConfiguration(worker=True)
446        self.data = self.databuilder.data
447
448    def handle_extraconfigdata(self, data):
449        self.extraconfigdata = pickle.loads(data)
450
451    def handle_workerdata(self, data):
452        self.workerdata = pickle.loads(data)
453        bb.build.verboseShellLogging = self.workerdata["build_verbose_shell"]
454        bb.build.verboseStdoutLogging = self.workerdata["build_verbose_stdout"]
455        bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"]
456        bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
457        for mc in self.databuilder.mcdata:
458            self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
459            self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"])
460
461    def handle_newtaskhashes(self, data):
462        self.workerdata["newhashes"] = pickle.loads(data)
463
464    def handle_ping(self, _):
465        workerlog_write("Handling ping\n")
466
467        logger.warning("Pong from bitbake-worker!")
468
469    def handle_quit(self, data):
470        workerlog_write("Handling quit\n")
471
472        global normalexit
473        normalexit = True
474        sys.exit(0)
475
476    def handle_runtask(self, data):
477        fn, task, taskname, taskhash, unihash, quieterrors, appends, taskdepdata, dry_run_exec = pickle.loads(data)
478        workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
479
480        pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec)
481
482        self.build_pids[pid] = task
483        self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
484
485    def process_waitpid(self):
486        """
487        Return none is there are no processes awaiting result collection, otherwise
488        collect the process exit codes and close the information pipe.
489        """
490        try:
491            pid, status = os.waitpid(-1, os.WNOHANG)
492            if pid == 0 or os.WIFSTOPPED(status):
493                return False
494        except OSError:
495            return False
496
497        workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
498
499        if os.WIFEXITED(status):
500            status = os.WEXITSTATUS(status)
501        elif os.WIFSIGNALED(status):
502            # Per shell conventions for $?, when a process exits due to
503            # a signal, we return an exit code of 128 + SIGNUM
504            status = 128 + os.WTERMSIG(status)
505
506        task = self.build_pids[pid]
507        del self.build_pids[pid]
508
509        self.build_pipes[pid].close()
510        del self.build_pipes[pid]
511
512        worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>")
513
514        return True
515
516    def handle_finishnow(self, _):
517        if self.build_pids:
518            logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
519            for k, v in iter(self.build_pids.items()):
520                try:
521                    os.kill(-k, signal.SIGTERM)
522                    os.waitpid(-1, 0)
523                except:
524                    pass
525        for pipe in self.build_pipes:
526            self.build_pipes[pipe].read()
527
528try:
529    worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb'))
530    if not profiling:
531        worker.serve()
532    else:
533        profname = "profile-worker.log"
534        prof = profile.Profile()
535        try:
536            profile.Profile.runcall(prof, worker.serve)
537        finally:
538            prof.dump_stats(profname)
539            bb.utils.process_profilelog(profname)
540except BaseException as e:
541    if not normalexit:
542        import traceback
543        sys.stderr.write(traceback.format_exc())
544        sys.stderr.write(str(e))
545finally:
546    worker_thread_exit = True
547    worker_thread.join()
548
549workerlog_write("exiting")
550if not normalexit:
551    sys.exit(1)
552sys.exit(0)
553