1# 2# Copyright BitBake Contributors 3# 4# SPDX-License-Identifier: GPL-2.0-only 5# 6 7import os,sys,logging 8import signal, time 9import socket 10import io 11import sqlite3 12import prserv 13import prserv.db 14import errno 15import bb.asyncrpc 16 17logger = logging.getLogger("BitBake.PRserv") 18 19PIDPREFIX = "/tmp/PRServer_%s_%s.pid" 20singleton = None 21 22class PRServerClient(bb.asyncrpc.AsyncServerConnection): 23 def __init__(self, reader, writer, table, read_only): 24 super().__init__(reader, writer, 'PRSERVICE', logger) 25 self.handlers.update({ 26 'get-pr': self.handle_get_pr, 27 'import-one': self.handle_import_one, 28 'export': self.handle_export, 29 'is-readonly': self.handle_is_readonly, 30 }) 31 self.table = table 32 self.read_only = read_only 33 34 def validate_proto_version(self): 35 return (self.proto_version == (1, 0)) 36 37 async def dispatch_message(self, msg): 38 try: 39 await super().dispatch_message(msg) 40 except: 41 self.table.sync() 42 raise 43 44 self.table.sync_if_dirty() 45 46 async def handle_get_pr(self, request): 47 version = request['version'] 48 pkgarch = request['pkgarch'] 49 checksum = request['checksum'] 50 51 response = None 52 try: 53 value = self.table.getValue(version, pkgarch, checksum) 54 response = {'value': value} 55 except prserv.NotFoundError: 56 logger.error("can not find value for (%s, %s)",version, checksum) 57 except sqlite3.Error as exc: 58 logger.error(str(exc)) 59 60 self.write_message(response) 61 62 async def handle_import_one(self, request): 63 response = None 64 if not self.read_only: 65 version = request['version'] 66 pkgarch = request['pkgarch'] 67 checksum = request['checksum'] 68 value = request['value'] 69 70 value = self.table.importone(version, pkgarch, checksum, value) 71 if value is not None: 72 response = {'value': value} 73 74 self.write_message(response) 75 76 async def handle_export(self, request): 77 version = request['version'] 78 pkgarch = request['pkgarch'] 79 checksum = request['checksum'] 80 colinfo = request['colinfo'] 81 82 try: 83 (metainfo, datainfo) = self.table.export(version, pkgarch, checksum, colinfo) 84 except sqlite3.Error as exc: 85 logger.error(str(exc)) 86 metainfo = datainfo = None 87 88 response = {'metainfo': metainfo, 'datainfo': datainfo} 89 self.write_message(response) 90 91 async def handle_is_readonly(self, request): 92 response = {'readonly': self.read_only} 93 self.write_message(response) 94 95class PRServer(bb.asyncrpc.AsyncServer): 96 def __init__(self, dbfile, read_only=False): 97 super().__init__(logger) 98 self.dbfile = dbfile 99 self.table = None 100 self.read_only = read_only 101 102 def accept_client(self, reader, writer): 103 return PRServerClient(reader, writer, self.table, self.read_only) 104 105 def _serve_forever(self): 106 self.db = prserv.db.PRData(self.dbfile, read_only=self.read_only) 107 self.table = self.db["PRMAIN"] 108 109 logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" % 110 (self.dbfile, self.address, str(os.getpid()))) 111 112 super()._serve_forever() 113 114 self.table.sync_if_dirty() 115 self.db.disconnect() 116 117 def signal_handler(self): 118 super().signal_handler() 119 if self.table: 120 self.table.sync() 121 122class PRServSingleton(object): 123 def __init__(self, dbfile, logfile, host, port): 124 self.dbfile = dbfile 125 self.logfile = logfile 126 self.host = host 127 self.port = port 128 129 def start(self): 130 self.prserv = PRServer(self.dbfile) 131 self.prserv.start_tcp_server(socket.gethostbyname(self.host), self.port) 132 self.process = self.prserv.serve_as_process() 133 134 if not self.prserv.address: 135 raise PRServiceConfigError 136 if not self.port: 137 self.port = int(self.prserv.address.rsplit(':', 1)[1]) 138 139def run_as_daemon(func, pidfile, logfile): 140 """ 141 See Advanced Programming in the UNIX, Sec 13.3 142 """ 143 try: 144 pid = os.fork() 145 if pid > 0: 146 os.waitpid(pid, 0) 147 #parent return instead of exit to give control 148 return pid 149 except OSError as e: 150 raise Exception("%s [%d]" % (e.strerror, e.errno)) 151 152 os.setsid() 153 """ 154 fork again to make sure the daemon is not session leader, 155 which prevents it from acquiring controlling terminal 156 """ 157 try: 158 pid = os.fork() 159 if pid > 0: #parent 160 os._exit(0) 161 except OSError as e: 162 raise Exception("%s [%d]" % (e.strerror, e.errno)) 163 164 os.chdir("/") 165 166 sys.stdout.flush() 167 sys.stderr.flush() 168 169 # We could be called from a python thread with io.StringIO as 170 # stdout/stderr or it could be 'real' unix fd forking where we need 171 # to physically close the fds to prevent the program launching us from 172 # potentially hanging on a pipe. Handle both cases. 173 si = open('/dev/null', 'r') 174 try: 175 os.dup2(si.fileno(),sys.stdin.fileno()) 176 except (AttributeError, io.UnsupportedOperation): 177 sys.stdin = si 178 so = open(logfile, 'a+') 179 try: 180 os.dup2(so.fileno(),sys.stdout.fileno()) 181 except (AttributeError, io.UnsupportedOperation): 182 sys.stdout = so 183 try: 184 os.dup2(so.fileno(),sys.stderr.fileno()) 185 except (AttributeError, io.UnsupportedOperation): 186 sys.stderr = so 187 188 # Clear out all log handlers prior to the fork() to avoid calling 189 # event handlers not part of the PRserver 190 for logger_iter in logging.Logger.manager.loggerDict.keys(): 191 logging.getLogger(logger_iter).handlers = [] 192 193 # Ensure logging makes it to the logfile 194 streamhandler = logging.StreamHandler() 195 streamhandler.setLevel(logging.DEBUG) 196 formatter = bb.msg.BBLogFormatter("%(levelname)s: %(message)s") 197 streamhandler.setFormatter(formatter) 198 logger.addHandler(streamhandler) 199 200 # write pidfile 201 pid = str(os.getpid()) 202 with open(pidfile, 'w') as pf: 203 pf.write("%s\n" % pid) 204 205 func() 206 os.remove(pidfile) 207 os._exit(0) 208 209def start_daemon(dbfile, host, port, logfile, read_only=False): 210 ip = socket.gethostbyname(host) 211 pidfile = PIDPREFIX % (ip, port) 212 try: 213 with open(pidfile) as pf: 214 pid = int(pf.readline().strip()) 215 except IOError: 216 pid = None 217 218 if pid: 219 sys.stderr.write("pidfile %s already exist. Daemon already running?\n" 220 % pidfile) 221 return 1 222 223 dbfile = os.path.abspath(dbfile) 224 def daemon_main(): 225 server = PRServer(dbfile, read_only=read_only) 226 server.start_tcp_server(ip, port) 227 server.serve_forever() 228 229 run_as_daemon(daemon_main, pidfile, os.path.abspath(logfile)) 230 return 0 231 232def stop_daemon(host, port): 233 import glob 234 ip = socket.gethostbyname(host) 235 pidfile = PIDPREFIX % (ip, port) 236 try: 237 with open(pidfile) as pf: 238 pid = int(pf.readline().strip()) 239 except IOError: 240 pid = None 241 242 if not pid: 243 # when server starts at port=0 (i.e. localhost:0), server actually takes another port, 244 # so at least advise the user which ports the corresponding server is listening 245 ports = [] 246 portstr = "" 247 for pf in glob.glob(PIDPREFIX % (ip,'*')): 248 bn = os.path.basename(pf) 249 root, _ = os.path.splitext(bn) 250 ports.append(root.split('_')[-1]) 251 if len(ports): 252 portstr = "Wrong port? Other ports listening at %s: %s" % (host, ' '.join(ports)) 253 254 sys.stderr.write("pidfile %s does not exist. Daemon not running? %s\n" 255 % (pidfile,portstr)) 256 return 1 257 258 try: 259 if is_running(pid): 260 print("Sending SIGTERM to pr-server.") 261 os.kill(pid, signal.SIGTERM) 262 time.sleep(0.1) 263 264 if os.path.exists(pidfile): 265 os.remove(pidfile) 266 267 except OSError as e: 268 err = str(e) 269 if err.find("No such process") <= 0: 270 raise e 271 272 return 0 273 274def is_running(pid): 275 try: 276 os.kill(pid, 0) 277 except OSError as err: 278 if err.errno == errno.ESRCH: 279 return False 280 return True 281 282def is_local_special(host, port): 283 if (host == 'localhost' or host == '127.0.0.1') and not port: 284 return True 285 else: 286 return False 287 288class PRServiceConfigError(Exception): 289 pass 290 291def auto_start(d): 292 global singleton 293 294 host_params = list(filter(None, (d.getVar('PRSERV_HOST') or '').split(':'))) 295 if not host_params: 296 # Shutdown any existing PR Server 297 auto_shutdown() 298 return None 299 300 if len(host_params) != 2: 301 # Shutdown any existing PR Server 302 auto_shutdown() 303 logger.critical('\n'.join(['PRSERV_HOST: incorrect format', 304 'Usage: PRSERV_HOST = "<hostname>:<port>"'])) 305 raise PRServiceConfigError 306 307 host = host_params[0].strip().lower() 308 port = int(host_params[1]) 309 if is_local_special(host, port): 310 import bb.utils 311 cachedir = (d.getVar("PERSISTENT_DIR") or d.getVar("CACHE")) 312 if not cachedir: 313 logger.critical("Please set the 'PERSISTENT_DIR' or 'CACHE' variable") 314 raise PRServiceConfigError 315 dbfile = os.path.join(cachedir, "prserv.sqlite3") 316 logfile = os.path.join(cachedir, "prserv.log") 317 if singleton: 318 if singleton.dbfile != dbfile: 319 # Shutdown any existing PR Server as doesn't match config 320 auto_shutdown() 321 if not singleton: 322 bb.utils.mkdirhier(cachedir) 323 singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), host, port) 324 singleton.start() 325 if singleton: 326 host = singleton.host 327 port = singleton.port 328 329 try: 330 ping(host, port) 331 return str(host) + ":" + str(port) 332 333 except Exception: 334 logger.critical("PRservice %s:%d not available" % (host, port)) 335 raise PRServiceConfigError 336 337def auto_shutdown(): 338 global singleton 339 if singleton and singleton.process: 340 singleton.process.terminate() 341 singleton.process.join() 342 singleton = None 343 344def ping(host, port): 345 from . import client 346 347 conn = client.PRClient() 348 conn.connect_tcp(host, port) 349 return conn.ping() 350 351def connect(host, port): 352 from . import client 353 354 global singleton 355 356 if host.strip().lower() == 'localhost' and not port: 357 host = 'localhost' 358 port = singleton.port 359 360 conn = client.PRClient() 361 conn.connect_tcp(host, port) 362 return conn 363