1*4882a593Smuzhiyun# 2*4882a593Smuzhiyun# Copyright BitBake Contributors 3*4882a593Smuzhiyun# 4*4882a593Smuzhiyun# SPDX-License-Identifier: GPL-2.0-only 5*4882a593Smuzhiyun# 6*4882a593Smuzhiyun 7*4882a593Smuzhiyunimport os,sys,logging 8*4882a593Smuzhiyunimport signal, time 9*4882a593Smuzhiyunimport socket 10*4882a593Smuzhiyunimport io 11*4882a593Smuzhiyunimport sqlite3 12*4882a593Smuzhiyunimport prserv 13*4882a593Smuzhiyunimport prserv.db 14*4882a593Smuzhiyunimport errno 15*4882a593Smuzhiyunimport bb.asyncrpc 16*4882a593Smuzhiyun 17*4882a593Smuzhiyunlogger = logging.getLogger("BitBake.PRserv") 18*4882a593Smuzhiyun 19*4882a593SmuzhiyunPIDPREFIX = "/tmp/PRServer_%s_%s.pid" 20*4882a593Smuzhiyunsingleton = None 21*4882a593Smuzhiyun 22*4882a593Smuzhiyunclass PRServerClient(bb.asyncrpc.AsyncServerConnection): 23*4882a593Smuzhiyun def __init__(self, reader, writer, table, read_only): 24*4882a593Smuzhiyun super().__init__(reader, writer, 'PRSERVICE', logger) 25*4882a593Smuzhiyun self.handlers.update({ 26*4882a593Smuzhiyun 'get-pr': self.handle_get_pr, 27*4882a593Smuzhiyun 'import-one': self.handle_import_one, 28*4882a593Smuzhiyun 'export': self.handle_export, 29*4882a593Smuzhiyun 'is-readonly': self.handle_is_readonly, 30*4882a593Smuzhiyun }) 31*4882a593Smuzhiyun self.table = table 32*4882a593Smuzhiyun self.read_only = read_only 33*4882a593Smuzhiyun 34*4882a593Smuzhiyun def validate_proto_version(self): 35*4882a593Smuzhiyun return (self.proto_version == (1, 0)) 36*4882a593Smuzhiyun 37*4882a593Smuzhiyun async def dispatch_message(self, msg): 38*4882a593Smuzhiyun try: 39*4882a593Smuzhiyun await super().dispatch_message(msg) 40*4882a593Smuzhiyun except: 41*4882a593Smuzhiyun self.table.sync() 42*4882a593Smuzhiyun raise 43*4882a593Smuzhiyun 44*4882a593Smuzhiyun self.table.sync_if_dirty() 45*4882a593Smuzhiyun 46*4882a593Smuzhiyun async def handle_get_pr(self, request): 47*4882a593Smuzhiyun version = request['version'] 48*4882a593Smuzhiyun pkgarch = request['pkgarch'] 49*4882a593Smuzhiyun checksum = request['checksum'] 50*4882a593Smuzhiyun 51*4882a593Smuzhiyun response = None 52*4882a593Smuzhiyun try: 53*4882a593Smuzhiyun value = self.table.getValue(version, pkgarch, checksum) 54*4882a593Smuzhiyun response = {'value': value} 55*4882a593Smuzhiyun except prserv.NotFoundError: 56*4882a593Smuzhiyun logger.error("can not find value for (%s, %s)",version, checksum) 57*4882a593Smuzhiyun except sqlite3.Error as exc: 58*4882a593Smuzhiyun logger.error(str(exc)) 59*4882a593Smuzhiyun 60*4882a593Smuzhiyun self.write_message(response) 61*4882a593Smuzhiyun 62*4882a593Smuzhiyun async def handle_import_one(self, request): 63*4882a593Smuzhiyun response = None 64*4882a593Smuzhiyun if not self.read_only: 65*4882a593Smuzhiyun version = request['version'] 66*4882a593Smuzhiyun pkgarch = request['pkgarch'] 67*4882a593Smuzhiyun checksum = request['checksum'] 68*4882a593Smuzhiyun value = request['value'] 69*4882a593Smuzhiyun 70*4882a593Smuzhiyun value = self.table.importone(version, pkgarch, checksum, value) 71*4882a593Smuzhiyun if value is not None: 72*4882a593Smuzhiyun response = {'value': value} 73*4882a593Smuzhiyun 74*4882a593Smuzhiyun self.write_message(response) 75*4882a593Smuzhiyun 76*4882a593Smuzhiyun async def handle_export(self, request): 77*4882a593Smuzhiyun version = request['version'] 78*4882a593Smuzhiyun pkgarch = request['pkgarch'] 79*4882a593Smuzhiyun checksum = request['checksum'] 80*4882a593Smuzhiyun colinfo = request['colinfo'] 81*4882a593Smuzhiyun 82*4882a593Smuzhiyun try: 83*4882a593Smuzhiyun (metainfo, datainfo) = self.table.export(version, pkgarch, checksum, colinfo) 84*4882a593Smuzhiyun except sqlite3.Error as exc: 85*4882a593Smuzhiyun logger.error(str(exc)) 86*4882a593Smuzhiyun metainfo = datainfo = None 87*4882a593Smuzhiyun 88*4882a593Smuzhiyun response = {'metainfo': metainfo, 'datainfo': datainfo} 89*4882a593Smuzhiyun self.write_message(response) 90*4882a593Smuzhiyun 91*4882a593Smuzhiyun async def handle_is_readonly(self, request): 92*4882a593Smuzhiyun response = {'readonly': self.read_only} 93*4882a593Smuzhiyun self.write_message(response) 94*4882a593Smuzhiyun 95*4882a593Smuzhiyunclass PRServer(bb.asyncrpc.AsyncServer): 96*4882a593Smuzhiyun def __init__(self, dbfile, read_only=False): 97*4882a593Smuzhiyun super().__init__(logger) 98*4882a593Smuzhiyun self.dbfile = dbfile 99*4882a593Smuzhiyun self.table = None 100*4882a593Smuzhiyun self.read_only = read_only 101*4882a593Smuzhiyun 102*4882a593Smuzhiyun def accept_client(self, reader, writer): 103*4882a593Smuzhiyun return PRServerClient(reader, writer, self.table, self.read_only) 104*4882a593Smuzhiyun 105*4882a593Smuzhiyun def _serve_forever(self): 106*4882a593Smuzhiyun self.db = prserv.db.PRData(self.dbfile, read_only=self.read_only) 107*4882a593Smuzhiyun self.table = self.db["PRMAIN"] 108*4882a593Smuzhiyun 109*4882a593Smuzhiyun logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" % 110*4882a593Smuzhiyun (self.dbfile, self.address, str(os.getpid()))) 111*4882a593Smuzhiyun 112*4882a593Smuzhiyun super()._serve_forever() 113*4882a593Smuzhiyun 114*4882a593Smuzhiyun self.table.sync_if_dirty() 115*4882a593Smuzhiyun self.db.disconnect() 116*4882a593Smuzhiyun 117*4882a593Smuzhiyun def signal_handler(self): 118*4882a593Smuzhiyun super().signal_handler() 119*4882a593Smuzhiyun if self.table: 120*4882a593Smuzhiyun self.table.sync() 121*4882a593Smuzhiyun 122*4882a593Smuzhiyunclass PRServSingleton(object): 123*4882a593Smuzhiyun def __init__(self, dbfile, logfile, host, port): 124*4882a593Smuzhiyun self.dbfile = dbfile 125*4882a593Smuzhiyun self.logfile = logfile 126*4882a593Smuzhiyun self.host = host 127*4882a593Smuzhiyun self.port = port 128*4882a593Smuzhiyun 129*4882a593Smuzhiyun def start(self): 130*4882a593Smuzhiyun self.prserv = PRServer(self.dbfile) 131*4882a593Smuzhiyun self.prserv.start_tcp_server(socket.gethostbyname(self.host), self.port) 132*4882a593Smuzhiyun self.process = self.prserv.serve_as_process() 133*4882a593Smuzhiyun 134*4882a593Smuzhiyun if not self.prserv.address: 135*4882a593Smuzhiyun raise PRServiceConfigError 136*4882a593Smuzhiyun if not self.port: 137*4882a593Smuzhiyun self.port = int(self.prserv.address.rsplit(':', 1)[1]) 138*4882a593Smuzhiyun 139*4882a593Smuzhiyundef run_as_daemon(func, pidfile, logfile): 140*4882a593Smuzhiyun """ 141*4882a593Smuzhiyun See Advanced Programming in the UNIX, Sec 13.3 142*4882a593Smuzhiyun """ 143*4882a593Smuzhiyun try: 144*4882a593Smuzhiyun pid = os.fork() 145*4882a593Smuzhiyun if pid > 0: 146*4882a593Smuzhiyun os.waitpid(pid, 0) 147*4882a593Smuzhiyun #parent return instead of exit to give control 148*4882a593Smuzhiyun return pid 149*4882a593Smuzhiyun except OSError as e: 150*4882a593Smuzhiyun raise Exception("%s [%d]" % (e.strerror, e.errno)) 151*4882a593Smuzhiyun 152*4882a593Smuzhiyun os.setsid() 153*4882a593Smuzhiyun """ 154*4882a593Smuzhiyun fork again to make sure the daemon is not session leader, 155*4882a593Smuzhiyun which prevents it from acquiring controlling terminal 156*4882a593Smuzhiyun """ 157*4882a593Smuzhiyun try: 158*4882a593Smuzhiyun pid = os.fork() 159*4882a593Smuzhiyun if pid > 0: #parent 160*4882a593Smuzhiyun os._exit(0) 161*4882a593Smuzhiyun except OSError as e: 162*4882a593Smuzhiyun raise Exception("%s [%d]" % (e.strerror, e.errno)) 163*4882a593Smuzhiyun 164*4882a593Smuzhiyun os.chdir("/") 165*4882a593Smuzhiyun 166*4882a593Smuzhiyun sys.stdout.flush() 167*4882a593Smuzhiyun sys.stderr.flush() 168*4882a593Smuzhiyun 169*4882a593Smuzhiyun # We could be called from a python thread with io.StringIO as 170*4882a593Smuzhiyun # stdout/stderr or it could be 'real' unix fd forking where we need 171*4882a593Smuzhiyun # to physically close the fds to prevent the program launching us from 172*4882a593Smuzhiyun # potentially hanging on a pipe. Handle both cases. 173*4882a593Smuzhiyun si = open('/dev/null', 'r') 174*4882a593Smuzhiyun try: 175*4882a593Smuzhiyun os.dup2(si.fileno(),sys.stdin.fileno()) 176*4882a593Smuzhiyun except (AttributeError, io.UnsupportedOperation): 177*4882a593Smuzhiyun sys.stdin = si 178*4882a593Smuzhiyun so = open(logfile, 'a+') 179*4882a593Smuzhiyun try: 180*4882a593Smuzhiyun os.dup2(so.fileno(),sys.stdout.fileno()) 181*4882a593Smuzhiyun except (AttributeError, io.UnsupportedOperation): 182*4882a593Smuzhiyun sys.stdout = so 183*4882a593Smuzhiyun try: 184*4882a593Smuzhiyun os.dup2(so.fileno(),sys.stderr.fileno()) 185*4882a593Smuzhiyun except (AttributeError, io.UnsupportedOperation): 186*4882a593Smuzhiyun sys.stderr = so 187*4882a593Smuzhiyun 188*4882a593Smuzhiyun # Clear out all log handlers prior to the fork() to avoid calling 189*4882a593Smuzhiyun # event handlers not part of the PRserver 190*4882a593Smuzhiyun for logger_iter in logging.Logger.manager.loggerDict.keys(): 191*4882a593Smuzhiyun logging.getLogger(logger_iter).handlers = [] 192*4882a593Smuzhiyun 193*4882a593Smuzhiyun # Ensure logging makes it to the logfile 194*4882a593Smuzhiyun streamhandler = logging.StreamHandler() 195*4882a593Smuzhiyun streamhandler.setLevel(logging.DEBUG) 196*4882a593Smuzhiyun formatter = bb.msg.BBLogFormatter("%(levelname)s: %(message)s") 197*4882a593Smuzhiyun streamhandler.setFormatter(formatter) 198*4882a593Smuzhiyun logger.addHandler(streamhandler) 199*4882a593Smuzhiyun 200*4882a593Smuzhiyun # write pidfile 201*4882a593Smuzhiyun pid = str(os.getpid()) 202*4882a593Smuzhiyun with open(pidfile, 'w') as pf: 203*4882a593Smuzhiyun pf.write("%s\n" % pid) 204*4882a593Smuzhiyun 205*4882a593Smuzhiyun func() 206*4882a593Smuzhiyun os.remove(pidfile) 207*4882a593Smuzhiyun os._exit(0) 208*4882a593Smuzhiyun 209*4882a593Smuzhiyundef start_daemon(dbfile, host, port, logfile, read_only=False): 210*4882a593Smuzhiyun ip = socket.gethostbyname(host) 211*4882a593Smuzhiyun pidfile = PIDPREFIX % (ip, port) 212*4882a593Smuzhiyun try: 213*4882a593Smuzhiyun with open(pidfile) as pf: 214*4882a593Smuzhiyun pid = int(pf.readline().strip()) 215*4882a593Smuzhiyun except IOError: 216*4882a593Smuzhiyun pid = None 217*4882a593Smuzhiyun 218*4882a593Smuzhiyun if pid: 219*4882a593Smuzhiyun sys.stderr.write("pidfile %s already exist. Daemon already running?\n" 220*4882a593Smuzhiyun % pidfile) 221*4882a593Smuzhiyun return 1 222*4882a593Smuzhiyun 223*4882a593Smuzhiyun dbfile = os.path.abspath(dbfile) 224*4882a593Smuzhiyun def daemon_main(): 225*4882a593Smuzhiyun server = PRServer(dbfile, read_only=read_only) 226*4882a593Smuzhiyun server.start_tcp_server(ip, port) 227*4882a593Smuzhiyun server.serve_forever() 228*4882a593Smuzhiyun 229*4882a593Smuzhiyun run_as_daemon(daemon_main, pidfile, os.path.abspath(logfile)) 230*4882a593Smuzhiyun return 0 231*4882a593Smuzhiyun 232*4882a593Smuzhiyundef stop_daemon(host, port): 233*4882a593Smuzhiyun import glob 234*4882a593Smuzhiyun ip = socket.gethostbyname(host) 235*4882a593Smuzhiyun pidfile = PIDPREFIX % (ip, port) 236*4882a593Smuzhiyun try: 237*4882a593Smuzhiyun with open(pidfile) as pf: 238*4882a593Smuzhiyun pid = int(pf.readline().strip()) 239*4882a593Smuzhiyun except IOError: 240*4882a593Smuzhiyun pid = None 241*4882a593Smuzhiyun 242*4882a593Smuzhiyun if not pid: 243*4882a593Smuzhiyun # when server starts at port=0 (i.e. localhost:0), server actually takes another port, 244*4882a593Smuzhiyun # so at least advise the user which ports the corresponding server is listening 245*4882a593Smuzhiyun ports = [] 246*4882a593Smuzhiyun portstr = "" 247*4882a593Smuzhiyun for pf in glob.glob(PIDPREFIX % (ip,'*')): 248*4882a593Smuzhiyun bn = os.path.basename(pf) 249*4882a593Smuzhiyun root, _ = os.path.splitext(bn) 250*4882a593Smuzhiyun ports.append(root.split('_')[-1]) 251*4882a593Smuzhiyun if len(ports): 252*4882a593Smuzhiyun portstr = "Wrong port? Other ports listening at %s: %s" % (host, ' '.join(ports)) 253*4882a593Smuzhiyun 254*4882a593Smuzhiyun sys.stderr.write("pidfile %s does not exist. Daemon not running? %s\n" 255*4882a593Smuzhiyun % (pidfile,portstr)) 256*4882a593Smuzhiyun return 1 257*4882a593Smuzhiyun 258*4882a593Smuzhiyun try: 259*4882a593Smuzhiyun if is_running(pid): 260*4882a593Smuzhiyun print("Sending SIGTERM to pr-server.") 261*4882a593Smuzhiyun os.kill(pid, signal.SIGTERM) 262*4882a593Smuzhiyun time.sleep(0.1) 263*4882a593Smuzhiyun 264*4882a593Smuzhiyun if os.path.exists(pidfile): 265*4882a593Smuzhiyun os.remove(pidfile) 266*4882a593Smuzhiyun 267*4882a593Smuzhiyun except OSError as e: 268*4882a593Smuzhiyun err = str(e) 269*4882a593Smuzhiyun if err.find("No such process") <= 0: 270*4882a593Smuzhiyun raise e 271*4882a593Smuzhiyun 272*4882a593Smuzhiyun return 0 273*4882a593Smuzhiyun 274*4882a593Smuzhiyundef is_running(pid): 275*4882a593Smuzhiyun try: 276*4882a593Smuzhiyun os.kill(pid, 0) 277*4882a593Smuzhiyun except OSError as err: 278*4882a593Smuzhiyun if err.errno == errno.ESRCH: 279*4882a593Smuzhiyun return False 280*4882a593Smuzhiyun return True 281*4882a593Smuzhiyun 282*4882a593Smuzhiyundef is_local_special(host, port): 283*4882a593Smuzhiyun if (host == 'localhost' or host == '127.0.0.1') and not port: 284*4882a593Smuzhiyun return True 285*4882a593Smuzhiyun else: 286*4882a593Smuzhiyun return False 287*4882a593Smuzhiyun 288*4882a593Smuzhiyunclass PRServiceConfigError(Exception): 289*4882a593Smuzhiyun pass 290*4882a593Smuzhiyun 291*4882a593Smuzhiyundef auto_start(d): 292*4882a593Smuzhiyun global singleton 293*4882a593Smuzhiyun 294*4882a593Smuzhiyun host_params = list(filter(None, (d.getVar('PRSERV_HOST') or '').split(':'))) 295*4882a593Smuzhiyun if not host_params: 296*4882a593Smuzhiyun # Shutdown any existing PR Server 297*4882a593Smuzhiyun auto_shutdown() 298*4882a593Smuzhiyun return None 299*4882a593Smuzhiyun 300*4882a593Smuzhiyun if len(host_params) != 2: 301*4882a593Smuzhiyun # Shutdown any existing PR Server 302*4882a593Smuzhiyun auto_shutdown() 303*4882a593Smuzhiyun logger.critical('\n'.join(['PRSERV_HOST: incorrect format', 304*4882a593Smuzhiyun 'Usage: PRSERV_HOST = "<hostname>:<port>"'])) 305*4882a593Smuzhiyun raise PRServiceConfigError 306*4882a593Smuzhiyun 307*4882a593Smuzhiyun host = host_params[0].strip().lower() 308*4882a593Smuzhiyun port = int(host_params[1]) 309*4882a593Smuzhiyun if is_local_special(host, port): 310*4882a593Smuzhiyun import bb.utils 311*4882a593Smuzhiyun cachedir = (d.getVar("PERSISTENT_DIR") or d.getVar("CACHE")) 312*4882a593Smuzhiyun if not cachedir: 313*4882a593Smuzhiyun logger.critical("Please set the 'PERSISTENT_DIR' or 'CACHE' variable") 314*4882a593Smuzhiyun raise PRServiceConfigError 315*4882a593Smuzhiyun dbfile = os.path.join(cachedir, "prserv.sqlite3") 316*4882a593Smuzhiyun logfile = os.path.join(cachedir, "prserv.log") 317*4882a593Smuzhiyun if singleton: 318*4882a593Smuzhiyun if singleton.dbfile != dbfile: 319*4882a593Smuzhiyun # Shutdown any existing PR Server as doesn't match config 320*4882a593Smuzhiyun auto_shutdown() 321*4882a593Smuzhiyun if not singleton: 322*4882a593Smuzhiyun bb.utils.mkdirhier(cachedir) 323*4882a593Smuzhiyun singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), host, port) 324*4882a593Smuzhiyun singleton.start() 325*4882a593Smuzhiyun if singleton: 326*4882a593Smuzhiyun host = singleton.host 327*4882a593Smuzhiyun port = singleton.port 328*4882a593Smuzhiyun 329*4882a593Smuzhiyun try: 330*4882a593Smuzhiyun ping(host, port) 331*4882a593Smuzhiyun return str(host) + ":" + str(port) 332*4882a593Smuzhiyun 333*4882a593Smuzhiyun except Exception: 334*4882a593Smuzhiyun logger.critical("PRservice %s:%d not available" % (host, port)) 335*4882a593Smuzhiyun raise PRServiceConfigError 336*4882a593Smuzhiyun 337*4882a593Smuzhiyundef auto_shutdown(): 338*4882a593Smuzhiyun global singleton 339*4882a593Smuzhiyun if singleton and singleton.process: 340*4882a593Smuzhiyun singleton.process.terminate() 341*4882a593Smuzhiyun singleton.process.join() 342*4882a593Smuzhiyun singleton = None 343*4882a593Smuzhiyun 344*4882a593Smuzhiyundef ping(host, port): 345*4882a593Smuzhiyun from . import client 346*4882a593Smuzhiyun 347*4882a593Smuzhiyun conn = client.PRClient() 348*4882a593Smuzhiyun conn.connect_tcp(host, port) 349*4882a593Smuzhiyun return conn.ping() 350*4882a593Smuzhiyun 351*4882a593Smuzhiyundef connect(host, port): 352*4882a593Smuzhiyun from . import client 353*4882a593Smuzhiyun 354*4882a593Smuzhiyun global singleton 355*4882a593Smuzhiyun 356*4882a593Smuzhiyun if host.strip().lower() == 'localhost' and not port: 357*4882a593Smuzhiyun host = 'localhost' 358*4882a593Smuzhiyun port = singleton.port 359*4882a593Smuzhiyun 360*4882a593Smuzhiyun conn = client.PRClient() 361*4882a593Smuzhiyun conn.connect_tcp(host, port) 362*4882a593Smuzhiyun return conn 363