1# 2# BitBake Process based server. 3# 4# Copyright (C) 2010 Bob Foerster <robert@erafx.com> 5# 6# SPDX-License-Identifier: GPL-2.0-only 7# 8 9""" 10 This module implements a multiprocessing.Process based server for bitbake. 11""" 12 13import bb 14import bb.event 15import logging 16import multiprocessing 17import threading 18import array 19import os 20import sys 21import time 22import select 23import socket 24import subprocess 25import errno 26import re 27import datetime 28import pickle 29import traceback 30import gc 31import bb.server.xmlrpcserver 32from bb import daemonize 33from multiprocessing import queues 34 35logger = logging.getLogger('BitBake') 36 37class ProcessTimeout(SystemExit): 38 pass 39 40def serverlog(msg): 41 print(str(os.getpid()) + " " + datetime.datetime.now().strftime('%H:%M:%S.%f') + " " + msg) 42 sys.stdout.flush() 43 44class ProcessServer(): 45 profile_filename = "profile.log" 46 profile_processed_filename = "profile.log.processed" 47 48 def __init__(self, lock, lockname, sock, sockname, server_timeout, xmlrpcinterface): 49 self.command_channel = False 50 self.command_channel_reply = False 51 self.quit = False 52 self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore. 53 self.next_heartbeat = time.time() 54 55 self.event_handle = None 56 self.hadanyui = False 57 self.haveui = False 58 self.maxuiwait = 30 59 self.xmlrpc = False 60 61 self._idlefuns = {} 62 63 self.bitbake_lock = lock 64 self.bitbake_lock_name = lockname 65 self.sock = sock 66 self.sockname = sockname 67 68 self.server_timeout = server_timeout 69 self.timeout = self.server_timeout 70 self.xmlrpcinterface = xmlrpcinterface 71 72 def register_idle_function(self, function, data): 73 """Register a function to be called while the server is idle""" 74 assert hasattr(function, '__call__') 75 self._idlefuns[function] = data 76 77 def run(self): 78 79 if self.xmlrpcinterface[0]: 80 self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self) 81 82 serverlog("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port)) 83 84 try: 85 self.bitbake_lock.seek(0) 86 self.bitbake_lock.truncate() 87 if self.xmlrpc: 88 self.bitbake_lock.write("%s %s:%s\n" % (os.getpid(), self.xmlrpc.host, self.xmlrpc.port)) 89 else: 90 self.bitbake_lock.write("%s\n" % (os.getpid())) 91 self.bitbake_lock.flush() 92 except Exception as e: 93 serverlog("Error writing to lock file: %s" % str(e)) 94 pass 95 96 if self.cooker.configuration.profile: 97 try: 98 import cProfile as profile 99 except: 100 import profile 101 prof = profile.Profile() 102 103 ret = profile.Profile.runcall(prof, self.main) 104 105 prof.dump_stats("profile.log") 106 bb.utils.process_profilelog("profile.log") 107 serverlog("Raw profiling information saved to profile.log and processed statistics to profile.log.processed") 108 109 else: 110 ret = self.main() 111 112 return ret 113 114 def main(self): 115 self.cooker.pre_serve() 116 117 bb.utils.set_process_name("Cooker") 118 119 ready = [] 120 newconnections = [] 121 122 self.controllersock = False 123 fds = [self.sock] 124 if self.xmlrpc: 125 fds.append(self.xmlrpc) 126 seendata = False 127 serverlog("Entering server connection loop") 128 129 def disconnect_client(self, fds): 130 serverlog("Disconnecting Client") 131 if self.controllersock: 132 fds.remove(self.controllersock) 133 self.controllersock.close() 134 self.controllersock = False 135 if self.haveui: 136 fds.remove(self.command_channel) 137 bb.event.unregister_UIHhandler(self.event_handle, True) 138 self.command_channel_reply.writer.close() 139 self.event_writer.writer.close() 140 self.command_channel.close() 141 self.command_channel = False 142 del self.event_writer 143 self.lastui = time.time() 144 self.cooker.clientComplete() 145 self.haveui = False 146 ready = select.select(fds,[],[],0)[0] 147 if newconnections: 148 serverlog("Starting new client") 149 conn = newconnections.pop(-1) 150 fds.append(conn) 151 self.controllersock = conn 152 elif not self.timeout and not ready: 153 serverlog("No timeout, exiting.") 154 self.quit = True 155 156 self.lastui = time.time() 157 while not self.quit: 158 if self.sock in ready: 159 while select.select([self.sock],[],[],0)[0]: 160 controllersock, address = self.sock.accept() 161 if self.controllersock: 162 serverlog("Queuing %s (%s)" % (str(ready), str(newconnections))) 163 newconnections.append(controllersock) 164 else: 165 serverlog("Accepting %s (%s)" % (str(ready), str(newconnections))) 166 self.controllersock = controllersock 167 fds.append(controllersock) 168 if self.controllersock in ready: 169 try: 170 serverlog("Processing Client") 171 ui_fds = recvfds(self.controllersock, 3) 172 serverlog("Connecting Client") 173 174 # Where to write events to 175 writer = ConnectionWriter(ui_fds[0]) 176 self.event_handle = bb.event.register_UIHhandler(writer, True) 177 self.event_writer = writer 178 179 # Where to read commands from 180 reader = ConnectionReader(ui_fds[1]) 181 fds.append(reader) 182 self.command_channel = reader 183 184 # Where to send command return values to 185 writer = ConnectionWriter(ui_fds[2]) 186 self.command_channel_reply = writer 187 188 self.haveui = True 189 self.hadanyui = True 190 191 except (EOFError, OSError): 192 disconnect_client(self, fds) 193 194 if not self.timeout == -1.0 and not self.haveui and self.timeout and \ 195 (self.lastui + self.timeout) < time.time(): 196 serverlog("Server timeout, exiting.") 197 self.quit = True 198 199 # If we don't see a UI connection within maxuiwait, its unlikely we're going to see 200 # one. We have had issue with processes hanging indefinitely so timing out UI-less 201 # servers is useful. 202 if not self.hadanyui and not self.xmlrpc and not self.timeout and (self.lastui + self.maxuiwait) < time.time(): 203 serverlog("No UI connection within max timeout, exiting to avoid infinite loop.") 204 self.quit = True 205 206 if self.command_channel in ready: 207 try: 208 command = self.command_channel.get() 209 except EOFError: 210 # Client connection shutting down 211 ready = [] 212 disconnect_client(self, fds) 213 continue 214 if command[0] == "terminateServer": 215 self.quit = True 216 continue 217 try: 218 serverlog("Running command %s" % command) 219 self.command_channel_reply.send(self.cooker.command.runCommand(command)) 220 serverlog("Command Completed") 221 except Exception as e: 222 stack = traceback.format_exc() 223 serverlog('Exception in server main event loop running command %s (%s)' % (command, stack)) 224 logger.exception('Exception in server main event loop running command %s (%s)' % (command, stack)) 225 226 if self.xmlrpc in ready: 227 self.xmlrpc.handle_requests() 228 229 if not seendata and hasattr(self.cooker, "data"): 230 heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT') 231 if heartbeat_event: 232 try: 233 self.heartbeat_seconds = float(heartbeat_event) 234 except: 235 bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event) 236 237 self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT') 238 try: 239 if self.timeout: 240 self.timeout = float(self.timeout) 241 except: 242 bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout) 243 seendata = True 244 245 ready = self.idle_commands(.1, fds) 246 247 serverlog("Exiting") 248 # Remove the socket file so we don't get any more connections to avoid races 249 try: 250 os.unlink(self.sockname) 251 except: 252 pass 253 self.sock.close() 254 255 try: 256 self.cooker.shutdown(True) 257 self.cooker.notifier.stop() 258 self.cooker.confignotifier.stop() 259 except: 260 pass 261 262 self.cooker.post_serve() 263 264 if len(threading.enumerate()) != 1: 265 serverlog("More than one thread left?: " + str(threading.enumerate())) 266 267 # Flush logs before we release the lock 268 sys.stdout.flush() 269 sys.stderr.flush() 270 271 # Finally release the lockfile but warn about other processes holding it open 272 lock = self.bitbake_lock 273 lockfile = self.bitbake_lock_name 274 275 def get_lock_contents(lockfile): 276 try: 277 with open(lockfile, "r") as f: 278 return f.readlines() 279 except FileNotFoundError: 280 return None 281 282 lockcontents = get_lock_contents(lockfile) 283 serverlog("Original lockfile contents: " + str(lockcontents)) 284 285 lock.close() 286 lock = None 287 288 while not lock: 289 i = 0 290 lock = None 291 while not lock and i < 30: 292 lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False) 293 if not lock: 294 newlockcontents = get_lock_contents(lockfile) 295 if newlockcontents != lockcontents: 296 # A new server was started, the lockfile contents changed, we can exit 297 serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents)) 298 return 299 time.sleep(0.1) 300 i += 1 301 if lock: 302 # We hold the lock so we can remove the file (hide stale pid data) 303 # via unlockfile. 304 bb.utils.unlockfile(lock) 305 serverlog("Exiting as we could obtain the lock") 306 return 307 308 if not lock: 309 # Some systems may not have lsof available 310 procs = None 311 try: 312 procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT) 313 except subprocess.CalledProcessError: 314 # File was deleted? 315 continue 316 except OSError as e: 317 if e.errno != errno.ENOENT: 318 raise 319 if procs is None: 320 # Fall back to fuser if lsof is unavailable 321 try: 322 procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT) 323 except subprocess.CalledProcessError: 324 # File was deleted? 325 continue 326 except OSError as e: 327 if e.errno != errno.ENOENT: 328 raise 329 330 msg = ["Delaying shutdown due to active processes which appear to be holding bitbake.lock"] 331 if procs: 332 msg.append(":\n%s" % str(procs.decode("utf-8"))) 333 serverlog("".join(msg)) 334 335 def idle_commands(self, delay, fds=None): 336 nextsleep = delay 337 if not fds: 338 fds = [] 339 340 for function, data in list(self._idlefuns.items()): 341 try: 342 retval = function(self, data, False) 343 if retval is False: 344 del self._idlefuns[function] 345 nextsleep = None 346 elif retval is True: 347 nextsleep = None 348 elif isinstance(retval, float) and nextsleep: 349 if (retval < nextsleep): 350 nextsleep = retval 351 elif nextsleep is None: 352 continue 353 else: 354 fds = fds + retval 355 except SystemExit: 356 raise 357 except Exception as exc: 358 if not isinstance(exc, bb.BBHandledException): 359 logger.exception('Running idle function') 360 del self._idlefuns[function] 361 self.quit = True 362 363 # Create new heartbeat event? 364 now = time.time() 365 if now >= self.next_heartbeat: 366 # We might have missed heartbeats. Just trigger once in 367 # that case and continue after the usual delay. 368 self.next_heartbeat += self.heartbeat_seconds 369 if self.next_heartbeat <= now: 370 self.next_heartbeat = now + self.heartbeat_seconds 371 if hasattr(self.cooker, "data"): 372 heartbeat = bb.event.HeartbeatEvent(now) 373 try: 374 bb.event.fire(heartbeat, self.cooker.data) 375 except Exception as exc: 376 if not isinstance(exc, bb.BBHandledException): 377 logger.exception('Running heartbeat function') 378 self.quit = True 379 if nextsleep and now + nextsleep > self.next_heartbeat: 380 # Shorten timeout so that we we wake up in time for 381 # the heartbeat. 382 nextsleep = self.next_heartbeat - now 383 384 if nextsleep is not None: 385 if self.xmlrpc: 386 nextsleep = self.xmlrpc.get_timeout(nextsleep) 387 try: 388 return select.select(fds,[],[],nextsleep)[0] 389 except InterruptedError: 390 # Ignore EINTR 391 return [] 392 else: 393 return select.select(fds,[],[],0)[0] 394 395 396class ServerCommunicator(): 397 def __init__(self, connection, recv): 398 self.connection = connection 399 self.recv = recv 400 401 def runCommand(self, command): 402 self.connection.send(command) 403 if not self.recv.poll(30): 404 logger.info("No reply from server in 30s") 405 if not self.recv.poll(30): 406 raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s)") 407 ret, exc = self.recv.get() 408 # Should probably turn all exceptions in exc back into exceptions? 409 # For now, at least handle BBHandledException 410 if exc and ("BBHandledException" in exc or "SystemExit" in exc): 411 raise bb.BBHandledException() 412 return ret, exc 413 414 def updateFeatureSet(self, featureset): 415 _, error = self.runCommand(["setFeatures", featureset]) 416 if error: 417 logger.error("Unable to set the cooker to the correct featureset: %s" % error) 418 raise BaseException(error) 419 420 def getEventHandle(self): 421 handle, error = self.runCommand(["getUIHandlerNum"]) 422 if error: 423 logger.error("Unable to get UI Handler Number: %s" % error) 424 raise BaseException(error) 425 426 return handle 427 428 def terminateServer(self): 429 self.connection.send(['terminateServer']) 430 return 431 432class BitBakeProcessServerConnection(object): 433 def __init__(self, ui_channel, recv, eq, sock): 434 self.connection = ServerCommunicator(ui_channel, recv) 435 self.events = eq 436 # Save sock so it doesn't get gc'd for the life of our connection 437 self.socket_connection = sock 438 439 def terminate(self): 440 self.socket_connection.close() 441 self.connection.connection.close() 442 self.connection.recv.close() 443 return 444 445start_log_format = '--- Starting bitbake server pid %s at %s ---' 446start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f' 447 448class BitBakeServer(object): 449 450 def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface): 451 452 self.server_timeout = server_timeout 453 self.xmlrpcinterface = xmlrpcinterface 454 self.featureset = featureset 455 self.sockname = sockname 456 self.bitbake_lock = lock 457 self.readypipe, self.readypipein = os.pipe() 458 459 # Place the log in the builddirectory alongside the lock file 460 logfile = os.path.join(os.path.dirname(self.bitbake_lock.name), "bitbake-cookerdaemon.log") 461 self.logfile = logfile 462 463 startdatetime = datetime.datetime.now() 464 bb.daemonize.createDaemon(self._startServer, logfile) 465 self.bitbake_lock.close() 466 os.close(self.readypipein) 467 468 ready = ConnectionReader(self.readypipe) 469 r = ready.poll(5) 470 if not r: 471 bb.note("Bitbake server didn't start within 5 seconds, waiting for 90") 472 r = ready.poll(90) 473 if r: 474 try: 475 r = ready.get() 476 except EOFError: 477 # Trap the child exiting/closing the pipe and error out 478 r = None 479 if not r or r[0] != "r": 480 ready.close() 481 bb.error("Unable to start bitbake server (%s)" % str(r)) 482 if os.path.exists(logfile): 483 logstart_re = re.compile(start_log_format % ('([0-9]+)', '([0-9-]+ [0-9:.]+)')) 484 started = False 485 lines = [] 486 lastlines = [] 487 with open(logfile, "r") as f: 488 for line in f: 489 if started: 490 lines.append(line) 491 else: 492 lastlines.append(line) 493 res = logstart_re.search(line.rstrip()) 494 if res: 495 ldatetime = datetime.datetime.strptime(res.group(2), start_log_datetime_format) 496 if ldatetime >= startdatetime: 497 started = True 498 lines.append(line) 499 if len(lastlines) > 60: 500 lastlines = lastlines[-60:] 501 if lines: 502 if len(lines) > 60: 503 bb.error("Last 60 lines of server log for this session (%s):\n%s" % (logfile, "".join(lines[-60:]))) 504 else: 505 bb.error("Server log for this session (%s):\n%s" % (logfile, "".join(lines))) 506 elif lastlines: 507 bb.error("Server didn't start, last 60 loglines (%s):\n%s" % (logfile, "".join(lastlines))) 508 else: 509 bb.error("%s doesn't exist" % logfile) 510 511 raise SystemExit(1) 512 513 ready.close() 514 515 def _startServer(self): 516 os.close(self.readypipe) 517 os.set_inheritable(self.bitbake_lock.fileno(), True) 518 os.set_inheritable(self.readypipein, True) 519 serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server") 520 os.execl(sys.executable, "bitbake-server", serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout or 0), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1])) 521 522def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface): 523 524 import bb.cookerdata 525 import bb.cooker 526 527 serverlog(start_log_format % (os.getpid(), datetime.datetime.now().strftime(start_log_datetime_format))) 528 529 try: 530 bitbake_lock = os.fdopen(lockfd, "w") 531 532 # Create server control socket 533 if os.path.exists(sockname): 534 os.unlink(sockname) 535 536 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 537 # AF_UNIX has path length issues so chdir here to workaround 538 cwd = os.getcwd() 539 try: 540 os.chdir(os.path.dirname(sockname)) 541 sock.bind(os.path.basename(sockname)) 542 finally: 543 os.chdir(cwd) 544 sock.listen(1) 545 546 server = ProcessServer(bitbake_lock, lockname, sock, sockname, server_timeout, xmlrpcinterface) 547 writer = ConnectionWriter(readypipeinfd) 548 try: 549 featureset = [] 550 cooker = bb.cooker.BBCooker(featureset, server.register_idle_function) 551 except bb.BBHandledException: 552 return None 553 writer.send("r") 554 writer.close() 555 server.cooker = cooker 556 serverlog("Started bitbake server pid %d" % os.getpid()) 557 558 server.run() 559 finally: 560 # Flush any messages/errors to the logfile before exit 561 sys.stdout.flush() 562 sys.stderr.flush() 563 564def connectProcessServer(sockname, featureset): 565 # Connect to socket 566 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 567 # AF_UNIX has path length issues so chdir here to workaround 568 cwd = os.getcwd() 569 570 readfd = writefd = readfd1 = writefd1 = readfd2 = writefd2 = None 571 eq = command_chan_recv = command_chan = None 572 573 sock.settimeout(10) 574 575 try: 576 try: 577 os.chdir(os.path.dirname(sockname)) 578 finished = False 579 while not finished: 580 try: 581 sock.connect(os.path.basename(sockname)) 582 finished = True 583 except IOError as e: 584 if e.errno == errno.EWOULDBLOCK: 585 pass 586 raise 587 finally: 588 os.chdir(cwd) 589 590 # Send an fd for the remote to write events to 591 readfd, writefd = os.pipe() 592 eq = BBUIEventQueue(readfd) 593 # Send an fd for the remote to recieve commands from 594 readfd1, writefd1 = os.pipe() 595 command_chan = ConnectionWriter(writefd1) 596 # Send an fd for the remote to write commands results to 597 readfd2, writefd2 = os.pipe() 598 command_chan_recv = ConnectionReader(readfd2) 599 600 sendfds(sock, [writefd, readfd1, writefd2]) 601 602 server_connection = BitBakeProcessServerConnection(command_chan, command_chan_recv, eq, sock) 603 604 # Close the ends of the pipes we won't use 605 for i in [writefd, readfd1, writefd2]: 606 os.close(i) 607 608 server_connection.connection.updateFeatureSet(featureset) 609 610 except (Exception, SystemExit) as e: 611 if command_chan_recv: 612 command_chan_recv.close() 613 if command_chan: 614 command_chan.close() 615 for i in [writefd, readfd1, writefd2]: 616 try: 617 if i: 618 os.close(i) 619 except OSError: 620 pass 621 sock.close() 622 raise 623 624 return server_connection 625 626def sendfds(sock, fds): 627 '''Send an array of fds over an AF_UNIX socket.''' 628 fds = array.array('i', fds) 629 msg = bytes([len(fds) % 256]) 630 sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)]) 631 632def recvfds(sock, size): 633 '''Receive an array of fds over an AF_UNIX socket.''' 634 a = array.array('i') 635 bytes_size = a.itemsize * size 636 msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size)) 637 if not msg and not ancdata: 638 raise EOFError 639 try: 640 if len(ancdata) != 1: 641 raise RuntimeError('received %d items of ancdata' % 642 len(ancdata)) 643 cmsg_level, cmsg_type, cmsg_data = ancdata[0] 644 if (cmsg_level == socket.SOL_SOCKET and 645 cmsg_type == socket.SCM_RIGHTS): 646 if len(cmsg_data) % a.itemsize != 0: 647 raise ValueError 648 a.frombytes(cmsg_data) 649 assert len(a) % 256 == msg[0] 650 return list(a) 651 except (ValueError, IndexError): 652 pass 653 raise RuntimeError('Invalid data received') 654 655class BBUIEventQueue: 656 def __init__(self, readfd): 657 658 self.eventQueue = [] 659 self.eventQueueLock = threading.Lock() 660 self.eventQueueNotify = threading.Event() 661 662 self.reader = ConnectionReader(readfd) 663 664 self.t = threading.Thread() 665 self.t.daemon = True 666 self.t.run = self.startCallbackHandler 667 self.t.start() 668 669 def getEvent(self): 670 self.eventQueueLock.acquire() 671 672 if len(self.eventQueue) == 0: 673 self.eventQueueLock.release() 674 return None 675 676 item = self.eventQueue.pop(0) 677 678 if len(self.eventQueue) == 0: 679 self.eventQueueNotify.clear() 680 681 self.eventQueueLock.release() 682 return item 683 684 def waitEvent(self, delay): 685 self.eventQueueNotify.wait(delay) 686 return self.getEvent() 687 688 def queue_event(self, event): 689 self.eventQueueLock.acquire() 690 self.eventQueue.append(event) 691 self.eventQueueNotify.set() 692 self.eventQueueLock.release() 693 694 def send_event(self, event): 695 self.queue_event(pickle.loads(event)) 696 697 def startCallbackHandler(self): 698 bb.utils.set_process_name("UIEventQueue") 699 while True: 700 try: 701 self.reader.wait() 702 event = self.reader.get() 703 self.queue_event(event) 704 except EOFError: 705 # Easiest way to exit is to close the file descriptor to cause an exit 706 break 707 self.reader.close() 708 709class ConnectionReader(object): 710 711 def __init__(self, fd): 712 self.reader = multiprocessing.connection.Connection(fd, writable=False) 713 self.rlock = multiprocessing.Lock() 714 715 def wait(self, timeout=None): 716 return multiprocessing.connection.wait([self.reader], timeout) 717 718 def poll(self, timeout=None): 719 return self.reader.poll(timeout) 720 721 def get(self): 722 with self.rlock: 723 res = self.reader.recv_bytes() 724 return multiprocessing.reduction.ForkingPickler.loads(res) 725 726 def fileno(self): 727 return self.reader.fileno() 728 729 def close(self): 730 return self.reader.close() 731 732 733class ConnectionWriter(object): 734 735 def __init__(self, fd): 736 self.writer = multiprocessing.connection.Connection(fd, readable=False) 737 self.wlock = multiprocessing.Lock() 738 # Why bb.event needs this I have no idea 739 self.event = self 740 741 def _send(self, obj): 742 gc.disable() 743 with self.wlock: 744 self.writer.send_bytes(obj) 745 gc.enable() 746 747 def send(self, obj): 748 obj = multiprocessing.reduction.ForkingPickler.dumps(obj) 749 # See notes/code in CookerParser 750 # We must not terminate holding this lock else processes will hang. 751 # For SIGTERM, raising afterwards avoids this. 752 # For SIGINT, we don't want to have written partial data to the pipe. 753 # pthread_sigmask block/unblock would be nice but doesn't work, https://bugs.python.org/issue47139 754 process = multiprocessing.current_process() 755 if process and hasattr(process, "queue_signals"): 756 with process.signal_threadlock: 757 process.queue_signals = True 758 self._send(obj) 759 process.queue_signals = False 760 try: 761 for sig in process.signal_received.pop(): 762 process.handle_sig(sig, None) 763 except IndexError: 764 pass 765 else: 766 self._send(obj) 767 768 def fileno(self): 769 return self.writer.fileno() 770 771 def close(self): 772 return self.writer.close() 773