xref: /OK3568_Linux_fs/yocto/poky/bitbake/lib/bb/server/process.py (revision 4882a59341e53eb6f0b4789bf948001014eff981)
1#
2# BitBake Process based server.
3#
4# Copyright (C) 2010 Bob Foerster <robert@erafx.com>
5#
6# SPDX-License-Identifier: GPL-2.0-only
7#
8
9"""
10    This module implements a multiprocessing.Process based server for bitbake.
11"""
12
13import bb
14import bb.event
15import logging
16import multiprocessing
17import threading
18import array
19import os
20import sys
21import time
22import select
23import socket
24import subprocess
25import errno
26import re
27import datetime
28import pickle
29import traceback
30import gc
31import bb.server.xmlrpcserver
32from bb import daemonize
33from multiprocessing import queues
34
35logger = logging.getLogger('BitBake')
36
37class ProcessTimeout(SystemExit):
38    pass
39
40def serverlog(msg):
41    print(str(os.getpid()) + " " +  datetime.datetime.now().strftime('%H:%M:%S.%f') + " " + msg)
42    sys.stdout.flush()
43
44class ProcessServer():
45    profile_filename = "profile.log"
46    profile_processed_filename = "profile.log.processed"
47
48    def __init__(self, lock, lockname, sock, sockname, server_timeout, xmlrpcinterface):
49        self.command_channel = False
50        self.command_channel_reply = False
51        self.quit = False
52        self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore.
53        self.next_heartbeat = time.time()
54
55        self.event_handle = None
56        self.hadanyui = False
57        self.haveui = False
58        self.maxuiwait = 30
59        self.xmlrpc = False
60
61        self._idlefuns = {}
62
63        self.bitbake_lock = lock
64        self.bitbake_lock_name = lockname
65        self.sock = sock
66        self.sockname = sockname
67
68        self.server_timeout = server_timeout
69        self.timeout = self.server_timeout
70        self.xmlrpcinterface = xmlrpcinterface
71
72    def register_idle_function(self, function, data):
73        """Register a function to be called while the server is idle"""
74        assert hasattr(function, '__call__')
75        self._idlefuns[function] = data
76
77    def run(self):
78
79        if self.xmlrpcinterface[0]:
80            self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self)
81
82            serverlog("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port))
83
84        try:
85            self.bitbake_lock.seek(0)
86            self.bitbake_lock.truncate()
87            if self.xmlrpc:
88                self.bitbake_lock.write("%s %s:%s\n" % (os.getpid(), self.xmlrpc.host, self.xmlrpc.port))
89            else:
90                self.bitbake_lock.write("%s\n" % (os.getpid()))
91            self.bitbake_lock.flush()
92        except Exception as e:
93            serverlog("Error writing to lock file: %s" % str(e))
94            pass
95
96        if self.cooker.configuration.profile:
97            try:
98                import cProfile as profile
99            except:
100                import profile
101            prof = profile.Profile()
102
103            ret = profile.Profile.runcall(prof, self.main)
104
105            prof.dump_stats("profile.log")
106            bb.utils.process_profilelog("profile.log")
107            serverlog("Raw profiling information saved to profile.log and processed statistics to profile.log.processed")
108
109        else:
110            ret = self.main()
111
112        return ret
113
114    def main(self):
115        self.cooker.pre_serve()
116
117        bb.utils.set_process_name("Cooker")
118
119        ready = []
120        newconnections = []
121
122        self.controllersock = False
123        fds = [self.sock]
124        if self.xmlrpc:
125            fds.append(self.xmlrpc)
126        seendata = False
127        serverlog("Entering server connection loop")
128
129        def disconnect_client(self, fds):
130            serverlog("Disconnecting Client")
131            if self.controllersock:
132                fds.remove(self.controllersock)
133                self.controllersock.close()
134                self.controllersock = False
135            if self.haveui:
136                fds.remove(self.command_channel)
137                bb.event.unregister_UIHhandler(self.event_handle, True)
138                self.command_channel_reply.writer.close()
139                self.event_writer.writer.close()
140                self.command_channel.close()
141                self.command_channel = False
142                del self.event_writer
143                self.lastui = time.time()
144                self.cooker.clientComplete()
145                self.haveui = False
146            ready = select.select(fds,[],[],0)[0]
147            if newconnections:
148                serverlog("Starting new client")
149                conn = newconnections.pop(-1)
150                fds.append(conn)
151                self.controllersock = conn
152            elif not self.timeout and not ready:
153                serverlog("No timeout, exiting.")
154                self.quit = True
155
156        self.lastui = time.time()
157        while not self.quit:
158            if self.sock in ready:
159                while select.select([self.sock],[],[],0)[0]:
160                    controllersock, address = self.sock.accept()
161                    if self.controllersock:
162                        serverlog("Queuing %s (%s)" % (str(ready), str(newconnections)))
163                        newconnections.append(controllersock)
164                    else:
165                        serverlog("Accepting %s (%s)" % (str(ready), str(newconnections)))
166                        self.controllersock = controllersock
167                        fds.append(controllersock)
168            if self.controllersock in ready:
169                try:
170                    serverlog("Processing Client")
171                    ui_fds = recvfds(self.controllersock, 3)
172                    serverlog("Connecting Client")
173
174                    # Where to write events to
175                    writer = ConnectionWriter(ui_fds[0])
176                    self.event_handle = bb.event.register_UIHhandler(writer, True)
177                    self.event_writer = writer
178
179                    # Where to read commands from
180                    reader = ConnectionReader(ui_fds[1])
181                    fds.append(reader)
182                    self.command_channel = reader
183
184                    # Where to send command return values to
185                    writer = ConnectionWriter(ui_fds[2])
186                    self.command_channel_reply = writer
187
188                    self.haveui = True
189                    self.hadanyui = True
190
191                except (EOFError, OSError):
192                    disconnect_client(self, fds)
193
194            if not self.timeout == -1.0 and not self.haveui and self.timeout and \
195                    (self.lastui + self.timeout) < time.time():
196                serverlog("Server timeout, exiting.")
197                self.quit = True
198
199            # If we don't see a UI connection within maxuiwait, its unlikely we're going to see
200            # one. We have had issue with processes hanging indefinitely so timing out UI-less
201            # servers is useful.
202            if not self.hadanyui and not self.xmlrpc and not self.timeout and (self.lastui + self.maxuiwait) < time.time():
203                serverlog("No UI connection within max timeout, exiting to avoid infinite loop.")
204                self.quit = True
205
206            if self.command_channel in ready:
207                try:
208                    command = self.command_channel.get()
209                except EOFError:
210                    # Client connection shutting down
211                    ready = []
212                    disconnect_client(self, fds)
213                    continue
214                if command[0] == "terminateServer":
215                    self.quit = True
216                    continue
217                try:
218                    serverlog("Running command %s" % command)
219                    self.command_channel_reply.send(self.cooker.command.runCommand(command))
220                    serverlog("Command Completed")
221                except Exception as e:
222                   stack = traceback.format_exc()
223                   serverlog('Exception in server main event loop running command %s (%s)' % (command, stack))
224                   logger.exception('Exception in server main event loop running command %s (%s)' % (command, stack))
225
226            if self.xmlrpc in ready:
227                self.xmlrpc.handle_requests()
228
229            if not seendata and hasattr(self.cooker, "data"):
230                heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT')
231                if heartbeat_event:
232                    try:
233                        self.heartbeat_seconds = float(heartbeat_event)
234                    except:
235                        bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event)
236
237                self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT')
238                try:
239                    if self.timeout:
240                        self.timeout = float(self.timeout)
241                except:
242                    bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout)
243                seendata = True
244
245            ready = self.idle_commands(.1, fds)
246
247        serverlog("Exiting")
248        # Remove the socket file so we don't get any more connections to avoid races
249        try:
250            os.unlink(self.sockname)
251        except:
252            pass
253        self.sock.close()
254
255        try:
256            self.cooker.shutdown(True)
257            self.cooker.notifier.stop()
258            self.cooker.confignotifier.stop()
259        except:
260            pass
261
262        self.cooker.post_serve()
263
264        if len(threading.enumerate()) != 1:
265            serverlog("More than one thread left?: " + str(threading.enumerate()))
266
267        # Flush logs before we release the lock
268        sys.stdout.flush()
269        sys.stderr.flush()
270
271        # Finally release the lockfile but warn about other processes holding it open
272        lock = self.bitbake_lock
273        lockfile = self.bitbake_lock_name
274
275        def get_lock_contents(lockfile):
276            try:
277                with open(lockfile, "r") as f:
278                    return f.readlines()
279            except FileNotFoundError:
280                return None
281
282        lockcontents = get_lock_contents(lockfile)
283        serverlog("Original lockfile contents: " + str(lockcontents))
284
285        lock.close()
286        lock = None
287
288        while not lock:
289            i = 0
290            lock = None
291            while not lock and i < 30:
292                lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False)
293                if not lock:
294                    newlockcontents = get_lock_contents(lockfile)
295                    if newlockcontents != lockcontents:
296                        # A new server was started, the lockfile contents changed, we can exit
297                        serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents))
298                        return
299                    time.sleep(0.1)
300                i += 1
301            if lock:
302                # We hold the lock so we can remove the file (hide stale pid data)
303                # via unlockfile.
304                bb.utils.unlockfile(lock)
305                serverlog("Exiting as we could obtain the lock")
306                return
307
308            if not lock:
309                # Some systems may not have lsof available
310                procs = None
311                try:
312                    procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT)
313                except subprocess.CalledProcessError:
314                    # File was deleted?
315                    continue
316                except OSError as e:
317                    if e.errno != errno.ENOENT:
318                        raise
319                if procs is None:
320                    # Fall back to fuser if lsof is unavailable
321                    try:
322                        procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT)
323                    except subprocess.CalledProcessError:
324                        # File was deleted?
325                        continue
326                    except OSError as e:
327                        if e.errno != errno.ENOENT:
328                            raise
329
330                msg = ["Delaying shutdown due to active processes which appear to be holding bitbake.lock"]
331                if procs:
332                    msg.append(":\n%s" % str(procs.decode("utf-8")))
333                serverlog("".join(msg))
334
335    def idle_commands(self, delay, fds=None):
336        nextsleep = delay
337        if not fds:
338            fds = []
339
340        for function, data in list(self._idlefuns.items()):
341            try:
342                retval = function(self, data, False)
343                if retval is False:
344                    del self._idlefuns[function]
345                    nextsleep = None
346                elif retval is True:
347                    nextsleep = None
348                elif isinstance(retval, float) and nextsleep:
349                    if (retval < nextsleep):
350                        nextsleep = retval
351                elif nextsleep is None:
352                    continue
353                else:
354                    fds = fds + retval
355            except SystemExit:
356                raise
357            except Exception as exc:
358                if not isinstance(exc, bb.BBHandledException):
359                    logger.exception('Running idle function')
360                del self._idlefuns[function]
361                self.quit = True
362
363        # Create new heartbeat event?
364        now = time.time()
365        if now >= self.next_heartbeat:
366            # We might have missed heartbeats. Just trigger once in
367            # that case and continue after the usual delay.
368            self.next_heartbeat += self.heartbeat_seconds
369            if self.next_heartbeat <= now:
370                self.next_heartbeat = now + self.heartbeat_seconds
371            if hasattr(self.cooker, "data"):
372                heartbeat = bb.event.HeartbeatEvent(now)
373                try:
374                    bb.event.fire(heartbeat, self.cooker.data)
375                except Exception as exc:
376                    if not isinstance(exc, bb.BBHandledException):
377                        logger.exception('Running heartbeat function')
378                    self.quit = True
379        if nextsleep and now + nextsleep > self.next_heartbeat:
380            # Shorten timeout so that we we wake up in time for
381            # the heartbeat.
382            nextsleep = self.next_heartbeat - now
383
384        if nextsleep is not None:
385            if self.xmlrpc:
386                nextsleep = self.xmlrpc.get_timeout(nextsleep)
387            try:
388                return select.select(fds,[],[],nextsleep)[0]
389            except InterruptedError:
390                # Ignore EINTR
391                return []
392        else:
393            return select.select(fds,[],[],0)[0]
394
395
396class ServerCommunicator():
397    def __init__(self, connection, recv):
398        self.connection = connection
399        self.recv = recv
400
401    def runCommand(self, command):
402        self.connection.send(command)
403        if not self.recv.poll(30):
404            logger.info("No reply from server in 30s")
405            if not self.recv.poll(30):
406                raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s)")
407        ret, exc = self.recv.get()
408        # Should probably turn all exceptions in exc back into exceptions?
409        # For now, at least handle BBHandledException
410        if exc and ("BBHandledException" in exc or "SystemExit" in exc):
411            raise bb.BBHandledException()
412        return ret, exc
413
414    def updateFeatureSet(self, featureset):
415        _, error = self.runCommand(["setFeatures", featureset])
416        if error:
417            logger.error("Unable to set the cooker to the correct featureset: %s" % error)
418            raise BaseException(error)
419
420    def getEventHandle(self):
421        handle, error = self.runCommand(["getUIHandlerNum"])
422        if error:
423            logger.error("Unable to get UI Handler Number: %s" % error)
424            raise BaseException(error)
425
426        return handle
427
428    def terminateServer(self):
429        self.connection.send(['terminateServer'])
430        return
431
432class BitBakeProcessServerConnection(object):
433    def __init__(self, ui_channel, recv, eq, sock):
434        self.connection = ServerCommunicator(ui_channel, recv)
435        self.events = eq
436        # Save sock so it doesn't get gc'd for the life of our connection
437        self.socket_connection = sock
438
439    def terminate(self):
440        self.socket_connection.close()
441        self.connection.connection.close()
442        self.connection.recv.close()
443        return
444
445start_log_format = '--- Starting bitbake server pid %s at %s ---'
446start_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f'
447
448class BitBakeServer(object):
449
450    def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface):
451
452        self.server_timeout = server_timeout
453        self.xmlrpcinterface = xmlrpcinterface
454        self.featureset = featureset
455        self.sockname = sockname
456        self.bitbake_lock = lock
457        self.readypipe, self.readypipein = os.pipe()
458
459        # Place the log in the builddirectory alongside the lock file
460        logfile = os.path.join(os.path.dirname(self.bitbake_lock.name), "bitbake-cookerdaemon.log")
461        self.logfile = logfile
462
463        startdatetime = datetime.datetime.now()
464        bb.daemonize.createDaemon(self._startServer, logfile)
465        self.bitbake_lock.close()
466        os.close(self.readypipein)
467
468        ready = ConnectionReader(self.readypipe)
469        r = ready.poll(5)
470        if not r:
471            bb.note("Bitbake server didn't start within 5 seconds, waiting for 90")
472            r = ready.poll(90)
473        if r:
474            try:
475                r = ready.get()
476            except EOFError:
477                # Trap the child exiting/closing the pipe and error out
478                r = None
479        if not r or r[0] != "r":
480            ready.close()
481            bb.error("Unable to start bitbake server (%s)" % str(r))
482            if os.path.exists(logfile):
483                logstart_re = re.compile(start_log_format % ('([0-9]+)', '([0-9-]+ [0-9:.]+)'))
484                started = False
485                lines = []
486                lastlines = []
487                with open(logfile, "r") as f:
488                    for line in f:
489                        if started:
490                            lines.append(line)
491                        else:
492                            lastlines.append(line)
493                            res = logstart_re.search(line.rstrip())
494                            if res:
495                                ldatetime = datetime.datetime.strptime(res.group(2), start_log_datetime_format)
496                                if ldatetime >= startdatetime:
497                                    started = True
498                                    lines.append(line)
499                        if len(lastlines) > 60:
500                            lastlines = lastlines[-60:]
501                if lines:
502                    if len(lines) > 60:
503                        bb.error("Last 60 lines of server log for this session (%s):\n%s" % (logfile, "".join(lines[-60:])))
504                    else:
505                        bb.error("Server log for this session (%s):\n%s" % (logfile, "".join(lines)))
506                elif lastlines:
507                        bb.error("Server didn't start, last 60 loglines (%s):\n%s" % (logfile, "".join(lastlines)))
508            else:
509                bb.error("%s doesn't exist" % logfile)
510
511            raise SystemExit(1)
512
513        ready.close()
514
515    def _startServer(self):
516        os.close(self.readypipe)
517        os.set_inheritable(self.bitbake_lock.fileno(), True)
518        os.set_inheritable(self.readypipein, True)
519        serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server")
520        os.execl(sys.executable, "bitbake-server", serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname,  str(self.server_timeout or 0), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1]))
521
522def execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface):
523
524    import bb.cookerdata
525    import bb.cooker
526
527    serverlog(start_log_format % (os.getpid(), datetime.datetime.now().strftime(start_log_datetime_format)))
528
529    try:
530        bitbake_lock = os.fdopen(lockfd, "w")
531
532        # Create server control socket
533        if os.path.exists(sockname):
534            os.unlink(sockname)
535
536        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
537        # AF_UNIX has path length issues so chdir here to workaround
538        cwd = os.getcwd()
539        try:
540            os.chdir(os.path.dirname(sockname))
541            sock.bind(os.path.basename(sockname))
542        finally:
543            os.chdir(cwd)
544        sock.listen(1)
545
546        server = ProcessServer(bitbake_lock, lockname, sock, sockname, server_timeout, xmlrpcinterface)
547        writer = ConnectionWriter(readypipeinfd)
548        try:
549            featureset = []
550            cooker = bb.cooker.BBCooker(featureset, server.register_idle_function)
551        except bb.BBHandledException:
552            return None
553        writer.send("r")
554        writer.close()
555        server.cooker = cooker
556        serverlog("Started bitbake server pid %d" % os.getpid())
557
558        server.run()
559    finally:
560        # Flush any messages/errors to the logfile before exit
561        sys.stdout.flush()
562        sys.stderr.flush()
563
564def connectProcessServer(sockname, featureset):
565    # Connect to socket
566    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
567    # AF_UNIX has path length issues so chdir here to workaround
568    cwd = os.getcwd()
569
570    readfd = writefd = readfd1 = writefd1 = readfd2 = writefd2 = None
571    eq = command_chan_recv = command_chan = None
572
573    sock.settimeout(10)
574
575    try:
576        try:
577            os.chdir(os.path.dirname(sockname))
578            finished = False
579            while not finished:
580                try:
581                    sock.connect(os.path.basename(sockname))
582                    finished = True
583                except IOError as e:
584                    if e.errno == errno.EWOULDBLOCK:
585                        pass
586                    raise
587        finally:
588            os.chdir(cwd)
589
590        # Send an fd for the remote to write events to
591        readfd, writefd = os.pipe()
592        eq = BBUIEventQueue(readfd)
593        # Send an fd for the remote to recieve commands from
594        readfd1, writefd1 = os.pipe()
595        command_chan = ConnectionWriter(writefd1)
596        # Send an fd for the remote to write commands results to
597        readfd2, writefd2 = os.pipe()
598        command_chan_recv = ConnectionReader(readfd2)
599
600        sendfds(sock, [writefd, readfd1, writefd2])
601
602        server_connection = BitBakeProcessServerConnection(command_chan, command_chan_recv, eq, sock)
603
604        # Close the ends of the pipes we won't use
605        for i in [writefd, readfd1, writefd2]:
606            os.close(i)
607
608        server_connection.connection.updateFeatureSet(featureset)
609
610    except (Exception, SystemExit) as e:
611        if command_chan_recv:
612            command_chan_recv.close()
613        if command_chan:
614            command_chan.close()
615        for i in [writefd, readfd1, writefd2]:
616            try:
617                if i:
618                    os.close(i)
619            except OSError:
620                pass
621        sock.close()
622        raise
623
624    return server_connection
625
626def sendfds(sock, fds):
627        '''Send an array of fds over an AF_UNIX socket.'''
628        fds = array.array('i', fds)
629        msg = bytes([len(fds) % 256])
630        sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
631
632def recvfds(sock, size):
633        '''Receive an array of fds over an AF_UNIX socket.'''
634        a = array.array('i')
635        bytes_size = a.itemsize * size
636        msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
637        if not msg and not ancdata:
638            raise EOFError
639        try:
640            if len(ancdata) != 1:
641                raise RuntimeError('received %d items of ancdata' %
642                                   len(ancdata))
643            cmsg_level, cmsg_type, cmsg_data = ancdata[0]
644            if (cmsg_level == socket.SOL_SOCKET and
645                cmsg_type == socket.SCM_RIGHTS):
646                if len(cmsg_data) % a.itemsize != 0:
647                    raise ValueError
648                a.frombytes(cmsg_data)
649                assert len(a) % 256 == msg[0]
650                return list(a)
651        except (ValueError, IndexError):
652            pass
653        raise RuntimeError('Invalid data received')
654
655class BBUIEventQueue:
656    def __init__(self, readfd):
657
658        self.eventQueue = []
659        self.eventQueueLock = threading.Lock()
660        self.eventQueueNotify = threading.Event()
661
662        self.reader = ConnectionReader(readfd)
663
664        self.t = threading.Thread()
665        self.t.daemon = True
666        self.t.run = self.startCallbackHandler
667        self.t.start()
668
669    def getEvent(self):
670        self.eventQueueLock.acquire()
671
672        if len(self.eventQueue) == 0:
673            self.eventQueueLock.release()
674            return None
675
676        item = self.eventQueue.pop(0)
677
678        if len(self.eventQueue) == 0:
679            self.eventQueueNotify.clear()
680
681        self.eventQueueLock.release()
682        return item
683
684    def waitEvent(self, delay):
685        self.eventQueueNotify.wait(delay)
686        return self.getEvent()
687
688    def queue_event(self, event):
689        self.eventQueueLock.acquire()
690        self.eventQueue.append(event)
691        self.eventQueueNotify.set()
692        self.eventQueueLock.release()
693
694    def send_event(self, event):
695        self.queue_event(pickle.loads(event))
696
697    def startCallbackHandler(self):
698        bb.utils.set_process_name("UIEventQueue")
699        while True:
700            try:
701                self.reader.wait()
702                event = self.reader.get()
703                self.queue_event(event)
704            except EOFError:
705                # Easiest way to exit is to close the file descriptor to cause an exit
706                break
707        self.reader.close()
708
709class ConnectionReader(object):
710
711    def __init__(self, fd):
712        self.reader = multiprocessing.connection.Connection(fd, writable=False)
713        self.rlock = multiprocessing.Lock()
714
715    def wait(self, timeout=None):
716        return multiprocessing.connection.wait([self.reader], timeout)
717
718    def poll(self, timeout=None):
719        return self.reader.poll(timeout)
720
721    def get(self):
722        with self.rlock:
723            res = self.reader.recv_bytes()
724        return multiprocessing.reduction.ForkingPickler.loads(res)
725
726    def fileno(self):
727        return self.reader.fileno()
728
729    def close(self):
730        return self.reader.close()
731
732
733class ConnectionWriter(object):
734
735    def __init__(self, fd):
736        self.writer = multiprocessing.connection.Connection(fd, readable=False)
737        self.wlock = multiprocessing.Lock()
738        # Why bb.event needs this I have no idea
739        self.event = self
740
741    def _send(self, obj):
742        gc.disable()
743        with self.wlock:
744            self.writer.send_bytes(obj)
745        gc.enable()
746
747    def send(self, obj):
748        obj = multiprocessing.reduction.ForkingPickler.dumps(obj)
749        # See notes/code in CookerParser
750        # We must not terminate holding this lock else processes will hang.
751        # For SIGTERM, raising afterwards avoids this.
752        # For SIGINT, we don't want to have written partial data to the pipe.
753        # pthread_sigmask block/unblock would be nice but doesn't work, https://bugs.python.org/issue47139
754        process = multiprocessing.current_process()
755        if process and hasattr(process, "queue_signals"):
756            with process.signal_threadlock:
757                process.queue_signals = True
758                self._send(obj)
759                process.queue_signals = False
760                try:
761                    for sig in process.signal_received.pop():
762                        process.handle_sig(sig, None)
763                except IndexError:
764                    pass
765        else:
766            self._send(obj)
767
768    def fileno(self):
769        return self.writer.fileno()
770
771    def close(self):
772        return self.writer.close()
773