xref: /OK3568_Linux_fs/yocto/poky/bitbake/lib/bb/server/process.py (revision 4882a59341e53eb6f0b4789bf948001014eff981)
1*4882a593Smuzhiyun#
2*4882a593Smuzhiyun# BitBake Process based server.
3*4882a593Smuzhiyun#
4*4882a593Smuzhiyun# Copyright (C) 2010 Bob Foerster <robert@erafx.com>
5*4882a593Smuzhiyun#
6*4882a593Smuzhiyun# SPDX-License-Identifier: GPL-2.0-only
7*4882a593Smuzhiyun#
8*4882a593Smuzhiyun
9*4882a593Smuzhiyun"""
10*4882a593Smuzhiyun    This module implements a multiprocessing.Process based server for bitbake.
11*4882a593Smuzhiyun"""
12*4882a593Smuzhiyun
13*4882a593Smuzhiyunimport bb
14*4882a593Smuzhiyunimport bb.event
15*4882a593Smuzhiyunimport logging
16*4882a593Smuzhiyunimport multiprocessing
17*4882a593Smuzhiyunimport threading
18*4882a593Smuzhiyunimport array
19*4882a593Smuzhiyunimport os
20*4882a593Smuzhiyunimport sys
21*4882a593Smuzhiyunimport time
22*4882a593Smuzhiyunimport select
23*4882a593Smuzhiyunimport socket
24*4882a593Smuzhiyunimport subprocess
25*4882a593Smuzhiyunimport errno
26*4882a593Smuzhiyunimport re
27*4882a593Smuzhiyunimport datetime
28*4882a593Smuzhiyunimport pickle
29*4882a593Smuzhiyunimport traceback
30*4882a593Smuzhiyunimport gc
31*4882a593Smuzhiyunimport bb.server.xmlrpcserver
32*4882a593Smuzhiyunfrom bb import daemonize
33*4882a593Smuzhiyunfrom multiprocessing import queues
34*4882a593Smuzhiyun
35*4882a593Smuzhiyunlogger = logging.getLogger('BitBake')
36*4882a593Smuzhiyun
37*4882a593Smuzhiyunclass ProcessTimeout(SystemExit):
38*4882a593Smuzhiyun    pass
39*4882a593Smuzhiyun
40*4882a593Smuzhiyundef serverlog(msg):
41*4882a593Smuzhiyun    print(str(os.getpid()) + " " +  datetime.datetime.now().strftime('%H:%M:%S.%f') + " " + msg)
42*4882a593Smuzhiyun    sys.stdout.flush()
43*4882a593Smuzhiyun
44*4882a593Smuzhiyunclass ProcessServer():
45*4882a593Smuzhiyun    profile_filename = "profile.log"
46*4882a593Smuzhiyun    profile_processed_filename = "profile.log.processed"
47*4882a593Smuzhiyun
48*4882a593Smuzhiyun    def __init__(self, lock, lockname, sock, sockname, server_timeout, xmlrpcinterface):
49*4882a593Smuzhiyun        self.command_channel = False
50*4882a593Smuzhiyun        self.command_channel_reply = False
51*4882a593Smuzhiyun        self.quit = False
52*4882a593Smuzhiyun        self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore.
53*4882a593Smuzhiyun        self.next_heartbeat = time.time()
54*4882a593Smuzhiyun
55*4882a593Smuzhiyun        self.event_handle = None
56*4882a593Smuzhiyun        self.hadanyui = False
57*4882a593Smuzhiyun        self.haveui = False
58*4882a593Smuzhiyun        self.maxuiwait = 30
59*4882a593Smuzhiyun        self.xmlrpc = False
60*4882a593Smuzhiyun
61*4882a593Smuzhiyun        self._idlefuns = {}
62*4882a593Smuzhiyun
63*4882a593Smuzhiyun        self.bitbake_lock = lock
64*4882a593Smuzhiyun        self.bitbake_lock_name = lockname
65*4882a593Smuzhiyun        self.sock = sock
66*4882a593Smuzhiyun        self.sockname = sockname
67*4882a593Smuzhiyun
68*4882a593Smuzhiyun        self.server_timeout = server_timeout
69*4882a593Smuzhiyun        self.timeout = self.server_timeout
70*4882a593Smuzhiyun        self.xmlrpcinterface = xmlrpcinterface
71*4882a593Smuzhiyun
72*4882a593Smuzhiyun    def register_idle_function(self, function, data):
73*4882a593Smuzhiyun        """Register a function to be called while the server is idle"""
74*4882a593Smuzhiyun        assert hasattr(function, '__call__')
75*4882a593Smuzhiyun        self._idlefuns[function] = data
76*4882a593Smuzhiyun
77*4882a593Smuzhiyun    def run(self):
78*4882a593Smuzhiyun
79*4882a593Smuzhiyun        if self.xmlrpcinterface[0]:
80*4882a593Smuzhiyun            self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self)
81*4882a593Smuzhiyun
82*4882a593Smuzhiyun            serverlog("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port))
83*4882a593Smuzhiyun
84*4882a593Smuzhiyun        try:
85*4882a593Smuzhiyun            self.bitbake_lock.seek(0)
86*4882a593Smuzhiyun            self.bitbake_lock.truncate()
87*4882a593Smuzhiyun            if self.xmlrpc:
88*4882a593Smuzhiyun                self.bitbake_lock.write("%s %s:%s\n" % (os.getpid(), self.xmlrpc.host, self.xmlrpc.port))
89*4882a593Smuzhiyun            else:
90*4882a593Smuzhiyun                self.bitbake_lock.write("%s\n" % (os.getpid()))
91*4882a593Smuzhiyun            self.bitbake_lock.flush()
92*4882a593Smuzhiyun        except Exception as e:
93*4882a593Smuzhiyun            serverlog("Error writing to lock file: %s" % str(e))
94*4882a593Smuzhiyun            pass
95*4882a593Smuzhiyun
96*4882a593Smuzhiyun        if self.cooker.configuration.profile:
97*4882a593Smuzhiyun            try:
98*4882a593Smuzhiyun                import cProfile as profile
99*4882a593Smuzhiyun            except:
100*4882a593Smuzhiyun                import profile
101*4882a593Smuzhiyun            prof = profile.Profile()
102*4882a593Smuzhiyun
103*4882a593Smuzhiyun            ret = profile.Profile.runcall(prof, self.main)
104*4882a593Smuzhiyun
105*4882a593Smuzhiyun            prof.dump_stats("profile.log")
106*4882a593Smuzhiyun            bb.utils.process_profilelog("profile.log")
107*4882a593Smuzhiyun            serverlog("Raw profiling information saved to profile.log and processed statistics to profile.log.processed")
108*4882a593Smuzhiyun
109*4882a593Smuzhiyun        else:
110*4882a593Smuzhiyun            ret = self.main()
111*4882a593Smuzhiyun
112*4882a593Smuzhiyun        return ret
113*4882a593Smuzhiyun
114*4882a593Smuzhiyun    def main(self):
115*4882a593Smuzhiyun        self.cooker.pre_serve()
116*4882a593Smuzhiyun
117*4882a593Smuzhiyun        bb.utils.set_process_name("Cooker")
118*4882a593Smuzhiyun
119*4882a593Smuzhiyun        ready = []
120*4882a593Smuzhiyun        newconnections = []
121*4882a593Smuzhiyun
122*4882a593Smuzhiyun        self.controllersock = False
123*4882a593Smuzhiyun        fds = [self.sock]
124*4882a593Smuzhiyun        if self.xmlrpc:
125*4882a593Smuzhiyun            fds.append(self.xmlrpc)
126*4882a593Smuzhiyun        seendata = False
127*4882a593Smuzhiyun        serverlog("Entering server connection loop")
128*4882a593Smuzhiyun
129*4882a593Smuzhiyun        def disconnect_client(self, fds):
130*4882a593Smuzhiyun            serverlog("Disconnecting Client")
131*4882a593Smuzhiyun            if self.controllersock:
132*4882a593Smuzhiyun                fds.remove(self.controllersock)
133*4882a593Smuzhiyun                self.controllersock.close()
134*4882a593Smuzhiyun                self.controllersock = False
135*4882a593Smuzhiyun            if self.haveui:
136*4882a593Smuzhiyun                fds.remove(self.command_channel)
137*4882a593Smuzhiyun                bb.event.unregister_UIHhandler(self.event_handle, True)
138*4882a593Smuzhiyun                self.command_channel_reply.writer.close()
139*4882a593Smuzhiyun                self.event_writer.writer.close()
140*4882a593Smuzhiyun                self.command_channel.close()
141*4882a593Smuzhiyun                self.command_channel = False
142*4882a593Smuzhiyun                del self.event_writer
143*4882a593Smuzhiyun                self.lastui = time.time()
144*4882a593Smuzhiyun                self.cooker.clientComplete()
145*4882a593Smuzhiyun                self.haveui = False
146*4882a593Smuzhiyun            ready = select.select(fds,[],[],0)[0]
147*4882a593Smuzhiyun            if newconnections:
148*4882a593Smuzhiyun                serverlog("Starting new client")
149*4882a593Smuzhiyun                conn = newconnections.pop(-1)
150*4882a593Smuzhiyun                fds.append(conn)
151*4882a593Smuzhiyun                self.controllersock = conn
152*4882a593Smuzhiyun            elif not self.timeout and not ready:
153*4882a593Smuzhiyun                serverlog("No timeout, exiting.")
154*4882a593Smuzhiyun                self.quit = True
155*4882a593Smuzhiyun
156*4882a593Smuzhiyun        self.lastui = time.time()
157*4882a593Smuzhiyun        while not self.quit:
158*4882a593Smuzhiyun            if self.sock in ready:
159*4882a593Smuzhiyun                while select.select([self.sock],[],[],0)[0]:
160*4882a593Smuzhiyun                    controllersock, address = self.sock.accept()
161*4882a593Smuzhiyun                    if self.controllersock:
162*4882a593Smuzhiyun                        serverlog("Queuing %s (%s)" % (str(ready), str(newconnections)))
163*4882a593Smuzhiyun                        newconnections.append(controllersock)
164*4882a593Smuzhiyun                    else:
165*4882a593Smuzhiyun                        serverlog("Accepting %s (%s)" % (str(ready), str(newconnections)))
166*4882a593Smuzhiyun                        self.controllersock = controllersock
167*4882a593Smuzhiyun                        fds.append(controllersock)
168*4882a593Smuzhiyun            if self.controllersock in ready:
169*4882a593Smuzhiyun                try:
170*4882a593Smuzhiyun                    serverlog("Processing Client")
171*4882a593Smuzhiyun                    ui_fds = recvfds(self.controllersock, 3)
172*4882a593Smuzhiyun                    serverlog("Connecting Client")
173*4882a593Smuzhiyun
174*4882a593Smuzhiyun                    # Where to write events to
175*4882a593Smuzhiyun                    writer = ConnectionWriter(ui_fds[0])
176*4882a593Smuzhiyun                    self.event_handle = bb.event.register_UIHhandler(writer, True)
177*4882a593Smuzhiyun                    self.event_writer = writer
178*4882a593Smuzhiyun
179*4882a593Smuzhiyun                    # Where to read commands from
180*4882a593Smuzhiyun                    reader = ConnectionReader(ui_fds[1])
181*4882a593Smuzhiyun                    fds.append(reader)
182*4882a593Smuzhiyun                    self.command_channel = reader
183*4882a593Smuzhiyun
184*4882a593Smuzhiyun                    # Where to send command return values to
185*4882a593Smuzhiyun                    writer = ConnectionWriter(ui_fds[2])
186*4882a593Smuzhiyun                    self.command_channel_reply = writer
187*4882a593Smuzhiyun
188*4882a593Smuzhiyun                    self.haveui = True
189*4882a593Smuzhiyun                    self.hadanyui = True
190*4882a593Smuzhiyun
191*4882a593Smuzhiyun                except (EOFError, OSError):
192*4882a593Smuzhiyun                    disconnect_client(self, fds)
193*4882a593Smuzhiyun
194*4882a593Smuzhiyun            if not self.timeout == -1.0 and not self.haveui and self.timeout and \
195*4882a593Smuzhiyun                    (self.lastui + self.timeout) < time.time():
196*4882a593Smuzhiyun                serverlog("Server timeout, exiting.")
197*4882a593Smuzhiyun                self.quit = True
198*4882a593Smuzhiyun
199*4882a593Smuzhiyun            # If we don't see a UI connection within maxuiwait, its unlikely we're going to see
200*4882a593Smuzhiyun            # one. We have had issue with processes hanging indefinitely so timing out UI-less
201*4882a593Smuzhiyun            # servers is useful.
202*4882a593Smuzhiyun            if not self.hadanyui and not self.xmlrpc and not self.timeout and (self.lastui + self.maxuiwait) < time.time():
203*4882a593Smuzhiyun                serverlog("No UI connection within max timeout, exiting to avoid infinite loop.")
204*4882a593Smuzhiyun                self.quit = True
205*4882a593Smuzhiyun
206*4882a593Smuzhiyun            if self.command_channel in ready:
207*4882a593Smuzhiyun                try:
208*4882a593Smuzhiyun                    command = self.command_channel.get()
209*4882a593Smuzhiyun                except EOFError:
210*4882a593Smuzhiyun                    # Client connection shutting down
211*4882a593Smuzhiyun                    ready = []
212*4882a593Smuzhiyun                    disconnect_client(self, fds)
213*4882a593Smuzhiyun                    continue
214*4882a593Smuzhiyun                if command[0] == "terminateServer":
215*4882a593Smuzhiyun                    self.quit = True
216*4882a593Smuzhiyun                    continue
217*4882a593Smuzhiyun                try:
218*4882a593Smuzhiyun                    serverlog("Running command %s" % command)
219*4882a593Smuzhiyun                    self.command_channel_reply.send(self.cooker.command.runCommand(command))
220*4882a593Smuzhiyun                    serverlog("Command Completed")
221*4882a593Smuzhiyun                except Exception as e:
222*4882a593Smuzhiyun                   stack = traceback.format_exc()
223*4882a593Smuzhiyun                   serverlog('Exception in server main event loop running command %s (%s)' % (command, stack))
224*4882a593Smuzhiyun                   logger.exception('Exception in server main event loop running command %s (%s)' % (command, stack))
225*4882a593Smuzhiyun
226*4882a593Smuzhiyun            if self.xmlrpc in ready:
227*4882a593Smuzhiyun                self.xmlrpc.handle_requests()
228*4882a593Smuzhiyun
229*4882a593Smuzhiyun            if not seendata and hasattr(self.cooker, "data"):
230*4882a593Smuzhiyun                heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT')
231*4882a593Smuzhiyun                if heartbeat_event:
232*4882a593Smuzhiyun                    try:
233*4882a593Smuzhiyun                        self.heartbeat_seconds = float(heartbeat_event)
234*4882a593Smuzhiyun                    except:
235*4882a593Smuzhiyun                        bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event)
236*4882a593Smuzhiyun
237*4882a593Smuzhiyun                self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT')
238*4882a593Smuzhiyun                try:
239*4882a593Smuzhiyun                    if self.timeout:
240*4882a593Smuzhiyun                        self.timeout = float(self.timeout)
241*4882a593Smuzhiyun                except:
242*4882a593Smuzhiyun                    bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout)
243*4882a593Smuzhiyun                seendata = True
244*4882a593Smuzhiyun
245*4882a593Smuzhiyun            ready = self.idle_commands(.1, fds)
246*4882a593Smuzhiyun
247*4882a593Smuzhiyun        serverlog("Exiting")
248*4882a593Smuzhiyun        # Remove the socket file so we don't get any more connections to avoid races
249*4882a593Smuzhiyun        try:
250*4882a593Smuzhiyun            os.unlink(self.sockname)
251*4882a593Smuzhiyun        except:
252*4882a593Smuzhiyun            pass
253*4882a593Smuzhiyun        self.sock.close()
254*4882a593Smuzhiyun
255*4882a593Smuzhiyun        try:
256*4882a593Smuzhiyun            self.cooker.shutdown(True)
257*4882a593Smuzhiyun            self.cooker.notifier.stop()
258*4882a593Smuzhiyun            self.cooker.confignotifier.stop()
259*4882a593Smuzhiyun        except:
260*4882a593Smuzhiyun            pass
261*4882a593Smuzhiyun
262*4882a593Smuzhiyun        self.cooker.post_serve()
263*4882a593Smuzhiyun
264*4882a593Smuzhiyun        if len(threading.enumerate()) != 1:
265*4882a593Smuzhiyun            serverlog("More than one thread left?: " + str(threading.enumerate()))
266*4882a593Smuzhiyun
267*4882a593Smuzhiyun        # Flush logs before we release the lock
268*4882a593Smuzhiyun        sys.stdout.flush()
269*4882a593Smuzhiyun        sys.stderr.flush()
270*4882a593Smuzhiyun
271*4882a593Smuzhiyun        # Finally release the lockfile but warn about other processes holding it open
272*4882a593Smuzhiyun        lock = self.bitbake_lock
273*4882a593Smuzhiyun        lockfile = self.bitbake_lock_name
274*4882a593Smuzhiyun
275*4882a593Smuzhiyun        def get_lock_contents(lockfile):
276*4882a593Smuzhiyun            try:
277*4882a593Smuzhiyun                with open(lockfile, "r") as f:
278*4882a593Smuzhiyun                    return f.readlines()
279*4882a593Smuzhiyun            except FileNotFoundError:
280*4882a593Smuzhiyun                return None
281*4882a593Smuzhiyun
282*4882a593Smuzhiyun        lockcontents = get_lock_contents(lockfile)
283*4882a593Smuzhiyun        serverlog("Original lockfile contents: " + str(lockcontents))
284*4882a593Smuzhiyun
285*4882a593Smuzhiyun        lock.close()
286*4882a593Smuzhiyun        lock = None
287*4882a593Smuzhiyun
288*4882a593Smuzhiyun        while not lock:
289*4882a593Smuzhiyun            i = 0
290*4882a593Smuzhiyun            lock = None
291*4882a593Smuzhiyun            while not lock and i < 30:
292*4882a593Smuzhiyun                lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False)
293*4882a593Smuzhiyun                if not lock:
294*4882a593Smuzhiyun                    newlockcontents = get_lock_contents(lockfile)
295*4882a593Smuzhiyun                    if newlockcontents != lockcontents:
296*4882a593Smuzhiyun                        # A new server was started, the lockfile contents changed, we can exit
297*4882a593Smuzhiyun                        serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents))
298*4882a593Smuzhiyun                        return
299*4882a593Smuzhiyun                    time.sleep(0.1)
300*4882a593Smuzhiyun                i += 1
301*4882a593Smuzhiyun            if lock:
302*4882a593Smuzhiyun                # We hold the lock so we can remove the file (hide stale pid data)
303*4882a593Smuzhiyun                # via unlockfile.
304*4882a593Smuzhiyun                bb.utils.unlockfile(lock)
305*4882a593Smuzhiyun                serverlog("Exiting as we could obtain the lock")
306*4882a593Smuzhiyun                return
307*4882a593Smuzhiyun
308*4882a593Smuzhiyun            if not lock:
309*4882a593Smuzhiyun                # Some systems may not have lsof available
310*4882a593Smuzhiyun                procs = None
311*4882a593Smuzhiyun                try:
312*4882a593Smuzhiyun                    procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT)
313*4882a593Smuzhiyun                except subprocess.CalledProcessError:
314*4882a593Smuzhiyun                    # File was deleted?
315*4882a593Smuzhiyun                    continue
316*4882a593Smuzhiyun                except OSError as e:
317*4882a593Smuzhiyun                    if e.errno != errno.ENOENT:
318*4882a593Smuzhiyun                        raise
319*4882a593Smuzhiyun                if procs is None:
320*4882a593Smuzhiyun                    # Fall back to fuser if lsof is unavailable
321*4882a593Smuzhiyun                    try:
322*4882a593Smuzhiyun                        procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT)
323*4882a593Smuzhiyun                    except subprocess.CalledProcessError:
324*4882a593Smuzhiyun                        # File was deleted?
325*4882a593Smuzhiyun                        continue
326*4882a593Smuzhiyun                    except OSError as e:
327*4882a593Smuzhiyun                        if e.errno != errno.ENOENT:
328*4882a593Smuzhiyun                            raise
329*4882a593Smuzhiyun
330*4882a593Smuzhiyun                msg = ["Delaying shutdown due to active processes which appear to be holding bitbake.lock"]
331*4882a593Smuzhiyun                if procs:
332*4882a593Smuzhiyun                    msg.append(":\n%s" % str(procs.decode("utf-8")))
333*4882a593Smuzhiyun                serverlog("".join(msg))
334*4882a593Smuzhiyun
335*4882a593Smuzhiyun    def idle_commands(self, delay, fds=None):
336*4882a593Smuzhiyun        nextsleep = delay
337*4882a593Smuzhiyun        if not fds:
338*4882a593Smuzhiyun            fds = []
339*4882a593Smuzhiyun
340*4882a593Smuzhiyun        for function, data in list(self._idlefuns.items()):
341*4882a593Smuzhiyun            try:
342*4882a593Smuzhiyun                retval = function(self, data, False)
343*4882a593Smuzhiyun                if retval is False:
344*4882a593Smuzhiyun                    del self._idlefuns[function]
345*4882a593Smuzhiyun                    nextsleep = None
346*4882a593Smuzhiyun                elif retval is True:
347*4882a593Smuzhiyun                    nextsleep = None
348*4882a593Smuzhiyun                elif isinstance(retval, float) and nextsleep:
349*4882a593Smuzhiyun                    if (retval < nextsleep):
350*4882a593Smuzhiyun                        nextsleep = retval
351*4882a593Smuzhiyun                elif nextsleep is None:
352*4882a593Smuzhiyun                    continue
353*4882a593Smuzhiyun                else:
354*4882a593Smuzhiyun                    fds = fds + retval
355*4882a593Smuzhiyun            except SystemExit:
356*4882a593Smuzhiyun                raise
357*4882a593Smuzhiyun            except Exception as exc:
358*4882a593Smuzhiyun                if not isinstance(exc, bb.BBHandledException):
359*4882a593Smuzhiyun                    logger.exception('Running idle function')
360*4882a593Smuzhiyun                del self._idlefuns[function]
361*4882a593Smuzhiyun                self.quit = True
362*4882a593Smuzhiyun
363*4882a593Smuzhiyun        # Create new heartbeat event?
364*4882a593Smuzhiyun        now = time.time()
365*4882a593Smuzhiyun        if now >= self.next_heartbeat:
366*4882a593Smuzhiyun            # We might have missed heartbeats. Just trigger once in
367*4882a593Smuzhiyun            # that case and continue after the usual delay.
368*4882a593Smuzhiyun            self.next_heartbeat += self.heartbeat_seconds
369*4882a593Smuzhiyun            if self.next_heartbeat <= now:
370*4882a593Smuzhiyun                self.next_heartbeat = now + self.heartbeat_seconds
371*4882a593Smuzhiyun            if hasattr(self.cooker, "data"):
372*4882a593Smuzhiyun                heartbeat = bb.event.HeartbeatEvent(now)
373*4882a593Smuzhiyun                try:
374*4882a593Smuzhiyun                    bb.event.fire(heartbeat, self.cooker.data)
375*4882a593Smuzhiyun                except Exception as exc:
376*4882a593Smuzhiyun                    if not isinstance(exc, bb.BBHandledException):
377*4882a593Smuzhiyun                        logger.exception('Running heartbeat function')
378*4882a593Smuzhiyun                    self.quit = True
379*4882a593Smuzhiyun        if nextsleep and now + nextsleep > self.next_heartbeat:
380*4882a593Smuzhiyun            # Shorten timeout so that we we wake up in time for
381*4882a593Smuzhiyun            # the heartbeat.
382*4882a593Smuzhiyun            nextsleep = self.next_heartbeat - now
383*4882a593Smuzhiyun
384*4882a593Smuzhiyun        if nextsleep is not None:
385*4882a593Smuzhiyun            if self.xmlrpc:
386*4882a593Smuzhiyun                nextsleep = self.xmlrpc.get_timeout(nextsleep)
387*4882a593Smuzhiyun            try:
388*4882a593Smuzhiyun                return select.select(fds,[],[],nextsleep)[0]
389*4882a593Smuzhiyun            except InterruptedError:
390*4882a593Smuzhiyun                # Ignore EINTR
391*4882a593Smuzhiyun                return []
392*4882a593Smuzhiyun        else:
393*4882a593Smuzhiyun            return select.select(fds,[],[],0)[0]
394*4882a593Smuzhiyun
395*4882a593Smuzhiyun
396*4882a593Smuzhiyunclass ServerCommunicator():
397*4882a593Smuzhiyun    def __init__(self, connection, recv):
398*4882a593Smuzhiyun        self.connection = connection
399*4882a593Smuzhiyun        self.recv = recv
400*4882a593Smuzhiyun
401*4882a593Smuzhiyun    def runCommand(self, command):
402*4882a593Smuzhiyun        self.connection.send(command)
403*4882a593Smuzhiyun        if not self.recv.poll(30):
404*4882a593Smuzhiyun            logger.info("No reply from server in 30s")
405*4882a593Smuzhiyun            if not self.recv.poll(30):
406*4882a593Smuzhiyun                raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s)")
407*4882a593Smuzhiyun        ret, exc = self.recv.get()
408*4882a593Smuzhiyun        # Should probably turn all exceptions in exc back into exceptions?
409*4882a593Smuzhiyun        # For now, at least handle BBHandledException
410*4882a593Smuzhiyun        if exc and ("BBHandledException" in exc or "SystemExit" in exc):
411*4882a593Smuzhiyun            raise bb.BBHandledException()
412*4882a593Smuzhiyun        return ret, exc
413*4882a593Smuzhiyun
414*4882a593Smuzhiyun    def updateFeatureSet(self, featureset):
415*4882a593Smuzhiyun        _, error = self.runCommand(["setFeatures", featureset])
416*4882a593Smuzhiyun        if error:
417*4882a593Smuzhiyun            logger.error("Unable to set the cooker to the correct featureset: %s" % error)
418*4882a593Smuzhiyun            raise BaseException(error)
419*4882a593Smuzhiyun
420*4882a593Smuzhiyun    def getEventHandle(self):
421*4882a593Smuzhiyun        handle, error = self.runCommand(["getUIHandlerNum"])
422*4882a593Smuzhiyun        if error:
423*4882a593Smuzhiyun            logger.error("Unable to get UI Handler Number: %s" % error)
424*4882a593Smuzhiyun            raise BaseException(error)
425*4882a593Smuzhiyun
426*4882a593Smuzhiyun        return handle
427*4882a593Smuzhiyun
428*4882a593Smuzhiyun    def terminateServer(self):
429*4882a593Smuzhiyun        self.connection.send(['terminateServer'])
430*4882a593Smuzhiyun        return
431*4882a593Smuzhiyun
432*4882a593Smuzhiyunclass BitBakeProcessServerConnection(object):
433*4882a593Smuzhiyun    def __init__(self, ui_channel, recv, eq, sock):
434*4882a593Smuzhiyun        self.connection = ServerCommunicator(ui_channel, recv)
435*4882a593Smuzhiyun        self.events = eq
436*4882a593Smuzhiyun        # Save sock so it doesn't get gc'd for the life of our connection
437*4882a593Smuzhiyun        self.socket_connection = sock
438*4882a593Smuzhiyun
439*4882a593Smuzhiyun    def terminate(self):
440*4882a593Smuzhiyun        self.socket_connection.close()
441*4882a593Smuzhiyun        self.connection.connection.close()
442*4882a593Smuzhiyun        self.connection.recv.close()
443*4882a593Smuzhiyun        return
444*4882a593Smuzhiyun
445*4882a593Smuzhiyunstart_log_format = '--- Starting bitbake server pid %s at %s ---'
446*4882a593Smuzhiyunstart_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f'
447*4882a593Smuzhiyun
448*4882a593Smuzhiyunclass BitBakeServer(object):
449*4882a593Smuzhiyun
450*4882a593Smuzhiyun    def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface):
451*4882a593Smuzhiyun
452*4882a593Smuzhiyun        self.server_timeout = server_timeout
453*4882a593Smuzhiyun        self.xmlrpcinterface = xmlrpcinterface
454*4882a593Smuzhiyun        self.featureset = featureset
455*4882a593Smuzhiyun        self.sockname = sockname
456*4882a593Smuzhiyun        self.bitbake_lock = lock
457*4882a593Smuzhiyun        self.readypipe, self.readypipein = os.pipe()
458*4882a593Smuzhiyun
459*4882a593Smuzhiyun        # Place the log in the builddirectory alongside the lock file
460*4882a593Smuzhiyun        logfile = os.path.join(os.path.dirname(self.bitbake_lock.name), "bitbake-cookerdaemon.log")
461*4882a593Smuzhiyun        self.logfile = logfile
462*4882a593Smuzhiyun
463*4882a593Smuzhiyun        startdatetime = datetime.datetime.now()
464*4882a593Smuzhiyun        bb.daemonize.createDaemon(self._startServer, logfile)
465*4882a593Smuzhiyun        self.bitbake_lock.close()
466*4882a593Smuzhiyun        os.close(self.readypipein)
467*4882a593Smuzhiyun
468*4882a593Smuzhiyun        ready = ConnectionReader(self.readypipe)
469*4882a593Smuzhiyun        r = ready.poll(5)
470*4882a593Smuzhiyun        if not r:
471*4882a593Smuzhiyun            bb.note("Bitbake server didn't start within 5 seconds, waiting for 90")
472*4882a593Smuzhiyun            r = ready.poll(90)
473*4882a593Smuzhiyun        if r:
474*4882a593Smuzhiyun            try:
475*4882a593Smuzhiyun                r = ready.get()
476*4882a593Smuzhiyun            except EOFError:
477*4882a593Smuzhiyun                # Trap the child exiting/closing the pipe and error out
478*4882a593Smuzhiyun                r = None
479*4882a593Smuzhiyun        if not r or r[0] != "r":
480*4882a593Smuzhiyun            ready.close()
481*4882a593Smuzhiyun            bb.error("Unable to start bitbake server (%s)" % str(r))
482*4882a593Smuzhiyun            if os.path.exists(logfile):
483*4882a593Smuzhiyun                logstart_re = re.compile(start_log_format % ('([0-9]+)', '([0-9-]+ [0-9:.]+)'))
484*4882a593Smuzhiyun                started = False
485*4882a593Smuzhiyun                lines = []
486*4882a593Smuzhiyun                lastlines = []
487*4882a593Smuzhiyun                with open(logfile, "r") as f:
488*4882a593Smuzhiyun                    for line in f:
489*4882a593Smuzhiyun                        if started:
490*4882a593Smuzhiyun                            lines.append(line)
491*4882a593Smuzhiyun                        else:
492*4882a593Smuzhiyun                            lastlines.append(line)
493*4882a593Smuzhiyun                            res = logstart_re.search(line.rstrip())
494*4882a593Smuzhiyun                            if res:
495*4882a593Smuzhiyun                                ldatetime = datetime.datetime.strptime(res.group(2), start_log_datetime_format)
496*4882a593Smuzhiyun                                if ldatetime >= startdatetime:
497*4882a593Smuzhiyun                                    started = True
498*4882a593Smuzhiyun                                    lines.append(line)
499*4882a593Smuzhiyun                        if len(lastlines) > 60:
500*4882a593Smuzhiyun                            lastlines = lastlines[-60:]
501*4882a593Smuzhiyun                if lines:
502*4882a593Smuzhiyun                    if len(lines) > 60:
503*4882a593Smuzhiyun                        bb.error("Last 60 lines of server log for this session (%s):\n%s" % (logfile, "".join(lines[-60:])))
504*4882a593Smuzhiyun                    else:
505*4882a593Smuzhiyun                        bb.error("Server log for this session (%s):\n%s" % (logfile, "".join(lines)))
506*4882a593Smuzhiyun                elif lastlines:
507*4882a593Smuzhiyun                        bb.error("Server didn't start, last 60 loglines (%s):\n%s" % (logfile, "".join(lastlines)))
508*4882a593Smuzhiyun            else:
509*4882a593Smuzhiyun                bb.error("%s doesn't exist" % logfile)
510*4882a593Smuzhiyun
511*4882a593Smuzhiyun            raise SystemExit(1)
512*4882a593Smuzhiyun
513*4882a593Smuzhiyun        ready.close()
514*4882a593Smuzhiyun
515*4882a593Smuzhiyun    def _startServer(self):
516*4882a593Smuzhiyun        os.close(self.readypipe)
517*4882a593Smuzhiyun        os.set_inheritable(self.bitbake_lock.fileno(), True)
518*4882a593Smuzhiyun        os.set_inheritable(self.readypipein, True)
519*4882a593Smuzhiyun        serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server")
520*4882a593Smuzhiyun        os.execl(sys.executable, "bitbake-server", serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname,  str(self.server_timeout or 0), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1]))
521*4882a593Smuzhiyun
522*4882a593Smuzhiyundef execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface):
523*4882a593Smuzhiyun
524*4882a593Smuzhiyun    import bb.cookerdata
525*4882a593Smuzhiyun    import bb.cooker
526*4882a593Smuzhiyun
527*4882a593Smuzhiyun    serverlog(start_log_format % (os.getpid(), datetime.datetime.now().strftime(start_log_datetime_format)))
528*4882a593Smuzhiyun
529*4882a593Smuzhiyun    try:
530*4882a593Smuzhiyun        bitbake_lock = os.fdopen(lockfd, "w")
531*4882a593Smuzhiyun
532*4882a593Smuzhiyun        # Create server control socket
533*4882a593Smuzhiyun        if os.path.exists(sockname):
534*4882a593Smuzhiyun            os.unlink(sockname)
535*4882a593Smuzhiyun
536*4882a593Smuzhiyun        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
537*4882a593Smuzhiyun        # AF_UNIX has path length issues so chdir here to workaround
538*4882a593Smuzhiyun        cwd = os.getcwd()
539*4882a593Smuzhiyun        try:
540*4882a593Smuzhiyun            os.chdir(os.path.dirname(sockname))
541*4882a593Smuzhiyun            sock.bind(os.path.basename(sockname))
542*4882a593Smuzhiyun        finally:
543*4882a593Smuzhiyun            os.chdir(cwd)
544*4882a593Smuzhiyun        sock.listen(1)
545*4882a593Smuzhiyun
546*4882a593Smuzhiyun        server = ProcessServer(bitbake_lock, lockname, sock, sockname, server_timeout, xmlrpcinterface)
547*4882a593Smuzhiyun        writer = ConnectionWriter(readypipeinfd)
548*4882a593Smuzhiyun        try:
549*4882a593Smuzhiyun            featureset = []
550*4882a593Smuzhiyun            cooker = bb.cooker.BBCooker(featureset, server.register_idle_function)
551*4882a593Smuzhiyun        except bb.BBHandledException:
552*4882a593Smuzhiyun            return None
553*4882a593Smuzhiyun        writer.send("r")
554*4882a593Smuzhiyun        writer.close()
555*4882a593Smuzhiyun        server.cooker = cooker
556*4882a593Smuzhiyun        serverlog("Started bitbake server pid %d" % os.getpid())
557*4882a593Smuzhiyun
558*4882a593Smuzhiyun        server.run()
559*4882a593Smuzhiyun    finally:
560*4882a593Smuzhiyun        # Flush any messages/errors to the logfile before exit
561*4882a593Smuzhiyun        sys.stdout.flush()
562*4882a593Smuzhiyun        sys.stderr.flush()
563*4882a593Smuzhiyun
564*4882a593Smuzhiyundef connectProcessServer(sockname, featureset):
565*4882a593Smuzhiyun    # Connect to socket
566*4882a593Smuzhiyun    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
567*4882a593Smuzhiyun    # AF_UNIX has path length issues so chdir here to workaround
568*4882a593Smuzhiyun    cwd = os.getcwd()
569*4882a593Smuzhiyun
570*4882a593Smuzhiyun    readfd = writefd = readfd1 = writefd1 = readfd2 = writefd2 = None
571*4882a593Smuzhiyun    eq = command_chan_recv = command_chan = None
572*4882a593Smuzhiyun
573*4882a593Smuzhiyun    sock.settimeout(10)
574*4882a593Smuzhiyun
575*4882a593Smuzhiyun    try:
576*4882a593Smuzhiyun        try:
577*4882a593Smuzhiyun            os.chdir(os.path.dirname(sockname))
578*4882a593Smuzhiyun            finished = False
579*4882a593Smuzhiyun            while not finished:
580*4882a593Smuzhiyun                try:
581*4882a593Smuzhiyun                    sock.connect(os.path.basename(sockname))
582*4882a593Smuzhiyun                    finished = True
583*4882a593Smuzhiyun                except IOError as e:
584*4882a593Smuzhiyun                    if e.errno == errno.EWOULDBLOCK:
585*4882a593Smuzhiyun                        pass
586*4882a593Smuzhiyun                    raise
587*4882a593Smuzhiyun        finally:
588*4882a593Smuzhiyun            os.chdir(cwd)
589*4882a593Smuzhiyun
590*4882a593Smuzhiyun        # Send an fd for the remote to write events to
591*4882a593Smuzhiyun        readfd, writefd = os.pipe()
592*4882a593Smuzhiyun        eq = BBUIEventQueue(readfd)
593*4882a593Smuzhiyun        # Send an fd for the remote to recieve commands from
594*4882a593Smuzhiyun        readfd1, writefd1 = os.pipe()
595*4882a593Smuzhiyun        command_chan = ConnectionWriter(writefd1)
596*4882a593Smuzhiyun        # Send an fd for the remote to write commands results to
597*4882a593Smuzhiyun        readfd2, writefd2 = os.pipe()
598*4882a593Smuzhiyun        command_chan_recv = ConnectionReader(readfd2)
599*4882a593Smuzhiyun
600*4882a593Smuzhiyun        sendfds(sock, [writefd, readfd1, writefd2])
601*4882a593Smuzhiyun
602*4882a593Smuzhiyun        server_connection = BitBakeProcessServerConnection(command_chan, command_chan_recv, eq, sock)
603*4882a593Smuzhiyun
604*4882a593Smuzhiyun        # Close the ends of the pipes we won't use
605*4882a593Smuzhiyun        for i in [writefd, readfd1, writefd2]:
606*4882a593Smuzhiyun            os.close(i)
607*4882a593Smuzhiyun
608*4882a593Smuzhiyun        server_connection.connection.updateFeatureSet(featureset)
609*4882a593Smuzhiyun
610*4882a593Smuzhiyun    except (Exception, SystemExit) as e:
611*4882a593Smuzhiyun        if command_chan_recv:
612*4882a593Smuzhiyun            command_chan_recv.close()
613*4882a593Smuzhiyun        if command_chan:
614*4882a593Smuzhiyun            command_chan.close()
615*4882a593Smuzhiyun        for i in [writefd, readfd1, writefd2]:
616*4882a593Smuzhiyun            try:
617*4882a593Smuzhiyun                if i:
618*4882a593Smuzhiyun                    os.close(i)
619*4882a593Smuzhiyun            except OSError:
620*4882a593Smuzhiyun                pass
621*4882a593Smuzhiyun        sock.close()
622*4882a593Smuzhiyun        raise
623*4882a593Smuzhiyun
624*4882a593Smuzhiyun    return server_connection
625*4882a593Smuzhiyun
626*4882a593Smuzhiyundef sendfds(sock, fds):
627*4882a593Smuzhiyun        '''Send an array of fds over an AF_UNIX socket.'''
628*4882a593Smuzhiyun        fds = array.array('i', fds)
629*4882a593Smuzhiyun        msg = bytes([len(fds) % 256])
630*4882a593Smuzhiyun        sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
631*4882a593Smuzhiyun
632*4882a593Smuzhiyundef recvfds(sock, size):
633*4882a593Smuzhiyun        '''Receive an array of fds over an AF_UNIX socket.'''
634*4882a593Smuzhiyun        a = array.array('i')
635*4882a593Smuzhiyun        bytes_size = a.itemsize * size
636*4882a593Smuzhiyun        msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
637*4882a593Smuzhiyun        if not msg and not ancdata:
638*4882a593Smuzhiyun            raise EOFError
639*4882a593Smuzhiyun        try:
640*4882a593Smuzhiyun            if len(ancdata) != 1:
641*4882a593Smuzhiyun                raise RuntimeError('received %d items of ancdata' %
642*4882a593Smuzhiyun                                   len(ancdata))
643*4882a593Smuzhiyun            cmsg_level, cmsg_type, cmsg_data = ancdata[0]
644*4882a593Smuzhiyun            if (cmsg_level == socket.SOL_SOCKET and
645*4882a593Smuzhiyun                cmsg_type == socket.SCM_RIGHTS):
646*4882a593Smuzhiyun                if len(cmsg_data) % a.itemsize != 0:
647*4882a593Smuzhiyun                    raise ValueError
648*4882a593Smuzhiyun                a.frombytes(cmsg_data)
649*4882a593Smuzhiyun                assert len(a) % 256 == msg[0]
650*4882a593Smuzhiyun                return list(a)
651*4882a593Smuzhiyun        except (ValueError, IndexError):
652*4882a593Smuzhiyun            pass
653*4882a593Smuzhiyun        raise RuntimeError('Invalid data received')
654*4882a593Smuzhiyun
655*4882a593Smuzhiyunclass BBUIEventQueue:
656*4882a593Smuzhiyun    def __init__(self, readfd):
657*4882a593Smuzhiyun
658*4882a593Smuzhiyun        self.eventQueue = []
659*4882a593Smuzhiyun        self.eventQueueLock = threading.Lock()
660*4882a593Smuzhiyun        self.eventQueueNotify = threading.Event()
661*4882a593Smuzhiyun
662*4882a593Smuzhiyun        self.reader = ConnectionReader(readfd)
663*4882a593Smuzhiyun
664*4882a593Smuzhiyun        self.t = threading.Thread()
665*4882a593Smuzhiyun        self.t.daemon = True
666*4882a593Smuzhiyun        self.t.run = self.startCallbackHandler
667*4882a593Smuzhiyun        self.t.start()
668*4882a593Smuzhiyun
669*4882a593Smuzhiyun    def getEvent(self):
670*4882a593Smuzhiyun        self.eventQueueLock.acquire()
671*4882a593Smuzhiyun
672*4882a593Smuzhiyun        if len(self.eventQueue) == 0:
673*4882a593Smuzhiyun            self.eventQueueLock.release()
674*4882a593Smuzhiyun            return None
675*4882a593Smuzhiyun
676*4882a593Smuzhiyun        item = self.eventQueue.pop(0)
677*4882a593Smuzhiyun
678*4882a593Smuzhiyun        if len(self.eventQueue) == 0:
679*4882a593Smuzhiyun            self.eventQueueNotify.clear()
680*4882a593Smuzhiyun
681*4882a593Smuzhiyun        self.eventQueueLock.release()
682*4882a593Smuzhiyun        return item
683*4882a593Smuzhiyun
684*4882a593Smuzhiyun    def waitEvent(self, delay):
685*4882a593Smuzhiyun        self.eventQueueNotify.wait(delay)
686*4882a593Smuzhiyun        return self.getEvent()
687*4882a593Smuzhiyun
688*4882a593Smuzhiyun    def queue_event(self, event):
689*4882a593Smuzhiyun        self.eventQueueLock.acquire()
690*4882a593Smuzhiyun        self.eventQueue.append(event)
691*4882a593Smuzhiyun        self.eventQueueNotify.set()
692*4882a593Smuzhiyun        self.eventQueueLock.release()
693*4882a593Smuzhiyun
694*4882a593Smuzhiyun    def send_event(self, event):
695*4882a593Smuzhiyun        self.queue_event(pickle.loads(event))
696*4882a593Smuzhiyun
697*4882a593Smuzhiyun    def startCallbackHandler(self):
698*4882a593Smuzhiyun        bb.utils.set_process_name("UIEventQueue")
699*4882a593Smuzhiyun        while True:
700*4882a593Smuzhiyun            try:
701*4882a593Smuzhiyun                self.reader.wait()
702*4882a593Smuzhiyun                event = self.reader.get()
703*4882a593Smuzhiyun                self.queue_event(event)
704*4882a593Smuzhiyun            except EOFError:
705*4882a593Smuzhiyun                # Easiest way to exit is to close the file descriptor to cause an exit
706*4882a593Smuzhiyun                break
707*4882a593Smuzhiyun        self.reader.close()
708*4882a593Smuzhiyun
709*4882a593Smuzhiyunclass ConnectionReader(object):
710*4882a593Smuzhiyun
711*4882a593Smuzhiyun    def __init__(self, fd):
712*4882a593Smuzhiyun        self.reader = multiprocessing.connection.Connection(fd, writable=False)
713*4882a593Smuzhiyun        self.rlock = multiprocessing.Lock()
714*4882a593Smuzhiyun
715*4882a593Smuzhiyun    def wait(self, timeout=None):
716*4882a593Smuzhiyun        return multiprocessing.connection.wait([self.reader], timeout)
717*4882a593Smuzhiyun
718*4882a593Smuzhiyun    def poll(self, timeout=None):
719*4882a593Smuzhiyun        return self.reader.poll(timeout)
720*4882a593Smuzhiyun
721*4882a593Smuzhiyun    def get(self):
722*4882a593Smuzhiyun        with self.rlock:
723*4882a593Smuzhiyun            res = self.reader.recv_bytes()
724*4882a593Smuzhiyun        return multiprocessing.reduction.ForkingPickler.loads(res)
725*4882a593Smuzhiyun
726*4882a593Smuzhiyun    def fileno(self):
727*4882a593Smuzhiyun        return self.reader.fileno()
728*4882a593Smuzhiyun
729*4882a593Smuzhiyun    def close(self):
730*4882a593Smuzhiyun        return self.reader.close()
731*4882a593Smuzhiyun
732*4882a593Smuzhiyun
733*4882a593Smuzhiyunclass ConnectionWriter(object):
734*4882a593Smuzhiyun
735*4882a593Smuzhiyun    def __init__(self, fd):
736*4882a593Smuzhiyun        self.writer = multiprocessing.connection.Connection(fd, readable=False)
737*4882a593Smuzhiyun        self.wlock = multiprocessing.Lock()
738*4882a593Smuzhiyun        # Why bb.event needs this I have no idea
739*4882a593Smuzhiyun        self.event = self
740*4882a593Smuzhiyun
741*4882a593Smuzhiyun    def _send(self, obj):
742*4882a593Smuzhiyun        gc.disable()
743*4882a593Smuzhiyun        with self.wlock:
744*4882a593Smuzhiyun            self.writer.send_bytes(obj)
745*4882a593Smuzhiyun        gc.enable()
746*4882a593Smuzhiyun
747*4882a593Smuzhiyun    def send(self, obj):
748*4882a593Smuzhiyun        obj = multiprocessing.reduction.ForkingPickler.dumps(obj)
749*4882a593Smuzhiyun        # See notes/code in CookerParser
750*4882a593Smuzhiyun        # We must not terminate holding this lock else processes will hang.
751*4882a593Smuzhiyun        # For SIGTERM, raising afterwards avoids this.
752*4882a593Smuzhiyun        # For SIGINT, we don't want to have written partial data to the pipe.
753*4882a593Smuzhiyun        # pthread_sigmask block/unblock would be nice but doesn't work, https://bugs.python.org/issue47139
754*4882a593Smuzhiyun        process = multiprocessing.current_process()
755*4882a593Smuzhiyun        if process and hasattr(process, "queue_signals"):
756*4882a593Smuzhiyun            with process.signal_threadlock:
757*4882a593Smuzhiyun                process.queue_signals = True
758*4882a593Smuzhiyun                self._send(obj)
759*4882a593Smuzhiyun                process.queue_signals = False
760*4882a593Smuzhiyun                try:
761*4882a593Smuzhiyun                    for sig in process.signal_received.pop():
762*4882a593Smuzhiyun                        process.handle_sig(sig, None)
763*4882a593Smuzhiyun                except IndexError:
764*4882a593Smuzhiyun                    pass
765*4882a593Smuzhiyun        else:
766*4882a593Smuzhiyun            self._send(obj)
767*4882a593Smuzhiyun
768*4882a593Smuzhiyun    def fileno(self):
769*4882a593Smuzhiyun        return self.writer.fileno()
770*4882a593Smuzhiyun
771*4882a593Smuzhiyun    def close(self):
772*4882a593Smuzhiyun        return self.writer.close()
773