xref: /OK3568_Linux_fs/yocto/poky/bitbake/lib/prserv/serv.py (revision 4882a59341e53eb6f0b4789bf948001014eff981)
1#
2# Copyright BitBake Contributors
3#
4# SPDX-License-Identifier: GPL-2.0-only
5#
6
7import os,sys,logging
8import signal, time
9import socket
10import io
11import sqlite3
12import prserv
13import prserv.db
14import errno
15import bb.asyncrpc
16
17logger = logging.getLogger("BitBake.PRserv")
18
19PIDPREFIX = "/tmp/PRServer_%s_%s.pid"
20singleton = None
21
22class PRServerClient(bb.asyncrpc.AsyncServerConnection):
23    def __init__(self, reader, writer, table, read_only):
24        super().__init__(reader, writer, 'PRSERVICE', logger)
25        self.handlers.update({
26            'get-pr': self.handle_get_pr,
27            'import-one': self.handle_import_one,
28            'export': self.handle_export,
29            'is-readonly': self.handle_is_readonly,
30        })
31        self.table = table
32        self.read_only = read_only
33
34    def validate_proto_version(self):
35        return (self.proto_version == (1, 0))
36
37    async def dispatch_message(self, msg):
38        try:
39            await super().dispatch_message(msg)
40        except:
41            self.table.sync()
42            raise
43
44        self.table.sync_if_dirty()
45
46    async def handle_get_pr(self, request):
47        version = request['version']
48        pkgarch = request['pkgarch']
49        checksum = request['checksum']
50
51        response = None
52        try:
53            value = self.table.getValue(version, pkgarch, checksum)
54            response = {'value': value}
55        except prserv.NotFoundError:
56            logger.error("can not find value for (%s, %s)",version, checksum)
57        except sqlite3.Error as exc:
58            logger.error(str(exc))
59
60        self.write_message(response)
61
62    async def handle_import_one(self, request):
63        response = None
64        if not self.read_only:
65            version = request['version']
66            pkgarch = request['pkgarch']
67            checksum = request['checksum']
68            value = request['value']
69
70            value = self.table.importone(version, pkgarch, checksum, value)
71            if value is not None:
72                response = {'value': value}
73
74        self.write_message(response)
75
76    async def handle_export(self, request):
77        version = request['version']
78        pkgarch = request['pkgarch']
79        checksum = request['checksum']
80        colinfo = request['colinfo']
81
82        try:
83            (metainfo, datainfo) = self.table.export(version, pkgarch, checksum, colinfo)
84        except sqlite3.Error as exc:
85            logger.error(str(exc))
86            metainfo = datainfo = None
87
88        response = {'metainfo': metainfo, 'datainfo': datainfo}
89        self.write_message(response)
90
91    async def handle_is_readonly(self, request):
92        response = {'readonly': self.read_only}
93        self.write_message(response)
94
95class PRServer(bb.asyncrpc.AsyncServer):
96    def __init__(self, dbfile, read_only=False):
97        super().__init__(logger)
98        self.dbfile = dbfile
99        self.table = None
100        self.read_only = read_only
101
102    def accept_client(self, reader, writer):
103        return PRServerClient(reader, writer, self.table, self.read_only)
104
105    def _serve_forever(self):
106        self.db = prserv.db.PRData(self.dbfile, read_only=self.read_only)
107        self.table = self.db["PRMAIN"]
108
109        logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" %
110                     (self.dbfile, self.address, str(os.getpid())))
111
112        super()._serve_forever()
113
114        self.table.sync_if_dirty()
115        self.db.disconnect()
116
117    def signal_handler(self):
118        super().signal_handler()
119        if self.table:
120            self.table.sync()
121
122class PRServSingleton(object):
123    def __init__(self, dbfile, logfile, host, port):
124        self.dbfile = dbfile
125        self.logfile = logfile
126        self.host = host
127        self.port = port
128
129    def start(self):
130        self.prserv = PRServer(self.dbfile)
131        self.prserv.start_tcp_server(socket.gethostbyname(self.host), self.port)
132        self.process = self.prserv.serve_as_process()
133
134        if not self.prserv.address:
135            raise PRServiceConfigError
136        if not self.port:
137            self.port = int(self.prserv.address.rsplit(':', 1)[1])
138
139def run_as_daemon(func, pidfile, logfile):
140    """
141    See Advanced Programming in the UNIX, Sec 13.3
142    """
143    try:
144        pid = os.fork()
145        if pid > 0:
146            os.waitpid(pid, 0)
147            #parent return instead of exit to give control
148            return pid
149    except OSError as e:
150        raise Exception("%s [%d]" % (e.strerror, e.errno))
151
152    os.setsid()
153    """
154    fork again to make sure the daemon is not session leader,
155    which prevents it from acquiring controlling terminal
156    """
157    try:
158        pid = os.fork()
159        if pid > 0: #parent
160            os._exit(0)
161    except OSError as e:
162        raise Exception("%s [%d]" % (e.strerror, e.errno))
163
164    os.chdir("/")
165
166    sys.stdout.flush()
167    sys.stderr.flush()
168
169    # We could be called from a python thread with io.StringIO as
170    # stdout/stderr or it could be 'real' unix fd forking where we need
171    # to physically close the fds to prevent the program launching us from
172    # potentially hanging on a pipe. Handle both cases.
173    si = open('/dev/null', 'r')
174    try:
175        os.dup2(si.fileno(),sys.stdin.fileno())
176    except (AttributeError, io.UnsupportedOperation):
177        sys.stdin = si
178    so = open(logfile, 'a+')
179    try:
180        os.dup2(so.fileno(),sys.stdout.fileno())
181    except (AttributeError, io.UnsupportedOperation):
182        sys.stdout = so
183    try:
184        os.dup2(so.fileno(),sys.stderr.fileno())
185    except (AttributeError, io.UnsupportedOperation):
186        sys.stderr = so
187
188    # Clear out all log handlers prior to the fork() to avoid calling
189    # event handlers not part of the PRserver
190    for logger_iter in logging.Logger.manager.loggerDict.keys():
191        logging.getLogger(logger_iter).handlers = []
192
193    # Ensure logging makes it to the logfile
194    streamhandler = logging.StreamHandler()
195    streamhandler.setLevel(logging.DEBUG)
196    formatter = bb.msg.BBLogFormatter("%(levelname)s: %(message)s")
197    streamhandler.setFormatter(formatter)
198    logger.addHandler(streamhandler)
199
200    # write pidfile
201    pid = str(os.getpid())
202    with open(pidfile, 'w') as pf:
203        pf.write("%s\n" % pid)
204
205    func()
206    os.remove(pidfile)
207    os._exit(0)
208
209def start_daemon(dbfile, host, port, logfile, read_only=False):
210    ip = socket.gethostbyname(host)
211    pidfile = PIDPREFIX % (ip, port)
212    try:
213        with open(pidfile) as pf:
214            pid = int(pf.readline().strip())
215    except IOError:
216        pid = None
217
218    if pid:
219        sys.stderr.write("pidfile %s already exist. Daemon already running?\n"
220                            % pidfile)
221        return 1
222
223    dbfile = os.path.abspath(dbfile)
224    def daemon_main():
225        server = PRServer(dbfile, read_only=read_only)
226        server.start_tcp_server(ip, port)
227        server.serve_forever()
228
229    run_as_daemon(daemon_main, pidfile, os.path.abspath(logfile))
230    return 0
231
232def stop_daemon(host, port):
233    import glob
234    ip = socket.gethostbyname(host)
235    pidfile = PIDPREFIX % (ip, port)
236    try:
237        with open(pidfile) as pf:
238            pid = int(pf.readline().strip())
239    except IOError:
240        pid = None
241
242    if not pid:
243        # when server starts at port=0 (i.e. localhost:0), server actually takes another port,
244        # so at least advise the user which ports the corresponding server is listening
245        ports = []
246        portstr = ""
247        for pf in glob.glob(PIDPREFIX % (ip,'*')):
248            bn = os.path.basename(pf)
249            root, _ = os.path.splitext(bn)
250            ports.append(root.split('_')[-1])
251        if len(ports):
252            portstr = "Wrong port? Other ports listening at %s: %s" % (host, ' '.join(ports))
253
254        sys.stderr.write("pidfile %s does not exist. Daemon not running? %s\n"
255                         % (pidfile,portstr))
256        return 1
257
258    try:
259        if is_running(pid):
260            print("Sending SIGTERM to pr-server.")
261            os.kill(pid, signal.SIGTERM)
262            time.sleep(0.1)
263
264        if os.path.exists(pidfile):
265            os.remove(pidfile)
266
267    except OSError as e:
268        err = str(e)
269        if err.find("No such process") <= 0:
270            raise e
271
272    return 0
273
274def is_running(pid):
275    try:
276        os.kill(pid, 0)
277    except OSError as err:
278        if err.errno == errno.ESRCH:
279            return False
280    return True
281
282def is_local_special(host, port):
283    if (host == 'localhost' or host == '127.0.0.1') and not port:
284        return True
285    else:
286        return False
287
288class PRServiceConfigError(Exception):
289    pass
290
291def auto_start(d):
292    global singleton
293
294    host_params = list(filter(None, (d.getVar('PRSERV_HOST') or '').split(':')))
295    if not host_params:
296        # Shutdown any existing PR Server
297        auto_shutdown()
298        return None
299
300    if len(host_params) != 2:
301        # Shutdown any existing PR Server
302        auto_shutdown()
303        logger.critical('\n'.join(['PRSERV_HOST: incorrect format',
304                'Usage: PRSERV_HOST = "<hostname>:<port>"']))
305        raise PRServiceConfigError
306
307    host = host_params[0].strip().lower()
308    port = int(host_params[1])
309    if is_local_special(host, port):
310        import bb.utils
311        cachedir = (d.getVar("PERSISTENT_DIR") or d.getVar("CACHE"))
312        if not cachedir:
313            logger.critical("Please set the 'PERSISTENT_DIR' or 'CACHE' variable")
314            raise PRServiceConfigError
315        dbfile = os.path.join(cachedir, "prserv.sqlite3")
316        logfile = os.path.join(cachedir, "prserv.log")
317        if singleton:
318            if singleton.dbfile != dbfile:
319               # Shutdown any existing PR Server as doesn't match config
320               auto_shutdown()
321        if not singleton:
322            bb.utils.mkdirhier(cachedir)
323            singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), host, port)
324            singleton.start()
325    if singleton:
326        host = singleton.host
327        port = singleton.port
328
329    try:
330        ping(host, port)
331        return str(host) + ":" + str(port)
332
333    except Exception:
334        logger.critical("PRservice %s:%d not available" % (host, port))
335        raise PRServiceConfigError
336
337def auto_shutdown():
338    global singleton
339    if singleton and singleton.process:
340        singleton.process.terminate()
341        singleton.process.join()
342        singleton = None
343
344def ping(host, port):
345    from . import client
346
347    conn = client.PRClient()
348    conn.connect_tcp(host, port)
349    return conn.ping()
350
351def connect(host, port):
352    from . import client
353
354    global singleton
355
356    if host.strip().lower() == 'localhost' and not port:
357        host = 'localhost'
358        port = singleton.port
359
360    conn = client.PRClient()
361    conn.connect_tcp(host, port)
362    return conn
363