1#!/usr/bin/env python3 2# 3# Copyright BitBake Contributors 4# 5# SPDX-License-Identifier: GPL-2.0-only 6# 7 8import os 9import sys 10import warnings 11warnings.simplefilter("default") 12sys.path.insert(0, os.path.join(os.path.dirname(os.path.dirname(sys.argv[0])), 'lib')) 13from bb import fetch2 14import logging 15import bb 16import select 17import errno 18import signal 19import pickle 20import traceback 21import queue 22import shlex 23import subprocess 24from multiprocessing import Lock 25from threading import Thread 26 27if sys.getfilesystemencoding() != "utf-8": 28 sys.exit("Please use a locale setting which supports UTF-8 (such as LANG=en_US.UTF-8).\nPython can't change the filesystem locale after loading so we need a UTF-8 when Python starts or things won't work.") 29 30# Users shouldn't be running this code directly 31if len(sys.argv) != 2 or not sys.argv[1].startswith("decafbad"): 32 print("bitbake-worker is meant for internal execution by bitbake itself, please don't use it standalone.") 33 sys.exit(1) 34 35profiling = False 36if sys.argv[1].startswith("decafbadbad"): 37 profiling = True 38 try: 39 import cProfile as profile 40 except: 41 import profile 42 43# Unbuffer stdout to avoid log truncation in the event 44# of an unorderly exit as well as to provide timely 45# updates to log files for use with tail 46try: 47 if sys.stdout.name == '<stdout>': 48 import fcntl 49 fl = fcntl.fcntl(sys.stdout.fileno(), fcntl.F_GETFL) 50 fl |= os.O_SYNC 51 fcntl.fcntl(sys.stdout.fileno(), fcntl.F_SETFL, fl) 52 #sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) 53except: 54 pass 55 56logger = logging.getLogger("BitBake") 57 58worker_pipe = sys.stdout.fileno() 59bb.utils.nonblockingfd(worker_pipe) 60# Need to guard against multiprocessing being used in child processes 61# and multiple processes trying to write to the parent at the same time 62worker_pipe_lock = None 63 64handler = bb.event.LogHandler() 65logger.addHandler(handler) 66 67if 0: 68 # Code to write out a log file of all events passing through the worker 69 logfilename = "/tmp/workerlogfile" 70 format_str = "%(levelname)s: %(message)s" 71 conlogformat = bb.msg.BBLogFormatter(format_str) 72 consolelog = logging.FileHandler(logfilename) 73 consolelog.setFormatter(conlogformat) 74 logger.addHandler(consolelog) 75 76worker_queue = queue.Queue() 77 78def worker_fire(event, d): 79 data = b"<event>" + pickle.dumps(event) + b"</event>" 80 worker_fire_prepickled(data) 81 82def worker_fire_prepickled(event): 83 global worker_queue 84 85 worker_queue.put(event) 86 87# 88# We can end up with write contention with the cooker, it can be trying to send commands 89# and we can be trying to send event data back. Therefore use a separate thread for writing 90# back data to cooker. 91# 92worker_thread_exit = False 93 94def worker_flush(worker_queue): 95 worker_queue_int = b"" 96 global worker_pipe, worker_thread_exit 97 98 while True: 99 try: 100 worker_queue_int = worker_queue_int + worker_queue.get(True, 1) 101 except queue.Empty: 102 pass 103 while (worker_queue_int or not worker_queue.empty()): 104 try: 105 (_, ready, _) = select.select([], [worker_pipe], [], 1) 106 if not worker_queue.empty(): 107 worker_queue_int = worker_queue_int + worker_queue.get() 108 written = os.write(worker_pipe, worker_queue_int) 109 worker_queue_int = worker_queue_int[written:] 110 except (IOError, OSError) as e: 111 if e.errno != errno.EAGAIN and e.errno != errno.EPIPE: 112 raise 113 if worker_thread_exit and worker_queue.empty() and not worker_queue_int: 114 return 115 116worker_thread = Thread(target=worker_flush, args=(worker_queue,)) 117worker_thread.start() 118 119def worker_child_fire(event, d): 120 global worker_pipe 121 global worker_pipe_lock 122 123 data = b"<event>" + pickle.dumps(event) + b"</event>" 124 try: 125 worker_pipe_lock.acquire() 126 while(len(data)): 127 written = worker_pipe.write(data) 128 data = data[written:] 129 worker_pipe_lock.release() 130 except IOError: 131 sigterm_handler(None, None) 132 raise 133 134bb.event.worker_fire = worker_fire 135 136lf = None 137#lf = open("/tmp/workercommandlog", "w+") 138def workerlog_write(msg): 139 if lf: 140 lf.write(msg) 141 lf.flush() 142 143def sigterm_handler(signum, frame): 144 signal.signal(signal.SIGTERM, signal.SIG_DFL) 145 os.killpg(0, signal.SIGTERM) 146 sys.exit() 147 148def fork_off_task(cfg, data, databuilder, workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, extraconfigdata, quieterrors=False, dry_run_exec=False): 149 # We need to setup the environment BEFORE the fork, since 150 # a fork() or exec*() activates PSEUDO... 151 152 envbackup = {} 153 fakeroot = False 154 fakeenv = {} 155 umask = None 156 157 uid = os.getuid() 158 gid = os.getgid() 159 160 161 taskdep = workerdata["taskdeps"][fn] 162 if 'umask' in taskdep and taskname in taskdep['umask']: 163 umask = taskdep['umask'][taskname] 164 elif workerdata["umask"]: 165 umask = workerdata["umask"] 166 if umask: 167 # umask might come in as a number or text string.. 168 try: 169 umask = int(umask, 8) 170 except TypeError: 171 pass 172 173 dry_run = cfg.dry_run or dry_run_exec 174 175 # We can't use the fakeroot environment in a dry run as it possibly hasn't been built 176 if 'fakeroot' in taskdep and taskname in taskdep['fakeroot'] and not dry_run: 177 fakeroot = True 178 envvars = (workerdata["fakerootenv"][fn] or "").split() 179 for key, value in (var.split('=') for var in envvars): 180 envbackup[key] = os.environ.get(key) 181 os.environ[key] = value 182 fakeenv[key] = value 183 184 fakedirs = (workerdata["fakerootdirs"][fn] or "").split() 185 for p in fakedirs: 186 bb.utils.mkdirhier(p) 187 logger.debug2('Running %s:%s under fakeroot, fakedirs: %s' % 188 (fn, taskname, ', '.join(fakedirs))) 189 else: 190 envvars = (workerdata["fakerootnoenv"][fn] or "").split() 191 for key, value in (var.split('=') for var in envvars): 192 envbackup[key] = os.environ.get(key) 193 os.environ[key] = value 194 fakeenv[key] = value 195 196 sys.stdout.flush() 197 sys.stderr.flush() 198 199 try: 200 pipein, pipeout = os.pipe() 201 pipein = os.fdopen(pipein, 'rb', 4096) 202 pipeout = os.fdopen(pipeout, 'wb', 0) 203 pid = os.fork() 204 except OSError as e: 205 logger.critical("fork failed: %d (%s)" % (e.errno, e.strerror)) 206 sys.exit(1) 207 208 if pid == 0: 209 def child(): 210 global worker_pipe 211 global worker_pipe_lock 212 pipein.close() 213 214 bb.utils.signal_on_parent_exit("SIGTERM") 215 216 # Save out the PID so that the event can include it the 217 # events 218 bb.event.worker_pid = os.getpid() 219 bb.event.worker_fire = worker_child_fire 220 worker_pipe = pipeout 221 worker_pipe_lock = Lock() 222 223 # Make the child the process group leader and ensure no 224 # child process will be controlled by the current terminal 225 # This ensures signals sent to the controlling terminal like Ctrl+C 226 # don't stop the child processes. 227 os.setsid() 228 229 signal.signal(signal.SIGTERM, sigterm_handler) 230 # Let SIGHUP exit as SIGTERM 231 signal.signal(signal.SIGHUP, sigterm_handler) 232 233 # No stdin 234 newsi = os.open(os.devnull, os.O_RDWR) 235 os.dup2(newsi, sys.stdin.fileno()) 236 237 if umask: 238 os.umask(umask) 239 240 try: 241 bb_cache = bb.cache.NoCache(databuilder) 242 (realfn, virtual, mc) = bb.cache.virtualfn2realfn(fn) 243 the_data = databuilder.mcdata[mc] 244 the_data.setVar("BB_WORKERCONTEXT", "1") 245 the_data.setVar("BB_TASKDEPDATA", taskdepdata) 246 the_data.setVar('BB_CURRENTTASK', taskname.replace("do_", "")) 247 if cfg.limited_deps: 248 the_data.setVar("BB_LIMITEDDEPS", "1") 249 the_data.setVar("BUILDNAME", workerdata["buildname"]) 250 the_data.setVar("DATE", workerdata["date"]) 251 the_data.setVar("TIME", workerdata["time"]) 252 for varname, value in extraconfigdata.items(): 253 the_data.setVar(varname, value) 254 255 bb.parse.siggen.set_taskdata(workerdata["sigdata"]) 256 if "newhashes" in workerdata: 257 bb.parse.siggen.set_taskhashes(workerdata["newhashes"]) 258 ret = 0 259 260 the_data = bb_cache.loadDataFull(fn, appends) 261 the_data.setVar('BB_TASKHASH', taskhash) 262 the_data.setVar('BB_UNIHASH', unihash) 263 264 bb.utils.set_process_name("%s:%s" % (the_data.getVar("PN"), taskname.replace("do_", ""))) 265 266 if not the_data.getVarFlag(taskname, 'network', False): 267 if bb.utils.is_local_uid(uid): 268 logger.debug("Attempting to disable network for %s" % taskname) 269 bb.utils.disable_network(uid, gid) 270 else: 271 logger.debug("Skipping disable network for %s since %s is not a local uid." % (taskname, uid)) 272 273 # exported_vars() returns a generator which *cannot* be passed to os.environ.update() 274 # successfully. We also need to unset anything from the environment which shouldn't be there 275 exports = bb.data.exported_vars(the_data) 276 277 bb.utils.empty_environment() 278 for e, v in exports: 279 os.environ[e] = v 280 281 for e in fakeenv: 282 os.environ[e] = fakeenv[e] 283 the_data.setVar(e, fakeenv[e]) 284 the_data.setVarFlag(e, 'export', "1") 285 286 task_exports = the_data.getVarFlag(taskname, 'exports') 287 if task_exports: 288 for e in task_exports.split(): 289 the_data.setVarFlag(e, 'export', '1') 290 v = the_data.getVar(e) 291 if v is not None: 292 os.environ[e] = v 293 294 if quieterrors: 295 the_data.setVarFlag(taskname, "quieterrors", "1") 296 297 except Exception: 298 if not quieterrors: 299 logger.critical(traceback.format_exc()) 300 os._exit(1) 301 try: 302 if dry_run: 303 return 0 304 try: 305 ret = bb.build.exec_task(fn, taskname, the_data, cfg.profile) 306 finally: 307 if fakeroot: 308 fakerootcmd = shlex.split(the_data.getVar("FAKEROOTCMD")) 309 subprocess.run(fakerootcmd + ['-S'], check=True, stdout=subprocess.PIPE) 310 return ret 311 except: 312 os._exit(1) 313 if not profiling: 314 os._exit(child()) 315 else: 316 profname = "profile-%s.log" % (fn.replace("/", "-") + "-" + taskname) 317 prof = profile.Profile() 318 try: 319 ret = profile.Profile.runcall(prof, child) 320 finally: 321 prof.dump_stats(profname) 322 bb.utils.process_profilelog(profname) 323 os._exit(ret) 324 else: 325 for key, value in iter(envbackup.items()): 326 if value is None: 327 del os.environ[key] 328 else: 329 os.environ[key] = value 330 331 return pid, pipein, pipeout 332 333class runQueueWorkerPipe(): 334 """ 335 Abstraction for a pipe between a worker thread and the worker server 336 """ 337 def __init__(self, pipein, pipeout): 338 self.input = pipein 339 if pipeout: 340 pipeout.close() 341 bb.utils.nonblockingfd(self.input) 342 self.queue = b"" 343 344 def read(self): 345 start = len(self.queue) 346 try: 347 self.queue = self.queue + (self.input.read(102400) or b"") 348 except (OSError, IOError) as e: 349 if e.errno != errno.EAGAIN: 350 raise 351 352 end = len(self.queue) 353 index = self.queue.find(b"</event>") 354 while index != -1: 355 msg = self.queue[:index+8] 356 assert msg.startswith(b"<event>") and msg.count(b"<event>") == 1 357 worker_fire_prepickled(msg) 358 self.queue = self.queue[index+8:] 359 index = self.queue.find(b"</event>") 360 return (end > start) 361 362 def close(self): 363 while self.read(): 364 continue 365 if len(self.queue) > 0: 366 print("Warning, worker child left partial message: %s" % self.queue) 367 self.input.close() 368 369normalexit = False 370 371class BitbakeWorker(object): 372 def __init__(self, din): 373 self.input = din 374 bb.utils.nonblockingfd(self.input) 375 self.queue = b"" 376 self.cookercfg = None 377 self.databuilder = None 378 self.data = None 379 self.extraconfigdata = None 380 self.build_pids = {} 381 self.build_pipes = {} 382 383 signal.signal(signal.SIGTERM, self.sigterm_exception) 384 # Let SIGHUP exit as SIGTERM 385 signal.signal(signal.SIGHUP, self.sigterm_exception) 386 if "beef" in sys.argv[1]: 387 bb.utils.set_process_name("Worker (Fakeroot)") 388 else: 389 bb.utils.set_process_name("Worker") 390 391 def sigterm_exception(self, signum, stackframe): 392 if signum == signal.SIGTERM: 393 bb.warn("Worker received SIGTERM, shutting down...") 394 elif signum == signal.SIGHUP: 395 bb.warn("Worker received SIGHUP, shutting down...") 396 self.handle_finishnow(None) 397 signal.signal(signal.SIGTERM, signal.SIG_DFL) 398 os.kill(os.getpid(), signal.SIGTERM) 399 400 def serve(self): 401 while True: 402 (ready, _, _) = select.select([self.input] + [i.input for i in self.build_pipes.values()], [] , [], 1) 403 if self.input in ready: 404 try: 405 r = self.input.read() 406 if len(r) == 0: 407 # EOF on pipe, server must have terminated 408 self.sigterm_exception(signal.SIGTERM, None) 409 self.queue = self.queue + r 410 except (OSError, IOError): 411 pass 412 if len(self.queue): 413 self.handle_item(b"cookerconfig", self.handle_cookercfg) 414 self.handle_item(b"extraconfigdata", self.handle_extraconfigdata) 415 self.handle_item(b"workerdata", self.handle_workerdata) 416 self.handle_item(b"newtaskhashes", self.handle_newtaskhashes) 417 self.handle_item(b"runtask", self.handle_runtask) 418 self.handle_item(b"finishnow", self.handle_finishnow) 419 self.handle_item(b"ping", self.handle_ping) 420 self.handle_item(b"quit", self.handle_quit) 421 422 for pipe in self.build_pipes: 423 if self.build_pipes[pipe].input in ready: 424 self.build_pipes[pipe].read() 425 if len(self.build_pids): 426 while self.process_waitpid(): 427 continue 428 429 430 def handle_item(self, item, func): 431 if self.queue.startswith(b"<" + item + b">"): 432 index = self.queue.find(b"</" + item + b">") 433 while index != -1: 434 try: 435 func(self.queue[(len(item) + 2):index]) 436 except pickle.UnpicklingError: 437 workerlog_write("Unable to unpickle data: %s\n" % ":".join("{:02x}".format(c) for c in self.queue)) 438 raise 439 self.queue = self.queue[(index + len(item) + 3):] 440 index = self.queue.find(b"</" + item + b">") 441 442 def handle_cookercfg(self, data): 443 self.cookercfg = pickle.loads(data) 444 self.databuilder = bb.cookerdata.CookerDataBuilder(self.cookercfg, worker=True) 445 self.databuilder.parseBaseConfiguration(worker=True) 446 self.data = self.databuilder.data 447 448 def handle_extraconfigdata(self, data): 449 self.extraconfigdata = pickle.loads(data) 450 451 def handle_workerdata(self, data): 452 self.workerdata = pickle.loads(data) 453 bb.build.verboseShellLogging = self.workerdata["build_verbose_shell"] 454 bb.build.verboseStdoutLogging = self.workerdata["build_verbose_stdout"] 455 bb.msg.loggerDefaultLogLevel = self.workerdata["logdefaultlevel"] 456 bb.msg.loggerDefaultDomains = self.workerdata["logdefaultdomain"] 457 for mc in self.databuilder.mcdata: 458 self.databuilder.mcdata[mc].setVar("PRSERV_HOST", self.workerdata["prhost"]) 459 self.databuilder.mcdata[mc].setVar("BB_HASHSERVE", self.workerdata["hashservaddr"]) 460 461 def handle_newtaskhashes(self, data): 462 self.workerdata["newhashes"] = pickle.loads(data) 463 464 def handle_ping(self, _): 465 workerlog_write("Handling ping\n") 466 467 logger.warning("Pong from bitbake-worker!") 468 469 def handle_quit(self, data): 470 workerlog_write("Handling quit\n") 471 472 global normalexit 473 normalexit = True 474 sys.exit(0) 475 476 def handle_runtask(self, data): 477 fn, task, taskname, taskhash, unihash, quieterrors, appends, taskdepdata, dry_run_exec = pickle.loads(data) 478 workerlog_write("Handling runtask %s %s %s\n" % (task, fn, taskname)) 479 480 pid, pipein, pipeout = fork_off_task(self.cookercfg, self.data, self.databuilder, self.workerdata, fn, task, taskname, taskhash, unihash, appends, taskdepdata, self.extraconfigdata, quieterrors, dry_run_exec) 481 482 self.build_pids[pid] = task 483 self.build_pipes[pid] = runQueueWorkerPipe(pipein, pipeout) 484 485 def process_waitpid(self): 486 """ 487 Return none is there are no processes awaiting result collection, otherwise 488 collect the process exit codes and close the information pipe. 489 """ 490 try: 491 pid, status = os.waitpid(-1, os.WNOHANG) 492 if pid == 0 or os.WIFSTOPPED(status): 493 return False 494 except OSError: 495 return False 496 497 workerlog_write("Exit code of %s for pid %s\n" % (status, pid)) 498 499 if os.WIFEXITED(status): 500 status = os.WEXITSTATUS(status) 501 elif os.WIFSIGNALED(status): 502 # Per shell conventions for $?, when a process exits due to 503 # a signal, we return an exit code of 128 + SIGNUM 504 status = 128 + os.WTERMSIG(status) 505 506 task = self.build_pids[pid] 507 del self.build_pids[pid] 508 509 self.build_pipes[pid].close() 510 del self.build_pipes[pid] 511 512 worker_fire_prepickled(b"<exitcode>" + pickle.dumps((task, status)) + b"</exitcode>") 513 514 return True 515 516 def handle_finishnow(self, _): 517 if self.build_pids: 518 logger.info("Sending SIGTERM to remaining %s tasks", len(self.build_pids)) 519 for k, v in iter(self.build_pids.items()): 520 try: 521 os.kill(-k, signal.SIGTERM) 522 os.waitpid(-1, 0) 523 except: 524 pass 525 for pipe in self.build_pipes: 526 self.build_pipes[pipe].read() 527 528try: 529 worker = BitbakeWorker(os.fdopen(sys.stdin.fileno(), 'rb')) 530 if not profiling: 531 worker.serve() 532 else: 533 profname = "profile-worker.log" 534 prof = profile.Profile() 535 try: 536 profile.Profile.runcall(prof, worker.serve) 537 finally: 538 prof.dump_stats(profname) 539 bb.utils.process_profilelog(profname) 540except BaseException as e: 541 if not normalexit: 542 import traceback 543 sys.stderr.write(traceback.format_exc()) 544 sys.stderr.write(str(e)) 545finally: 546 worker_thread_exit = True 547 worker_thread.join() 548 549workerlog_write("exiting") 550if not normalexit: 551 sys.exit(1) 552sys.exit(0) 553