xref: /OK3568_Linux_fs/yocto/bitbake/bin/bitbake-worker (revision 4882a59341e53eb6f0b4789bf948001014eff981)
1*4882a593Smuzhiyun#!/usr/bin/env python3
2*4882a593Smuzhiyun#
3*4882a593Smuzhiyun# Copyright BitBake Contributors
4*4882a593Smuzhiyun#
5*4882a593Smuzhiyun# SPDX-License-Identifier: GPL-2.0-only
6*4882a593Smuzhiyun#
7*4882a593Smuzhiyun
8*4882a593Smuzhiyunimport os
9*4882a593Smuzhiyunimport sys
10*4882a593Smuzhiyunimport warnings
11*4882a593Smuzhiyunwarnings.simplefilter("default")
12*4882a593Smuzhiyunsys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib'))
13*4882a593Smuzhiyunfrom bb import fetch2
14*4882a593Smuzhiyunimport logging
15*4882a593Smuzhiyunimport bb
16*4882a593Smuzhiyunimport select
17*4882a593Smuzhiyunimport errno
18*4882a593Smuzhiyunimport signal
19*4882a593Smuzhiyunimport pickle
20*4882a593Smuzhiyunimport traceback
21*4882a593Smuzhiyunimport queue
22*4882a593Smuzhiyunimport shlex
23*4882a593Smuzhiyunimport subprocess
24*4882a593Smuzhiyunfrom multiprocessing import Lock
25*4882a593Smuzhiyunfrom threading import Thread
26*4882a593Smuzhiyun
27*4882a593Smuzhiyunif sys.getfilesystemencoding() != "utf-8":
28*4882a593Smuzhiyun    sys.exit("Please use a locale setting which supports UTF-8 (such as LANG=en_US.UTF-8).\nPython can't change the filesystem locale after loading so we need a UTF-8 when Python starts or things won't work.")
29*4882a593Smuzhiyun
30*4882a593Smuzhiyun# Users shouldn't be running this code directly
31*4882a593Smuzhiyunif len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"):
32*4882a593Smuzhiyun    print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.")
33*4882a593Smuzhiyun    sys.exit(1)
34*4882a593Smuzhiyun
35*4882a593Smuzhiyunprofiling = False
36*4882a593Smuzhiyunif sys.argv[1].startswith("decafbadbad"):
37*4882a593Smuzhiyun    profiling = True
38*4882a593Smuzhiyun    try:
39*4882a593Smuzhiyun        import cProfile as profile
40*4882a593Smuzhiyun    except:
41*4882a593Smuzhiyun        import profile
42*4882a593Smuzhiyun
43*4882a593Smuzhiyun# Unbuffer stdout to avoid log truncation in the event
44*4882a593Smuzhiyun# of an unorderly exit as well as to provide timely
45*4882a593Smuzhiyun# updates to log files for use with tail
46*4882a593Smuzhiyuntry:
47*4882a593Smuzhiyun    if sys.stdout.name == '<stdout>':
48*4882a593Smuzhiyun        import fcntl
49*4882a593Smuzhiyun        fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL)
50*4882a593Smuzhiyun        fl |= os.O_SYNC
51*4882a593Smuzhiyun        fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl)
52*4882a593Smuzhiyun        #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
53*4882a593Smuzhiyunexcept:
54*4882a593Smuzhiyun    pass
55*4882a593Smuzhiyun
56*4882a593Smuzhiyunlogger = logging.getLogger("BitBake")
57*4882a593Smuzhiyun
58*4882a593Smuzhiyunworker_pipe = sys.stdout.fileno()
59*4882a593Smuzhiyunbb.utils.nonblockingfd(worker_pipe)
60*4882a593Smuzhiyun# Need to guard against multiprocessing being used in child processes
61*4882a593Smuzhiyun# and multiple processes trying to write to the parent at the same time
62*4882a593Smuzhiyunworker_pipe_lock = None
63*4882a593Smuzhiyun
64*4882a593Smuzhiyunhandler = bb.event.LogHandler()
65*4882a593Smuzhiyunlogger.addHandler(handler)
66*4882a593Smuzhiyun
67*4882a593Smuzhiyunif 0:
68*4882a593Smuzhiyun    # Code to write out a log file of all events passing through the worker
69*4882a593Smuzhiyun    logfilename = "/tmp/workerlogfile"
70*4882a593Smuzhiyun    format_str = "%(levelname)s: %(message)s"
71*4882a593Smuzhiyun    conlogformat = bb.msg.BBLogFormatter(format_str)
72*4882a593Smuzhiyun    consolelog = logging.FileHandler(logfilename)
73*4882a593Smuzhiyun    consolelog.setFormatter(conlogformat)
74*4882a593Smuzhiyun    logger.addHandler(consolelog)
75*4882a593Smuzhiyun
76*4882a593Smuzhiyunworker_queue = queue.Queue()
77*4882a593Smuzhiyun
78*4882a593Smuzhiyundef worker_fire(event, d):
79*4882a593Smuzhiyun    data = b"<event>" + pickle.dumps(event) + b"</event>"
80*4882a593Smuzhiyun    worker_fire_prepickled(data)
81*4882a593Smuzhiyun
82*4882a593Smuzhiyundef worker_fire_prepickled(event):
83*4882a593Smuzhiyun    global worker_queue
84*4882a593Smuzhiyun
85*4882a593Smuzhiyun    worker_queue.put(event)
86*4882a593Smuzhiyun
87*4882a593Smuzhiyun#
88*4882a593Smuzhiyun# We can end up with write contention with the cooker, it can be trying to send commands
89*4882a593Smuzhiyun# and we can be trying to send event data back. Therefore use a separate thread for writing
90*4882a593Smuzhiyun# back data to cooker.
91*4882a593Smuzhiyun#
92*4882a593Smuzhiyunworker_thread_exit = False
93*4882a593Smuzhiyun
94*4882a593Smuzhiyundef worker_flush(worker_queue):
95*4882a593Smuzhiyun    worker_queue_int = b""
96*4882a593Smuzhiyun    global worker_pipe, worker_thread_exit
97*4882a593Smuzhiyun
98*4882a593Smuzhiyun    while True:
99*4882a593Smuzhiyun        try:
100*4882a593Smuzhiyun            worker_queue_int = worker_queue_int + worker_queue.get(True, 1)
101*4882a593Smuzhiyun        except queue.Empty:
102*4882a593Smuzhiyun            pass
103*4882a593Smuzhiyun        while (worker_queue_int or not worker_queue.empty()):
104*4882a593Smuzhiyun            try:
105*4882a593Smuzhiyun                (_, ready, _) = select.select([], [worker_pipe], [], 1)
106*4882a593Smuzhiyun                if not worker_queue.empty():
107*4882a593Smuzhiyun                    worker_queue_int = worker_queue_int + worker_queue.get()
108*4882a593Smuzhiyun                written = os.write(worker_pipe, worker_queue_int)
109*4882a593Smuzhiyun                worker_queue_int = worker_queue_int[written:]
110*4882a593Smuzhiyun            except (IOError, OSError) as e:
111*4882a593Smuzhiyun                if e.errno != errno.EAGAIN and e.errno != errno.EPIPE:
112*4882a593Smuzhiyun                    raise
113*4882a593Smuzhiyun        if worker_thread_exit and worker_queue.empty() and not worker_queue_int:
114*4882a593Smuzhiyun            return
115*4882a593Smuzhiyun
116*4882a593Smuzhiyunworker_thread = Thread(target=worker_flush, args=(worker_queue,))
117*4882a593Smuzhiyunworker_thread.start()
118*4882a593Smuzhiyun
119*4882a593Smuzhiyundef worker_child_fire(event, d):
120*4882a593Smuzhiyun    global worker_pipe
121*4882a593Smuzhiyun    global worker_pipe_lock
122*4882a593Smuzhiyun
123*4882a593Smuzhiyun    data = b"<event>" + pickle.dumps(event) + b"</event>"
124*4882a593Smuzhiyun    try:
125*4882a593Smuzhiyun        worker_pipe_lock.acquire()
126*4882a593Smuzhiyun        while(len(data)):
127*4882a593Smuzhiyun            written = worker_pipe.write(data)
128*4882a593Smuzhiyun            data = data[written:]
129*4882a593Smuzhiyun        worker_pipe_lock.release()
130*4882a593Smuzhiyun    except IOError:
131*4882a593Smuzhiyun        sigterm_handler(None, None)
132*4882a593Smuzhiyun        raise
133*4882a593Smuzhiyun
134*4882a593Smuzhiyunbb.event.worker_fire = worker_fire
135*4882a593Smuzhiyun
136*4882a593Smuzhiyunlf = None
137*4882a593Smuzhiyun#lf = open("/tmp/workercommandlog", "w+")
138*4882a593Smuzhiyundef workerlog_write(msg):
139*4882a593Smuzhiyun    if lf:
140*4882a593Smuzhiyun        lf.write(msg)
141*4882a593Smuzhiyun        lf.flush()
142*4882a593Smuzhiyun
143*4882a593Smuzhiyundef sigterm_handler(signum, frame):
144*4882a593Smuzhiyun    signal.signal(signal.SIGTERM, signal.SIG_DFL)
145*4882a593Smuzhiyun    os.killpg(0, signal.SIGTERM)
146*4882a593Smuzhiyun    sys.exit()
147*4882a593Smuzhiyun
148*4882a593Smuzhiyundef fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, extraconfigdata, quieterrors=False, dry_run_exec=False):
149*4882a593Smuzhiyun    # We need to setup the environment BEFORE the fork, since
150*4882a593Smuzhiyun    # a fork() or exec*() activates PSEUDO...
151*4882a593Smuzhiyun
152*4882a593Smuzhiyun    envbackup = {}
153*4882a593Smuzhiyun    fakeroot = False
154*4882a593Smuzhiyun    fakeenv = {}
155*4882a593Smuzhiyun    umask = None
156*4882a593Smuzhiyun
157*4882a593Smuzhiyun    uid = os.getuid()
158*4882a593Smuzhiyun    gid = os.getgid()
159*4882a593Smuzhiyun
160*4882a593Smuzhiyun
161*4882a593Smuzhiyun    taskdep = workerdata["taskdeps"][fn]
162*4882a593Smuzhiyun    if 'umask' in taskdep and taskname in taskdep['umask']:
163*4882a593Smuzhiyun        umask = taskdep['umask'][taskname]
164*4882a593Smuzhiyun    elif workerdata["umask"]:
165*4882a593Smuzhiyun        umask = workerdata["umask"]
166*4882a593Smuzhiyun    if umask:
167*4882a593Smuzhiyun        # umask might come in as a number or text string..
168*4882a593Smuzhiyun        try:
169*4882a593Smuzhiyun             umask = int(umask, 8)
170*4882a593Smuzhiyun        except TypeError:
171*4882a593Smuzhiyun             pass
172*4882a593Smuzhiyun
173*4882a593Smuzhiyun    dry_run = cfg.dry_run or dry_run_exec
174*4882a593Smuzhiyun
175*4882a593Smuzhiyun    # We can't use the fakeroot environment in a dry run as it possibly hasn't been built
176*4882a593Smuzhiyun    if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run:
177*4882a593Smuzhiyun        fakeroot = True
178*4882a593Smuzhiyun        envvars = (workerdata["fakerootenv"][fn] or "").split()
179*4882a593Smuzhiyun        for key, value in (var.split('=') for var in envvars):
180*4882a593Smuzhiyun            envbackup[key] = os.environ.get(key)
181*4882a593Smuzhiyun            os.environ[key] = value
182*4882a593Smuzhiyun            fakeenv[key] = value
183*4882a593Smuzhiyun
184*4882a593Smuzhiyun        fakedirs = (workerdata["fakerootdirs"][fn] or "").split()
185*4882a593Smuzhiyun        for p in fakedirs:
186*4882a593Smuzhiyun            bb.utils.mkdirhier(p)
187*4882a593Smuzhiyun        logger.debug2('Running %s:%s under fakeroot, fakedirs: %s' %
188*4882a593Smuzhiyun                        (fn, taskname, ', '.join(fakedirs)))
189*4882a593Smuzhiyun    else:
190*4882a593Smuzhiyun        envvars = (workerdata["fakerootnoenv"][fn] or "").split()
191*4882a593Smuzhiyun        for key, value in (var.split('=') for var in envvars):
192*4882a593Smuzhiyun            envbackup[key] = os.environ.get(key)
193*4882a593Smuzhiyun            os.environ[key] = value
194*4882a593Smuzhiyun            fakeenv[key] = value
195*4882a593Smuzhiyun
196*4882a593Smuzhiyun    sys.stdout.flush()
197*4882a593Smuzhiyun    sys.stderr.flush()
198*4882a593Smuzhiyun
199*4882a593Smuzhiyun    try:
200*4882a593Smuzhiyun        pipein, pipeout = os.pipe()
201*4882a593Smuzhiyun        pipein = os.fdopen(pipein, 'rb', 4096)
202*4882a593Smuzhiyun        pipeout = os.fdopen(pipeout, 'wb', 0)
203*4882a593Smuzhiyun        pid = os.fork()
204*4882a593Smuzhiyun    except OSError as e:
205*4882a593Smuzhiyun        logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror))
206*4882a593Smuzhiyun        sys.exit(1)
207*4882a593Smuzhiyun
208*4882a593Smuzhiyun    if pid == 0:
209*4882a593Smuzhiyun        def child():
210*4882a593Smuzhiyun            global worker_pipe
211*4882a593Smuzhiyun            global worker_pipe_lock
212*4882a593Smuzhiyun            pipein.close()
213*4882a593Smuzhiyun
214*4882a593Smuzhiyun            bb.utils.signal_on_parent_exit("SIGTERM")
215*4882a593Smuzhiyun
216*4882a593Smuzhiyun            # Save out the PID so that the event can include it the
217*4882a593Smuzhiyun            # events
218*4882a593Smuzhiyun            bb.event.worker_pid = os.getpid()
219*4882a593Smuzhiyun            bb.event.worker_fire = worker_child_fire
220*4882a593Smuzhiyun            worker_pipe = pipeout
221*4882a593Smuzhiyun            worker_pipe_lock = Lock()
222*4882a593Smuzhiyun
223*4882a593Smuzhiyun            # Make the child the process group leader and ensure no
224*4882a593Smuzhiyun            # child process will be controlled by the current terminal
225*4882a593Smuzhiyun            # This ensures signals sent to the controlling terminal like Ctrl+C
226*4882a593Smuzhiyun            # don't stop the child processes.
227*4882a593Smuzhiyun            os.setsid()
228*4882a593Smuzhiyun
229*4882a593Smuzhiyun            signal.signal(signal.SIGTERM, sigterm_handler)
230*4882a593Smuzhiyun            # Let SIGHUP exit as SIGTERM
231*4882a593Smuzhiyun            signal.signal(signal.SIGHUP, sigterm_handler)
232*4882a593Smuzhiyun
233*4882a593Smuzhiyun            # No stdin
234*4882a593Smuzhiyun            newsi = os.open(os.devnull, os.O_RDWR)
235*4882a593Smuzhiyun            os.dup2(newsi, sys.stdin.fileno())
236*4882a593Smuzhiyun
237*4882a593Smuzhiyun            if umask:
238*4882a593Smuzhiyun                os.umask(umask)
239*4882a593Smuzhiyun
240*4882a593Smuzhiyun            try:
241*4882a593Smuzhiyun                bb_cache = bb.cache.NoCache(databuilder)
242*4882a593Smuzhiyun                (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn)
243*4882a593Smuzhiyun                the_data = databuilder.mcdata[mc]
244*4882a593Smuzhiyun                the_data.setVar("BB_WORKERCONTEXT", "1")
245*4882a593Smuzhiyun                the_data.setVar("BB_TASKDEPDATA", taskdepdata)
246*4882a593Smuzhiyun                the_data.setVar('BB_CURRENTTASK', taskname.replace("do_", ""))
247*4882a593Smuzhiyun                if cfg.limited_deps:
248*4882a593Smuzhiyun                    the_data.setVar("BB_LIMITEDDEPS", "1")
249*4882a593Smuzhiyun                the_data.setVar("BUILDNAME", workerdata["buildname"])
250*4882a593Smuzhiyun                the_data.setVar("DATE", workerdata["date"])
251*4882a593Smuzhiyun                the_data.setVar("TIME", workerdata["time"])
252*4882a593Smuzhiyun                for varname, value in extraconfigdata.items():
253*4882a593Smuzhiyun                    the_data.setVar(varname, value)
254*4882a593Smuzhiyun
255*4882a593Smuzhiyun                bb.parse.siggen.set_taskdata(workerdata["sigdata"])
256*4882a593Smuzhiyun                if "newhashes" in workerdata:
257*4882a593Smuzhiyun                    bb.parse.siggen.set_taskhashes(workerdata["newhashes"])
258*4882a593Smuzhiyun                ret = 0
259*4882a593Smuzhiyun
260*4882a593Smuzhiyun                the_data = bb_cache.loadDataFull(fn, appends)
261*4882a593Smuzhiyun                the_data.setVar('BB_TASKHASH', taskhash)
262*4882a593Smuzhiyun                the_data.setVar('BB_UNIHASH', unihash)
263*4882a593Smuzhiyun
264*4882a593Smuzhiyun                bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", "")))
265*4882a593Smuzhiyun
266*4882a593Smuzhiyun                if not the_data.getVarFlag(taskname, 'network', False):
267*4882a593Smuzhiyun                    if bb.utils.is_local_uid(uid):
268*4882a593Smuzhiyun                        logger.debug("Attempting to disable network for %s" % taskname)
269*4882a593Smuzhiyun                        bb.utils.disable_network(uid, gid)
270*4882a593Smuzhiyun                    else:
271*4882a593Smuzhiyun                        logger.debug("Skipping disable network for %s since %s is not a local uid." % (taskname, uid))
272*4882a593Smuzhiyun
273*4882a593Smuzhiyun                # exported_vars() returns a generator which *cannot* be passed to os.environ.update()
274*4882a593Smuzhiyun                # successfully. We also need to unset anything from the environment which shouldn't be there
275*4882a593Smuzhiyun                exports = bb.data.exported_vars(the_data)
276*4882a593Smuzhiyun
277*4882a593Smuzhiyun                bb.utils.empty_environment()
278*4882a593Smuzhiyun                for e, v in exports:
279*4882a593Smuzhiyun                    os.environ[e] = v
280*4882a593Smuzhiyun
281*4882a593Smuzhiyun                for e in fakeenv:
282*4882a593Smuzhiyun                    os.environ[e] = fakeenv[e]
283*4882a593Smuzhiyun                    the_data.setVar(e, fakeenv[e])
284*4882a593Smuzhiyun                    the_data.setVarFlag(e, 'export', "1")
285*4882a593Smuzhiyun
286*4882a593Smuzhiyun                task_exports = the_data.getVarFlag(taskname, 'exports')
287*4882a593Smuzhiyun                if task_exports:
288*4882a593Smuzhiyun                    for e in task_exports.split():
289*4882a593Smuzhiyun                        the_data.setVarFlag(e, 'export', '1')
290*4882a593Smuzhiyun                        v = the_data.getVar(e)
291*4882a593Smuzhiyun                        if v is not None:
292*4882a593Smuzhiyun                            os.environ[e] = v
293*4882a593Smuzhiyun
294*4882a593Smuzhiyun                if quieterrors:
295*4882a593Smuzhiyun                    the_data.setVarFlag(taskname, "quieterrors", "1")
296*4882a593Smuzhiyun
297*4882a593Smuzhiyun            except Exception:
298*4882a593Smuzhiyun                if not quieterrors:
299*4882a593Smuzhiyun                    logger.critical(traceback.format_exc())
300*4882a593Smuzhiyun                os._exit(1)
301*4882a593Smuzhiyun            try:
302*4882a593Smuzhiyun                if dry_run:
303*4882a593Smuzhiyun                    return 0
304*4882a593Smuzhiyun                try:
305*4882a593Smuzhiyun                    ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile)
306*4882a593Smuzhiyun                finally:
307*4882a593Smuzhiyun                    if fakeroot:
308*4882a593Smuzhiyun                        fakerootcmd = shlex.split(the_data.getVar("FAKEROOTCMD"))
309*4882a593Smuzhiyun                        subprocess.run(fakerootcmd + ['-S'], check=True, stdout=subprocess.PIPE)
310*4882a593Smuzhiyun                return ret
311*4882a593Smuzhiyun            except:
312*4882a593Smuzhiyun                os._exit(1)
313*4882a593Smuzhiyun        if not profiling:
314*4882a593Smuzhiyun            os._exit(child())
315*4882a593Smuzhiyun        else:
316*4882a593Smuzhiyun            profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname)
317*4882a593Smuzhiyun            prof = profile.Profile()
318*4882a593Smuzhiyun            try:
319*4882a593Smuzhiyun                ret = profile.Profile.runcall(prof, child)
320*4882a593Smuzhiyun            finally:
321*4882a593Smuzhiyun                prof.dump_stats(profname)
322*4882a593Smuzhiyun                bb.utils.process_profilelog(profname)
323*4882a593Smuzhiyun                os._exit(ret)
324*4882a593Smuzhiyun    else:
325*4882a593Smuzhiyun        for key, value in iter(envbackup.items()):
326*4882a593Smuzhiyun            if value is None:
327*4882a593Smuzhiyun                del os.environ[key]
328*4882a593Smuzhiyun            else:
329*4882a593Smuzhiyun                os.environ[key] = value
330*4882a593Smuzhiyun
331*4882a593Smuzhiyun    return pid, pipein, pipeout
332*4882a593Smuzhiyun
333*4882a593Smuzhiyunclass runQueueWorkerPipe():
334*4882a593Smuzhiyun    """
335*4882a593Smuzhiyun    Abstraction for a pipe between a worker thread and the worker server
336*4882a593Smuzhiyun    """
337*4882a593Smuzhiyun    def __init__(self, pipein, pipeout):
338*4882a593Smuzhiyun        self.input = pipein
339*4882a593Smuzhiyun        if pipeout:
340*4882a593Smuzhiyun            pipeout.close()
341*4882a593Smuzhiyun        bb.utils.nonblockingfd(self.input)
342*4882a593Smuzhiyun        self.queue = b""
343*4882a593Smuzhiyun
344*4882a593Smuzhiyun    def read(self):
345*4882a593Smuzhiyun        start = len(self.queue)
346*4882a593Smuzhiyun        try:
347*4882a593Smuzhiyun            self.queue = self.queue + (self.input.read(102400) or b"")
348*4882a593Smuzhiyun        except (OSError, IOError) as e:
349*4882a593Smuzhiyun            if e.errno != errno.EAGAIN:
350*4882a593Smuzhiyun                raise
351*4882a593Smuzhiyun
352*4882a593Smuzhiyun        end = len(self.queue)
353*4882a593Smuzhiyun        index = self.queue.find(b"</event>")
354*4882a593Smuzhiyun        while index != -1:
355*4882a593Smuzhiyun            msg = self.queue[:index+8]
356*4882a593Smuzhiyun            assert msg.startswith(b"<event>") and msg.count(b"<event>") == 1
357*4882a593Smuzhiyun            worker_fire_prepickled(msg)
358*4882a593Smuzhiyun            self.queue = self.queue[index+8:]
359*4882a593Smuzhiyun            index = self.queue.find(b"</event>")
360*4882a593Smuzhiyun        return (end > start)
361*4882a593Smuzhiyun
362*4882a593Smuzhiyun    def close(self):
363*4882a593Smuzhiyun        while self.read():
364*4882a593Smuzhiyun            continue
365*4882a593Smuzhiyun        if len(self.queue) > 0:
366*4882a593Smuzhiyun            print("Warning, worker child left partial message: %s" % self.queue)
367*4882a593Smuzhiyun        self.input.close()
368*4882a593Smuzhiyun
369*4882a593Smuzhiyunnormalexit = False
370*4882a593Smuzhiyun
371*4882a593Smuzhiyunclass BitbakeWorker(object):
372*4882a593Smuzhiyun    def __init__(self, din):
373*4882a593Smuzhiyun        self.input = din
374*4882a593Smuzhiyun        bb.utils.nonblockingfd(self.input)
375*4882a593Smuzhiyun        self.queue = b""
376*4882a593Smuzhiyun        self.cookercfg = None
377*4882a593Smuzhiyun        self.databuilder = None
378*4882a593Smuzhiyun        self.data = None
379*4882a593Smuzhiyun        self.extraconfigdata = None
380*4882a593Smuzhiyun        self.build_pids = {}
381*4882a593Smuzhiyun        self.build_pipes = {}
382*4882a593Smuzhiyun
383*4882a593Smuzhiyun        signal.signal(signal.SIGTERM, self.sigterm_exception)
384*4882a593Smuzhiyun        # Let SIGHUP exit as SIGTERM
385*4882a593Smuzhiyun        signal.signal(signal.SIGHUP, self.sigterm_exception)
386*4882a593Smuzhiyun        if "beef" in sys.argv[1]:
387*4882a593Smuzhiyun            bb.utils.set_process_name("Worker (Fakeroot)")
388*4882a593Smuzhiyun        else:
389*4882a593Smuzhiyun            bb.utils.set_process_name("Worker")
390*4882a593Smuzhiyun
391*4882a593Smuzhiyun    def sigterm_exception(self, signum, stackframe):
392*4882a593Smuzhiyun        if signum == signal.SIGTERM:
393*4882a593Smuzhiyun            bb.warn("Worker received SIGTERM, shutting down...")
394*4882a593Smuzhiyun        elif signum == signal.SIGHUP:
395*4882a593Smuzhiyun            bb.warn("Worker received SIGHUP, shutting down...")
396*4882a593Smuzhiyun        self.handle_finishnow(None)
397*4882a593Smuzhiyun        signal.signal(signal.SIGTERM, signal.SIG_DFL)
398*4882a593Smuzhiyun        os.kill(os.getpid(), signal.SIGTERM)
399*4882a593Smuzhiyun
400*4882a593Smuzhiyun    def serve(self):
401*4882a593Smuzhiyun        while True:
402*4882a593Smuzhiyun            (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1)
403*4882a593Smuzhiyun            if self.input in ready:
404*4882a593Smuzhiyun                try:
405*4882a593Smuzhiyun                    r = self.input.read()
406*4882a593Smuzhiyun                    if len(r) == 0:
407*4882a593Smuzhiyun                        # EOF on pipe, server must have terminated
408*4882a593Smuzhiyun                        self.sigterm_exception(signal.SIGTERM, None)
409*4882a593Smuzhiyun                    self.queue = self.queue + r
410*4882a593Smuzhiyun                except (OSError, IOError):
411*4882a593Smuzhiyun                    pass
412*4882a593Smuzhiyun            if len(self.queue):
413*4882a593Smuzhiyun                self.handle_item(b"cookerconfig", self.handle_cookercfg)
414*4882a593Smuzhiyun                self.handle_item(b"extraconfigdata", self.handle_extraconfigdata)
415*4882a593Smuzhiyun                self.handle_item(b"workerdata", self.handle_workerdata)
416*4882a593Smuzhiyun                self.handle_item(b"newtaskhashes", self.handle_newtaskhashes)
417*4882a593Smuzhiyun                self.handle_item(b"runtask", self.handle_runtask)
418*4882a593Smuzhiyun                self.handle_item(b"finishnow", self.handle_finishnow)
419*4882a593Smuzhiyun                self.handle_item(b"ping", self.handle_ping)
420*4882a593Smuzhiyun                self.handle_item(b"quit", self.handle_quit)
421*4882a593Smuzhiyun
422*4882a593Smuzhiyun            for pipe in self.build_pipes:
423*4882a593Smuzhiyun                if self.build_pipes[pipe].input in ready:
424*4882a593Smuzhiyun                    self.build_pipes[pipe].read()
425*4882a593Smuzhiyun            if len(self.build_pids):
426*4882a593Smuzhiyun                while self.process_waitpid():
427*4882a593Smuzhiyun                    continue
428*4882a593Smuzhiyun
429*4882a593Smuzhiyun
430*4882a593Smuzhiyun    def handle_item(self, item, func):
431*4882a593Smuzhiyun        if self.queue.startswith(b"<" + item + b">"):
432*4882a593Smuzhiyun            index = self.queue.find(b"</" + item + b">")
433*4882a593Smuzhiyun            while index != -1:
434*4882a593Smuzhiyun                try:
435*4882a593Smuzhiyun                    func(self.queue[(len(item) + 2):index])
436*4882a593Smuzhiyun                except pickle.UnpicklingError:
437*4882a593Smuzhiyun                    workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue))
438*4882a593Smuzhiyun                    raise
439*4882a593Smuzhiyun                self.queue = self.queue[(index + len(item) + 3):]
440*4882a593Smuzhiyun                index = self.queue.find(b"</" + item + b">")
441*4882a593Smuzhiyun
442*4882a593Smuzhiyun    def handle_cookercfg(self, data):
443*4882a593Smuzhiyun        self.cookercfg = pickle.loads(data)
444*4882a593Smuzhiyun        self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True)
445*4882a593Smuzhiyun        self.databuilder.parseBaseConfiguration(worker=True)
446*4882a593Smuzhiyun        self.data = self.databuilder.data
447*4882a593Smuzhiyun
448*4882a593Smuzhiyun    def handle_extraconfigdata(self, data):
449*4882a593Smuzhiyun        self.extraconfigdata = pickle.loads(data)
450*4882a593Smuzhiyun
451*4882a593Smuzhiyun    def handle_workerdata(self, data):
452*4882a593Smuzhiyun        self.workerdata = pickle.loads(data)
453*4882a593Smuzhiyun        bb.build.verboseShellLogging = self.workerdata["build_verbose_shell"]
454*4882a593Smuzhiyun        bb.build.verboseStdoutLogging = self.workerdata["build_verbose_stdout"]
455*4882a593Smuzhiyun        bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"]
456*4882a593Smuzhiyun        bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"]
457*4882a593Smuzhiyun        for mc in self.databuilder.mcdata:
458*4882a593Smuzhiyun            self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"])
459*4882a593Smuzhiyun            self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"])
460*4882a593Smuzhiyun
461*4882a593Smuzhiyun    def handle_newtaskhashes(self, data):
462*4882a593Smuzhiyun        self.workerdata["newhashes"] = pickle.loads(data)
463*4882a593Smuzhiyun
464*4882a593Smuzhiyun    def handle_ping(self, _):
465*4882a593Smuzhiyun        workerlog_write("Handling ping\n")
466*4882a593Smuzhiyun
467*4882a593Smuzhiyun        logger.warning("Pong from bitbake-worker!")
468*4882a593Smuzhiyun
469*4882a593Smuzhiyun    def handle_quit(self, data):
470*4882a593Smuzhiyun        workerlog_write("Handling quit\n")
471*4882a593Smuzhiyun
472*4882a593Smuzhiyun        global normalexit
473*4882a593Smuzhiyun        normalexit = True
474*4882a593Smuzhiyun        sys.exit(0)
475*4882a593Smuzhiyun
476*4882a593Smuzhiyun    def handle_runtask(self, data):
477*4882a593Smuzhiyun        fn, task, taskname, taskhash, unihash, quieterrors, appends, taskdepdata, dry_run_exec = pickle.loads(data)
478*4882a593Smuzhiyun        workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname))
479*4882a593Smuzhiyun
480*4882a593Smuzhiyun        pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec)
481*4882a593Smuzhiyun
482*4882a593Smuzhiyun        self.build_pids[pid] = task
483*4882a593Smuzhiyun        self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout)
484*4882a593Smuzhiyun
485*4882a593Smuzhiyun    def process_waitpid(self):
486*4882a593Smuzhiyun        """
487*4882a593Smuzhiyun        Return none is there are no processes awaiting result collection, otherwise
488*4882a593Smuzhiyun        collect the process exit codes and close the information pipe.
489*4882a593Smuzhiyun        """
490*4882a593Smuzhiyun        try:
491*4882a593Smuzhiyun            pid, status = os.waitpid(-1, os.WNOHANG)
492*4882a593Smuzhiyun            if pid == 0 or os.WIFSTOPPED(status):
493*4882a593Smuzhiyun                return False
494*4882a593Smuzhiyun        except OSError:
495*4882a593Smuzhiyun            return False
496*4882a593Smuzhiyun
497*4882a593Smuzhiyun        workerlog_write("Exit code of %s for pid %s\n" % (status, pid))
498*4882a593Smuzhiyun
499*4882a593Smuzhiyun        if os.WIFEXITED(status):
500*4882a593Smuzhiyun            status = os.WEXITSTATUS(status)
501*4882a593Smuzhiyun        elif os.WIFSIGNALED(status):
502*4882a593Smuzhiyun            # Per shell conventions for $?, when a process exits due to
503*4882a593Smuzhiyun            # a signal, we return an exit code of 128 + SIGNUM
504*4882a593Smuzhiyun            status = 128 + os.WTERMSIG(status)
505*4882a593Smuzhiyun
506*4882a593Smuzhiyun        task = self.build_pids[pid]
507*4882a593Smuzhiyun        del self.build_pids[pid]
508*4882a593Smuzhiyun
509*4882a593Smuzhiyun        self.build_pipes[pid].close()
510*4882a593Smuzhiyun        del self.build_pipes[pid]
511*4882a593Smuzhiyun
512*4882a593Smuzhiyun        worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>")
513*4882a593Smuzhiyun
514*4882a593Smuzhiyun        return True
515*4882a593Smuzhiyun
516*4882a593Smuzhiyun    def handle_finishnow(self, _):
517*4882a593Smuzhiyun        if self.build_pids:
518*4882a593Smuzhiyun            logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids))
519*4882a593Smuzhiyun            for k, v in iter(self.build_pids.items()):
520*4882a593Smuzhiyun                try:
521*4882a593Smuzhiyun                    os.kill(-k, signal.SIGTERM)
522*4882a593Smuzhiyun                    os.waitpid(-1, 0)
523*4882a593Smuzhiyun                except:
524*4882a593Smuzhiyun                    pass
525*4882a593Smuzhiyun        for pipe in self.build_pipes:
526*4882a593Smuzhiyun            self.build_pipes[pipe].read()
527*4882a593Smuzhiyun
528*4882a593Smuzhiyuntry:
529*4882a593Smuzhiyun    worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb'))
530*4882a593Smuzhiyun    if not profiling:
531*4882a593Smuzhiyun        worker.serve()
532*4882a593Smuzhiyun    else:
533*4882a593Smuzhiyun        profname = "profile-worker.log"
534*4882a593Smuzhiyun        prof = profile.Profile()
535*4882a593Smuzhiyun        try:
536*4882a593Smuzhiyun            profile.Profile.runcall(prof, worker.serve)
537*4882a593Smuzhiyun        finally:
538*4882a593Smuzhiyun            prof.dump_stats(profname)
539*4882a593Smuzhiyun            bb.utils.process_profilelog(profname)
540*4882a593Smuzhiyunexcept BaseException as e:
541*4882a593Smuzhiyun    if not normalexit:
542*4882a593Smuzhiyun        import traceback
543*4882a593Smuzhiyun        sys.stderr.write(traceback.format_exc())
544*4882a593Smuzhiyun        sys.stderr.write(str(e))
545*4882a593Smuzhiyunfinally:
546*4882a593Smuzhiyun    worker_thread_exit = True
547*4882a593Smuzhiyun    worker_thread.join()
548*4882a593Smuzhiyun
549*4882a593Smuzhiyunworkerlog_write("exiting")
550*4882a593Smuzhiyunif not normalexit:
551*4882a593Smuzhiyun    sys.exit(1)
552*4882a593Smuzhiyunsys.exit(0)
553