1# Copyright (C) 2019 Garmin Ltd. 2# 3# SPDX-License-Identifier: GPL-2.0-only 4# 5 6import logging 7import socket 8import bb.asyncrpc 9from . import create_async_client 10 11 12logger = logging.getLogger("hashserv.client") 13 14 15class AsyncClient(bb.asyncrpc.AsyncClient): 16 MODE_NORMAL = 0 17 MODE_GET_STREAM = 1 18 19 def __init__(self): 20 super().__init__('OEHASHEQUIV', '1.1', logger) 21 self.mode = self.MODE_NORMAL 22 23 async def setup_connection(self): 24 await super().setup_connection() 25 cur_mode = self.mode 26 self.mode = self.MODE_NORMAL 27 await self._set_mode(cur_mode) 28 29 async def send_stream(self, msg): 30 async def proc(): 31 self.writer.write(("%s\n" % msg).encode("utf-8")) 32 await self.writer.drain() 33 l = await self.reader.readline() 34 if not l: 35 raise ConnectionError("Connection closed") 36 return l.decode("utf-8").rstrip() 37 38 return await self._send_wrapper(proc) 39 40 async def _set_mode(self, new_mode): 41 if new_mode == self.MODE_NORMAL and self.mode == self.MODE_GET_STREAM: 42 r = await self.send_stream("END") 43 if r != "ok": 44 raise ConnectionError("Bad response from server %r" % r) 45 elif new_mode == self.MODE_GET_STREAM and self.mode == self.MODE_NORMAL: 46 r = await self.send_message({"get-stream": None}) 47 if r != "ok": 48 raise ConnectionError("Bad response from server %r" % r) 49 elif new_mode != self.mode: 50 raise Exception( 51 "Undefined mode transition %r -> %r" % (self.mode, new_mode) 52 ) 53 54 self.mode = new_mode 55 56 async def get_unihash(self, method, taskhash): 57 await self._set_mode(self.MODE_GET_STREAM) 58 r = await self.send_stream("%s %s" % (method, taskhash)) 59 if not r: 60 return None 61 return r 62 63 async def report_unihash(self, taskhash, method, outhash, unihash, extra={}): 64 await self._set_mode(self.MODE_NORMAL) 65 m = extra.copy() 66 m["taskhash"] = taskhash 67 m["method"] = method 68 m["outhash"] = outhash 69 m["unihash"] = unihash 70 return await self.send_message({"report": m}) 71 72 async def report_unihash_equiv(self, taskhash, method, unihash, extra={}): 73 await self._set_mode(self.MODE_NORMAL) 74 m = extra.copy() 75 m["taskhash"] = taskhash 76 m["method"] = method 77 m["unihash"] = unihash 78 return await self.send_message({"report-equiv": m}) 79 80 async def get_taskhash(self, method, taskhash, all_properties=False): 81 await self._set_mode(self.MODE_NORMAL) 82 return await self.send_message( 83 {"get": {"taskhash": taskhash, "method": method, "all": all_properties}} 84 ) 85 86 async def get_outhash(self, method, outhash, taskhash): 87 await self._set_mode(self.MODE_NORMAL) 88 return await self.send_message( 89 {"get-outhash": {"outhash": outhash, "taskhash": taskhash, "method": method}} 90 ) 91 92 async def get_stats(self): 93 await self._set_mode(self.MODE_NORMAL) 94 return await self.send_message({"get-stats": None}) 95 96 async def reset_stats(self): 97 await self._set_mode(self.MODE_NORMAL) 98 return await self.send_message({"reset-stats": None}) 99 100 async def backfill_wait(self): 101 await self._set_mode(self.MODE_NORMAL) 102 return (await self.send_message({"backfill-wait": None}))["tasks"] 103 104 105class Client(bb.asyncrpc.Client): 106 def __init__(self): 107 super().__init__() 108 self._add_methods( 109 "connect_tcp", 110 "get_unihash", 111 "report_unihash", 112 "report_unihash_equiv", 113 "get_taskhash", 114 "get_outhash", 115 "get_stats", 116 "reset_stats", 117 "backfill_wait", 118 ) 119 120 def _get_async_client(self): 121 return AsyncClient() 122