xref: /OK3568_Linux_fs/yocto/poky/bitbake/lib/hashserv/server.py (revision 4882a59341e53eb6f0b4789bf948001014eff981)
1*4882a593Smuzhiyun# Copyright (C) 2019 Garmin Ltd.
2*4882a593Smuzhiyun#
3*4882a593Smuzhiyun# SPDX-License-Identifier: GPL-2.0-only
4*4882a593Smuzhiyun#
5*4882a593Smuzhiyun
6*4882a593Smuzhiyunfrom contextlib import closing, contextmanager
7*4882a593Smuzhiyunfrom datetime import datetime
8*4882a593Smuzhiyunimport enum
9*4882a593Smuzhiyunimport asyncio
10*4882a593Smuzhiyunimport logging
11*4882a593Smuzhiyunimport math
12*4882a593Smuzhiyunimport time
13*4882a593Smuzhiyunfrom . import create_async_client, UNIHASH_TABLE_COLUMNS, OUTHASH_TABLE_COLUMNS
14*4882a593Smuzhiyunimport bb.asyncrpc
15*4882a593Smuzhiyun
16*4882a593Smuzhiyun
17*4882a593Smuzhiyunlogger = logging.getLogger('hashserv.server')
18*4882a593Smuzhiyun
19*4882a593Smuzhiyun
20*4882a593Smuzhiyunclass Measurement(object):
21*4882a593Smuzhiyun    def __init__(self, sample):
22*4882a593Smuzhiyun        self.sample = sample
23*4882a593Smuzhiyun
24*4882a593Smuzhiyun    def start(self):
25*4882a593Smuzhiyun        self.start_time = time.perf_counter()
26*4882a593Smuzhiyun
27*4882a593Smuzhiyun    def end(self):
28*4882a593Smuzhiyun        self.sample.add(time.perf_counter() - self.start_time)
29*4882a593Smuzhiyun
30*4882a593Smuzhiyun    def __enter__(self):
31*4882a593Smuzhiyun        self.start()
32*4882a593Smuzhiyun        return self
33*4882a593Smuzhiyun
34*4882a593Smuzhiyun    def __exit__(self, *args, **kwargs):
35*4882a593Smuzhiyun        self.end()
36*4882a593Smuzhiyun
37*4882a593Smuzhiyun
38*4882a593Smuzhiyunclass Sample(object):
39*4882a593Smuzhiyun    def __init__(self, stats):
40*4882a593Smuzhiyun        self.stats = stats
41*4882a593Smuzhiyun        self.num_samples = 0
42*4882a593Smuzhiyun        self.elapsed = 0
43*4882a593Smuzhiyun
44*4882a593Smuzhiyun    def measure(self):
45*4882a593Smuzhiyun        return Measurement(self)
46*4882a593Smuzhiyun
47*4882a593Smuzhiyun    def __enter__(self):
48*4882a593Smuzhiyun        return self
49*4882a593Smuzhiyun
50*4882a593Smuzhiyun    def __exit__(self, *args, **kwargs):
51*4882a593Smuzhiyun        self.end()
52*4882a593Smuzhiyun
53*4882a593Smuzhiyun    def add(self, elapsed):
54*4882a593Smuzhiyun        self.num_samples += 1
55*4882a593Smuzhiyun        self.elapsed += elapsed
56*4882a593Smuzhiyun
57*4882a593Smuzhiyun    def end(self):
58*4882a593Smuzhiyun        if self.num_samples:
59*4882a593Smuzhiyun            self.stats.add(self.elapsed)
60*4882a593Smuzhiyun            self.num_samples = 0
61*4882a593Smuzhiyun            self.elapsed = 0
62*4882a593Smuzhiyun
63*4882a593Smuzhiyun
64*4882a593Smuzhiyunclass Stats(object):
65*4882a593Smuzhiyun    def __init__(self):
66*4882a593Smuzhiyun        self.reset()
67*4882a593Smuzhiyun
68*4882a593Smuzhiyun    def reset(self):
69*4882a593Smuzhiyun        self.num = 0
70*4882a593Smuzhiyun        self.total_time = 0
71*4882a593Smuzhiyun        self.max_time = 0
72*4882a593Smuzhiyun        self.m = 0
73*4882a593Smuzhiyun        self.s = 0
74*4882a593Smuzhiyun        self.current_elapsed = None
75*4882a593Smuzhiyun
76*4882a593Smuzhiyun    def add(self, elapsed):
77*4882a593Smuzhiyun        self.num += 1
78*4882a593Smuzhiyun        if self.num == 1:
79*4882a593Smuzhiyun            self.m = elapsed
80*4882a593Smuzhiyun            self.s = 0
81*4882a593Smuzhiyun        else:
82*4882a593Smuzhiyun            last_m = self.m
83*4882a593Smuzhiyun            self.m = last_m + (elapsed - last_m) / self.num
84*4882a593Smuzhiyun            self.s = self.s + (elapsed - last_m) * (elapsed - self.m)
85*4882a593Smuzhiyun
86*4882a593Smuzhiyun        self.total_time += elapsed
87*4882a593Smuzhiyun
88*4882a593Smuzhiyun        if self.max_time < elapsed:
89*4882a593Smuzhiyun            self.max_time = elapsed
90*4882a593Smuzhiyun
91*4882a593Smuzhiyun    def start_sample(self):
92*4882a593Smuzhiyun        return Sample(self)
93*4882a593Smuzhiyun
94*4882a593Smuzhiyun    @property
95*4882a593Smuzhiyun    def average(self):
96*4882a593Smuzhiyun        if self.num == 0:
97*4882a593Smuzhiyun            return 0
98*4882a593Smuzhiyun        return self.total_time / self.num
99*4882a593Smuzhiyun
100*4882a593Smuzhiyun    @property
101*4882a593Smuzhiyun    def stdev(self):
102*4882a593Smuzhiyun        if self.num <= 1:
103*4882a593Smuzhiyun            return 0
104*4882a593Smuzhiyun        return math.sqrt(self.s / (self.num - 1))
105*4882a593Smuzhiyun
106*4882a593Smuzhiyun    def todict(self):
107*4882a593Smuzhiyun        return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')}
108*4882a593Smuzhiyun
109*4882a593Smuzhiyun
110*4882a593Smuzhiyun@enum.unique
111*4882a593Smuzhiyunclass Resolve(enum.Enum):
112*4882a593Smuzhiyun    FAIL = enum.auto()
113*4882a593Smuzhiyun    IGNORE = enum.auto()
114*4882a593Smuzhiyun    REPLACE = enum.auto()
115*4882a593Smuzhiyun
116*4882a593Smuzhiyun
117*4882a593Smuzhiyundef insert_table(cursor, table, data, on_conflict):
118*4882a593Smuzhiyun    resolve = {
119*4882a593Smuzhiyun        Resolve.FAIL: "",
120*4882a593Smuzhiyun        Resolve.IGNORE: " OR IGNORE",
121*4882a593Smuzhiyun        Resolve.REPLACE: " OR REPLACE",
122*4882a593Smuzhiyun    }[on_conflict]
123*4882a593Smuzhiyun
124*4882a593Smuzhiyun    keys = sorted(data.keys())
125*4882a593Smuzhiyun    query = 'INSERT{resolve} INTO {table} ({fields}) VALUES({values})'.format(
126*4882a593Smuzhiyun        resolve=resolve,
127*4882a593Smuzhiyun        table=table,
128*4882a593Smuzhiyun        fields=", ".join(keys),
129*4882a593Smuzhiyun        values=", ".join(":" + k for k in keys),
130*4882a593Smuzhiyun    )
131*4882a593Smuzhiyun    prevrowid = cursor.lastrowid
132*4882a593Smuzhiyun    cursor.execute(query, data)
133*4882a593Smuzhiyun    logging.debug(
134*4882a593Smuzhiyun        "Inserting %r into %s, %s",
135*4882a593Smuzhiyun        data,
136*4882a593Smuzhiyun        table,
137*4882a593Smuzhiyun        on_conflict
138*4882a593Smuzhiyun    )
139*4882a593Smuzhiyun    return (cursor.lastrowid, cursor.lastrowid != prevrowid)
140*4882a593Smuzhiyun
141*4882a593Smuzhiyundef insert_unihash(cursor, data, on_conflict):
142*4882a593Smuzhiyun    return insert_table(cursor, "unihashes_v2", data, on_conflict)
143*4882a593Smuzhiyun
144*4882a593Smuzhiyundef insert_outhash(cursor, data, on_conflict):
145*4882a593Smuzhiyun    return insert_table(cursor, "outhashes_v2", data, on_conflict)
146*4882a593Smuzhiyun
147*4882a593Smuzhiyunasync def copy_unihash_from_upstream(client, db, method, taskhash):
148*4882a593Smuzhiyun    d = await client.get_taskhash(method, taskhash)
149*4882a593Smuzhiyun    if d is not None:
150*4882a593Smuzhiyun        with closing(db.cursor()) as cursor:
151*4882a593Smuzhiyun            insert_unihash(
152*4882a593Smuzhiyun                cursor,
153*4882a593Smuzhiyun                {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS},
154*4882a593Smuzhiyun                Resolve.IGNORE,
155*4882a593Smuzhiyun            )
156*4882a593Smuzhiyun            db.commit()
157*4882a593Smuzhiyun    return d
158*4882a593Smuzhiyun
159*4882a593Smuzhiyun
160*4882a593Smuzhiyunclass ServerCursor(object):
161*4882a593Smuzhiyun    def __init__(self, db, cursor, upstream):
162*4882a593Smuzhiyun        self.db = db
163*4882a593Smuzhiyun        self.cursor = cursor
164*4882a593Smuzhiyun        self.upstream = upstream
165*4882a593Smuzhiyun
166*4882a593Smuzhiyun
167*4882a593Smuzhiyunclass ServerClient(bb.asyncrpc.AsyncServerConnection):
168*4882a593Smuzhiyun    def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only):
169*4882a593Smuzhiyun        super().__init__(reader, writer, 'OEHASHEQUIV', logger)
170*4882a593Smuzhiyun        self.db = db
171*4882a593Smuzhiyun        self.request_stats = request_stats
172*4882a593Smuzhiyun        self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK
173*4882a593Smuzhiyun        self.backfill_queue = backfill_queue
174*4882a593Smuzhiyun        self.upstream = upstream
175*4882a593Smuzhiyun
176*4882a593Smuzhiyun        self.handlers.update({
177*4882a593Smuzhiyun            'get': self.handle_get,
178*4882a593Smuzhiyun            'get-outhash': self.handle_get_outhash,
179*4882a593Smuzhiyun            'get-stream': self.handle_get_stream,
180*4882a593Smuzhiyun            'get-stats': self.handle_get_stats,
181*4882a593Smuzhiyun        })
182*4882a593Smuzhiyun
183*4882a593Smuzhiyun        if not read_only:
184*4882a593Smuzhiyun            self.handlers.update({
185*4882a593Smuzhiyun                'report': self.handle_report,
186*4882a593Smuzhiyun                'report-equiv': self.handle_equivreport,
187*4882a593Smuzhiyun                'reset-stats': self.handle_reset_stats,
188*4882a593Smuzhiyun                'backfill-wait': self.handle_backfill_wait,
189*4882a593Smuzhiyun            })
190*4882a593Smuzhiyun
191*4882a593Smuzhiyun    def validate_proto_version(self):
192*4882a593Smuzhiyun        return (self.proto_version > (1, 0) and self.proto_version <= (1, 1))
193*4882a593Smuzhiyun
194*4882a593Smuzhiyun    async def process_requests(self):
195*4882a593Smuzhiyun        if self.upstream is not None:
196*4882a593Smuzhiyun            self.upstream_client = await create_async_client(self.upstream)
197*4882a593Smuzhiyun        else:
198*4882a593Smuzhiyun            self.upstream_client = None
199*4882a593Smuzhiyun
200*4882a593Smuzhiyun        await super().process_requests()
201*4882a593Smuzhiyun
202*4882a593Smuzhiyun        if self.upstream_client is not None:
203*4882a593Smuzhiyun            await self.upstream_client.close()
204*4882a593Smuzhiyun
205*4882a593Smuzhiyun    async def dispatch_message(self, msg):
206*4882a593Smuzhiyun        for k in self.handlers.keys():
207*4882a593Smuzhiyun            if k in msg:
208*4882a593Smuzhiyun                logger.debug('Handling %s' % k)
209*4882a593Smuzhiyun                if 'stream' in k:
210*4882a593Smuzhiyun                    await self.handlers[k](msg[k])
211*4882a593Smuzhiyun                else:
212*4882a593Smuzhiyun                    with self.request_stats.start_sample() as self.request_sample, \
213*4882a593Smuzhiyun                            self.request_sample.measure():
214*4882a593Smuzhiyun                        await self.handlers[k](msg[k])
215*4882a593Smuzhiyun                return
216*4882a593Smuzhiyun
217*4882a593Smuzhiyun        raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg)
218*4882a593Smuzhiyun
219*4882a593Smuzhiyun    async def handle_get(self, request):
220*4882a593Smuzhiyun        method = request['method']
221*4882a593Smuzhiyun        taskhash = request['taskhash']
222*4882a593Smuzhiyun        fetch_all = request.get('all', False)
223*4882a593Smuzhiyun
224*4882a593Smuzhiyun        with closing(self.db.cursor()) as cursor:
225*4882a593Smuzhiyun            d = await self.get_unihash(cursor, method, taskhash, fetch_all)
226*4882a593Smuzhiyun
227*4882a593Smuzhiyun        self.write_message(d)
228*4882a593Smuzhiyun
229*4882a593Smuzhiyun    async def get_unihash(self, cursor, method, taskhash, fetch_all=False):
230*4882a593Smuzhiyun        d = None
231*4882a593Smuzhiyun
232*4882a593Smuzhiyun        if fetch_all:
233*4882a593Smuzhiyun            cursor.execute(
234*4882a593Smuzhiyun                '''
235*4882a593Smuzhiyun                SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2
236*4882a593Smuzhiyun                INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash
237*4882a593Smuzhiyun                WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash
238*4882a593Smuzhiyun                ORDER BY outhashes_v2.created ASC
239*4882a593Smuzhiyun                LIMIT 1
240*4882a593Smuzhiyun                ''',
241*4882a593Smuzhiyun                {
242*4882a593Smuzhiyun                    'method': method,
243*4882a593Smuzhiyun                    'taskhash': taskhash,
244*4882a593Smuzhiyun                }
245*4882a593Smuzhiyun
246*4882a593Smuzhiyun            )
247*4882a593Smuzhiyun            row = cursor.fetchone()
248*4882a593Smuzhiyun
249*4882a593Smuzhiyun            if row is not None:
250*4882a593Smuzhiyun                d = {k: row[k] for k in row.keys()}
251*4882a593Smuzhiyun            elif self.upstream_client is not None:
252*4882a593Smuzhiyun                d = await self.upstream_client.get_taskhash(method, taskhash, True)
253*4882a593Smuzhiyun                self.update_unified(cursor, d)
254*4882a593Smuzhiyun                self.db.commit()
255*4882a593Smuzhiyun        else:
256*4882a593Smuzhiyun            row = self.query_equivalent(cursor, method, taskhash)
257*4882a593Smuzhiyun
258*4882a593Smuzhiyun            if row is not None:
259*4882a593Smuzhiyun                d = {k: row[k] for k in row.keys()}
260*4882a593Smuzhiyun            elif self.upstream_client is not None:
261*4882a593Smuzhiyun                d = await self.upstream_client.get_taskhash(method, taskhash)
262*4882a593Smuzhiyun                d = {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS}
263*4882a593Smuzhiyun                insert_unihash(cursor, d, Resolve.IGNORE)
264*4882a593Smuzhiyun                self.db.commit()
265*4882a593Smuzhiyun
266*4882a593Smuzhiyun        return d
267*4882a593Smuzhiyun
268*4882a593Smuzhiyun    async def handle_get_outhash(self, request):
269*4882a593Smuzhiyun        method = request['method']
270*4882a593Smuzhiyun        outhash = request['outhash']
271*4882a593Smuzhiyun        taskhash = request['taskhash']
272*4882a593Smuzhiyun
273*4882a593Smuzhiyun        with closing(self.db.cursor()) as cursor:
274*4882a593Smuzhiyun            d = await self.get_outhash(cursor, method, outhash, taskhash)
275*4882a593Smuzhiyun
276*4882a593Smuzhiyun        self.write_message(d)
277*4882a593Smuzhiyun
278*4882a593Smuzhiyun    async def get_outhash(self, cursor, method, outhash, taskhash):
279*4882a593Smuzhiyun        d = None
280*4882a593Smuzhiyun        cursor.execute(
281*4882a593Smuzhiyun            '''
282*4882a593Smuzhiyun            SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2
283*4882a593Smuzhiyun            INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash
284*4882a593Smuzhiyun            WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash
285*4882a593Smuzhiyun            ORDER BY outhashes_v2.created ASC
286*4882a593Smuzhiyun            LIMIT 1
287*4882a593Smuzhiyun            ''',
288*4882a593Smuzhiyun            {
289*4882a593Smuzhiyun                'method': method,
290*4882a593Smuzhiyun                'outhash': outhash,
291*4882a593Smuzhiyun            }
292*4882a593Smuzhiyun        )
293*4882a593Smuzhiyun        row = cursor.fetchone()
294*4882a593Smuzhiyun
295*4882a593Smuzhiyun        if row is not None:
296*4882a593Smuzhiyun            d = {k: row[k] for k in row.keys()}
297*4882a593Smuzhiyun        elif self.upstream_client is not None:
298*4882a593Smuzhiyun            d = await self.upstream_client.get_outhash(method, outhash, taskhash)
299*4882a593Smuzhiyun            self.update_unified(cursor, d)
300*4882a593Smuzhiyun            self.db.commit()
301*4882a593Smuzhiyun
302*4882a593Smuzhiyun        return d
303*4882a593Smuzhiyun
304*4882a593Smuzhiyun    def update_unified(self, cursor, data):
305*4882a593Smuzhiyun        if data is None:
306*4882a593Smuzhiyun            return
307*4882a593Smuzhiyun
308*4882a593Smuzhiyun        insert_unihash(
309*4882a593Smuzhiyun            cursor,
310*4882a593Smuzhiyun            {k: v for k, v in data.items() if k in UNIHASH_TABLE_COLUMNS},
311*4882a593Smuzhiyun            Resolve.IGNORE
312*4882a593Smuzhiyun        )
313*4882a593Smuzhiyun        insert_outhash(
314*4882a593Smuzhiyun            cursor,
315*4882a593Smuzhiyun            {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS},
316*4882a593Smuzhiyun            Resolve.IGNORE
317*4882a593Smuzhiyun        )
318*4882a593Smuzhiyun
319*4882a593Smuzhiyun    async def handle_get_stream(self, request):
320*4882a593Smuzhiyun        self.write_message('ok')
321*4882a593Smuzhiyun
322*4882a593Smuzhiyun        while True:
323*4882a593Smuzhiyun            upstream = None
324*4882a593Smuzhiyun
325*4882a593Smuzhiyun            l = await self.reader.readline()
326*4882a593Smuzhiyun            if not l:
327*4882a593Smuzhiyun                return
328*4882a593Smuzhiyun
329*4882a593Smuzhiyun            try:
330*4882a593Smuzhiyun                # This inner loop is very sensitive and must be as fast as
331*4882a593Smuzhiyun                # possible (which is why the request sample is handled manually
332*4882a593Smuzhiyun                # instead of using 'with', and also why logging statements are
333*4882a593Smuzhiyun                # commented out.
334*4882a593Smuzhiyun                self.request_sample = self.request_stats.start_sample()
335*4882a593Smuzhiyun                request_measure = self.request_sample.measure()
336*4882a593Smuzhiyun                request_measure.start()
337*4882a593Smuzhiyun
338*4882a593Smuzhiyun                l = l.decode('utf-8').rstrip()
339*4882a593Smuzhiyun                if l == 'END':
340*4882a593Smuzhiyun                    self.writer.write('ok\n'.encode('utf-8'))
341*4882a593Smuzhiyun                    return
342*4882a593Smuzhiyun
343*4882a593Smuzhiyun                (method, taskhash) = l.split()
344*4882a593Smuzhiyun                #logger.debug('Looking up %s %s' % (method, taskhash))
345*4882a593Smuzhiyun                cursor = self.db.cursor()
346*4882a593Smuzhiyun                try:
347*4882a593Smuzhiyun                    row = self.query_equivalent(cursor, method, taskhash)
348*4882a593Smuzhiyun                finally:
349*4882a593Smuzhiyun                    cursor.close()
350*4882a593Smuzhiyun
351*4882a593Smuzhiyun                if row is not None:
352*4882a593Smuzhiyun                    msg = ('%s\n' % row['unihash']).encode('utf-8')
353*4882a593Smuzhiyun                    #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash']))
354*4882a593Smuzhiyun                elif self.upstream_client is not None:
355*4882a593Smuzhiyun                    upstream = await self.upstream_client.get_unihash(method, taskhash)
356*4882a593Smuzhiyun                    if upstream:
357*4882a593Smuzhiyun                        msg = ("%s\n" % upstream).encode("utf-8")
358*4882a593Smuzhiyun                    else:
359*4882a593Smuzhiyun                        msg = "\n".encode("utf-8")
360*4882a593Smuzhiyun                else:
361*4882a593Smuzhiyun                    msg = '\n'.encode('utf-8')
362*4882a593Smuzhiyun
363*4882a593Smuzhiyun                self.writer.write(msg)
364*4882a593Smuzhiyun            finally:
365*4882a593Smuzhiyun                request_measure.end()
366*4882a593Smuzhiyun                self.request_sample.end()
367*4882a593Smuzhiyun
368*4882a593Smuzhiyun            await self.writer.drain()
369*4882a593Smuzhiyun
370*4882a593Smuzhiyun            # Post to the backfill queue after writing the result to minimize
371*4882a593Smuzhiyun            # the turn around time on a request
372*4882a593Smuzhiyun            if upstream is not None:
373*4882a593Smuzhiyun                await self.backfill_queue.put((method, taskhash))
374*4882a593Smuzhiyun
375*4882a593Smuzhiyun    async def handle_report(self, data):
376*4882a593Smuzhiyun        with closing(self.db.cursor()) as cursor:
377*4882a593Smuzhiyun            outhash_data = {
378*4882a593Smuzhiyun                'method': data['method'],
379*4882a593Smuzhiyun                'outhash': data['outhash'],
380*4882a593Smuzhiyun                'taskhash': data['taskhash'],
381*4882a593Smuzhiyun                'created': datetime.now()
382*4882a593Smuzhiyun            }
383*4882a593Smuzhiyun
384*4882a593Smuzhiyun            for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'):
385*4882a593Smuzhiyun                if k in data:
386*4882a593Smuzhiyun                    outhash_data[k] = data[k]
387*4882a593Smuzhiyun
388*4882a593Smuzhiyun            # Insert the new entry, unless it already exists
389*4882a593Smuzhiyun            (rowid, inserted) = insert_outhash(cursor, outhash_data, Resolve.IGNORE)
390*4882a593Smuzhiyun
391*4882a593Smuzhiyun            if inserted:
392*4882a593Smuzhiyun                # If this row is new, check if it is equivalent to another
393*4882a593Smuzhiyun                # output hash
394*4882a593Smuzhiyun                cursor.execute(
395*4882a593Smuzhiyun                    '''
396*4882a593Smuzhiyun                    SELECT outhashes_v2.taskhash AS taskhash, unihashes_v2.unihash AS unihash FROM outhashes_v2
397*4882a593Smuzhiyun                    INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash
398*4882a593Smuzhiyun                    -- Select any matching output hash except the one we just inserted
399*4882a593Smuzhiyun                    WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash
400*4882a593Smuzhiyun                    -- Pick the oldest hash
401*4882a593Smuzhiyun                    ORDER BY outhashes_v2.created ASC
402*4882a593Smuzhiyun                    LIMIT 1
403*4882a593Smuzhiyun                    ''',
404*4882a593Smuzhiyun                    {
405*4882a593Smuzhiyun                        'method': data['method'],
406*4882a593Smuzhiyun                        'outhash': data['outhash'],
407*4882a593Smuzhiyun                        'taskhash': data['taskhash'],
408*4882a593Smuzhiyun                    }
409*4882a593Smuzhiyun                )
410*4882a593Smuzhiyun                row = cursor.fetchone()
411*4882a593Smuzhiyun
412*4882a593Smuzhiyun                if row is not None:
413*4882a593Smuzhiyun                    # A matching output hash was found. Set our taskhash to the
414*4882a593Smuzhiyun                    # same unihash since they are equivalent
415*4882a593Smuzhiyun                    unihash = row['unihash']
416*4882a593Smuzhiyun                    resolve = Resolve.IGNORE
417*4882a593Smuzhiyun                else:
418*4882a593Smuzhiyun                    # No matching output hash was found. This is probably the
419*4882a593Smuzhiyun                    # first outhash to be added.
420*4882a593Smuzhiyun                    unihash = data['unihash']
421*4882a593Smuzhiyun                    resolve = Resolve.IGNORE
422*4882a593Smuzhiyun
423*4882a593Smuzhiyun                    # Query upstream to see if it has a unihash we can use
424*4882a593Smuzhiyun                    if self.upstream_client is not None:
425*4882a593Smuzhiyun                        upstream_data = await self.upstream_client.get_outhash(data['method'], data['outhash'], data['taskhash'])
426*4882a593Smuzhiyun                        if upstream_data is not None:
427*4882a593Smuzhiyun                            unihash = upstream_data['unihash']
428*4882a593Smuzhiyun
429*4882a593Smuzhiyun
430*4882a593Smuzhiyun                insert_unihash(
431*4882a593Smuzhiyun                    cursor,
432*4882a593Smuzhiyun                    {
433*4882a593Smuzhiyun                        'method': data['method'],
434*4882a593Smuzhiyun                        'taskhash': data['taskhash'],
435*4882a593Smuzhiyun                        'unihash': unihash,
436*4882a593Smuzhiyun                    },
437*4882a593Smuzhiyun                    resolve
438*4882a593Smuzhiyun                )
439*4882a593Smuzhiyun
440*4882a593Smuzhiyun            unihash_data = await self.get_unihash(cursor, data['method'], data['taskhash'])
441*4882a593Smuzhiyun            if unihash_data is not None:
442*4882a593Smuzhiyun                unihash = unihash_data['unihash']
443*4882a593Smuzhiyun            else:
444*4882a593Smuzhiyun                unihash = data['unihash']
445*4882a593Smuzhiyun
446*4882a593Smuzhiyun            self.db.commit()
447*4882a593Smuzhiyun
448*4882a593Smuzhiyun            d = {
449*4882a593Smuzhiyun                'taskhash': data['taskhash'],
450*4882a593Smuzhiyun                'method': data['method'],
451*4882a593Smuzhiyun                'unihash': unihash,
452*4882a593Smuzhiyun            }
453*4882a593Smuzhiyun
454*4882a593Smuzhiyun        self.write_message(d)
455*4882a593Smuzhiyun
456*4882a593Smuzhiyun    async def handle_equivreport(self, data):
457*4882a593Smuzhiyun        with closing(self.db.cursor()) as cursor:
458*4882a593Smuzhiyun            insert_data = {
459*4882a593Smuzhiyun                'method': data['method'],
460*4882a593Smuzhiyun                'taskhash': data['taskhash'],
461*4882a593Smuzhiyun                'unihash': data['unihash'],
462*4882a593Smuzhiyun            }
463*4882a593Smuzhiyun            insert_unihash(cursor, insert_data, Resolve.IGNORE)
464*4882a593Smuzhiyun            self.db.commit()
465*4882a593Smuzhiyun
466*4882a593Smuzhiyun            # Fetch the unihash that will be reported for the taskhash. If the
467*4882a593Smuzhiyun            # unihash matches, it means this row was inserted (or the mapping
468*4882a593Smuzhiyun            # was already valid)
469*4882a593Smuzhiyun            row = self.query_equivalent(cursor, data['method'], data['taskhash'])
470*4882a593Smuzhiyun
471*4882a593Smuzhiyun            if row['unihash'] == data['unihash']:
472*4882a593Smuzhiyun                logger.info('Adding taskhash equivalence for %s with unihash %s',
473*4882a593Smuzhiyun                                data['taskhash'], row['unihash'])
474*4882a593Smuzhiyun
475*4882a593Smuzhiyun            d = {k: row[k] for k in ('taskhash', 'method', 'unihash')}
476*4882a593Smuzhiyun
477*4882a593Smuzhiyun        self.write_message(d)
478*4882a593Smuzhiyun
479*4882a593Smuzhiyun
480*4882a593Smuzhiyun    async def handle_get_stats(self, request):
481*4882a593Smuzhiyun        d = {
482*4882a593Smuzhiyun            'requests': self.request_stats.todict(),
483*4882a593Smuzhiyun        }
484*4882a593Smuzhiyun
485*4882a593Smuzhiyun        self.write_message(d)
486*4882a593Smuzhiyun
487*4882a593Smuzhiyun    async def handle_reset_stats(self, request):
488*4882a593Smuzhiyun        d = {
489*4882a593Smuzhiyun            'requests': self.request_stats.todict(),
490*4882a593Smuzhiyun        }
491*4882a593Smuzhiyun
492*4882a593Smuzhiyun        self.request_stats.reset()
493*4882a593Smuzhiyun        self.write_message(d)
494*4882a593Smuzhiyun
495*4882a593Smuzhiyun    async def handle_backfill_wait(self, request):
496*4882a593Smuzhiyun        d = {
497*4882a593Smuzhiyun            'tasks': self.backfill_queue.qsize(),
498*4882a593Smuzhiyun        }
499*4882a593Smuzhiyun        await self.backfill_queue.join()
500*4882a593Smuzhiyun        self.write_message(d)
501*4882a593Smuzhiyun
502*4882a593Smuzhiyun    def query_equivalent(self, cursor, method, taskhash):
503*4882a593Smuzhiyun        # This is part of the inner loop and must be as fast as possible
504*4882a593Smuzhiyun        cursor.execute(
505*4882a593Smuzhiyun            'SELECT taskhash, method, unihash FROM unihashes_v2 WHERE method=:method AND taskhash=:taskhash',
506*4882a593Smuzhiyun            {
507*4882a593Smuzhiyun                'method': method,
508*4882a593Smuzhiyun                'taskhash': taskhash,
509*4882a593Smuzhiyun            }
510*4882a593Smuzhiyun        )
511*4882a593Smuzhiyun        return cursor.fetchone()
512*4882a593Smuzhiyun
513*4882a593Smuzhiyun
514*4882a593Smuzhiyunclass Server(bb.asyncrpc.AsyncServer):
515*4882a593Smuzhiyun    def __init__(self, db, upstream=None, read_only=False):
516*4882a593Smuzhiyun        if upstream and read_only:
517*4882a593Smuzhiyun            raise bb.asyncrpc.ServerError("Read-only hashserv cannot pull from an upstream server")
518*4882a593Smuzhiyun
519*4882a593Smuzhiyun        super().__init__(logger)
520*4882a593Smuzhiyun
521*4882a593Smuzhiyun        self.request_stats = Stats()
522*4882a593Smuzhiyun        self.db = db
523*4882a593Smuzhiyun        self.upstream = upstream
524*4882a593Smuzhiyun        self.read_only = read_only
525*4882a593Smuzhiyun
526*4882a593Smuzhiyun    def accept_client(self, reader, writer):
527*4882a593Smuzhiyun        return ServerClient(reader, writer, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only)
528*4882a593Smuzhiyun
529*4882a593Smuzhiyun    @contextmanager
530*4882a593Smuzhiyun    def _backfill_worker(self):
531*4882a593Smuzhiyun        async def backfill_worker_task():
532*4882a593Smuzhiyun            client = await create_async_client(self.upstream)
533*4882a593Smuzhiyun            try:
534*4882a593Smuzhiyun                while True:
535*4882a593Smuzhiyun                    item = await self.backfill_queue.get()
536*4882a593Smuzhiyun                    if item is None:
537*4882a593Smuzhiyun                        self.backfill_queue.task_done()
538*4882a593Smuzhiyun                        break
539*4882a593Smuzhiyun                    method, taskhash = item
540*4882a593Smuzhiyun                    await copy_unihash_from_upstream(client, self.db, method, taskhash)
541*4882a593Smuzhiyun                    self.backfill_queue.task_done()
542*4882a593Smuzhiyun            finally:
543*4882a593Smuzhiyun                await client.close()
544*4882a593Smuzhiyun
545*4882a593Smuzhiyun        async def join_worker(worker):
546*4882a593Smuzhiyun            await self.backfill_queue.put(None)
547*4882a593Smuzhiyun            await worker
548*4882a593Smuzhiyun
549*4882a593Smuzhiyun        if self.upstream is not None:
550*4882a593Smuzhiyun            worker = asyncio.ensure_future(backfill_worker_task())
551*4882a593Smuzhiyun            try:
552*4882a593Smuzhiyun                yield
553*4882a593Smuzhiyun            finally:
554*4882a593Smuzhiyun                self.loop.run_until_complete(join_worker(worker))
555*4882a593Smuzhiyun        else:
556*4882a593Smuzhiyun            yield
557*4882a593Smuzhiyun
558*4882a593Smuzhiyun    def run_loop_forever(self):
559*4882a593Smuzhiyun        self.backfill_queue = asyncio.Queue()
560*4882a593Smuzhiyun
561*4882a593Smuzhiyun        with self._backfill_worker():
562*4882a593Smuzhiyun            super().run_loop_forever()
563