xref: /OK3568_Linux_fs/yocto/poky/bitbake/lib/prserv/serv.py (revision 4882a59341e53eb6f0b4789bf948001014eff981)
1*4882a593Smuzhiyun#
2*4882a593Smuzhiyun# Copyright BitBake Contributors
3*4882a593Smuzhiyun#
4*4882a593Smuzhiyun# SPDX-License-Identifier: GPL-2.0-only
5*4882a593Smuzhiyun#
6*4882a593Smuzhiyun
7*4882a593Smuzhiyunimport os,sys,logging
8*4882a593Smuzhiyunimport signal, time
9*4882a593Smuzhiyunimport socket
10*4882a593Smuzhiyunimport io
11*4882a593Smuzhiyunimport sqlite3
12*4882a593Smuzhiyunimport prserv
13*4882a593Smuzhiyunimport prserv.db
14*4882a593Smuzhiyunimport errno
15*4882a593Smuzhiyunimport bb.asyncrpc
16*4882a593Smuzhiyun
17*4882a593Smuzhiyunlogger = logging.getLogger("BitBake.PRserv")
18*4882a593Smuzhiyun
19*4882a593SmuzhiyunPIDPREFIX = "/tmp/PRServer_%s_%s.pid"
20*4882a593Smuzhiyunsingleton = None
21*4882a593Smuzhiyun
22*4882a593Smuzhiyunclass PRServerClient(bb.asyncrpc.AsyncServerConnection):
23*4882a593Smuzhiyun    def __init__(self, reader, writer, table, read_only):
24*4882a593Smuzhiyun        super().__init__(reader, writer, 'PRSERVICE', logger)
25*4882a593Smuzhiyun        self.handlers.update({
26*4882a593Smuzhiyun            'get-pr': self.handle_get_pr,
27*4882a593Smuzhiyun            'import-one': self.handle_import_one,
28*4882a593Smuzhiyun            'export': self.handle_export,
29*4882a593Smuzhiyun            'is-readonly': self.handle_is_readonly,
30*4882a593Smuzhiyun        })
31*4882a593Smuzhiyun        self.table = table
32*4882a593Smuzhiyun        self.read_only = read_only
33*4882a593Smuzhiyun
34*4882a593Smuzhiyun    def validate_proto_version(self):
35*4882a593Smuzhiyun        return (self.proto_version == (1, 0))
36*4882a593Smuzhiyun
37*4882a593Smuzhiyun    async def dispatch_message(self, msg):
38*4882a593Smuzhiyun        try:
39*4882a593Smuzhiyun            await super().dispatch_message(msg)
40*4882a593Smuzhiyun        except:
41*4882a593Smuzhiyun            self.table.sync()
42*4882a593Smuzhiyun            raise
43*4882a593Smuzhiyun
44*4882a593Smuzhiyun        self.table.sync_if_dirty()
45*4882a593Smuzhiyun
46*4882a593Smuzhiyun    async def handle_get_pr(self, request):
47*4882a593Smuzhiyun        version = request['version']
48*4882a593Smuzhiyun        pkgarch = request['pkgarch']
49*4882a593Smuzhiyun        checksum = request['checksum']
50*4882a593Smuzhiyun
51*4882a593Smuzhiyun        response = None
52*4882a593Smuzhiyun        try:
53*4882a593Smuzhiyun            value = self.table.getValue(version, pkgarch, checksum)
54*4882a593Smuzhiyun            response = {'value': value}
55*4882a593Smuzhiyun        except prserv.NotFoundError:
56*4882a593Smuzhiyun            logger.error("can not find value for (%s, %s)",version, checksum)
57*4882a593Smuzhiyun        except sqlite3.Error as exc:
58*4882a593Smuzhiyun            logger.error(str(exc))
59*4882a593Smuzhiyun
60*4882a593Smuzhiyun        self.write_message(response)
61*4882a593Smuzhiyun
62*4882a593Smuzhiyun    async def handle_import_one(self, request):
63*4882a593Smuzhiyun        response = None
64*4882a593Smuzhiyun        if not self.read_only:
65*4882a593Smuzhiyun            version = request['version']
66*4882a593Smuzhiyun            pkgarch = request['pkgarch']
67*4882a593Smuzhiyun            checksum = request['checksum']
68*4882a593Smuzhiyun            value = request['value']
69*4882a593Smuzhiyun
70*4882a593Smuzhiyun            value = self.table.importone(version, pkgarch, checksum, value)
71*4882a593Smuzhiyun            if value is not None:
72*4882a593Smuzhiyun                response = {'value': value}
73*4882a593Smuzhiyun
74*4882a593Smuzhiyun        self.write_message(response)
75*4882a593Smuzhiyun
76*4882a593Smuzhiyun    async def handle_export(self, request):
77*4882a593Smuzhiyun        version = request['version']
78*4882a593Smuzhiyun        pkgarch = request['pkgarch']
79*4882a593Smuzhiyun        checksum = request['checksum']
80*4882a593Smuzhiyun        colinfo = request['colinfo']
81*4882a593Smuzhiyun
82*4882a593Smuzhiyun        try:
83*4882a593Smuzhiyun            (metainfo, datainfo) = self.table.export(version, pkgarch, checksum, colinfo)
84*4882a593Smuzhiyun        except sqlite3.Error as exc:
85*4882a593Smuzhiyun            logger.error(str(exc))
86*4882a593Smuzhiyun            metainfo = datainfo = None
87*4882a593Smuzhiyun
88*4882a593Smuzhiyun        response = {'metainfo': metainfo, 'datainfo': datainfo}
89*4882a593Smuzhiyun        self.write_message(response)
90*4882a593Smuzhiyun
91*4882a593Smuzhiyun    async def handle_is_readonly(self, request):
92*4882a593Smuzhiyun        response = {'readonly': self.read_only}
93*4882a593Smuzhiyun        self.write_message(response)
94*4882a593Smuzhiyun
95*4882a593Smuzhiyunclass PRServer(bb.asyncrpc.AsyncServer):
96*4882a593Smuzhiyun    def __init__(self, dbfile, read_only=False):
97*4882a593Smuzhiyun        super().__init__(logger)
98*4882a593Smuzhiyun        self.dbfile = dbfile
99*4882a593Smuzhiyun        self.table = None
100*4882a593Smuzhiyun        self.read_only = read_only
101*4882a593Smuzhiyun
102*4882a593Smuzhiyun    def accept_client(self, reader, writer):
103*4882a593Smuzhiyun        return PRServerClient(reader, writer, self.table, self.read_only)
104*4882a593Smuzhiyun
105*4882a593Smuzhiyun    def _serve_forever(self):
106*4882a593Smuzhiyun        self.db = prserv.db.PRData(self.dbfile, read_only=self.read_only)
107*4882a593Smuzhiyun        self.table = self.db["PRMAIN"]
108*4882a593Smuzhiyun
109*4882a593Smuzhiyun        logger.info("Started PRServer with DBfile: %s, Address: %s, PID: %s" %
110*4882a593Smuzhiyun                     (self.dbfile, self.address, str(os.getpid())))
111*4882a593Smuzhiyun
112*4882a593Smuzhiyun        super()._serve_forever()
113*4882a593Smuzhiyun
114*4882a593Smuzhiyun        self.table.sync_if_dirty()
115*4882a593Smuzhiyun        self.db.disconnect()
116*4882a593Smuzhiyun
117*4882a593Smuzhiyun    def signal_handler(self):
118*4882a593Smuzhiyun        super().signal_handler()
119*4882a593Smuzhiyun        if self.table:
120*4882a593Smuzhiyun            self.table.sync()
121*4882a593Smuzhiyun
122*4882a593Smuzhiyunclass PRServSingleton(object):
123*4882a593Smuzhiyun    def __init__(self, dbfile, logfile, host, port):
124*4882a593Smuzhiyun        self.dbfile = dbfile
125*4882a593Smuzhiyun        self.logfile = logfile
126*4882a593Smuzhiyun        self.host = host
127*4882a593Smuzhiyun        self.port = port
128*4882a593Smuzhiyun
129*4882a593Smuzhiyun    def start(self):
130*4882a593Smuzhiyun        self.prserv = PRServer(self.dbfile)
131*4882a593Smuzhiyun        self.prserv.start_tcp_server(socket.gethostbyname(self.host), self.port)
132*4882a593Smuzhiyun        self.process = self.prserv.serve_as_process()
133*4882a593Smuzhiyun
134*4882a593Smuzhiyun        if not self.prserv.address:
135*4882a593Smuzhiyun            raise PRServiceConfigError
136*4882a593Smuzhiyun        if not self.port:
137*4882a593Smuzhiyun            self.port = int(self.prserv.address.rsplit(':', 1)[1])
138*4882a593Smuzhiyun
139*4882a593Smuzhiyundef run_as_daemon(func, pidfile, logfile):
140*4882a593Smuzhiyun    """
141*4882a593Smuzhiyun    See Advanced Programming in the UNIX, Sec 13.3
142*4882a593Smuzhiyun    """
143*4882a593Smuzhiyun    try:
144*4882a593Smuzhiyun        pid = os.fork()
145*4882a593Smuzhiyun        if pid > 0:
146*4882a593Smuzhiyun            os.waitpid(pid, 0)
147*4882a593Smuzhiyun            #parent return instead of exit to give control
148*4882a593Smuzhiyun            return pid
149*4882a593Smuzhiyun    except OSError as e:
150*4882a593Smuzhiyun        raise Exception("%s [%d]" % (e.strerror, e.errno))
151*4882a593Smuzhiyun
152*4882a593Smuzhiyun    os.setsid()
153*4882a593Smuzhiyun    """
154*4882a593Smuzhiyun    fork again to make sure the daemon is not session leader,
155*4882a593Smuzhiyun    which prevents it from acquiring controlling terminal
156*4882a593Smuzhiyun    """
157*4882a593Smuzhiyun    try:
158*4882a593Smuzhiyun        pid = os.fork()
159*4882a593Smuzhiyun        if pid > 0: #parent
160*4882a593Smuzhiyun            os._exit(0)
161*4882a593Smuzhiyun    except OSError as e:
162*4882a593Smuzhiyun        raise Exception("%s [%d]" % (e.strerror, e.errno))
163*4882a593Smuzhiyun
164*4882a593Smuzhiyun    os.chdir("/")
165*4882a593Smuzhiyun
166*4882a593Smuzhiyun    sys.stdout.flush()
167*4882a593Smuzhiyun    sys.stderr.flush()
168*4882a593Smuzhiyun
169*4882a593Smuzhiyun    # We could be called from a python thread with io.StringIO as
170*4882a593Smuzhiyun    # stdout/stderr or it could be 'real' unix fd forking where we need
171*4882a593Smuzhiyun    # to physically close the fds to prevent the program launching us from
172*4882a593Smuzhiyun    # potentially hanging on a pipe. Handle both cases.
173*4882a593Smuzhiyun    si = open('/dev/null', 'r')
174*4882a593Smuzhiyun    try:
175*4882a593Smuzhiyun        os.dup2(si.fileno(),sys.stdin.fileno())
176*4882a593Smuzhiyun    except (AttributeError, io.UnsupportedOperation):
177*4882a593Smuzhiyun        sys.stdin = si
178*4882a593Smuzhiyun    so = open(logfile, 'a+')
179*4882a593Smuzhiyun    try:
180*4882a593Smuzhiyun        os.dup2(so.fileno(),sys.stdout.fileno())
181*4882a593Smuzhiyun    except (AttributeError, io.UnsupportedOperation):
182*4882a593Smuzhiyun        sys.stdout = so
183*4882a593Smuzhiyun    try:
184*4882a593Smuzhiyun        os.dup2(so.fileno(),sys.stderr.fileno())
185*4882a593Smuzhiyun    except (AttributeError, io.UnsupportedOperation):
186*4882a593Smuzhiyun        sys.stderr = so
187*4882a593Smuzhiyun
188*4882a593Smuzhiyun    # Clear out all log handlers prior to the fork() to avoid calling
189*4882a593Smuzhiyun    # event handlers not part of the PRserver
190*4882a593Smuzhiyun    for logger_iter in logging.Logger.manager.loggerDict.keys():
191*4882a593Smuzhiyun        logging.getLogger(logger_iter).handlers = []
192*4882a593Smuzhiyun
193*4882a593Smuzhiyun    # Ensure logging makes it to the logfile
194*4882a593Smuzhiyun    streamhandler = logging.StreamHandler()
195*4882a593Smuzhiyun    streamhandler.setLevel(logging.DEBUG)
196*4882a593Smuzhiyun    formatter = bb.msg.BBLogFormatter("%(levelname)s: %(message)s")
197*4882a593Smuzhiyun    streamhandler.setFormatter(formatter)
198*4882a593Smuzhiyun    logger.addHandler(streamhandler)
199*4882a593Smuzhiyun
200*4882a593Smuzhiyun    # write pidfile
201*4882a593Smuzhiyun    pid = str(os.getpid())
202*4882a593Smuzhiyun    with open(pidfile, 'w') as pf:
203*4882a593Smuzhiyun        pf.write("%s\n" % pid)
204*4882a593Smuzhiyun
205*4882a593Smuzhiyun    func()
206*4882a593Smuzhiyun    os.remove(pidfile)
207*4882a593Smuzhiyun    os._exit(0)
208*4882a593Smuzhiyun
209*4882a593Smuzhiyundef start_daemon(dbfile, host, port, logfile, read_only=False):
210*4882a593Smuzhiyun    ip = socket.gethostbyname(host)
211*4882a593Smuzhiyun    pidfile = PIDPREFIX % (ip, port)
212*4882a593Smuzhiyun    try:
213*4882a593Smuzhiyun        with open(pidfile) as pf:
214*4882a593Smuzhiyun            pid = int(pf.readline().strip())
215*4882a593Smuzhiyun    except IOError:
216*4882a593Smuzhiyun        pid = None
217*4882a593Smuzhiyun
218*4882a593Smuzhiyun    if pid:
219*4882a593Smuzhiyun        sys.stderr.write("pidfile %s already exist. Daemon already running?\n"
220*4882a593Smuzhiyun                            % pidfile)
221*4882a593Smuzhiyun        return 1
222*4882a593Smuzhiyun
223*4882a593Smuzhiyun    dbfile = os.path.abspath(dbfile)
224*4882a593Smuzhiyun    def daemon_main():
225*4882a593Smuzhiyun        server = PRServer(dbfile, read_only=read_only)
226*4882a593Smuzhiyun        server.start_tcp_server(ip, port)
227*4882a593Smuzhiyun        server.serve_forever()
228*4882a593Smuzhiyun
229*4882a593Smuzhiyun    run_as_daemon(daemon_main, pidfile, os.path.abspath(logfile))
230*4882a593Smuzhiyun    return 0
231*4882a593Smuzhiyun
232*4882a593Smuzhiyundef stop_daemon(host, port):
233*4882a593Smuzhiyun    import glob
234*4882a593Smuzhiyun    ip = socket.gethostbyname(host)
235*4882a593Smuzhiyun    pidfile = PIDPREFIX % (ip, port)
236*4882a593Smuzhiyun    try:
237*4882a593Smuzhiyun        with open(pidfile) as pf:
238*4882a593Smuzhiyun            pid = int(pf.readline().strip())
239*4882a593Smuzhiyun    except IOError:
240*4882a593Smuzhiyun        pid = None
241*4882a593Smuzhiyun
242*4882a593Smuzhiyun    if not pid:
243*4882a593Smuzhiyun        # when server starts at port=0 (i.e. localhost:0), server actually takes another port,
244*4882a593Smuzhiyun        # so at least advise the user which ports the corresponding server is listening
245*4882a593Smuzhiyun        ports = []
246*4882a593Smuzhiyun        portstr = ""
247*4882a593Smuzhiyun        for pf in glob.glob(PIDPREFIX % (ip,'*')):
248*4882a593Smuzhiyun            bn = os.path.basename(pf)
249*4882a593Smuzhiyun            root, _ = os.path.splitext(bn)
250*4882a593Smuzhiyun            ports.append(root.split('_')[-1])
251*4882a593Smuzhiyun        if len(ports):
252*4882a593Smuzhiyun            portstr = "Wrong port? Other ports listening at %s: %s" % (host, ' '.join(ports))
253*4882a593Smuzhiyun
254*4882a593Smuzhiyun        sys.stderr.write("pidfile %s does not exist. Daemon not running? %s\n"
255*4882a593Smuzhiyun                         % (pidfile,portstr))
256*4882a593Smuzhiyun        return 1
257*4882a593Smuzhiyun
258*4882a593Smuzhiyun    try:
259*4882a593Smuzhiyun        if is_running(pid):
260*4882a593Smuzhiyun            print("Sending SIGTERM to pr-server.")
261*4882a593Smuzhiyun            os.kill(pid, signal.SIGTERM)
262*4882a593Smuzhiyun            time.sleep(0.1)
263*4882a593Smuzhiyun
264*4882a593Smuzhiyun        if os.path.exists(pidfile):
265*4882a593Smuzhiyun            os.remove(pidfile)
266*4882a593Smuzhiyun
267*4882a593Smuzhiyun    except OSError as e:
268*4882a593Smuzhiyun        err = str(e)
269*4882a593Smuzhiyun        if err.find("No such process") <= 0:
270*4882a593Smuzhiyun            raise e
271*4882a593Smuzhiyun
272*4882a593Smuzhiyun    return 0
273*4882a593Smuzhiyun
274*4882a593Smuzhiyundef is_running(pid):
275*4882a593Smuzhiyun    try:
276*4882a593Smuzhiyun        os.kill(pid, 0)
277*4882a593Smuzhiyun    except OSError as err:
278*4882a593Smuzhiyun        if err.errno == errno.ESRCH:
279*4882a593Smuzhiyun            return False
280*4882a593Smuzhiyun    return True
281*4882a593Smuzhiyun
282*4882a593Smuzhiyundef is_local_special(host, port):
283*4882a593Smuzhiyun    if (host == 'localhost' or host == '127.0.0.1') and not port:
284*4882a593Smuzhiyun        return True
285*4882a593Smuzhiyun    else:
286*4882a593Smuzhiyun        return False
287*4882a593Smuzhiyun
288*4882a593Smuzhiyunclass PRServiceConfigError(Exception):
289*4882a593Smuzhiyun    pass
290*4882a593Smuzhiyun
291*4882a593Smuzhiyundef auto_start(d):
292*4882a593Smuzhiyun    global singleton
293*4882a593Smuzhiyun
294*4882a593Smuzhiyun    host_params = list(filter(None, (d.getVar('PRSERV_HOST') or '').split(':')))
295*4882a593Smuzhiyun    if not host_params:
296*4882a593Smuzhiyun        # Shutdown any existing PR Server
297*4882a593Smuzhiyun        auto_shutdown()
298*4882a593Smuzhiyun        return None
299*4882a593Smuzhiyun
300*4882a593Smuzhiyun    if len(host_params) != 2:
301*4882a593Smuzhiyun        # Shutdown any existing PR Server
302*4882a593Smuzhiyun        auto_shutdown()
303*4882a593Smuzhiyun        logger.critical('\n'.join(['PRSERV_HOST: incorrect format',
304*4882a593Smuzhiyun                'Usage: PRSERV_HOST = "<hostname>:<port>"']))
305*4882a593Smuzhiyun        raise PRServiceConfigError
306*4882a593Smuzhiyun
307*4882a593Smuzhiyun    host = host_params[0].strip().lower()
308*4882a593Smuzhiyun    port = int(host_params[1])
309*4882a593Smuzhiyun    if is_local_special(host, port):
310*4882a593Smuzhiyun        import bb.utils
311*4882a593Smuzhiyun        cachedir = (d.getVar("PERSISTENT_DIR") or d.getVar("CACHE"))
312*4882a593Smuzhiyun        if not cachedir:
313*4882a593Smuzhiyun            logger.critical("Please set the 'PERSISTENT_DIR' or 'CACHE' variable")
314*4882a593Smuzhiyun            raise PRServiceConfigError
315*4882a593Smuzhiyun        dbfile = os.path.join(cachedir, "prserv.sqlite3")
316*4882a593Smuzhiyun        logfile = os.path.join(cachedir, "prserv.log")
317*4882a593Smuzhiyun        if singleton:
318*4882a593Smuzhiyun            if singleton.dbfile != dbfile:
319*4882a593Smuzhiyun               # Shutdown any existing PR Server as doesn't match config
320*4882a593Smuzhiyun               auto_shutdown()
321*4882a593Smuzhiyun        if not singleton:
322*4882a593Smuzhiyun            bb.utils.mkdirhier(cachedir)
323*4882a593Smuzhiyun            singleton = PRServSingleton(os.path.abspath(dbfile), os.path.abspath(logfile), host, port)
324*4882a593Smuzhiyun            singleton.start()
325*4882a593Smuzhiyun    if singleton:
326*4882a593Smuzhiyun        host = singleton.host
327*4882a593Smuzhiyun        port = singleton.port
328*4882a593Smuzhiyun
329*4882a593Smuzhiyun    try:
330*4882a593Smuzhiyun        ping(host, port)
331*4882a593Smuzhiyun        return str(host) + ":" + str(port)
332*4882a593Smuzhiyun
333*4882a593Smuzhiyun    except Exception:
334*4882a593Smuzhiyun        logger.critical("PRservice %s:%d not available" % (host, port))
335*4882a593Smuzhiyun        raise PRServiceConfigError
336*4882a593Smuzhiyun
337*4882a593Smuzhiyundef auto_shutdown():
338*4882a593Smuzhiyun    global singleton
339*4882a593Smuzhiyun    if singleton and singleton.process:
340*4882a593Smuzhiyun        singleton.process.terminate()
341*4882a593Smuzhiyun        singleton.process.join()
342*4882a593Smuzhiyun        singleton = None
343*4882a593Smuzhiyun
344*4882a593Smuzhiyundef ping(host, port):
345*4882a593Smuzhiyun    from . import client
346*4882a593Smuzhiyun
347*4882a593Smuzhiyun    conn = client.PRClient()
348*4882a593Smuzhiyun    conn.connect_tcp(host, port)
349*4882a593Smuzhiyun    return conn.ping()
350*4882a593Smuzhiyun
351*4882a593Smuzhiyundef connect(host, port):
352*4882a593Smuzhiyun    from . import client
353*4882a593Smuzhiyun
354*4882a593Smuzhiyun    global singleton
355*4882a593Smuzhiyun
356*4882a593Smuzhiyun    if host.strip().lower() == 'localhost' and not port:
357*4882a593Smuzhiyun        host = 'localhost'
358*4882a593Smuzhiyun        port = singleton.port
359*4882a593Smuzhiyun
360*4882a593Smuzhiyun    conn = client.PRClient()
361*4882a593Smuzhiyun    conn.connect_tcp(host, port)
362*4882a593Smuzhiyun    return conn
363