xref: /OK3568_Linux_fs/yocto/poky/bitbake/lib/hashserv/__init__.py (revision 4882a59341e53eb6f0b4789bf948001014eff981)
1*4882a593Smuzhiyun# Copyright (C) 2018-2019 Garmin Ltd.
2*4882a593Smuzhiyun#
3*4882a593Smuzhiyun# SPDX-License-Identifier: GPL-2.0-only
4*4882a593Smuzhiyun#
5*4882a593Smuzhiyun
6*4882a593Smuzhiyunimport asyncio
7*4882a593Smuzhiyunfrom contextlib import closing
8*4882a593Smuzhiyunimport re
9*4882a593Smuzhiyunimport sqlite3
10*4882a593Smuzhiyunimport itertools
11*4882a593Smuzhiyunimport json
12*4882a593Smuzhiyun
13*4882a593SmuzhiyunUNIX_PREFIX = "unix://"
14*4882a593Smuzhiyun
15*4882a593SmuzhiyunADDR_TYPE_UNIX = 0
16*4882a593SmuzhiyunADDR_TYPE_TCP = 1
17*4882a593Smuzhiyun
18*4882a593Smuzhiyun# The Python async server defaults to a 64K receive buffer, so we hardcode our
19*4882a593Smuzhiyun# maximum chunk size. It would be better if the client and server reported to
20*4882a593Smuzhiyun# each other what the maximum chunk sizes were, but that will slow down the
21*4882a593Smuzhiyun# connection setup with a round trip delay so I'd rather not do that unless it
22*4882a593Smuzhiyun# is necessary
23*4882a593SmuzhiyunDEFAULT_MAX_CHUNK = 32 * 1024
24*4882a593Smuzhiyun
25*4882a593SmuzhiyunUNIHASH_TABLE_DEFINITION = (
26*4882a593Smuzhiyun    ("method", "TEXT NOT NULL", "UNIQUE"),
27*4882a593Smuzhiyun    ("taskhash", "TEXT NOT NULL", "UNIQUE"),
28*4882a593Smuzhiyun    ("unihash", "TEXT NOT NULL", ""),
29*4882a593Smuzhiyun)
30*4882a593Smuzhiyun
31*4882a593SmuzhiyunUNIHASH_TABLE_COLUMNS = tuple(name for name, _, _ in UNIHASH_TABLE_DEFINITION)
32*4882a593Smuzhiyun
33*4882a593SmuzhiyunOUTHASH_TABLE_DEFINITION = (
34*4882a593Smuzhiyun    ("method", "TEXT NOT NULL", "UNIQUE"),
35*4882a593Smuzhiyun    ("taskhash", "TEXT NOT NULL", "UNIQUE"),
36*4882a593Smuzhiyun    ("outhash", "TEXT NOT NULL", "UNIQUE"),
37*4882a593Smuzhiyun    ("created", "DATETIME", ""),
38*4882a593Smuzhiyun
39*4882a593Smuzhiyun    # Optional fields
40*4882a593Smuzhiyun    ("owner", "TEXT", ""),
41*4882a593Smuzhiyun    ("PN", "TEXT", ""),
42*4882a593Smuzhiyun    ("PV", "TEXT", ""),
43*4882a593Smuzhiyun    ("PR", "TEXT", ""),
44*4882a593Smuzhiyun    ("task", "TEXT", ""),
45*4882a593Smuzhiyun    ("outhash_siginfo", "TEXT", ""),
46*4882a593Smuzhiyun)
47*4882a593Smuzhiyun
48*4882a593SmuzhiyunOUTHASH_TABLE_COLUMNS = tuple(name for name, _, _ in OUTHASH_TABLE_DEFINITION)
49*4882a593Smuzhiyun
50*4882a593Smuzhiyundef _make_table(cursor, name, definition):
51*4882a593Smuzhiyun    cursor.execute('''
52*4882a593Smuzhiyun        CREATE TABLE IF NOT EXISTS {name} (
53*4882a593Smuzhiyun            id INTEGER PRIMARY KEY AUTOINCREMENT,
54*4882a593Smuzhiyun            {fields}
55*4882a593Smuzhiyun            UNIQUE({unique})
56*4882a593Smuzhiyun            )
57*4882a593Smuzhiyun        '''.format(
58*4882a593Smuzhiyun            name=name,
59*4882a593Smuzhiyun            fields=" ".join("%s %s," % (name, typ) for name, typ, _ in definition),
60*4882a593Smuzhiyun            unique=", ".join(name for name, _, flags in definition if "UNIQUE" in flags)
61*4882a593Smuzhiyun    ))
62*4882a593Smuzhiyun
63*4882a593Smuzhiyun
64*4882a593Smuzhiyundef setup_database(database, sync=True):
65*4882a593Smuzhiyun    db = sqlite3.connect(database)
66*4882a593Smuzhiyun    db.row_factory = sqlite3.Row
67*4882a593Smuzhiyun
68*4882a593Smuzhiyun    with closing(db.cursor()) as cursor:
69*4882a593Smuzhiyun        _make_table(cursor, "unihashes_v2", UNIHASH_TABLE_DEFINITION)
70*4882a593Smuzhiyun        _make_table(cursor, "outhashes_v2", OUTHASH_TABLE_DEFINITION)
71*4882a593Smuzhiyun
72*4882a593Smuzhiyun        cursor.execute('PRAGMA journal_mode = WAL')
73*4882a593Smuzhiyun        cursor.execute('PRAGMA synchronous = %s' % ('NORMAL' if sync else 'OFF'))
74*4882a593Smuzhiyun
75*4882a593Smuzhiyun        # Drop old indexes
76*4882a593Smuzhiyun        cursor.execute('DROP INDEX IF EXISTS taskhash_lookup')
77*4882a593Smuzhiyun        cursor.execute('DROP INDEX IF EXISTS outhash_lookup')
78*4882a593Smuzhiyun        cursor.execute('DROP INDEX IF EXISTS taskhash_lookup_v2')
79*4882a593Smuzhiyun        cursor.execute('DROP INDEX IF EXISTS outhash_lookup_v2')
80*4882a593Smuzhiyun
81*4882a593Smuzhiyun        # TODO: Upgrade from tasks_v2?
82*4882a593Smuzhiyun        cursor.execute('DROP TABLE IF EXISTS tasks_v2')
83*4882a593Smuzhiyun
84*4882a593Smuzhiyun        # Create new indexes
85*4882a593Smuzhiyun        cursor.execute('CREATE INDEX IF NOT EXISTS taskhash_lookup_v3 ON unihashes_v2 (method, taskhash)')
86*4882a593Smuzhiyun        cursor.execute('CREATE INDEX IF NOT EXISTS outhash_lookup_v3 ON outhashes_v2 (method, outhash)')
87*4882a593Smuzhiyun
88*4882a593Smuzhiyun    return db
89*4882a593Smuzhiyun
90*4882a593Smuzhiyun
91*4882a593Smuzhiyundef parse_address(addr):
92*4882a593Smuzhiyun    if addr.startswith(UNIX_PREFIX):
93*4882a593Smuzhiyun        return (ADDR_TYPE_UNIX, (addr[len(UNIX_PREFIX):],))
94*4882a593Smuzhiyun    else:
95*4882a593Smuzhiyun        m = re.match(r'\[(?P<host>[^\]]*)\]:(?P<port>\d+)$', addr)
96*4882a593Smuzhiyun        if m is not None:
97*4882a593Smuzhiyun            host = m.group('host')
98*4882a593Smuzhiyun            port = m.group('port')
99*4882a593Smuzhiyun        else:
100*4882a593Smuzhiyun            host, port = addr.split(':')
101*4882a593Smuzhiyun
102*4882a593Smuzhiyun        return (ADDR_TYPE_TCP, (host, int(port)))
103*4882a593Smuzhiyun
104*4882a593Smuzhiyun
105*4882a593Smuzhiyundef chunkify(msg, max_chunk):
106*4882a593Smuzhiyun    if len(msg) < max_chunk - 1:
107*4882a593Smuzhiyun        yield ''.join((msg, "\n"))
108*4882a593Smuzhiyun    else:
109*4882a593Smuzhiyun        yield ''.join((json.dumps({
110*4882a593Smuzhiyun                'chunk-stream': None
111*4882a593Smuzhiyun            }), "\n"))
112*4882a593Smuzhiyun
113*4882a593Smuzhiyun        args = [iter(msg)] * (max_chunk - 1)
114*4882a593Smuzhiyun        for m in map(''.join, itertools.zip_longest(*args, fillvalue='')):
115*4882a593Smuzhiyun            yield ''.join(itertools.chain(m, "\n"))
116*4882a593Smuzhiyun        yield "\n"
117*4882a593Smuzhiyun
118*4882a593Smuzhiyun
119*4882a593Smuzhiyundef create_server(addr, dbname, *, sync=True, upstream=None, read_only=False):
120*4882a593Smuzhiyun    from . import server
121*4882a593Smuzhiyun    db = setup_database(dbname, sync=sync)
122*4882a593Smuzhiyun    s = server.Server(db, upstream=upstream, read_only=read_only)
123*4882a593Smuzhiyun
124*4882a593Smuzhiyun    (typ, a) = parse_address(addr)
125*4882a593Smuzhiyun    if typ == ADDR_TYPE_UNIX:
126*4882a593Smuzhiyun        s.start_unix_server(*a)
127*4882a593Smuzhiyun    else:
128*4882a593Smuzhiyun        s.start_tcp_server(*a)
129*4882a593Smuzhiyun
130*4882a593Smuzhiyun    return s
131*4882a593Smuzhiyun
132*4882a593Smuzhiyun
133*4882a593Smuzhiyundef create_client(addr):
134*4882a593Smuzhiyun    from . import client
135*4882a593Smuzhiyun    c = client.Client()
136*4882a593Smuzhiyun
137*4882a593Smuzhiyun    (typ, a) = parse_address(addr)
138*4882a593Smuzhiyun    if typ == ADDR_TYPE_UNIX:
139*4882a593Smuzhiyun        c.connect_unix(*a)
140*4882a593Smuzhiyun    else:
141*4882a593Smuzhiyun        c.connect_tcp(*a)
142*4882a593Smuzhiyun
143*4882a593Smuzhiyun    return c
144*4882a593Smuzhiyun
145*4882a593Smuzhiyunasync def create_async_client(addr):
146*4882a593Smuzhiyun    from . import client
147*4882a593Smuzhiyun    c = client.AsyncClient()
148*4882a593Smuzhiyun
149*4882a593Smuzhiyun    (typ, a) = parse_address(addr)
150*4882a593Smuzhiyun    if typ == ADDR_TYPE_UNIX:
151*4882a593Smuzhiyun        await c.connect_unix(*a)
152*4882a593Smuzhiyun    else:
153*4882a593Smuzhiyun        await c.connect_tcp(*a)
154*4882a593Smuzhiyun
155*4882a593Smuzhiyun    return c
156