1*4882a593Smuzhiyun# 2*4882a593Smuzhiyun# BitBake Process based server. 3*4882a593Smuzhiyun# 4*4882a593Smuzhiyun# Copyright (C) 2010 Bob Foerster <robert@erafx.com> 5*4882a593Smuzhiyun# 6*4882a593Smuzhiyun# SPDX-License-Identifier: GPL-2.0-only 7*4882a593Smuzhiyun# 8*4882a593Smuzhiyun 9*4882a593Smuzhiyun""" 10*4882a593Smuzhiyun This module implements a multiprocessing.Process based server for bitbake. 11*4882a593Smuzhiyun""" 12*4882a593Smuzhiyun 13*4882a593Smuzhiyunimport bb 14*4882a593Smuzhiyunimport bb.event 15*4882a593Smuzhiyunimport logging 16*4882a593Smuzhiyunimport multiprocessing 17*4882a593Smuzhiyunimport threading 18*4882a593Smuzhiyunimport array 19*4882a593Smuzhiyunimport os 20*4882a593Smuzhiyunimport sys 21*4882a593Smuzhiyunimport time 22*4882a593Smuzhiyunimport select 23*4882a593Smuzhiyunimport socket 24*4882a593Smuzhiyunimport subprocess 25*4882a593Smuzhiyunimport errno 26*4882a593Smuzhiyunimport re 27*4882a593Smuzhiyunimport datetime 28*4882a593Smuzhiyunimport pickle 29*4882a593Smuzhiyunimport traceback 30*4882a593Smuzhiyunimport gc 31*4882a593Smuzhiyunimport bb.server.xmlrpcserver 32*4882a593Smuzhiyunfrom bb import daemonize 33*4882a593Smuzhiyunfrom multiprocessing import queues 34*4882a593Smuzhiyun 35*4882a593Smuzhiyunlogger = logging.getLogger('BitBake') 36*4882a593Smuzhiyun 37*4882a593Smuzhiyunclass ProcessTimeout(SystemExit): 38*4882a593Smuzhiyun pass 39*4882a593Smuzhiyun 40*4882a593Smuzhiyundef serverlog(msg): 41*4882a593Smuzhiyun print(str(os.getpid()) + " " + datetime.datetime.now().strftime('%H:%M:%S.%f') + " " + msg) 42*4882a593Smuzhiyun sys.stdout.flush() 43*4882a593Smuzhiyun 44*4882a593Smuzhiyunclass ProcessServer(): 45*4882a593Smuzhiyun profile_filename = "profile.log" 46*4882a593Smuzhiyun profile_processed_filename = "profile.log.processed" 47*4882a593Smuzhiyun 48*4882a593Smuzhiyun def __init__(self, lock, lockname, sock, sockname, server_timeout, xmlrpcinterface): 49*4882a593Smuzhiyun self.command_channel = False 50*4882a593Smuzhiyun self.command_channel_reply = False 51*4882a593Smuzhiyun self.quit = False 52*4882a593Smuzhiyun self.heartbeat_seconds = 1 # default, BB_HEARTBEAT_EVENT will be checked once we have a datastore. 53*4882a593Smuzhiyun self.next_heartbeat = time.time() 54*4882a593Smuzhiyun 55*4882a593Smuzhiyun self.event_handle = None 56*4882a593Smuzhiyun self.hadanyui = False 57*4882a593Smuzhiyun self.haveui = False 58*4882a593Smuzhiyun self.maxuiwait = 30 59*4882a593Smuzhiyun self.xmlrpc = False 60*4882a593Smuzhiyun 61*4882a593Smuzhiyun self._idlefuns = {} 62*4882a593Smuzhiyun 63*4882a593Smuzhiyun self.bitbake_lock = lock 64*4882a593Smuzhiyun self.bitbake_lock_name = lockname 65*4882a593Smuzhiyun self.sock = sock 66*4882a593Smuzhiyun self.sockname = sockname 67*4882a593Smuzhiyun 68*4882a593Smuzhiyun self.server_timeout = server_timeout 69*4882a593Smuzhiyun self.timeout = self.server_timeout 70*4882a593Smuzhiyun self.xmlrpcinterface = xmlrpcinterface 71*4882a593Smuzhiyun 72*4882a593Smuzhiyun def register_idle_function(self, function, data): 73*4882a593Smuzhiyun """Register a function to be called while the server is idle""" 74*4882a593Smuzhiyun assert hasattr(function, '__call__') 75*4882a593Smuzhiyun self._idlefuns[function] = data 76*4882a593Smuzhiyun 77*4882a593Smuzhiyun def run(self): 78*4882a593Smuzhiyun 79*4882a593Smuzhiyun if self.xmlrpcinterface[0]: 80*4882a593Smuzhiyun self.xmlrpc = bb.server.xmlrpcserver.BitBakeXMLRPCServer(self.xmlrpcinterface, self.cooker, self) 81*4882a593Smuzhiyun 82*4882a593Smuzhiyun serverlog("Bitbake XMLRPC server address: %s, server port: %s" % (self.xmlrpc.host, self.xmlrpc.port)) 83*4882a593Smuzhiyun 84*4882a593Smuzhiyun try: 85*4882a593Smuzhiyun self.bitbake_lock.seek(0) 86*4882a593Smuzhiyun self.bitbake_lock.truncate() 87*4882a593Smuzhiyun if self.xmlrpc: 88*4882a593Smuzhiyun self.bitbake_lock.write("%s %s:%s\n" % (os.getpid(), self.xmlrpc.host, self.xmlrpc.port)) 89*4882a593Smuzhiyun else: 90*4882a593Smuzhiyun self.bitbake_lock.write("%s\n" % (os.getpid())) 91*4882a593Smuzhiyun self.bitbake_lock.flush() 92*4882a593Smuzhiyun except Exception as e: 93*4882a593Smuzhiyun serverlog("Error writing to lock file: %s" % str(e)) 94*4882a593Smuzhiyun pass 95*4882a593Smuzhiyun 96*4882a593Smuzhiyun if self.cooker.configuration.profile: 97*4882a593Smuzhiyun try: 98*4882a593Smuzhiyun import cProfile as profile 99*4882a593Smuzhiyun except: 100*4882a593Smuzhiyun import profile 101*4882a593Smuzhiyun prof = profile.Profile() 102*4882a593Smuzhiyun 103*4882a593Smuzhiyun ret = profile.Profile.runcall(prof, self.main) 104*4882a593Smuzhiyun 105*4882a593Smuzhiyun prof.dump_stats("profile.log") 106*4882a593Smuzhiyun bb.utils.process_profilelog("profile.log") 107*4882a593Smuzhiyun serverlog("Raw profiling information saved to profile.log and processed statistics to profile.log.processed") 108*4882a593Smuzhiyun 109*4882a593Smuzhiyun else: 110*4882a593Smuzhiyun ret = self.main() 111*4882a593Smuzhiyun 112*4882a593Smuzhiyun return ret 113*4882a593Smuzhiyun 114*4882a593Smuzhiyun def main(self): 115*4882a593Smuzhiyun self.cooker.pre_serve() 116*4882a593Smuzhiyun 117*4882a593Smuzhiyun bb.utils.set_process_name("Cooker") 118*4882a593Smuzhiyun 119*4882a593Smuzhiyun ready = [] 120*4882a593Smuzhiyun newconnections = [] 121*4882a593Smuzhiyun 122*4882a593Smuzhiyun self.controllersock = False 123*4882a593Smuzhiyun fds = [self.sock] 124*4882a593Smuzhiyun if self.xmlrpc: 125*4882a593Smuzhiyun fds.append(self.xmlrpc) 126*4882a593Smuzhiyun seendata = False 127*4882a593Smuzhiyun serverlog("Entering server connection loop") 128*4882a593Smuzhiyun 129*4882a593Smuzhiyun def disconnect_client(self, fds): 130*4882a593Smuzhiyun serverlog("Disconnecting Client") 131*4882a593Smuzhiyun if self.controllersock: 132*4882a593Smuzhiyun fds.remove(self.controllersock) 133*4882a593Smuzhiyun self.controllersock.close() 134*4882a593Smuzhiyun self.controllersock = False 135*4882a593Smuzhiyun if self.haveui: 136*4882a593Smuzhiyun fds.remove(self.command_channel) 137*4882a593Smuzhiyun bb.event.unregister_UIHhandler(self.event_handle, True) 138*4882a593Smuzhiyun self.command_channel_reply.writer.close() 139*4882a593Smuzhiyun self.event_writer.writer.close() 140*4882a593Smuzhiyun self.command_channel.close() 141*4882a593Smuzhiyun self.command_channel = False 142*4882a593Smuzhiyun del self.event_writer 143*4882a593Smuzhiyun self.lastui = time.time() 144*4882a593Smuzhiyun self.cooker.clientComplete() 145*4882a593Smuzhiyun self.haveui = False 146*4882a593Smuzhiyun ready = select.select(fds,[],[],0)[0] 147*4882a593Smuzhiyun if newconnections: 148*4882a593Smuzhiyun serverlog("Starting new client") 149*4882a593Smuzhiyun conn = newconnections.pop(-1) 150*4882a593Smuzhiyun fds.append(conn) 151*4882a593Smuzhiyun self.controllersock = conn 152*4882a593Smuzhiyun elif not self.timeout and not ready: 153*4882a593Smuzhiyun serverlog("No timeout, exiting.") 154*4882a593Smuzhiyun self.quit = True 155*4882a593Smuzhiyun 156*4882a593Smuzhiyun self.lastui = time.time() 157*4882a593Smuzhiyun while not self.quit: 158*4882a593Smuzhiyun if self.sock in ready: 159*4882a593Smuzhiyun while select.select([self.sock],[],[],0)[0]: 160*4882a593Smuzhiyun controllersock, address = self.sock.accept() 161*4882a593Smuzhiyun if self.controllersock: 162*4882a593Smuzhiyun serverlog("Queuing %s (%s)" % (str(ready), str(newconnections))) 163*4882a593Smuzhiyun newconnections.append(controllersock) 164*4882a593Smuzhiyun else: 165*4882a593Smuzhiyun serverlog("Accepting %s (%s)" % (str(ready), str(newconnections))) 166*4882a593Smuzhiyun self.controllersock = controllersock 167*4882a593Smuzhiyun fds.append(controllersock) 168*4882a593Smuzhiyun if self.controllersock in ready: 169*4882a593Smuzhiyun try: 170*4882a593Smuzhiyun serverlog("Processing Client") 171*4882a593Smuzhiyun ui_fds = recvfds(self.controllersock, 3) 172*4882a593Smuzhiyun serverlog("Connecting Client") 173*4882a593Smuzhiyun 174*4882a593Smuzhiyun # Where to write events to 175*4882a593Smuzhiyun writer = ConnectionWriter(ui_fds[0]) 176*4882a593Smuzhiyun self.event_handle = bb.event.register_UIHhandler(writer, True) 177*4882a593Smuzhiyun self.event_writer = writer 178*4882a593Smuzhiyun 179*4882a593Smuzhiyun # Where to read commands from 180*4882a593Smuzhiyun reader = ConnectionReader(ui_fds[1]) 181*4882a593Smuzhiyun fds.append(reader) 182*4882a593Smuzhiyun self.command_channel = reader 183*4882a593Smuzhiyun 184*4882a593Smuzhiyun # Where to send command return values to 185*4882a593Smuzhiyun writer = ConnectionWriter(ui_fds[2]) 186*4882a593Smuzhiyun self.command_channel_reply = writer 187*4882a593Smuzhiyun 188*4882a593Smuzhiyun self.haveui = True 189*4882a593Smuzhiyun self.hadanyui = True 190*4882a593Smuzhiyun 191*4882a593Smuzhiyun except (EOFError, OSError): 192*4882a593Smuzhiyun disconnect_client(self, fds) 193*4882a593Smuzhiyun 194*4882a593Smuzhiyun if not self.timeout == -1.0 and not self.haveui and self.timeout and \ 195*4882a593Smuzhiyun (self.lastui + self.timeout) < time.time(): 196*4882a593Smuzhiyun serverlog("Server timeout, exiting.") 197*4882a593Smuzhiyun self.quit = True 198*4882a593Smuzhiyun 199*4882a593Smuzhiyun # If we don't see a UI connection within maxuiwait, its unlikely we're going to see 200*4882a593Smuzhiyun # one. We have had issue with processes hanging indefinitely so timing out UI-less 201*4882a593Smuzhiyun # servers is useful. 202*4882a593Smuzhiyun if not self.hadanyui and not self.xmlrpc and not self.timeout and (self.lastui + self.maxuiwait) < time.time(): 203*4882a593Smuzhiyun serverlog("No UI connection within max timeout, exiting to avoid infinite loop.") 204*4882a593Smuzhiyun self.quit = True 205*4882a593Smuzhiyun 206*4882a593Smuzhiyun if self.command_channel in ready: 207*4882a593Smuzhiyun try: 208*4882a593Smuzhiyun command = self.command_channel.get() 209*4882a593Smuzhiyun except EOFError: 210*4882a593Smuzhiyun # Client connection shutting down 211*4882a593Smuzhiyun ready = [] 212*4882a593Smuzhiyun disconnect_client(self, fds) 213*4882a593Smuzhiyun continue 214*4882a593Smuzhiyun if command[0] == "terminateServer": 215*4882a593Smuzhiyun self.quit = True 216*4882a593Smuzhiyun continue 217*4882a593Smuzhiyun try: 218*4882a593Smuzhiyun serverlog("Running command %s" % command) 219*4882a593Smuzhiyun self.command_channel_reply.send(self.cooker.command.runCommand(command)) 220*4882a593Smuzhiyun serverlog("Command Completed") 221*4882a593Smuzhiyun except Exception as e: 222*4882a593Smuzhiyun stack = traceback.format_exc() 223*4882a593Smuzhiyun serverlog('Exception in server main event loop running command %s (%s)' % (command, stack)) 224*4882a593Smuzhiyun logger.exception('Exception in server main event loop running command %s (%s)' % (command, stack)) 225*4882a593Smuzhiyun 226*4882a593Smuzhiyun if self.xmlrpc in ready: 227*4882a593Smuzhiyun self.xmlrpc.handle_requests() 228*4882a593Smuzhiyun 229*4882a593Smuzhiyun if not seendata and hasattr(self.cooker, "data"): 230*4882a593Smuzhiyun heartbeat_event = self.cooker.data.getVar('BB_HEARTBEAT_EVENT') 231*4882a593Smuzhiyun if heartbeat_event: 232*4882a593Smuzhiyun try: 233*4882a593Smuzhiyun self.heartbeat_seconds = float(heartbeat_event) 234*4882a593Smuzhiyun except: 235*4882a593Smuzhiyun bb.warn('Ignoring invalid BB_HEARTBEAT_EVENT=%s, must be a float specifying seconds.' % heartbeat_event) 236*4882a593Smuzhiyun 237*4882a593Smuzhiyun self.timeout = self.server_timeout or self.cooker.data.getVar('BB_SERVER_TIMEOUT') 238*4882a593Smuzhiyun try: 239*4882a593Smuzhiyun if self.timeout: 240*4882a593Smuzhiyun self.timeout = float(self.timeout) 241*4882a593Smuzhiyun except: 242*4882a593Smuzhiyun bb.warn('Ignoring invalid BB_SERVER_TIMEOUT=%s, must be a float specifying seconds.' % self.timeout) 243*4882a593Smuzhiyun seendata = True 244*4882a593Smuzhiyun 245*4882a593Smuzhiyun ready = self.idle_commands(.1, fds) 246*4882a593Smuzhiyun 247*4882a593Smuzhiyun serverlog("Exiting") 248*4882a593Smuzhiyun # Remove the socket file so we don't get any more connections to avoid races 249*4882a593Smuzhiyun try: 250*4882a593Smuzhiyun os.unlink(self.sockname) 251*4882a593Smuzhiyun except: 252*4882a593Smuzhiyun pass 253*4882a593Smuzhiyun self.sock.close() 254*4882a593Smuzhiyun 255*4882a593Smuzhiyun try: 256*4882a593Smuzhiyun self.cooker.shutdown(True) 257*4882a593Smuzhiyun self.cooker.notifier.stop() 258*4882a593Smuzhiyun self.cooker.confignotifier.stop() 259*4882a593Smuzhiyun except: 260*4882a593Smuzhiyun pass 261*4882a593Smuzhiyun 262*4882a593Smuzhiyun self.cooker.post_serve() 263*4882a593Smuzhiyun 264*4882a593Smuzhiyun if len(threading.enumerate()) != 1: 265*4882a593Smuzhiyun serverlog("More than one thread left?: " + str(threading.enumerate())) 266*4882a593Smuzhiyun 267*4882a593Smuzhiyun # Flush logs before we release the lock 268*4882a593Smuzhiyun sys.stdout.flush() 269*4882a593Smuzhiyun sys.stderr.flush() 270*4882a593Smuzhiyun 271*4882a593Smuzhiyun # Finally release the lockfile but warn about other processes holding it open 272*4882a593Smuzhiyun lock = self.bitbake_lock 273*4882a593Smuzhiyun lockfile = self.bitbake_lock_name 274*4882a593Smuzhiyun 275*4882a593Smuzhiyun def get_lock_contents(lockfile): 276*4882a593Smuzhiyun try: 277*4882a593Smuzhiyun with open(lockfile, "r") as f: 278*4882a593Smuzhiyun return f.readlines() 279*4882a593Smuzhiyun except FileNotFoundError: 280*4882a593Smuzhiyun return None 281*4882a593Smuzhiyun 282*4882a593Smuzhiyun lockcontents = get_lock_contents(lockfile) 283*4882a593Smuzhiyun serverlog("Original lockfile contents: " + str(lockcontents)) 284*4882a593Smuzhiyun 285*4882a593Smuzhiyun lock.close() 286*4882a593Smuzhiyun lock = None 287*4882a593Smuzhiyun 288*4882a593Smuzhiyun while not lock: 289*4882a593Smuzhiyun i = 0 290*4882a593Smuzhiyun lock = None 291*4882a593Smuzhiyun while not lock and i < 30: 292*4882a593Smuzhiyun lock = bb.utils.lockfile(lockfile, shared=False, retry=False, block=False) 293*4882a593Smuzhiyun if not lock: 294*4882a593Smuzhiyun newlockcontents = get_lock_contents(lockfile) 295*4882a593Smuzhiyun if newlockcontents != lockcontents: 296*4882a593Smuzhiyun # A new server was started, the lockfile contents changed, we can exit 297*4882a593Smuzhiyun serverlog("Lockfile now contains different contents, exiting: " + str(newlockcontents)) 298*4882a593Smuzhiyun return 299*4882a593Smuzhiyun time.sleep(0.1) 300*4882a593Smuzhiyun i += 1 301*4882a593Smuzhiyun if lock: 302*4882a593Smuzhiyun # We hold the lock so we can remove the file (hide stale pid data) 303*4882a593Smuzhiyun # via unlockfile. 304*4882a593Smuzhiyun bb.utils.unlockfile(lock) 305*4882a593Smuzhiyun serverlog("Exiting as we could obtain the lock") 306*4882a593Smuzhiyun return 307*4882a593Smuzhiyun 308*4882a593Smuzhiyun if not lock: 309*4882a593Smuzhiyun # Some systems may not have lsof available 310*4882a593Smuzhiyun procs = None 311*4882a593Smuzhiyun try: 312*4882a593Smuzhiyun procs = subprocess.check_output(["lsof", '-w', lockfile], stderr=subprocess.STDOUT) 313*4882a593Smuzhiyun except subprocess.CalledProcessError: 314*4882a593Smuzhiyun # File was deleted? 315*4882a593Smuzhiyun continue 316*4882a593Smuzhiyun except OSError as e: 317*4882a593Smuzhiyun if e.errno != errno.ENOENT: 318*4882a593Smuzhiyun raise 319*4882a593Smuzhiyun if procs is None: 320*4882a593Smuzhiyun # Fall back to fuser if lsof is unavailable 321*4882a593Smuzhiyun try: 322*4882a593Smuzhiyun procs = subprocess.check_output(["fuser", '-v', lockfile], stderr=subprocess.STDOUT) 323*4882a593Smuzhiyun except subprocess.CalledProcessError: 324*4882a593Smuzhiyun # File was deleted? 325*4882a593Smuzhiyun continue 326*4882a593Smuzhiyun except OSError as e: 327*4882a593Smuzhiyun if e.errno != errno.ENOENT: 328*4882a593Smuzhiyun raise 329*4882a593Smuzhiyun 330*4882a593Smuzhiyun msg = ["Delaying shutdown due to active processes which appear to be holding bitbake.lock"] 331*4882a593Smuzhiyun if procs: 332*4882a593Smuzhiyun msg.append(":\n%s" % str(procs.decode("utf-8"))) 333*4882a593Smuzhiyun serverlog("".join(msg)) 334*4882a593Smuzhiyun 335*4882a593Smuzhiyun def idle_commands(self, delay, fds=None): 336*4882a593Smuzhiyun nextsleep = delay 337*4882a593Smuzhiyun if not fds: 338*4882a593Smuzhiyun fds = [] 339*4882a593Smuzhiyun 340*4882a593Smuzhiyun for function, data in list(self._idlefuns.items()): 341*4882a593Smuzhiyun try: 342*4882a593Smuzhiyun retval = function(self, data, False) 343*4882a593Smuzhiyun if retval is False: 344*4882a593Smuzhiyun del self._idlefuns[function] 345*4882a593Smuzhiyun nextsleep = None 346*4882a593Smuzhiyun elif retval is True: 347*4882a593Smuzhiyun nextsleep = None 348*4882a593Smuzhiyun elif isinstance(retval, float) and nextsleep: 349*4882a593Smuzhiyun if (retval < nextsleep): 350*4882a593Smuzhiyun nextsleep = retval 351*4882a593Smuzhiyun elif nextsleep is None: 352*4882a593Smuzhiyun continue 353*4882a593Smuzhiyun else: 354*4882a593Smuzhiyun fds = fds + retval 355*4882a593Smuzhiyun except SystemExit: 356*4882a593Smuzhiyun raise 357*4882a593Smuzhiyun except Exception as exc: 358*4882a593Smuzhiyun if not isinstance(exc, bb.BBHandledException): 359*4882a593Smuzhiyun logger.exception('Running idle function') 360*4882a593Smuzhiyun del self._idlefuns[function] 361*4882a593Smuzhiyun self.quit = True 362*4882a593Smuzhiyun 363*4882a593Smuzhiyun # Create new heartbeat event? 364*4882a593Smuzhiyun now = time.time() 365*4882a593Smuzhiyun if now >= self.next_heartbeat: 366*4882a593Smuzhiyun # We might have missed heartbeats. Just trigger once in 367*4882a593Smuzhiyun # that case and continue after the usual delay. 368*4882a593Smuzhiyun self.next_heartbeat += self.heartbeat_seconds 369*4882a593Smuzhiyun if self.next_heartbeat <= now: 370*4882a593Smuzhiyun self.next_heartbeat = now + self.heartbeat_seconds 371*4882a593Smuzhiyun if hasattr(self.cooker, "data"): 372*4882a593Smuzhiyun heartbeat = bb.event.HeartbeatEvent(now) 373*4882a593Smuzhiyun try: 374*4882a593Smuzhiyun bb.event.fire(heartbeat, self.cooker.data) 375*4882a593Smuzhiyun except Exception as exc: 376*4882a593Smuzhiyun if not isinstance(exc, bb.BBHandledException): 377*4882a593Smuzhiyun logger.exception('Running heartbeat function') 378*4882a593Smuzhiyun self.quit = True 379*4882a593Smuzhiyun if nextsleep and now + nextsleep > self.next_heartbeat: 380*4882a593Smuzhiyun # Shorten timeout so that we we wake up in time for 381*4882a593Smuzhiyun # the heartbeat. 382*4882a593Smuzhiyun nextsleep = self.next_heartbeat - now 383*4882a593Smuzhiyun 384*4882a593Smuzhiyun if nextsleep is not None: 385*4882a593Smuzhiyun if self.xmlrpc: 386*4882a593Smuzhiyun nextsleep = self.xmlrpc.get_timeout(nextsleep) 387*4882a593Smuzhiyun try: 388*4882a593Smuzhiyun return select.select(fds,[],[],nextsleep)[0] 389*4882a593Smuzhiyun except InterruptedError: 390*4882a593Smuzhiyun # Ignore EINTR 391*4882a593Smuzhiyun return [] 392*4882a593Smuzhiyun else: 393*4882a593Smuzhiyun return select.select(fds,[],[],0)[0] 394*4882a593Smuzhiyun 395*4882a593Smuzhiyun 396*4882a593Smuzhiyunclass ServerCommunicator(): 397*4882a593Smuzhiyun def __init__(self, connection, recv): 398*4882a593Smuzhiyun self.connection = connection 399*4882a593Smuzhiyun self.recv = recv 400*4882a593Smuzhiyun 401*4882a593Smuzhiyun def runCommand(self, command): 402*4882a593Smuzhiyun self.connection.send(command) 403*4882a593Smuzhiyun if not self.recv.poll(30): 404*4882a593Smuzhiyun logger.info("No reply from server in 30s") 405*4882a593Smuzhiyun if not self.recv.poll(30): 406*4882a593Smuzhiyun raise ProcessTimeout("Timeout while waiting for a reply from the bitbake server (60s)") 407*4882a593Smuzhiyun ret, exc = self.recv.get() 408*4882a593Smuzhiyun # Should probably turn all exceptions in exc back into exceptions? 409*4882a593Smuzhiyun # For now, at least handle BBHandledException 410*4882a593Smuzhiyun if exc and ("BBHandledException" in exc or "SystemExit" in exc): 411*4882a593Smuzhiyun raise bb.BBHandledException() 412*4882a593Smuzhiyun return ret, exc 413*4882a593Smuzhiyun 414*4882a593Smuzhiyun def updateFeatureSet(self, featureset): 415*4882a593Smuzhiyun _, error = self.runCommand(["setFeatures", featureset]) 416*4882a593Smuzhiyun if error: 417*4882a593Smuzhiyun logger.error("Unable to set the cooker to the correct featureset: %s" % error) 418*4882a593Smuzhiyun raise BaseException(error) 419*4882a593Smuzhiyun 420*4882a593Smuzhiyun def getEventHandle(self): 421*4882a593Smuzhiyun handle, error = self.runCommand(["getUIHandlerNum"]) 422*4882a593Smuzhiyun if error: 423*4882a593Smuzhiyun logger.error("Unable to get UI Handler Number: %s" % error) 424*4882a593Smuzhiyun raise BaseException(error) 425*4882a593Smuzhiyun 426*4882a593Smuzhiyun return handle 427*4882a593Smuzhiyun 428*4882a593Smuzhiyun def terminateServer(self): 429*4882a593Smuzhiyun self.connection.send(['terminateServer']) 430*4882a593Smuzhiyun return 431*4882a593Smuzhiyun 432*4882a593Smuzhiyunclass BitBakeProcessServerConnection(object): 433*4882a593Smuzhiyun def __init__(self, ui_channel, recv, eq, sock): 434*4882a593Smuzhiyun self.connection = ServerCommunicator(ui_channel, recv) 435*4882a593Smuzhiyun self.events = eq 436*4882a593Smuzhiyun # Save sock so it doesn't get gc'd for the life of our connection 437*4882a593Smuzhiyun self.socket_connection = sock 438*4882a593Smuzhiyun 439*4882a593Smuzhiyun def terminate(self): 440*4882a593Smuzhiyun self.socket_connection.close() 441*4882a593Smuzhiyun self.connection.connection.close() 442*4882a593Smuzhiyun self.connection.recv.close() 443*4882a593Smuzhiyun return 444*4882a593Smuzhiyun 445*4882a593Smuzhiyunstart_log_format = '--- Starting bitbake server pid %s at %s ---' 446*4882a593Smuzhiyunstart_log_datetime_format = '%Y-%m-%d %H:%M:%S.%f' 447*4882a593Smuzhiyun 448*4882a593Smuzhiyunclass BitBakeServer(object): 449*4882a593Smuzhiyun 450*4882a593Smuzhiyun def __init__(self, lock, sockname, featureset, server_timeout, xmlrpcinterface): 451*4882a593Smuzhiyun 452*4882a593Smuzhiyun self.server_timeout = server_timeout 453*4882a593Smuzhiyun self.xmlrpcinterface = xmlrpcinterface 454*4882a593Smuzhiyun self.featureset = featureset 455*4882a593Smuzhiyun self.sockname = sockname 456*4882a593Smuzhiyun self.bitbake_lock = lock 457*4882a593Smuzhiyun self.readypipe, self.readypipein = os.pipe() 458*4882a593Smuzhiyun 459*4882a593Smuzhiyun # Place the log in the builddirectory alongside the lock file 460*4882a593Smuzhiyun logfile = os.path.join(os.path.dirname(self.bitbake_lock.name), "bitbake-cookerdaemon.log") 461*4882a593Smuzhiyun self.logfile = logfile 462*4882a593Smuzhiyun 463*4882a593Smuzhiyun startdatetime = datetime.datetime.now() 464*4882a593Smuzhiyun bb.daemonize.createDaemon(self._startServer, logfile) 465*4882a593Smuzhiyun self.bitbake_lock.close() 466*4882a593Smuzhiyun os.close(self.readypipein) 467*4882a593Smuzhiyun 468*4882a593Smuzhiyun ready = ConnectionReader(self.readypipe) 469*4882a593Smuzhiyun r = ready.poll(5) 470*4882a593Smuzhiyun if not r: 471*4882a593Smuzhiyun bb.note("Bitbake server didn't start within 5 seconds, waiting for 90") 472*4882a593Smuzhiyun r = ready.poll(90) 473*4882a593Smuzhiyun if r: 474*4882a593Smuzhiyun try: 475*4882a593Smuzhiyun r = ready.get() 476*4882a593Smuzhiyun except EOFError: 477*4882a593Smuzhiyun # Trap the child exiting/closing the pipe and error out 478*4882a593Smuzhiyun r = None 479*4882a593Smuzhiyun if not r or r[0] != "r": 480*4882a593Smuzhiyun ready.close() 481*4882a593Smuzhiyun bb.error("Unable to start bitbake server (%s)" % str(r)) 482*4882a593Smuzhiyun if os.path.exists(logfile): 483*4882a593Smuzhiyun logstart_re = re.compile(start_log_format % ('([0-9]+)', '([0-9-]+ [0-9:.]+)')) 484*4882a593Smuzhiyun started = False 485*4882a593Smuzhiyun lines = [] 486*4882a593Smuzhiyun lastlines = [] 487*4882a593Smuzhiyun with open(logfile, "r") as f: 488*4882a593Smuzhiyun for line in f: 489*4882a593Smuzhiyun if started: 490*4882a593Smuzhiyun lines.append(line) 491*4882a593Smuzhiyun else: 492*4882a593Smuzhiyun lastlines.append(line) 493*4882a593Smuzhiyun res = logstart_re.search(line.rstrip()) 494*4882a593Smuzhiyun if res: 495*4882a593Smuzhiyun ldatetime = datetime.datetime.strptime(res.group(2), start_log_datetime_format) 496*4882a593Smuzhiyun if ldatetime >= startdatetime: 497*4882a593Smuzhiyun started = True 498*4882a593Smuzhiyun lines.append(line) 499*4882a593Smuzhiyun if len(lastlines) > 60: 500*4882a593Smuzhiyun lastlines = lastlines[-60:] 501*4882a593Smuzhiyun if lines: 502*4882a593Smuzhiyun if len(lines) > 60: 503*4882a593Smuzhiyun bb.error("Last 60 lines of server log for this session (%s):\n%s" % (logfile, "".join(lines[-60:]))) 504*4882a593Smuzhiyun else: 505*4882a593Smuzhiyun bb.error("Server log for this session (%s):\n%s" % (logfile, "".join(lines))) 506*4882a593Smuzhiyun elif lastlines: 507*4882a593Smuzhiyun bb.error("Server didn't start, last 60 loglines (%s):\n%s" % (logfile, "".join(lastlines))) 508*4882a593Smuzhiyun else: 509*4882a593Smuzhiyun bb.error("%s doesn't exist" % logfile) 510*4882a593Smuzhiyun 511*4882a593Smuzhiyun raise SystemExit(1) 512*4882a593Smuzhiyun 513*4882a593Smuzhiyun ready.close() 514*4882a593Smuzhiyun 515*4882a593Smuzhiyun def _startServer(self): 516*4882a593Smuzhiyun os.close(self.readypipe) 517*4882a593Smuzhiyun os.set_inheritable(self.bitbake_lock.fileno(), True) 518*4882a593Smuzhiyun os.set_inheritable(self.readypipein, True) 519*4882a593Smuzhiyun serverscript = os.path.realpath(os.path.dirname(__file__) + "/../../../bin/bitbake-server") 520*4882a593Smuzhiyun os.execl(sys.executable, "bitbake-server", serverscript, "decafbad", str(self.bitbake_lock.fileno()), str(self.readypipein), self.logfile, self.bitbake_lock.name, self.sockname, str(self.server_timeout or 0), str(self.xmlrpcinterface[0]), str(self.xmlrpcinterface[1])) 521*4882a593Smuzhiyun 522*4882a593Smuzhiyundef execServer(lockfd, readypipeinfd, lockname, sockname, server_timeout, xmlrpcinterface): 523*4882a593Smuzhiyun 524*4882a593Smuzhiyun import bb.cookerdata 525*4882a593Smuzhiyun import bb.cooker 526*4882a593Smuzhiyun 527*4882a593Smuzhiyun serverlog(start_log_format % (os.getpid(), datetime.datetime.now().strftime(start_log_datetime_format))) 528*4882a593Smuzhiyun 529*4882a593Smuzhiyun try: 530*4882a593Smuzhiyun bitbake_lock = os.fdopen(lockfd, "w") 531*4882a593Smuzhiyun 532*4882a593Smuzhiyun # Create server control socket 533*4882a593Smuzhiyun if os.path.exists(sockname): 534*4882a593Smuzhiyun os.unlink(sockname) 535*4882a593Smuzhiyun 536*4882a593Smuzhiyun sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 537*4882a593Smuzhiyun # AF_UNIX has path length issues so chdir here to workaround 538*4882a593Smuzhiyun cwd = os.getcwd() 539*4882a593Smuzhiyun try: 540*4882a593Smuzhiyun os.chdir(os.path.dirname(sockname)) 541*4882a593Smuzhiyun sock.bind(os.path.basename(sockname)) 542*4882a593Smuzhiyun finally: 543*4882a593Smuzhiyun os.chdir(cwd) 544*4882a593Smuzhiyun sock.listen(1) 545*4882a593Smuzhiyun 546*4882a593Smuzhiyun server = ProcessServer(bitbake_lock, lockname, sock, sockname, server_timeout, xmlrpcinterface) 547*4882a593Smuzhiyun writer = ConnectionWriter(readypipeinfd) 548*4882a593Smuzhiyun try: 549*4882a593Smuzhiyun featureset = [] 550*4882a593Smuzhiyun cooker = bb.cooker.BBCooker(featureset, server.register_idle_function) 551*4882a593Smuzhiyun except bb.BBHandledException: 552*4882a593Smuzhiyun return None 553*4882a593Smuzhiyun writer.send("r") 554*4882a593Smuzhiyun writer.close() 555*4882a593Smuzhiyun server.cooker = cooker 556*4882a593Smuzhiyun serverlog("Started bitbake server pid %d" % os.getpid()) 557*4882a593Smuzhiyun 558*4882a593Smuzhiyun server.run() 559*4882a593Smuzhiyun finally: 560*4882a593Smuzhiyun # Flush any messages/errors to the logfile before exit 561*4882a593Smuzhiyun sys.stdout.flush() 562*4882a593Smuzhiyun sys.stderr.flush() 563*4882a593Smuzhiyun 564*4882a593Smuzhiyundef connectProcessServer(sockname, featureset): 565*4882a593Smuzhiyun # Connect to socket 566*4882a593Smuzhiyun sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 567*4882a593Smuzhiyun # AF_UNIX has path length issues so chdir here to workaround 568*4882a593Smuzhiyun cwd = os.getcwd() 569*4882a593Smuzhiyun 570*4882a593Smuzhiyun readfd = writefd = readfd1 = writefd1 = readfd2 = writefd2 = None 571*4882a593Smuzhiyun eq = command_chan_recv = command_chan = None 572*4882a593Smuzhiyun 573*4882a593Smuzhiyun sock.settimeout(10) 574*4882a593Smuzhiyun 575*4882a593Smuzhiyun try: 576*4882a593Smuzhiyun try: 577*4882a593Smuzhiyun os.chdir(os.path.dirname(sockname)) 578*4882a593Smuzhiyun finished = False 579*4882a593Smuzhiyun while not finished: 580*4882a593Smuzhiyun try: 581*4882a593Smuzhiyun sock.connect(os.path.basename(sockname)) 582*4882a593Smuzhiyun finished = True 583*4882a593Smuzhiyun except IOError as e: 584*4882a593Smuzhiyun if e.errno == errno.EWOULDBLOCK: 585*4882a593Smuzhiyun pass 586*4882a593Smuzhiyun raise 587*4882a593Smuzhiyun finally: 588*4882a593Smuzhiyun os.chdir(cwd) 589*4882a593Smuzhiyun 590*4882a593Smuzhiyun # Send an fd for the remote to write events to 591*4882a593Smuzhiyun readfd, writefd = os.pipe() 592*4882a593Smuzhiyun eq = BBUIEventQueue(readfd) 593*4882a593Smuzhiyun # Send an fd for the remote to recieve commands from 594*4882a593Smuzhiyun readfd1, writefd1 = os.pipe() 595*4882a593Smuzhiyun command_chan = ConnectionWriter(writefd1) 596*4882a593Smuzhiyun # Send an fd for the remote to write commands results to 597*4882a593Smuzhiyun readfd2, writefd2 = os.pipe() 598*4882a593Smuzhiyun command_chan_recv = ConnectionReader(readfd2) 599*4882a593Smuzhiyun 600*4882a593Smuzhiyun sendfds(sock, [writefd, readfd1, writefd2]) 601*4882a593Smuzhiyun 602*4882a593Smuzhiyun server_connection = BitBakeProcessServerConnection(command_chan, command_chan_recv, eq, sock) 603*4882a593Smuzhiyun 604*4882a593Smuzhiyun # Close the ends of the pipes we won't use 605*4882a593Smuzhiyun for i in [writefd, readfd1, writefd2]: 606*4882a593Smuzhiyun os.close(i) 607*4882a593Smuzhiyun 608*4882a593Smuzhiyun server_connection.connection.updateFeatureSet(featureset) 609*4882a593Smuzhiyun 610*4882a593Smuzhiyun except (Exception, SystemExit) as e: 611*4882a593Smuzhiyun if command_chan_recv: 612*4882a593Smuzhiyun command_chan_recv.close() 613*4882a593Smuzhiyun if command_chan: 614*4882a593Smuzhiyun command_chan.close() 615*4882a593Smuzhiyun for i in [writefd, readfd1, writefd2]: 616*4882a593Smuzhiyun try: 617*4882a593Smuzhiyun if i: 618*4882a593Smuzhiyun os.close(i) 619*4882a593Smuzhiyun except OSError: 620*4882a593Smuzhiyun pass 621*4882a593Smuzhiyun sock.close() 622*4882a593Smuzhiyun raise 623*4882a593Smuzhiyun 624*4882a593Smuzhiyun return server_connection 625*4882a593Smuzhiyun 626*4882a593Smuzhiyundef sendfds(sock, fds): 627*4882a593Smuzhiyun '''Send an array of fds over an AF_UNIX socket.''' 628*4882a593Smuzhiyun fds = array.array('i', fds) 629*4882a593Smuzhiyun msg = bytes([len(fds) % 256]) 630*4882a593Smuzhiyun sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)]) 631*4882a593Smuzhiyun 632*4882a593Smuzhiyundef recvfds(sock, size): 633*4882a593Smuzhiyun '''Receive an array of fds over an AF_UNIX socket.''' 634*4882a593Smuzhiyun a = array.array('i') 635*4882a593Smuzhiyun bytes_size = a.itemsize * size 636*4882a593Smuzhiyun msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size)) 637*4882a593Smuzhiyun if not msg and not ancdata: 638*4882a593Smuzhiyun raise EOFError 639*4882a593Smuzhiyun try: 640*4882a593Smuzhiyun if len(ancdata) != 1: 641*4882a593Smuzhiyun raise RuntimeError('received %d items of ancdata' % 642*4882a593Smuzhiyun len(ancdata)) 643*4882a593Smuzhiyun cmsg_level, cmsg_type, cmsg_data = ancdata[0] 644*4882a593Smuzhiyun if (cmsg_level == socket.SOL_SOCKET and 645*4882a593Smuzhiyun cmsg_type == socket.SCM_RIGHTS): 646*4882a593Smuzhiyun if len(cmsg_data) % a.itemsize != 0: 647*4882a593Smuzhiyun raise ValueError 648*4882a593Smuzhiyun a.frombytes(cmsg_data) 649*4882a593Smuzhiyun assert len(a) % 256 == msg[0] 650*4882a593Smuzhiyun return list(a) 651*4882a593Smuzhiyun except (ValueError, IndexError): 652*4882a593Smuzhiyun pass 653*4882a593Smuzhiyun raise RuntimeError('Invalid data received') 654*4882a593Smuzhiyun 655*4882a593Smuzhiyunclass BBUIEventQueue: 656*4882a593Smuzhiyun def __init__(self, readfd): 657*4882a593Smuzhiyun 658*4882a593Smuzhiyun self.eventQueue = [] 659*4882a593Smuzhiyun self.eventQueueLock = threading.Lock() 660*4882a593Smuzhiyun self.eventQueueNotify = threading.Event() 661*4882a593Smuzhiyun 662*4882a593Smuzhiyun self.reader = ConnectionReader(readfd) 663*4882a593Smuzhiyun 664*4882a593Smuzhiyun self.t = threading.Thread() 665*4882a593Smuzhiyun self.t.daemon = True 666*4882a593Smuzhiyun self.t.run = self.startCallbackHandler 667*4882a593Smuzhiyun self.t.start() 668*4882a593Smuzhiyun 669*4882a593Smuzhiyun def getEvent(self): 670*4882a593Smuzhiyun self.eventQueueLock.acquire() 671*4882a593Smuzhiyun 672*4882a593Smuzhiyun if len(self.eventQueue) == 0: 673*4882a593Smuzhiyun self.eventQueueLock.release() 674*4882a593Smuzhiyun return None 675*4882a593Smuzhiyun 676*4882a593Smuzhiyun item = self.eventQueue.pop(0) 677*4882a593Smuzhiyun 678*4882a593Smuzhiyun if len(self.eventQueue) == 0: 679*4882a593Smuzhiyun self.eventQueueNotify.clear() 680*4882a593Smuzhiyun 681*4882a593Smuzhiyun self.eventQueueLock.release() 682*4882a593Smuzhiyun return item 683*4882a593Smuzhiyun 684*4882a593Smuzhiyun def waitEvent(self, delay): 685*4882a593Smuzhiyun self.eventQueueNotify.wait(delay) 686*4882a593Smuzhiyun return self.getEvent() 687*4882a593Smuzhiyun 688*4882a593Smuzhiyun def queue_event(self, event): 689*4882a593Smuzhiyun self.eventQueueLock.acquire() 690*4882a593Smuzhiyun self.eventQueue.append(event) 691*4882a593Smuzhiyun self.eventQueueNotify.set() 692*4882a593Smuzhiyun self.eventQueueLock.release() 693*4882a593Smuzhiyun 694*4882a593Smuzhiyun def send_event(self, event): 695*4882a593Smuzhiyun self.queue_event(pickle.loads(event)) 696*4882a593Smuzhiyun 697*4882a593Smuzhiyun def startCallbackHandler(self): 698*4882a593Smuzhiyun bb.utils.set_process_name("UIEventQueue") 699*4882a593Smuzhiyun while True: 700*4882a593Smuzhiyun try: 701*4882a593Smuzhiyun self.reader.wait() 702*4882a593Smuzhiyun event = self.reader.get() 703*4882a593Smuzhiyun self.queue_event(event) 704*4882a593Smuzhiyun except EOFError: 705*4882a593Smuzhiyun # Easiest way to exit is to close the file descriptor to cause an exit 706*4882a593Smuzhiyun break 707*4882a593Smuzhiyun self.reader.close() 708*4882a593Smuzhiyun 709*4882a593Smuzhiyunclass ConnectionReader(object): 710*4882a593Smuzhiyun 711*4882a593Smuzhiyun def __init__(self, fd): 712*4882a593Smuzhiyun self.reader = multiprocessing.connection.Connection(fd, writable=False) 713*4882a593Smuzhiyun self.rlock = multiprocessing.Lock() 714*4882a593Smuzhiyun 715*4882a593Smuzhiyun def wait(self, timeout=None): 716*4882a593Smuzhiyun return multiprocessing.connection.wait([self.reader], timeout) 717*4882a593Smuzhiyun 718*4882a593Smuzhiyun def poll(self, timeout=None): 719*4882a593Smuzhiyun return self.reader.poll(timeout) 720*4882a593Smuzhiyun 721*4882a593Smuzhiyun def get(self): 722*4882a593Smuzhiyun with self.rlock: 723*4882a593Smuzhiyun res = self.reader.recv_bytes() 724*4882a593Smuzhiyun return multiprocessing.reduction.ForkingPickler.loads(res) 725*4882a593Smuzhiyun 726*4882a593Smuzhiyun def fileno(self): 727*4882a593Smuzhiyun return self.reader.fileno() 728*4882a593Smuzhiyun 729*4882a593Smuzhiyun def close(self): 730*4882a593Smuzhiyun return self.reader.close() 731*4882a593Smuzhiyun 732*4882a593Smuzhiyun 733*4882a593Smuzhiyunclass ConnectionWriter(object): 734*4882a593Smuzhiyun 735*4882a593Smuzhiyun def __init__(self, fd): 736*4882a593Smuzhiyun self.writer = multiprocessing.connection.Connection(fd, readable=False) 737*4882a593Smuzhiyun self.wlock = multiprocessing.Lock() 738*4882a593Smuzhiyun # Why bb.event needs this I have no idea 739*4882a593Smuzhiyun self.event = self 740*4882a593Smuzhiyun 741*4882a593Smuzhiyun def _send(self, obj): 742*4882a593Smuzhiyun gc.disable() 743*4882a593Smuzhiyun with self.wlock: 744*4882a593Smuzhiyun self.writer.send_bytes(obj) 745*4882a593Smuzhiyun gc.enable() 746*4882a593Smuzhiyun 747*4882a593Smuzhiyun def send(self, obj): 748*4882a593Smuzhiyun obj = multiprocessing.reduction.ForkingPickler.dumps(obj) 749*4882a593Smuzhiyun # See notes/code in CookerParser 750*4882a593Smuzhiyun # We must not terminate holding this lock else processes will hang. 751*4882a593Smuzhiyun # For SIGTERM, raising afterwards avoids this. 752*4882a593Smuzhiyun # For SIGINT, we don't want to have written partial data to the pipe. 753*4882a593Smuzhiyun # pthread_sigmask block/unblock would be nice but doesn't work, https://bugs.python.org/issue47139 754*4882a593Smuzhiyun process = multiprocessing.current_process() 755*4882a593Smuzhiyun if process and hasattr(process, "queue_signals"): 756*4882a593Smuzhiyun with process.signal_threadlock: 757*4882a593Smuzhiyun process.queue_signals = True 758*4882a593Smuzhiyun self._send(obj) 759*4882a593Smuzhiyun process.queue_signals = False 760*4882a593Smuzhiyun try: 761*4882a593Smuzhiyun for sig in process.signal_received.pop(): 762*4882a593Smuzhiyun process.handle_sig(sig, None) 763*4882a593Smuzhiyun except IndexError: 764*4882a593Smuzhiyun pass 765*4882a593Smuzhiyun else: 766*4882a593Smuzhiyun self._send(obj) 767*4882a593Smuzhiyun 768*4882a593Smuzhiyun def fileno(self): 769*4882a593Smuzhiyun return self.writer.fileno() 770*4882a593Smuzhiyun 771*4882a593Smuzhiyun def close(self): 772*4882a593Smuzhiyun return self.writer.close() 773