1*4882a593Smuzhiyun#!/usr/bin/env python3 2*4882a593Smuzhiyun# 3*4882a593Smuzhiyun# Copyright BitBake Contributors 4*4882a593Smuzhiyun# 5*4882a593Smuzhiyun# SPDX-License-Identifier: GPL-2.0-only 6*4882a593Smuzhiyun# 7*4882a593Smuzhiyun 8*4882a593Smuzhiyunimport os 9*4882a593Smuzhiyunimport sys 10*4882a593Smuzhiyunimport warnings 11*4882a593Smuzhiyunwarnings.simplefilter("default") 12*4882a593Smuzhiyunsys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) 13*4882a593Smuzhiyunfrom bb import fetch2 14*4882a593Smuzhiyunimport logging 15*4882a593Smuzhiyunimport bb 16*4882a593Smuzhiyunimport select 17*4882a593Smuzhiyunimport errno 18*4882a593Smuzhiyunimport signal 19*4882a593Smuzhiyunimport pickle 20*4882a593Smuzhiyunimport traceback 21*4882a593Smuzhiyunimport queue 22*4882a593Smuzhiyunimport shlex 23*4882a593Smuzhiyunimport subprocess 24*4882a593Smuzhiyunfrom multiprocessing import Lock 25*4882a593Smuzhiyunfrom threading import Thread 26*4882a593Smuzhiyun 27*4882a593Smuzhiyunif sys.getfilesystemencoding() != "utf-8": 28*4882a593Smuzhiyun sys.exit("Please use a locale setting which supports UTF-8 (such as LANG=en_US.UTF-8).\nPython can't change the filesystem locale after loading so we need a UTF-8 when Python starts or things won't work.") 29*4882a593Smuzhiyun 30*4882a593Smuzhiyun# Users shouldn't be running this code directly 31*4882a593Smuzhiyunif len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"): 32*4882a593Smuzhiyun print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.") 33*4882a593Smuzhiyun sys.exit(1) 34*4882a593Smuzhiyun 35*4882a593Smuzhiyunprofiling = False 36*4882a593Smuzhiyunif sys.argv[1].startswith("decafbadbad"): 37*4882a593Smuzhiyun profiling = True 38*4882a593Smuzhiyun try: 39*4882a593Smuzhiyun import cProfile as profile 40*4882a593Smuzhiyun except: 41*4882a593Smuzhiyun import profile 42*4882a593Smuzhiyun 43*4882a593Smuzhiyun# Unbuffer stdout to avoid log truncation in the event 44*4882a593Smuzhiyun# of an unorderly exit as well as to provide timely 45*4882a593Smuzhiyun# updates to log files for use with tail 46*4882a593Smuzhiyuntry: 47*4882a593Smuzhiyun if sys.stdout.name == '<stdout>': 48*4882a593Smuzhiyun import fcntl 49*4882a593Smuzhiyun fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL) 50*4882a593Smuzhiyun fl |= os.O_SYNC 51*4882a593Smuzhiyun fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl) 52*4882a593Smuzhiyun #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) 53*4882a593Smuzhiyunexcept: 54*4882a593Smuzhiyun pass 55*4882a593Smuzhiyun 56*4882a593Smuzhiyunlogger = logging.getLogger("BitBake") 57*4882a593Smuzhiyun 58*4882a593Smuzhiyunworker_pipe = sys.stdout.fileno() 59*4882a593Smuzhiyunbb.utils.nonblockingfd(worker_pipe) 60*4882a593Smuzhiyun# Need to guard against multiprocessing being used in child processes 61*4882a593Smuzhiyun# and multiple processes trying to write to the parent at the same time 62*4882a593Smuzhiyunworker_pipe_lock = None 63*4882a593Smuzhiyun 64*4882a593Smuzhiyunhandler = bb.event.LogHandler() 65*4882a593Smuzhiyunlogger.addHandler(handler) 66*4882a593Smuzhiyun 67*4882a593Smuzhiyunif 0: 68*4882a593Smuzhiyun # Code to write out a log file of all events passing through the worker 69*4882a593Smuzhiyun logfilename = "/tmp/workerlogfile" 70*4882a593Smuzhiyun format_str = "%(levelname)s: %(message)s" 71*4882a593Smuzhiyun conlogformat = bb.msg.BBLogFormatter(format_str) 72*4882a593Smuzhiyun consolelog = logging.FileHandler(logfilename) 73*4882a593Smuzhiyun consolelog.setFormatter(conlogformat) 74*4882a593Smuzhiyun logger.addHandler(consolelog) 75*4882a593Smuzhiyun 76*4882a593Smuzhiyunworker_queue = queue.Queue() 77*4882a593Smuzhiyun 78*4882a593Smuzhiyundef worker_fire(event, d): 79*4882a593Smuzhiyun data = b"<event>" + pickle.dumps(event) + b"</event>" 80*4882a593Smuzhiyun worker_fire_prepickled(data) 81*4882a593Smuzhiyun 82*4882a593Smuzhiyundef worker_fire_prepickled(event): 83*4882a593Smuzhiyun global worker_queue 84*4882a593Smuzhiyun 85*4882a593Smuzhiyun worker_queue.put(event) 86*4882a593Smuzhiyun 87*4882a593Smuzhiyun# 88*4882a593Smuzhiyun# We can end up with write contention with the cooker, it can be trying to send commands 89*4882a593Smuzhiyun# and we can be trying to send event data back. Therefore use a separate thread for writing 90*4882a593Smuzhiyun# back data to cooker. 91*4882a593Smuzhiyun# 92*4882a593Smuzhiyunworker_thread_exit = False 93*4882a593Smuzhiyun 94*4882a593Smuzhiyundef worker_flush(worker_queue): 95*4882a593Smuzhiyun worker_queue_int = b"" 96*4882a593Smuzhiyun global worker_pipe, worker_thread_exit 97*4882a593Smuzhiyun 98*4882a593Smuzhiyun while True: 99*4882a593Smuzhiyun try: 100*4882a593Smuzhiyun worker_queue_int = worker_queue_int + worker_queue.get(True, 1) 101*4882a593Smuzhiyun except queue.Empty: 102*4882a593Smuzhiyun pass 103*4882a593Smuzhiyun while (worker_queue_int or not worker_queue.empty()): 104*4882a593Smuzhiyun try: 105*4882a593Smuzhiyun (_, ready, _) = select.select([], [worker_pipe], [], 1) 106*4882a593Smuzhiyun if not worker_queue.empty(): 107*4882a593Smuzhiyun worker_queue_int = worker_queue_int + worker_queue.get() 108*4882a593Smuzhiyun written = os.write(worker_pipe, worker_queue_int) 109*4882a593Smuzhiyun worker_queue_int = worker_queue_int[written:] 110*4882a593Smuzhiyun except (IOError, OSError) as e: 111*4882a593Smuzhiyun if e.errno != errno.EAGAIN and e.errno != errno.EPIPE: 112*4882a593Smuzhiyun raise 113*4882a593Smuzhiyun if worker_thread_exit and worker_queue.empty() and not worker_queue_int: 114*4882a593Smuzhiyun return 115*4882a593Smuzhiyun 116*4882a593Smuzhiyunworker_thread = Thread(target=worker_flush, args=(worker_queue,)) 117*4882a593Smuzhiyunworker_thread.start() 118*4882a593Smuzhiyun 119*4882a593Smuzhiyundef worker_child_fire(event, d): 120*4882a593Smuzhiyun global worker_pipe 121*4882a593Smuzhiyun global worker_pipe_lock 122*4882a593Smuzhiyun 123*4882a593Smuzhiyun data = b"<event>" + pickle.dumps(event) + b"</event>" 124*4882a593Smuzhiyun try: 125*4882a593Smuzhiyun worker_pipe_lock.acquire() 126*4882a593Smuzhiyun while(len(data)): 127*4882a593Smuzhiyun written = worker_pipe.write(data) 128*4882a593Smuzhiyun data = data[written:] 129*4882a593Smuzhiyun worker_pipe_lock.release() 130*4882a593Smuzhiyun except IOError: 131*4882a593Smuzhiyun sigterm_handler(None, None) 132*4882a593Smuzhiyun raise 133*4882a593Smuzhiyun 134*4882a593Smuzhiyunbb.event.worker_fire = worker_fire 135*4882a593Smuzhiyun 136*4882a593Smuzhiyunlf = None 137*4882a593Smuzhiyun#lf = open("/tmp/workercommandlog", "w+") 138*4882a593Smuzhiyundef workerlog_write(msg): 139*4882a593Smuzhiyun if lf: 140*4882a593Smuzhiyun lf.write(msg) 141*4882a593Smuzhiyun lf.flush() 142*4882a593Smuzhiyun 143*4882a593Smuzhiyundef sigterm_handler(signum, frame): 144*4882a593Smuzhiyun signal.signal(signal.SIGTERM, signal.SIG_DFL) 145*4882a593Smuzhiyun os.killpg(0, signal.SIGTERM) 146*4882a593Smuzhiyun sys.exit() 147*4882a593Smuzhiyun 148*4882a593Smuzhiyundef fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, extraconfigdata, quieterrors=False, dry_run_exec=False): 149*4882a593Smuzhiyun # We need to setup the environment BEFORE the fork, since 150*4882a593Smuzhiyun # a fork() or exec*() activates PSEUDO... 151*4882a593Smuzhiyun 152*4882a593Smuzhiyun envbackup = {} 153*4882a593Smuzhiyun fakeroot = False 154*4882a593Smuzhiyun fakeenv = {} 155*4882a593Smuzhiyun umask = None 156*4882a593Smuzhiyun 157*4882a593Smuzhiyun uid = os.getuid() 158*4882a593Smuzhiyun gid = os.getgid() 159*4882a593Smuzhiyun 160*4882a593Smuzhiyun 161*4882a593Smuzhiyun taskdep = workerdata["taskdeps"][fn] 162*4882a593Smuzhiyun if 'umask' in taskdep and taskname in taskdep['umask']: 163*4882a593Smuzhiyun umask = taskdep['umask'][taskname] 164*4882a593Smuzhiyun elif workerdata["umask"]: 165*4882a593Smuzhiyun umask = workerdata["umask"] 166*4882a593Smuzhiyun if umask: 167*4882a593Smuzhiyun # umask might come in as a number or text string.. 168*4882a593Smuzhiyun try: 169*4882a593Smuzhiyun umask = int(umask, 8) 170*4882a593Smuzhiyun except TypeError: 171*4882a593Smuzhiyun pass 172*4882a593Smuzhiyun 173*4882a593Smuzhiyun dry_run = cfg.dry_run or dry_run_exec 174*4882a593Smuzhiyun 175*4882a593Smuzhiyun # We can't use the fakeroot environment in a dry run as it possibly hasn't been built 176*4882a593Smuzhiyun if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run: 177*4882a593Smuzhiyun fakeroot = True 178*4882a593Smuzhiyun envvars = (workerdata["fakerootenv"][fn] or "").split() 179*4882a593Smuzhiyun for key, value in (var.split('=') for var in envvars): 180*4882a593Smuzhiyun envbackup[key] = os.environ.get(key) 181*4882a593Smuzhiyun os.environ[key] = value 182*4882a593Smuzhiyun fakeenv[key] = value 183*4882a593Smuzhiyun 184*4882a593Smuzhiyun fakedirs = (workerdata["fakerootdirs"][fn] or "").split() 185*4882a593Smuzhiyun for p in fakedirs: 186*4882a593Smuzhiyun bb.utils.mkdirhier(p) 187*4882a593Smuzhiyun logger.debug2('Running %s:%s under fakeroot, fakedirs: %s' % 188*4882a593Smuzhiyun (fn, taskname, ', '.join(fakedirs))) 189*4882a593Smuzhiyun else: 190*4882a593Smuzhiyun envvars = (workerdata["fakerootnoenv"][fn] or "").split() 191*4882a593Smuzhiyun for key, value in (var.split('=') for var in envvars): 192*4882a593Smuzhiyun envbackup[key] = os.environ.get(key) 193*4882a593Smuzhiyun os.environ[key] = value 194*4882a593Smuzhiyun fakeenv[key] = value 195*4882a593Smuzhiyun 196*4882a593Smuzhiyun sys.stdout.flush() 197*4882a593Smuzhiyun sys.stderr.flush() 198*4882a593Smuzhiyun 199*4882a593Smuzhiyun try: 200*4882a593Smuzhiyun pipein, pipeout = os.pipe() 201*4882a593Smuzhiyun pipein = os.fdopen(pipein, 'rb', 4096) 202*4882a593Smuzhiyun pipeout = os.fdopen(pipeout, 'wb', 0) 203*4882a593Smuzhiyun pid = os.fork() 204*4882a593Smuzhiyun except OSError as e: 205*4882a593Smuzhiyun logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror)) 206*4882a593Smuzhiyun sys.exit(1) 207*4882a593Smuzhiyun 208*4882a593Smuzhiyun if pid == 0: 209*4882a593Smuzhiyun def child(): 210*4882a593Smuzhiyun global worker_pipe 211*4882a593Smuzhiyun global worker_pipe_lock 212*4882a593Smuzhiyun pipein.close() 213*4882a593Smuzhiyun 214*4882a593Smuzhiyun bb.utils.signal_on_parent_exit("SIGTERM") 215*4882a593Smuzhiyun 216*4882a593Smuzhiyun # Save out the PID so that the event can include it the 217*4882a593Smuzhiyun # events 218*4882a593Smuzhiyun bb.event.worker_pid = os.getpid() 219*4882a593Smuzhiyun bb.event.worker_fire = worker_child_fire 220*4882a593Smuzhiyun worker_pipe = pipeout 221*4882a593Smuzhiyun worker_pipe_lock = Lock() 222*4882a593Smuzhiyun 223*4882a593Smuzhiyun # Make the child the process group leader and ensure no 224*4882a593Smuzhiyun # child process will be controlled by the current terminal 225*4882a593Smuzhiyun # This ensures signals sent to the controlling terminal like Ctrl+C 226*4882a593Smuzhiyun # don't stop the child processes. 227*4882a593Smuzhiyun os.setsid() 228*4882a593Smuzhiyun 229*4882a593Smuzhiyun signal.signal(signal.SIGTERM, sigterm_handler) 230*4882a593Smuzhiyun # Let SIGHUP exit as SIGTERM 231*4882a593Smuzhiyun signal.signal(signal.SIGHUP, sigterm_handler) 232*4882a593Smuzhiyun 233*4882a593Smuzhiyun # No stdin 234*4882a593Smuzhiyun newsi = os.open(os.devnull, os.O_RDWR) 235*4882a593Smuzhiyun os.dup2(newsi, sys.stdin.fileno()) 236*4882a593Smuzhiyun 237*4882a593Smuzhiyun if umask: 238*4882a593Smuzhiyun os.umask(umask) 239*4882a593Smuzhiyun 240*4882a593Smuzhiyun try: 241*4882a593Smuzhiyun bb_cache = bb.cache.NoCache(databuilder) 242*4882a593Smuzhiyun (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn) 243*4882a593Smuzhiyun the_data = databuilder.mcdata[mc] 244*4882a593Smuzhiyun the_data.setVar("BB_WORKERCONTEXT", "1") 245*4882a593Smuzhiyun the_data.setVar("BB_TASKDEPDATA", taskdepdata) 246*4882a593Smuzhiyun the_data.setVar('BB_CURRENTTASK', taskname.replace("do_", "")) 247*4882a593Smuzhiyun if cfg.limited_deps: 248*4882a593Smuzhiyun the_data.setVar("BB_LIMITEDDEPS", "1") 249*4882a593Smuzhiyun the_data.setVar("BUILDNAME", workerdata["buildname"]) 250*4882a593Smuzhiyun the_data.setVar("DATE", workerdata["date"]) 251*4882a593Smuzhiyun the_data.setVar("TIME", workerdata["time"]) 252*4882a593Smuzhiyun for varname, value in extraconfigdata.items(): 253*4882a593Smuzhiyun the_data.setVar(varname, value) 254*4882a593Smuzhiyun 255*4882a593Smuzhiyun bb.parse.siggen.set_taskdata(workerdata["sigdata"]) 256*4882a593Smuzhiyun if "newhashes" in workerdata: 257*4882a593Smuzhiyun bb.parse.siggen.set_taskhashes(workerdata["newhashes"]) 258*4882a593Smuzhiyun ret = 0 259*4882a593Smuzhiyun 260*4882a593Smuzhiyun the_data = bb_cache.loadDataFull(fn, appends) 261*4882a593Smuzhiyun the_data.setVar('BB_TASKHASH', taskhash) 262*4882a593Smuzhiyun the_data.setVar('BB_UNIHASH', unihash) 263*4882a593Smuzhiyun 264*4882a593Smuzhiyun bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", ""))) 265*4882a593Smuzhiyun 266*4882a593Smuzhiyun if not the_data.getVarFlag(taskname, 'network', False): 267*4882a593Smuzhiyun if bb.utils.is_local_uid(uid): 268*4882a593Smuzhiyun logger.debug("Attempting to disable network for %s" % taskname) 269*4882a593Smuzhiyun bb.utils.disable_network(uid, gid) 270*4882a593Smuzhiyun else: 271*4882a593Smuzhiyun logger.debug("Skipping disable network for %s since %s is not a local uid." % (taskname, uid)) 272*4882a593Smuzhiyun 273*4882a593Smuzhiyun # exported_vars() returns a generator which *cannot* be passed to os.environ.update() 274*4882a593Smuzhiyun # successfully. We also need to unset anything from the environment which shouldn't be there 275*4882a593Smuzhiyun exports = bb.data.exported_vars(the_data) 276*4882a593Smuzhiyun 277*4882a593Smuzhiyun bb.utils.empty_environment() 278*4882a593Smuzhiyun for e, v in exports: 279*4882a593Smuzhiyun os.environ[e] = v 280*4882a593Smuzhiyun 281*4882a593Smuzhiyun for e in fakeenv: 282*4882a593Smuzhiyun os.environ[e] = fakeenv[e] 283*4882a593Smuzhiyun the_data.setVar(e, fakeenv[e]) 284*4882a593Smuzhiyun the_data.setVarFlag(e, 'export', "1") 285*4882a593Smuzhiyun 286*4882a593Smuzhiyun task_exports = the_data.getVarFlag(taskname, 'exports') 287*4882a593Smuzhiyun if task_exports: 288*4882a593Smuzhiyun for e in task_exports.split(): 289*4882a593Smuzhiyun the_data.setVarFlag(e, 'export', '1') 290*4882a593Smuzhiyun v = the_data.getVar(e) 291*4882a593Smuzhiyun if v is not None: 292*4882a593Smuzhiyun os.environ[e] = v 293*4882a593Smuzhiyun 294*4882a593Smuzhiyun if quieterrors: 295*4882a593Smuzhiyun the_data.setVarFlag(taskname, "quieterrors", "1") 296*4882a593Smuzhiyun 297*4882a593Smuzhiyun except Exception: 298*4882a593Smuzhiyun if not quieterrors: 299*4882a593Smuzhiyun logger.critical(traceback.format_exc()) 300*4882a593Smuzhiyun os._exit(1) 301*4882a593Smuzhiyun try: 302*4882a593Smuzhiyun if dry_run: 303*4882a593Smuzhiyun return 0 304*4882a593Smuzhiyun try: 305*4882a593Smuzhiyun ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile) 306*4882a593Smuzhiyun finally: 307*4882a593Smuzhiyun if fakeroot: 308*4882a593Smuzhiyun fakerootcmd = shlex.split(the_data.getVar("FAKEROOTCMD")) 309*4882a593Smuzhiyun subprocess.run(fakerootcmd + ['-S'], check=True, stdout=subprocess.PIPE) 310*4882a593Smuzhiyun return ret 311*4882a593Smuzhiyun except: 312*4882a593Smuzhiyun os._exit(1) 313*4882a593Smuzhiyun if not profiling: 314*4882a593Smuzhiyun os._exit(child()) 315*4882a593Smuzhiyun else: 316*4882a593Smuzhiyun profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname) 317*4882a593Smuzhiyun prof = profile.Profile() 318*4882a593Smuzhiyun try: 319*4882a593Smuzhiyun ret = profile.Profile.runcall(prof, child) 320*4882a593Smuzhiyun finally: 321*4882a593Smuzhiyun prof.dump_stats(profname) 322*4882a593Smuzhiyun bb.utils.process_profilelog(profname) 323*4882a593Smuzhiyun os._exit(ret) 324*4882a593Smuzhiyun else: 325*4882a593Smuzhiyun for key, value in iter(envbackup.items()): 326*4882a593Smuzhiyun if value is None: 327*4882a593Smuzhiyun del os.environ[key] 328*4882a593Smuzhiyun else: 329*4882a593Smuzhiyun os.environ[key] = value 330*4882a593Smuzhiyun 331*4882a593Smuzhiyun return pid, pipein, pipeout 332*4882a593Smuzhiyun 333*4882a593Smuzhiyunclass runQueueWorkerPipe(): 334*4882a593Smuzhiyun """ 335*4882a593Smuzhiyun Abstraction for a pipe between a worker thread and the worker server 336*4882a593Smuzhiyun """ 337*4882a593Smuzhiyun def __init__(self, pipein, pipeout): 338*4882a593Smuzhiyun self.input = pipein 339*4882a593Smuzhiyun if pipeout: 340*4882a593Smuzhiyun pipeout.close() 341*4882a593Smuzhiyun bb.utils.nonblockingfd(self.input) 342*4882a593Smuzhiyun self.queue = b"" 343*4882a593Smuzhiyun 344*4882a593Smuzhiyun def read(self): 345*4882a593Smuzhiyun start = len(self.queue) 346*4882a593Smuzhiyun try: 347*4882a593Smuzhiyun self.queue = self.queue + (self.input.read(102400) or b"") 348*4882a593Smuzhiyun except (OSError, IOError) as e: 349*4882a593Smuzhiyun if e.errno != errno.EAGAIN: 350*4882a593Smuzhiyun raise 351*4882a593Smuzhiyun 352*4882a593Smuzhiyun end = len(self.queue) 353*4882a593Smuzhiyun index = self.queue.find(b"</event>") 354*4882a593Smuzhiyun while index != -1: 355*4882a593Smuzhiyun msg = self.queue[:index+8] 356*4882a593Smuzhiyun assert msg.startswith(b"<event>") and msg.count(b"<event>") == 1 357*4882a593Smuzhiyun worker_fire_prepickled(msg) 358*4882a593Smuzhiyun self.queue = self.queue[index+8:] 359*4882a593Smuzhiyun index = self.queue.find(b"</event>") 360*4882a593Smuzhiyun return (end > start) 361*4882a593Smuzhiyun 362*4882a593Smuzhiyun def close(self): 363*4882a593Smuzhiyun while self.read(): 364*4882a593Smuzhiyun continue 365*4882a593Smuzhiyun if len(self.queue) > 0: 366*4882a593Smuzhiyun print("Warning, worker child left partial message: %s" % self.queue) 367*4882a593Smuzhiyun self.input.close() 368*4882a593Smuzhiyun 369*4882a593Smuzhiyunnormalexit = False 370*4882a593Smuzhiyun 371*4882a593Smuzhiyunclass BitbakeWorker(object): 372*4882a593Smuzhiyun def __init__(self, din): 373*4882a593Smuzhiyun self.input = din 374*4882a593Smuzhiyun bb.utils.nonblockingfd(self.input) 375*4882a593Smuzhiyun self.queue = b"" 376*4882a593Smuzhiyun self.cookercfg = None 377*4882a593Smuzhiyun self.databuilder = None 378*4882a593Smuzhiyun self.data = None 379*4882a593Smuzhiyun self.extraconfigdata = None 380*4882a593Smuzhiyun self.build_pids = {} 381*4882a593Smuzhiyun self.build_pipes = {} 382*4882a593Smuzhiyun 383*4882a593Smuzhiyun signal.signal(signal.SIGTERM, self.sigterm_exception) 384*4882a593Smuzhiyun # Let SIGHUP exit as SIGTERM 385*4882a593Smuzhiyun signal.signal(signal.SIGHUP, self.sigterm_exception) 386*4882a593Smuzhiyun if "beef" in sys.argv[1]: 387*4882a593Smuzhiyun bb.utils.set_process_name("Worker (Fakeroot)") 388*4882a593Smuzhiyun else: 389*4882a593Smuzhiyun bb.utils.set_process_name("Worker") 390*4882a593Smuzhiyun 391*4882a593Smuzhiyun def sigterm_exception(self, signum, stackframe): 392*4882a593Smuzhiyun if signum == signal.SIGTERM: 393*4882a593Smuzhiyun bb.warn("Worker received SIGTERM, shutting down...") 394*4882a593Smuzhiyun elif signum == signal.SIGHUP: 395*4882a593Smuzhiyun bb.warn("Worker received SIGHUP, shutting down...") 396*4882a593Smuzhiyun self.handle_finishnow(None) 397*4882a593Smuzhiyun signal.signal(signal.SIGTERM, signal.SIG_DFL) 398*4882a593Smuzhiyun os.kill(os.getpid(), signal.SIGTERM) 399*4882a593Smuzhiyun 400*4882a593Smuzhiyun def serve(self): 401*4882a593Smuzhiyun while True: 402*4882a593Smuzhiyun (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1) 403*4882a593Smuzhiyun if self.input in ready: 404*4882a593Smuzhiyun try: 405*4882a593Smuzhiyun r = self.input.read() 406*4882a593Smuzhiyun if len(r) == 0: 407*4882a593Smuzhiyun # EOF on pipe, server must have terminated 408*4882a593Smuzhiyun self.sigterm_exception(signal.SIGTERM, None) 409*4882a593Smuzhiyun self.queue = self.queue + r 410*4882a593Smuzhiyun except (OSError, IOError): 411*4882a593Smuzhiyun pass 412*4882a593Smuzhiyun if len(self.queue): 413*4882a593Smuzhiyun self.handle_item(b"cookerconfig", self.handle_cookercfg) 414*4882a593Smuzhiyun self.handle_item(b"extraconfigdata", self.handle_extraconfigdata) 415*4882a593Smuzhiyun self.handle_item(b"workerdata", self.handle_workerdata) 416*4882a593Smuzhiyun self.handle_item(b"newtaskhashes", self.handle_newtaskhashes) 417*4882a593Smuzhiyun self.handle_item(b"runtask", self.handle_runtask) 418*4882a593Smuzhiyun self.handle_item(b"finishnow", self.handle_finishnow) 419*4882a593Smuzhiyun self.handle_item(b"ping", self.handle_ping) 420*4882a593Smuzhiyun self.handle_item(b"quit", self.handle_quit) 421*4882a593Smuzhiyun 422*4882a593Smuzhiyun for pipe in self.build_pipes: 423*4882a593Smuzhiyun if self.build_pipes[pipe].input in ready: 424*4882a593Smuzhiyun self.build_pipes[pipe].read() 425*4882a593Smuzhiyun if len(self.build_pids): 426*4882a593Smuzhiyun while self.process_waitpid(): 427*4882a593Smuzhiyun continue 428*4882a593Smuzhiyun 429*4882a593Smuzhiyun 430*4882a593Smuzhiyun def handle_item(self, item, func): 431*4882a593Smuzhiyun if self.queue.startswith(b"<" + item + b">"): 432*4882a593Smuzhiyun index = self.queue.find(b"</" + item + b">") 433*4882a593Smuzhiyun while index != -1: 434*4882a593Smuzhiyun try: 435*4882a593Smuzhiyun func(self.queue[(len(item) + 2):index]) 436*4882a593Smuzhiyun except pickle.UnpicklingError: 437*4882a593Smuzhiyun workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue)) 438*4882a593Smuzhiyun raise 439*4882a593Smuzhiyun self.queue = self.queue[(index + len(item) + 3):] 440*4882a593Smuzhiyun index = self.queue.find(b"</" + item + b">") 441*4882a593Smuzhiyun 442*4882a593Smuzhiyun def handle_cookercfg(self, data): 443*4882a593Smuzhiyun self.cookercfg = pickle.loads(data) 444*4882a593Smuzhiyun self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) 445*4882a593Smuzhiyun self.databuilder.parseBaseConfiguration(worker=True) 446*4882a593Smuzhiyun self.data = self.databuilder.data 447*4882a593Smuzhiyun 448*4882a593Smuzhiyun def handle_extraconfigdata(self, data): 449*4882a593Smuzhiyun self.extraconfigdata = pickle.loads(data) 450*4882a593Smuzhiyun 451*4882a593Smuzhiyun def handle_workerdata(self, data): 452*4882a593Smuzhiyun self.workerdata = pickle.loads(data) 453*4882a593Smuzhiyun bb.build.verboseShellLogging = self.workerdata["build_verbose_shell"] 454*4882a593Smuzhiyun bb.build.verboseStdoutLogging = self.workerdata["build_verbose_stdout"] 455*4882a593Smuzhiyun bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"] 456*4882a593Smuzhiyun bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"] 457*4882a593Smuzhiyun for mc in self.databuilder.mcdata: 458*4882a593Smuzhiyun self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) 459*4882a593Smuzhiyun self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"]) 460*4882a593Smuzhiyun 461*4882a593Smuzhiyun def handle_newtaskhashes(self, data): 462*4882a593Smuzhiyun self.workerdata["newhashes"] = pickle.loads(data) 463*4882a593Smuzhiyun 464*4882a593Smuzhiyun def handle_ping(self, _): 465*4882a593Smuzhiyun workerlog_write("Handling ping\n") 466*4882a593Smuzhiyun 467*4882a593Smuzhiyun logger.warning("Pong from bitbake-worker!") 468*4882a593Smuzhiyun 469*4882a593Smuzhiyun def handle_quit(self, data): 470*4882a593Smuzhiyun workerlog_write("Handling quit\n") 471*4882a593Smuzhiyun 472*4882a593Smuzhiyun global normalexit 473*4882a593Smuzhiyun normalexit = True 474*4882a593Smuzhiyun sys.exit(0) 475*4882a593Smuzhiyun 476*4882a593Smuzhiyun def handle_runtask(self, data): 477*4882a593Smuzhiyun fn, task, taskname, taskhash, unihash, quieterrors, appends, taskdepdata, dry_run_exec = pickle.loads(data) 478*4882a593Smuzhiyun workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) 479*4882a593Smuzhiyun 480*4882a593Smuzhiyun pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec) 481*4882a593Smuzhiyun 482*4882a593Smuzhiyun self.build_pids[pid] = task 483*4882a593Smuzhiyun self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) 484*4882a593Smuzhiyun 485*4882a593Smuzhiyun def process_waitpid(self): 486*4882a593Smuzhiyun """ 487*4882a593Smuzhiyun Return none is there are no processes awaiting result collection, otherwise 488*4882a593Smuzhiyun collect the process exit codes and close the information pipe. 489*4882a593Smuzhiyun """ 490*4882a593Smuzhiyun try: 491*4882a593Smuzhiyun pid, status = os.waitpid(-1, os.WNOHANG) 492*4882a593Smuzhiyun if pid == 0 or os.WIFSTOPPED(status): 493*4882a593Smuzhiyun return False 494*4882a593Smuzhiyun except OSError: 495*4882a593Smuzhiyun return False 496*4882a593Smuzhiyun 497*4882a593Smuzhiyun workerlog_write("Exit code of %s for pid %s\n" % (status, pid)) 498*4882a593Smuzhiyun 499*4882a593Smuzhiyun if os.WIFEXITED(status): 500*4882a593Smuzhiyun status = os.WEXITSTATUS(status) 501*4882a593Smuzhiyun elif os.WIFSIGNALED(status): 502*4882a593Smuzhiyun # Per shell conventions for $?, when a process exits due to 503*4882a593Smuzhiyun # a signal, we return an exit code of 128 + SIGNUM 504*4882a593Smuzhiyun status = 128 + os.WTERMSIG(status) 505*4882a593Smuzhiyun 506*4882a593Smuzhiyun task = self.build_pids[pid] 507*4882a593Smuzhiyun del self.build_pids[pid] 508*4882a593Smuzhiyun 509*4882a593Smuzhiyun self.build_pipes[pid].close() 510*4882a593Smuzhiyun del self.build_pipes[pid] 511*4882a593Smuzhiyun 512*4882a593Smuzhiyun worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>") 513*4882a593Smuzhiyun 514*4882a593Smuzhiyun return True 515*4882a593Smuzhiyun 516*4882a593Smuzhiyun def handle_finishnow(self, _): 517*4882a593Smuzhiyun if self.build_pids: 518*4882a593Smuzhiyun logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids)) 519*4882a593Smuzhiyun for k, v in iter(self.build_pids.items()): 520*4882a593Smuzhiyun try: 521*4882a593Smuzhiyun os.kill(-k, signal.SIGTERM) 522*4882a593Smuzhiyun os.waitpid(-1, 0) 523*4882a593Smuzhiyun except: 524*4882a593Smuzhiyun pass 525*4882a593Smuzhiyun for pipe in self.build_pipes: 526*4882a593Smuzhiyun self.build_pipes[pipe].read() 527*4882a593Smuzhiyun 528*4882a593Smuzhiyuntry: 529*4882a593Smuzhiyun worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb')) 530*4882a593Smuzhiyun if not profiling: 531*4882a593Smuzhiyun worker.serve() 532*4882a593Smuzhiyun else: 533*4882a593Smuzhiyun profname = "profile-worker.log" 534*4882a593Smuzhiyun prof = profile.Profile() 535*4882a593Smuzhiyun try: 536*4882a593Smuzhiyun profile.Profile.runcall(prof, worker.serve) 537*4882a593Smuzhiyun finally: 538*4882a593Smuzhiyun prof.dump_stats(profname) 539*4882a593Smuzhiyun bb.utils.process_profilelog(profname) 540*4882a593Smuzhiyunexcept BaseException as e: 541*4882a593Smuzhiyun if not normalexit: 542*4882a593Smuzhiyun import traceback 543*4882a593Smuzhiyun sys.stderr.write(traceback.format_exc()) 544*4882a593Smuzhiyun sys.stderr.write(str(e)) 545*4882a593Smuzhiyunfinally: 546*4882a593Smuzhiyun worker_thread_exit = True 547*4882a593Smuzhiyun worker_thread.join() 548*4882a593Smuzhiyun 549*4882a593Smuzhiyunworkerlog_write("exiting") 550*4882a593Smuzhiyunif not normalexit: 551*4882a593Smuzhiyun sys.exit(1) 552*4882a593Smuzhiyunsys.exit(0) 553