1*4882a593Smuzhiyun# Copyright (C) 2019 Garmin Ltd. 2*4882a593Smuzhiyun# 3*4882a593Smuzhiyun# SPDX-License-Identifier: GPL-2.0-only 4*4882a593Smuzhiyun# 5*4882a593Smuzhiyun 6*4882a593Smuzhiyunimport logging 7*4882a593Smuzhiyunimport socket 8*4882a593Smuzhiyunimport bb.asyncrpc 9*4882a593Smuzhiyunfrom . import create_async_client 10*4882a593Smuzhiyun 11*4882a593Smuzhiyun 12*4882a593Smuzhiyunlogger = logging.getLogger("hashserv.client") 13*4882a593Smuzhiyun 14*4882a593Smuzhiyun 15*4882a593Smuzhiyunclass AsyncClient(bb.asyncrpc.AsyncClient): 16*4882a593Smuzhiyun MODE_NORMAL = 0 17*4882a593Smuzhiyun MODE_GET_STREAM = 1 18*4882a593Smuzhiyun 19*4882a593Smuzhiyun def __init__(self): 20*4882a593Smuzhiyun super().__init__('OEHASHEQUIV', '1.1', logger) 21*4882a593Smuzhiyun self.mode = self.MODE_NORMAL 22*4882a593Smuzhiyun 23*4882a593Smuzhiyun async def setup_connection(self): 24*4882a593Smuzhiyun await super().setup_connection() 25*4882a593Smuzhiyun cur_mode = self.mode 26*4882a593Smuzhiyun self.mode = self.MODE_NORMAL 27*4882a593Smuzhiyun await self._set_mode(cur_mode) 28*4882a593Smuzhiyun 29*4882a593Smuzhiyun async def send_stream(self, msg): 30*4882a593Smuzhiyun async def proc(): 31*4882a593Smuzhiyun self.writer.write(("%s\n" % msg).encode("utf-8")) 32*4882a593Smuzhiyun await self.writer.drain() 33*4882a593Smuzhiyun l = await self.reader.readline() 34*4882a593Smuzhiyun if not l: 35*4882a593Smuzhiyun raise ConnectionError("Connection closed") 36*4882a593Smuzhiyun return l.decode("utf-8").rstrip() 37*4882a593Smuzhiyun 38*4882a593Smuzhiyun return await self._send_wrapper(proc) 39*4882a593Smuzhiyun 40*4882a593Smuzhiyun async def _set_mode(self, new_mode): 41*4882a593Smuzhiyun if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: 42*4882a593Smuzhiyun r = await self.send_stream("END") 43*4882a593Smuzhiyun if r != "ok": 44*4882a593Smuzhiyun raise ConnectionError("Bad response from server %r" % r) 45*4882a593Smuzhiyun elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: 46*4882a593Smuzhiyun r = await self.send_message({"get-stream": None}) 47*4882a593Smuzhiyun if r != "ok": 48*4882a593Smuzhiyun raise ConnectionError("Bad response from server %r" % r) 49*4882a593Smuzhiyun elif new_mode != self.mode: 50*4882a593Smuzhiyun raise Exception( 51*4882a593Smuzhiyun "Undefined mode transition %r -> %r" % (self.mode, new_mode) 52*4882a593Smuzhiyun ) 53*4882a593Smuzhiyun 54*4882a593Smuzhiyun self.mode = new_mode 55*4882a593Smuzhiyun 56*4882a593Smuzhiyun async def get_unihash(self, method, taskhash): 57*4882a593Smuzhiyun await self._set_mode(self.MODE_GET_STREAM) 58*4882a593Smuzhiyun r = await self.send_stream("%s %s" % (method, taskhash)) 59*4882a593Smuzhiyun if not r: 60*4882a593Smuzhiyun return None 61*4882a593Smuzhiyun return r 62*4882a593Smuzhiyun 63*4882a593Smuzhiyun async def report_unihash(self, taskhash, method, outhash, unihash, extra={}): 64*4882a593Smuzhiyun await self._set_mode(self.MODE_NORMAL) 65*4882a593Smuzhiyun m = extra.copy() 66*4882a593Smuzhiyun m["taskhash"] = taskhash 67*4882a593Smuzhiyun m["method"] = method 68*4882a593Smuzhiyun m["outhash"] = outhash 69*4882a593Smuzhiyun m["unihash"] = unihash 70*4882a593Smuzhiyun return await self.send_message({"report": m}) 71*4882a593Smuzhiyun 72*4882a593Smuzhiyun async def report_unihash_equiv(self, taskhash, method, unihash, extra={}): 73*4882a593Smuzhiyun await self._set_mode(self.MODE_NORMAL) 74*4882a593Smuzhiyun m = extra.copy() 75*4882a593Smuzhiyun m["taskhash"] = taskhash 76*4882a593Smuzhiyun m["method"] = method 77*4882a593Smuzhiyun m["unihash"] = unihash 78*4882a593Smuzhiyun return await self.send_message({"report-equiv": m}) 79*4882a593Smuzhiyun 80*4882a593Smuzhiyun async def get_taskhash(self, method, taskhash, all_properties=False): 81*4882a593Smuzhiyun await self._set_mode(self.MODE_NORMAL) 82*4882a593Smuzhiyun return await self.send_message( 83*4882a593Smuzhiyun {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} 84*4882a593Smuzhiyun ) 85*4882a593Smuzhiyun 86*4882a593Smuzhiyun async def get_outhash(self, method, outhash, taskhash): 87*4882a593Smuzhiyun await self._set_mode(self.MODE_NORMAL) 88*4882a593Smuzhiyun return await self.send_message( 89*4882a593Smuzhiyun {"get-outhash": {"outhash": outhash, "taskhash": taskhash, "method": method}} 90*4882a593Smuzhiyun ) 91*4882a593Smuzhiyun 92*4882a593Smuzhiyun async def get_stats(self): 93*4882a593Smuzhiyun await self._set_mode(self.MODE_NORMAL) 94*4882a593Smuzhiyun return await self.send_message({"get-stats": None}) 95*4882a593Smuzhiyun 96*4882a593Smuzhiyun async def reset_stats(self): 97*4882a593Smuzhiyun await self._set_mode(self.MODE_NORMAL) 98*4882a593Smuzhiyun return await self.send_message({"reset-stats": None}) 99*4882a593Smuzhiyun 100*4882a593Smuzhiyun async def backfill_wait(self): 101*4882a593Smuzhiyun await self._set_mode(self.MODE_NORMAL) 102*4882a593Smuzhiyun return (await self.send_message({"backfill-wait": None}))["tasks"] 103*4882a593Smuzhiyun 104*4882a593Smuzhiyun 105*4882a593Smuzhiyunclass Client(bb.asyncrpc.Client): 106*4882a593Smuzhiyun def __init__(self): 107*4882a593Smuzhiyun super().__init__() 108*4882a593Smuzhiyun self._add_methods( 109*4882a593Smuzhiyun "connect_tcp", 110*4882a593Smuzhiyun "get_unihash", 111*4882a593Smuzhiyun "report_unihash", 112*4882a593Smuzhiyun "report_unihash_equiv", 113*4882a593Smuzhiyun "get_taskhash", 114*4882a593Smuzhiyun "get_outhash", 115*4882a593Smuzhiyun "get_stats", 116*4882a593Smuzhiyun "reset_stats", 117*4882a593Smuzhiyun "backfill_wait", 118*4882a593Smuzhiyun ) 119*4882a593Smuzhiyun 120*4882a593Smuzhiyun def _get_async_client(self): 121*4882a593Smuzhiyun return AsyncClient() 122