1*4882a593Smuzhiyun# Copyright (C) 2019 Garmin Ltd. 2*4882a593Smuzhiyun# 3*4882a593Smuzhiyun# SPDX-License-Identifier: GPL-2.0-only 4*4882a593Smuzhiyun# 5*4882a593Smuzhiyun 6*4882a593Smuzhiyunfrom contextlib import closing, contextmanager 7*4882a593Smuzhiyunfrom datetime import datetime 8*4882a593Smuzhiyunimport enum 9*4882a593Smuzhiyunimport asyncio 10*4882a593Smuzhiyunimport logging 11*4882a593Smuzhiyunimport math 12*4882a593Smuzhiyunimport time 13*4882a593Smuzhiyunfrom . import create_async_client, UNIHASH_TABLE_COLUMNS, OUTHASH_TABLE_COLUMNS 14*4882a593Smuzhiyunimport bb.asyncrpc 15*4882a593Smuzhiyun 16*4882a593Smuzhiyun 17*4882a593Smuzhiyunlogger = logging.getLogger('hashserv.server') 18*4882a593Smuzhiyun 19*4882a593Smuzhiyun 20*4882a593Smuzhiyunclass Measurement(object): 21*4882a593Smuzhiyun def __init__(self, sample): 22*4882a593Smuzhiyun self.sample = sample 23*4882a593Smuzhiyun 24*4882a593Smuzhiyun def start(self): 25*4882a593Smuzhiyun self.start_time = time.perf_counter() 26*4882a593Smuzhiyun 27*4882a593Smuzhiyun def end(self): 28*4882a593Smuzhiyun self.sample.add(time.perf_counter() - self.start_time) 29*4882a593Smuzhiyun 30*4882a593Smuzhiyun def __enter__(self): 31*4882a593Smuzhiyun self.start() 32*4882a593Smuzhiyun return self 33*4882a593Smuzhiyun 34*4882a593Smuzhiyun def __exit__(self, *args, **kwargs): 35*4882a593Smuzhiyun self.end() 36*4882a593Smuzhiyun 37*4882a593Smuzhiyun 38*4882a593Smuzhiyunclass Sample(object): 39*4882a593Smuzhiyun def __init__(self, stats): 40*4882a593Smuzhiyun self.stats = stats 41*4882a593Smuzhiyun self.num_samples = 0 42*4882a593Smuzhiyun self.elapsed = 0 43*4882a593Smuzhiyun 44*4882a593Smuzhiyun def measure(self): 45*4882a593Smuzhiyun return Measurement(self) 46*4882a593Smuzhiyun 47*4882a593Smuzhiyun def __enter__(self): 48*4882a593Smuzhiyun return self 49*4882a593Smuzhiyun 50*4882a593Smuzhiyun def __exit__(self, *args, **kwargs): 51*4882a593Smuzhiyun self.end() 52*4882a593Smuzhiyun 53*4882a593Smuzhiyun def add(self, elapsed): 54*4882a593Smuzhiyun self.num_samples += 1 55*4882a593Smuzhiyun self.elapsed += elapsed 56*4882a593Smuzhiyun 57*4882a593Smuzhiyun def end(self): 58*4882a593Smuzhiyun if self.num_samples: 59*4882a593Smuzhiyun self.stats.add(self.elapsed) 60*4882a593Smuzhiyun self.num_samples = 0 61*4882a593Smuzhiyun self.elapsed = 0 62*4882a593Smuzhiyun 63*4882a593Smuzhiyun 64*4882a593Smuzhiyunclass Stats(object): 65*4882a593Smuzhiyun def __init__(self): 66*4882a593Smuzhiyun self.reset() 67*4882a593Smuzhiyun 68*4882a593Smuzhiyun def reset(self): 69*4882a593Smuzhiyun self.num = 0 70*4882a593Smuzhiyun self.total_time = 0 71*4882a593Smuzhiyun self.max_time = 0 72*4882a593Smuzhiyun self.m = 0 73*4882a593Smuzhiyun self.s = 0 74*4882a593Smuzhiyun self.current_elapsed = None 75*4882a593Smuzhiyun 76*4882a593Smuzhiyun def add(self, elapsed): 77*4882a593Smuzhiyun self.num += 1 78*4882a593Smuzhiyun if self.num == 1: 79*4882a593Smuzhiyun self.m = elapsed 80*4882a593Smuzhiyun self.s = 0 81*4882a593Smuzhiyun else: 82*4882a593Smuzhiyun last_m = self.m 83*4882a593Smuzhiyun self.m = last_m + (elapsed - last_m) / self.num 84*4882a593Smuzhiyun self.s = self.s + (elapsed - last_m) * (elapsed - self.m) 85*4882a593Smuzhiyun 86*4882a593Smuzhiyun self.total_time += elapsed 87*4882a593Smuzhiyun 88*4882a593Smuzhiyun if self.max_time < elapsed: 89*4882a593Smuzhiyun self.max_time = elapsed 90*4882a593Smuzhiyun 91*4882a593Smuzhiyun def start_sample(self): 92*4882a593Smuzhiyun return Sample(self) 93*4882a593Smuzhiyun 94*4882a593Smuzhiyun @property 95*4882a593Smuzhiyun def average(self): 96*4882a593Smuzhiyun if self.num == 0: 97*4882a593Smuzhiyun return 0 98*4882a593Smuzhiyun return self.total_time / self.num 99*4882a593Smuzhiyun 100*4882a593Smuzhiyun @property 101*4882a593Smuzhiyun def stdev(self): 102*4882a593Smuzhiyun if self.num <= 1: 103*4882a593Smuzhiyun return 0 104*4882a593Smuzhiyun return math.sqrt(self.s / (self.num - 1)) 105*4882a593Smuzhiyun 106*4882a593Smuzhiyun def todict(self): 107*4882a593Smuzhiyun return {k: getattr(self, k) for k in ('num', 'total_time', 'max_time', 'average', 'stdev')} 108*4882a593Smuzhiyun 109*4882a593Smuzhiyun 110*4882a593Smuzhiyun@enum.unique 111*4882a593Smuzhiyunclass Resolve(enum.Enum): 112*4882a593Smuzhiyun FAIL = enum.auto() 113*4882a593Smuzhiyun IGNORE = enum.auto() 114*4882a593Smuzhiyun REPLACE = enum.auto() 115*4882a593Smuzhiyun 116*4882a593Smuzhiyun 117*4882a593Smuzhiyundef insert_table(cursor, table, data, on_conflict): 118*4882a593Smuzhiyun resolve = { 119*4882a593Smuzhiyun Resolve.FAIL: "", 120*4882a593Smuzhiyun Resolve.IGNORE: " OR IGNORE", 121*4882a593Smuzhiyun Resolve.REPLACE: " OR REPLACE", 122*4882a593Smuzhiyun }[on_conflict] 123*4882a593Smuzhiyun 124*4882a593Smuzhiyun keys = sorted(data.keys()) 125*4882a593Smuzhiyun query = 'INSERT{resolve} INTO {table} ({fields}) VALUES({values})'.format( 126*4882a593Smuzhiyun resolve=resolve, 127*4882a593Smuzhiyun table=table, 128*4882a593Smuzhiyun fields=", ".join(keys), 129*4882a593Smuzhiyun values=", ".join(":" + k for k in keys), 130*4882a593Smuzhiyun ) 131*4882a593Smuzhiyun prevrowid = cursor.lastrowid 132*4882a593Smuzhiyun cursor.execute(query, data) 133*4882a593Smuzhiyun logging.debug( 134*4882a593Smuzhiyun "Inserting %r into %s, %s", 135*4882a593Smuzhiyun data, 136*4882a593Smuzhiyun table, 137*4882a593Smuzhiyun on_conflict 138*4882a593Smuzhiyun ) 139*4882a593Smuzhiyun return (cursor.lastrowid, cursor.lastrowid != prevrowid) 140*4882a593Smuzhiyun 141*4882a593Smuzhiyundef insert_unihash(cursor, data, on_conflict): 142*4882a593Smuzhiyun return insert_table(cursor, "unihashes_v2", data, on_conflict) 143*4882a593Smuzhiyun 144*4882a593Smuzhiyundef insert_outhash(cursor, data, on_conflict): 145*4882a593Smuzhiyun return insert_table(cursor, "outhashes_v2", data, on_conflict) 146*4882a593Smuzhiyun 147*4882a593Smuzhiyunasync def copy_unihash_from_upstream(client, db, method, taskhash): 148*4882a593Smuzhiyun d = await client.get_taskhash(method, taskhash) 149*4882a593Smuzhiyun if d is not None: 150*4882a593Smuzhiyun with closing(db.cursor()) as cursor: 151*4882a593Smuzhiyun insert_unihash( 152*4882a593Smuzhiyun cursor, 153*4882a593Smuzhiyun {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS}, 154*4882a593Smuzhiyun Resolve.IGNORE, 155*4882a593Smuzhiyun ) 156*4882a593Smuzhiyun db.commit() 157*4882a593Smuzhiyun return d 158*4882a593Smuzhiyun 159*4882a593Smuzhiyun 160*4882a593Smuzhiyunclass ServerCursor(object): 161*4882a593Smuzhiyun def __init__(self, db, cursor, upstream): 162*4882a593Smuzhiyun self.db = db 163*4882a593Smuzhiyun self.cursor = cursor 164*4882a593Smuzhiyun self.upstream = upstream 165*4882a593Smuzhiyun 166*4882a593Smuzhiyun 167*4882a593Smuzhiyunclass ServerClient(bb.asyncrpc.AsyncServerConnection): 168*4882a593Smuzhiyun def __init__(self, reader, writer, db, request_stats, backfill_queue, upstream, read_only): 169*4882a593Smuzhiyun super().__init__(reader, writer, 'OEHASHEQUIV', logger) 170*4882a593Smuzhiyun self.db = db 171*4882a593Smuzhiyun self.request_stats = request_stats 172*4882a593Smuzhiyun self.max_chunk = bb.asyncrpc.DEFAULT_MAX_CHUNK 173*4882a593Smuzhiyun self.backfill_queue = backfill_queue 174*4882a593Smuzhiyun self.upstream = upstream 175*4882a593Smuzhiyun 176*4882a593Smuzhiyun self.handlers.update({ 177*4882a593Smuzhiyun 'get': self.handle_get, 178*4882a593Smuzhiyun 'get-outhash': self.handle_get_outhash, 179*4882a593Smuzhiyun 'get-stream': self.handle_get_stream, 180*4882a593Smuzhiyun 'get-stats': self.handle_get_stats, 181*4882a593Smuzhiyun }) 182*4882a593Smuzhiyun 183*4882a593Smuzhiyun if not read_only: 184*4882a593Smuzhiyun self.handlers.update({ 185*4882a593Smuzhiyun 'report': self.handle_report, 186*4882a593Smuzhiyun 'report-equiv': self.handle_equivreport, 187*4882a593Smuzhiyun 'reset-stats': self.handle_reset_stats, 188*4882a593Smuzhiyun 'backfill-wait': self.handle_backfill_wait, 189*4882a593Smuzhiyun }) 190*4882a593Smuzhiyun 191*4882a593Smuzhiyun def validate_proto_version(self): 192*4882a593Smuzhiyun return (self.proto_version > (1, 0) and self.proto_version <= (1, 1)) 193*4882a593Smuzhiyun 194*4882a593Smuzhiyun async def process_requests(self): 195*4882a593Smuzhiyun if self.upstream is not None: 196*4882a593Smuzhiyun self.upstream_client = await create_async_client(self.upstream) 197*4882a593Smuzhiyun else: 198*4882a593Smuzhiyun self.upstream_client = None 199*4882a593Smuzhiyun 200*4882a593Smuzhiyun await super().process_requests() 201*4882a593Smuzhiyun 202*4882a593Smuzhiyun if self.upstream_client is not None: 203*4882a593Smuzhiyun await self.upstream_client.close() 204*4882a593Smuzhiyun 205*4882a593Smuzhiyun async def dispatch_message(self, msg): 206*4882a593Smuzhiyun for k in self.handlers.keys(): 207*4882a593Smuzhiyun if k in msg: 208*4882a593Smuzhiyun logger.debug('Handling %s' % k) 209*4882a593Smuzhiyun if 'stream' in k: 210*4882a593Smuzhiyun await self.handlers[k](msg[k]) 211*4882a593Smuzhiyun else: 212*4882a593Smuzhiyun with self.request_stats.start_sample() as self.request_sample, \ 213*4882a593Smuzhiyun self.request_sample.measure(): 214*4882a593Smuzhiyun await self.handlers[k](msg[k]) 215*4882a593Smuzhiyun return 216*4882a593Smuzhiyun 217*4882a593Smuzhiyun raise bb.asyncrpc.ClientError("Unrecognized command %r" % msg) 218*4882a593Smuzhiyun 219*4882a593Smuzhiyun async def handle_get(self, request): 220*4882a593Smuzhiyun method = request['method'] 221*4882a593Smuzhiyun taskhash = request['taskhash'] 222*4882a593Smuzhiyun fetch_all = request.get('all', False) 223*4882a593Smuzhiyun 224*4882a593Smuzhiyun with closing(self.db.cursor()) as cursor: 225*4882a593Smuzhiyun d = await self.get_unihash(cursor, method, taskhash, fetch_all) 226*4882a593Smuzhiyun 227*4882a593Smuzhiyun self.write_message(d) 228*4882a593Smuzhiyun 229*4882a593Smuzhiyun async def get_unihash(self, cursor, method, taskhash, fetch_all=False): 230*4882a593Smuzhiyun d = None 231*4882a593Smuzhiyun 232*4882a593Smuzhiyun if fetch_all: 233*4882a593Smuzhiyun cursor.execute( 234*4882a593Smuzhiyun ''' 235*4882a593Smuzhiyun SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 236*4882a593Smuzhiyun INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash 237*4882a593Smuzhiyun WHERE outhashes_v2.method=:method AND outhashes_v2.taskhash=:taskhash 238*4882a593Smuzhiyun ORDER BY outhashes_v2.created ASC 239*4882a593Smuzhiyun LIMIT 1 240*4882a593Smuzhiyun ''', 241*4882a593Smuzhiyun { 242*4882a593Smuzhiyun 'method': method, 243*4882a593Smuzhiyun 'taskhash': taskhash, 244*4882a593Smuzhiyun } 245*4882a593Smuzhiyun 246*4882a593Smuzhiyun ) 247*4882a593Smuzhiyun row = cursor.fetchone() 248*4882a593Smuzhiyun 249*4882a593Smuzhiyun if row is not None: 250*4882a593Smuzhiyun d = {k: row[k] for k in row.keys()} 251*4882a593Smuzhiyun elif self.upstream_client is not None: 252*4882a593Smuzhiyun d = await self.upstream_client.get_taskhash(method, taskhash, True) 253*4882a593Smuzhiyun self.update_unified(cursor, d) 254*4882a593Smuzhiyun self.db.commit() 255*4882a593Smuzhiyun else: 256*4882a593Smuzhiyun row = self.query_equivalent(cursor, method, taskhash) 257*4882a593Smuzhiyun 258*4882a593Smuzhiyun if row is not None: 259*4882a593Smuzhiyun d = {k: row[k] for k in row.keys()} 260*4882a593Smuzhiyun elif self.upstream_client is not None: 261*4882a593Smuzhiyun d = await self.upstream_client.get_taskhash(method, taskhash) 262*4882a593Smuzhiyun d = {k: v for k, v in d.items() if k in UNIHASH_TABLE_COLUMNS} 263*4882a593Smuzhiyun insert_unihash(cursor, d, Resolve.IGNORE) 264*4882a593Smuzhiyun self.db.commit() 265*4882a593Smuzhiyun 266*4882a593Smuzhiyun return d 267*4882a593Smuzhiyun 268*4882a593Smuzhiyun async def handle_get_outhash(self, request): 269*4882a593Smuzhiyun method = request['method'] 270*4882a593Smuzhiyun outhash = request['outhash'] 271*4882a593Smuzhiyun taskhash = request['taskhash'] 272*4882a593Smuzhiyun 273*4882a593Smuzhiyun with closing(self.db.cursor()) as cursor: 274*4882a593Smuzhiyun d = await self.get_outhash(cursor, method, outhash, taskhash) 275*4882a593Smuzhiyun 276*4882a593Smuzhiyun self.write_message(d) 277*4882a593Smuzhiyun 278*4882a593Smuzhiyun async def get_outhash(self, cursor, method, outhash, taskhash): 279*4882a593Smuzhiyun d = None 280*4882a593Smuzhiyun cursor.execute( 281*4882a593Smuzhiyun ''' 282*4882a593Smuzhiyun SELECT *, unihashes_v2.unihash AS unihash FROM outhashes_v2 283*4882a593Smuzhiyun INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash 284*4882a593Smuzhiyun WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash 285*4882a593Smuzhiyun ORDER BY outhashes_v2.created ASC 286*4882a593Smuzhiyun LIMIT 1 287*4882a593Smuzhiyun ''', 288*4882a593Smuzhiyun { 289*4882a593Smuzhiyun 'method': method, 290*4882a593Smuzhiyun 'outhash': outhash, 291*4882a593Smuzhiyun } 292*4882a593Smuzhiyun ) 293*4882a593Smuzhiyun row = cursor.fetchone() 294*4882a593Smuzhiyun 295*4882a593Smuzhiyun if row is not None: 296*4882a593Smuzhiyun d = {k: row[k] for k in row.keys()} 297*4882a593Smuzhiyun elif self.upstream_client is not None: 298*4882a593Smuzhiyun d = await self.upstream_client.get_outhash(method, outhash, taskhash) 299*4882a593Smuzhiyun self.update_unified(cursor, d) 300*4882a593Smuzhiyun self.db.commit() 301*4882a593Smuzhiyun 302*4882a593Smuzhiyun return d 303*4882a593Smuzhiyun 304*4882a593Smuzhiyun def update_unified(self, cursor, data): 305*4882a593Smuzhiyun if data is None: 306*4882a593Smuzhiyun return 307*4882a593Smuzhiyun 308*4882a593Smuzhiyun insert_unihash( 309*4882a593Smuzhiyun cursor, 310*4882a593Smuzhiyun {k: v for k, v in data.items() if k in UNIHASH_TABLE_COLUMNS}, 311*4882a593Smuzhiyun Resolve.IGNORE 312*4882a593Smuzhiyun ) 313*4882a593Smuzhiyun insert_outhash( 314*4882a593Smuzhiyun cursor, 315*4882a593Smuzhiyun {k: v for k, v in data.items() if k in OUTHASH_TABLE_COLUMNS}, 316*4882a593Smuzhiyun Resolve.IGNORE 317*4882a593Smuzhiyun ) 318*4882a593Smuzhiyun 319*4882a593Smuzhiyun async def handle_get_stream(self, request): 320*4882a593Smuzhiyun self.write_message('ok') 321*4882a593Smuzhiyun 322*4882a593Smuzhiyun while True: 323*4882a593Smuzhiyun upstream = None 324*4882a593Smuzhiyun 325*4882a593Smuzhiyun l = await self.reader.readline() 326*4882a593Smuzhiyun if not l: 327*4882a593Smuzhiyun return 328*4882a593Smuzhiyun 329*4882a593Smuzhiyun try: 330*4882a593Smuzhiyun # This inner loop is very sensitive and must be as fast as 331*4882a593Smuzhiyun # possible (which is why the request sample is handled manually 332*4882a593Smuzhiyun # instead of using 'with', and also why logging statements are 333*4882a593Smuzhiyun # commented out. 334*4882a593Smuzhiyun self.request_sample = self.request_stats.start_sample() 335*4882a593Smuzhiyun request_measure = self.request_sample.measure() 336*4882a593Smuzhiyun request_measure.start() 337*4882a593Smuzhiyun 338*4882a593Smuzhiyun l = l.decode('utf-8').rstrip() 339*4882a593Smuzhiyun if l == 'END': 340*4882a593Smuzhiyun self.writer.write('ok\n'.encode('utf-8')) 341*4882a593Smuzhiyun return 342*4882a593Smuzhiyun 343*4882a593Smuzhiyun (method, taskhash) = l.split() 344*4882a593Smuzhiyun #logger.debug('Looking up %s %s' % (method, taskhash)) 345*4882a593Smuzhiyun cursor = self.db.cursor() 346*4882a593Smuzhiyun try: 347*4882a593Smuzhiyun row = self.query_equivalent(cursor, method, taskhash) 348*4882a593Smuzhiyun finally: 349*4882a593Smuzhiyun cursor.close() 350*4882a593Smuzhiyun 351*4882a593Smuzhiyun if row is not None: 352*4882a593Smuzhiyun msg = ('%s\n' % row['unihash']).encode('utf-8') 353*4882a593Smuzhiyun #logger.debug('Found equivalent task %s -> %s', (row['taskhash'], row['unihash'])) 354*4882a593Smuzhiyun elif self.upstream_client is not None: 355*4882a593Smuzhiyun upstream = await self.upstream_client.get_unihash(method, taskhash) 356*4882a593Smuzhiyun if upstream: 357*4882a593Smuzhiyun msg = ("%s\n" % upstream).encode("utf-8") 358*4882a593Smuzhiyun else: 359*4882a593Smuzhiyun msg = "\n".encode("utf-8") 360*4882a593Smuzhiyun else: 361*4882a593Smuzhiyun msg = '\n'.encode('utf-8') 362*4882a593Smuzhiyun 363*4882a593Smuzhiyun self.writer.write(msg) 364*4882a593Smuzhiyun finally: 365*4882a593Smuzhiyun request_measure.end() 366*4882a593Smuzhiyun self.request_sample.end() 367*4882a593Smuzhiyun 368*4882a593Smuzhiyun await self.writer.drain() 369*4882a593Smuzhiyun 370*4882a593Smuzhiyun # Post to the backfill queue after writing the result to minimize 371*4882a593Smuzhiyun # the turn around time on a request 372*4882a593Smuzhiyun if upstream is not None: 373*4882a593Smuzhiyun await self.backfill_queue.put((method, taskhash)) 374*4882a593Smuzhiyun 375*4882a593Smuzhiyun async def handle_report(self, data): 376*4882a593Smuzhiyun with closing(self.db.cursor()) as cursor: 377*4882a593Smuzhiyun outhash_data = { 378*4882a593Smuzhiyun 'method': data['method'], 379*4882a593Smuzhiyun 'outhash': data['outhash'], 380*4882a593Smuzhiyun 'taskhash': data['taskhash'], 381*4882a593Smuzhiyun 'created': datetime.now() 382*4882a593Smuzhiyun } 383*4882a593Smuzhiyun 384*4882a593Smuzhiyun for k in ('owner', 'PN', 'PV', 'PR', 'task', 'outhash_siginfo'): 385*4882a593Smuzhiyun if k in data: 386*4882a593Smuzhiyun outhash_data[k] = data[k] 387*4882a593Smuzhiyun 388*4882a593Smuzhiyun # Insert the new entry, unless it already exists 389*4882a593Smuzhiyun (rowid, inserted) = insert_outhash(cursor, outhash_data, Resolve.IGNORE) 390*4882a593Smuzhiyun 391*4882a593Smuzhiyun if inserted: 392*4882a593Smuzhiyun # If this row is new, check if it is equivalent to another 393*4882a593Smuzhiyun # output hash 394*4882a593Smuzhiyun cursor.execute( 395*4882a593Smuzhiyun ''' 396*4882a593Smuzhiyun SELECT outhashes_v2.taskhash AS taskhash, unihashes_v2.unihash AS unihash FROM outhashes_v2 397*4882a593Smuzhiyun INNER JOIN unihashes_v2 ON unihashes_v2.method=outhashes_v2.method AND unihashes_v2.taskhash=outhashes_v2.taskhash 398*4882a593Smuzhiyun -- Select any matching output hash except the one we just inserted 399*4882a593Smuzhiyun WHERE outhashes_v2.method=:method AND outhashes_v2.outhash=:outhash AND outhashes_v2.taskhash!=:taskhash 400*4882a593Smuzhiyun -- Pick the oldest hash 401*4882a593Smuzhiyun ORDER BY outhashes_v2.created ASC 402*4882a593Smuzhiyun LIMIT 1 403*4882a593Smuzhiyun ''', 404*4882a593Smuzhiyun { 405*4882a593Smuzhiyun 'method': data['method'], 406*4882a593Smuzhiyun 'outhash': data['outhash'], 407*4882a593Smuzhiyun 'taskhash': data['taskhash'], 408*4882a593Smuzhiyun } 409*4882a593Smuzhiyun ) 410*4882a593Smuzhiyun row = cursor.fetchone() 411*4882a593Smuzhiyun 412*4882a593Smuzhiyun if row is not None: 413*4882a593Smuzhiyun # A matching output hash was found. Set our taskhash to the 414*4882a593Smuzhiyun # same unihash since they are equivalent 415*4882a593Smuzhiyun unihash = row['unihash'] 416*4882a593Smuzhiyun resolve = Resolve.IGNORE 417*4882a593Smuzhiyun else: 418*4882a593Smuzhiyun # No matching output hash was found. This is probably the 419*4882a593Smuzhiyun # first outhash to be added. 420*4882a593Smuzhiyun unihash = data['unihash'] 421*4882a593Smuzhiyun resolve = Resolve.IGNORE 422*4882a593Smuzhiyun 423*4882a593Smuzhiyun # Query upstream to see if it has a unihash we can use 424*4882a593Smuzhiyun if self.upstream_client is not None: 425*4882a593Smuzhiyun upstream_data = await self.upstream_client.get_outhash(data['method'], data['outhash'], data['taskhash']) 426*4882a593Smuzhiyun if upstream_data is not None: 427*4882a593Smuzhiyun unihash = upstream_data['unihash'] 428*4882a593Smuzhiyun 429*4882a593Smuzhiyun 430*4882a593Smuzhiyun insert_unihash( 431*4882a593Smuzhiyun cursor, 432*4882a593Smuzhiyun { 433*4882a593Smuzhiyun 'method': data['method'], 434*4882a593Smuzhiyun 'taskhash': data['taskhash'], 435*4882a593Smuzhiyun 'unihash': unihash, 436*4882a593Smuzhiyun }, 437*4882a593Smuzhiyun resolve 438*4882a593Smuzhiyun ) 439*4882a593Smuzhiyun 440*4882a593Smuzhiyun unihash_data = await self.get_unihash(cursor, data['method'], data['taskhash']) 441*4882a593Smuzhiyun if unihash_data is not None: 442*4882a593Smuzhiyun unihash = unihash_data['unihash'] 443*4882a593Smuzhiyun else: 444*4882a593Smuzhiyun unihash = data['unihash'] 445*4882a593Smuzhiyun 446*4882a593Smuzhiyun self.db.commit() 447*4882a593Smuzhiyun 448*4882a593Smuzhiyun d = { 449*4882a593Smuzhiyun 'taskhash': data['taskhash'], 450*4882a593Smuzhiyun 'method': data['method'], 451*4882a593Smuzhiyun 'unihash': unihash, 452*4882a593Smuzhiyun } 453*4882a593Smuzhiyun 454*4882a593Smuzhiyun self.write_message(d) 455*4882a593Smuzhiyun 456*4882a593Smuzhiyun async def handle_equivreport(self, data): 457*4882a593Smuzhiyun with closing(self.db.cursor()) as cursor: 458*4882a593Smuzhiyun insert_data = { 459*4882a593Smuzhiyun 'method': data['method'], 460*4882a593Smuzhiyun 'taskhash': data['taskhash'], 461*4882a593Smuzhiyun 'unihash': data['unihash'], 462*4882a593Smuzhiyun } 463*4882a593Smuzhiyun insert_unihash(cursor, insert_data, Resolve.IGNORE) 464*4882a593Smuzhiyun self.db.commit() 465*4882a593Smuzhiyun 466*4882a593Smuzhiyun # Fetch the unihash that will be reported for the taskhash. If the 467*4882a593Smuzhiyun # unihash matches, it means this row was inserted (or the mapping 468*4882a593Smuzhiyun # was already valid) 469*4882a593Smuzhiyun row = self.query_equivalent(cursor, data['method'], data['taskhash']) 470*4882a593Smuzhiyun 471*4882a593Smuzhiyun if row['unihash'] == data['unihash']: 472*4882a593Smuzhiyun logger.info('Adding taskhash equivalence for %s with unihash %s', 473*4882a593Smuzhiyun data['taskhash'], row['unihash']) 474*4882a593Smuzhiyun 475*4882a593Smuzhiyun d = {k: row[k] for k in ('taskhash', 'method', 'unihash')} 476*4882a593Smuzhiyun 477*4882a593Smuzhiyun self.write_message(d) 478*4882a593Smuzhiyun 479*4882a593Smuzhiyun 480*4882a593Smuzhiyun async def handle_get_stats(self, request): 481*4882a593Smuzhiyun d = { 482*4882a593Smuzhiyun 'requests': self.request_stats.todict(), 483*4882a593Smuzhiyun } 484*4882a593Smuzhiyun 485*4882a593Smuzhiyun self.write_message(d) 486*4882a593Smuzhiyun 487*4882a593Smuzhiyun async def handle_reset_stats(self, request): 488*4882a593Smuzhiyun d = { 489*4882a593Smuzhiyun 'requests': self.request_stats.todict(), 490*4882a593Smuzhiyun } 491*4882a593Smuzhiyun 492*4882a593Smuzhiyun self.request_stats.reset() 493*4882a593Smuzhiyun self.write_message(d) 494*4882a593Smuzhiyun 495*4882a593Smuzhiyun async def handle_backfill_wait(self, request): 496*4882a593Smuzhiyun d = { 497*4882a593Smuzhiyun 'tasks': self.backfill_queue.qsize(), 498*4882a593Smuzhiyun } 499*4882a593Smuzhiyun await self.backfill_queue.join() 500*4882a593Smuzhiyun self.write_message(d) 501*4882a593Smuzhiyun 502*4882a593Smuzhiyun def query_equivalent(self, cursor, method, taskhash): 503*4882a593Smuzhiyun # This is part of the inner loop and must be as fast as possible 504*4882a593Smuzhiyun cursor.execute( 505*4882a593Smuzhiyun 'SELECT taskhash, method, unihash FROM unihashes_v2 WHERE method=:method AND taskhash=:taskhash', 506*4882a593Smuzhiyun { 507*4882a593Smuzhiyun 'method': method, 508*4882a593Smuzhiyun 'taskhash': taskhash, 509*4882a593Smuzhiyun } 510*4882a593Smuzhiyun ) 511*4882a593Smuzhiyun return cursor.fetchone() 512*4882a593Smuzhiyun 513*4882a593Smuzhiyun 514*4882a593Smuzhiyunclass Server(bb.asyncrpc.AsyncServer): 515*4882a593Smuzhiyun def __init__(self, db, upstream=None, read_only=False): 516*4882a593Smuzhiyun if upstream and read_only: 517*4882a593Smuzhiyun raise bb.asyncrpc.ServerError("Read-only hashserv cannot pull from an upstream server") 518*4882a593Smuzhiyun 519*4882a593Smuzhiyun super().__init__(logger) 520*4882a593Smuzhiyun 521*4882a593Smuzhiyun self.request_stats = Stats() 522*4882a593Smuzhiyun self.db = db 523*4882a593Smuzhiyun self.upstream = upstream 524*4882a593Smuzhiyun self.read_only = read_only 525*4882a593Smuzhiyun 526*4882a593Smuzhiyun def accept_client(self, reader, writer): 527*4882a593Smuzhiyun return ServerClient(reader, writer, self.db, self.request_stats, self.backfill_queue, self.upstream, self.read_only) 528*4882a593Smuzhiyun 529*4882a593Smuzhiyun @contextmanager 530*4882a593Smuzhiyun def _backfill_worker(self): 531*4882a593Smuzhiyun async def backfill_worker_task(): 532*4882a593Smuzhiyun client = await create_async_client(self.upstream) 533*4882a593Smuzhiyun try: 534*4882a593Smuzhiyun while True: 535*4882a593Smuzhiyun item = await self.backfill_queue.get() 536*4882a593Smuzhiyun if item is None: 537*4882a593Smuzhiyun self.backfill_queue.task_done() 538*4882a593Smuzhiyun break 539*4882a593Smuzhiyun method, taskhash = item 540*4882a593Smuzhiyun await copy_unihash_from_upstream(client, self.db, method, taskhash) 541*4882a593Smuzhiyun self.backfill_queue.task_done() 542*4882a593Smuzhiyun finally: 543*4882a593Smuzhiyun await client.close() 544*4882a593Smuzhiyun 545*4882a593Smuzhiyun async def join_worker(worker): 546*4882a593Smuzhiyun await self.backfill_queue.put(None) 547*4882a593Smuzhiyun await worker 548*4882a593Smuzhiyun 549*4882a593Smuzhiyun if self.upstream is not None: 550*4882a593Smuzhiyun worker = asyncio.ensure_future(backfill_worker_task()) 551*4882a593Smuzhiyun try: 552*4882a593Smuzhiyun yield 553*4882a593Smuzhiyun finally: 554*4882a593Smuzhiyun self.loop.run_until_complete(join_worker(worker)) 555*4882a593Smuzhiyun else: 556*4882a593Smuzhiyun yield 557*4882a593Smuzhiyun 558*4882a593Smuzhiyun def run_loop_forever(self): 559*4882a593Smuzhiyun self.backfill_queue = asyncio.Queue() 560*4882a593Smuzhiyun 561*4882a593Smuzhiyun with self._backfill_worker(): 562*4882a593Smuzhiyun super().run_loop_forever() 563