1*4882a593Smuzhiyun# 2*4882a593Smuzhiyun# Copyright BitBake Contributors 3*4882a593Smuzhiyun# 4*4882a593Smuzhiyun# SPDX-License-Identifier: GPL-2.0-only 5*4882a593Smuzhiyun# 6*4882a593Smuzhiyun 7*4882a593Smuzhiyunimport abc 8*4882a593Smuzhiyunimport asyncio 9*4882a593Smuzhiyunimport json 10*4882a593Smuzhiyunimport os 11*4882a593Smuzhiyunimport socket 12*4882a593Smuzhiyunimport sys 13*4882a593Smuzhiyunfrom . import chunkify, DEFAULT_MAX_CHUNK 14*4882a593Smuzhiyun 15*4882a593Smuzhiyun 16*4882a593Smuzhiyunclass AsyncClient(object): 17*4882a593Smuzhiyun def __init__(self, proto_name, proto_version, logger, timeout=30): 18*4882a593Smuzhiyun self.reader = None 19*4882a593Smuzhiyun self.writer = None 20*4882a593Smuzhiyun self.max_chunk = DEFAULT_MAX_CHUNK 21*4882a593Smuzhiyun self.proto_name = proto_name 22*4882a593Smuzhiyun self.proto_version = proto_version 23*4882a593Smuzhiyun self.logger = logger 24*4882a593Smuzhiyun self.timeout = timeout 25*4882a593Smuzhiyun 26*4882a593Smuzhiyun async def connect_tcp(self, address, port): 27*4882a593Smuzhiyun async def connect_sock(): 28*4882a593Smuzhiyun return await asyncio.open_connection(address, port) 29*4882a593Smuzhiyun 30*4882a593Smuzhiyun self._connect_sock = connect_sock 31*4882a593Smuzhiyun 32*4882a593Smuzhiyun async def connect_unix(self, path): 33*4882a593Smuzhiyun async def connect_sock(): 34*4882a593Smuzhiyun # AF_UNIX has path length issues so chdir here to workaround 35*4882a593Smuzhiyun cwd = os.getcwd() 36*4882a593Smuzhiyun try: 37*4882a593Smuzhiyun os.chdir(os.path.dirname(path)) 38*4882a593Smuzhiyun # The socket must be opened synchronously so that CWD doesn't get 39*4882a593Smuzhiyun # changed out from underneath us so we pass as a sock into asyncio 40*4882a593Smuzhiyun sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) 41*4882a593Smuzhiyun sock.connect(os.path.basename(path)) 42*4882a593Smuzhiyun finally: 43*4882a593Smuzhiyun os.chdir(cwd) 44*4882a593Smuzhiyun return await asyncio.open_unix_connection(sock=sock) 45*4882a593Smuzhiyun 46*4882a593Smuzhiyun self._connect_sock = connect_sock 47*4882a593Smuzhiyun 48*4882a593Smuzhiyun async def setup_connection(self): 49*4882a593Smuzhiyun s = '%s %s\n\n' % (self.proto_name, self.proto_version) 50*4882a593Smuzhiyun self.writer.write(s.encode("utf-8")) 51*4882a593Smuzhiyun await self.writer.drain() 52*4882a593Smuzhiyun 53*4882a593Smuzhiyun async def connect(self): 54*4882a593Smuzhiyun if self.reader is None or self.writer is None: 55*4882a593Smuzhiyun (self.reader, self.writer) = await self._connect_sock() 56*4882a593Smuzhiyun await self.setup_connection() 57*4882a593Smuzhiyun 58*4882a593Smuzhiyun async def close(self): 59*4882a593Smuzhiyun self.reader = None 60*4882a593Smuzhiyun 61*4882a593Smuzhiyun if self.writer is not None: 62*4882a593Smuzhiyun self.writer.close() 63*4882a593Smuzhiyun self.writer = None 64*4882a593Smuzhiyun 65*4882a593Smuzhiyun async def _send_wrapper(self, proc): 66*4882a593Smuzhiyun count = 0 67*4882a593Smuzhiyun while True: 68*4882a593Smuzhiyun try: 69*4882a593Smuzhiyun await self.connect() 70*4882a593Smuzhiyun return await proc() 71*4882a593Smuzhiyun except ( 72*4882a593Smuzhiyun OSError, 73*4882a593Smuzhiyun ConnectionError, 74*4882a593Smuzhiyun json.JSONDecodeError, 75*4882a593Smuzhiyun UnicodeDecodeError, 76*4882a593Smuzhiyun ) as e: 77*4882a593Smuzhiyun self.logger.warning("Error talking to server: %s" % e) 78*4882a593Smuzhiyun if count >= 3: 79*4882a593Smuzhiyun if not isinstance(e, ConnectionError): 80*4882a593Smuzhiyun raise ConnectionError(str(e)) 81*4882a593Smuzhiyun raise e 82*4882a593Smuzhiyun await self.close() 83*4882a593Smuzhiyun count += 1 84*4882a593Smuzhiyun 85*4882a593Smuzhiyun async def send_message(self, msg): 86*4882a593Smuzhiyun async def get_line(): 87*4882a593Smuzhiyun try: 88*4882a593Smuzhiyun line = await asyncio.wait_for(self.reader.readline(), self.timeout) 89*4882a593Smuzhiyun except asyncio.TimeoutError: 90*4882a593Smuzhiyun raise ConnectionError("Timed out waiting for server") 91*4882a593Smuzhiyun 92*4882a593Smuzhiyun if not line: 93*4882a593Smuzhiyun raise ConnectionError("Connection closed") 94*4882a593Smuzhiyun 95*4882a593Smuzhiyun line = line.decode("utf-8") 96*4882a593Smuzhiyun 97*4882a593Smuzhiyun if not line.endswith("\n"): 98*4882a593Smuzhiyun raise ConnectionError("Bad message %r" % (line)) 99*4882a593Smuzhiyun 100*4882a593Smuzhiyun return line 101*4882a593Smuzhiyun 102*4882a593Smuzhiyun async def proc(): 103*4882a593Smuzhiyun for c in chunkify(json.dumps(msg), self.max_chunk): 104*4882a593Smuzhiyun self.writer.write(c.encode("utf-8")) 105*4882a593Smuzhiyun await self.writer.drain() 106*4882a593Smuzhiyun 107*4882a593Smuzhiyun l = await get_line() 108*4882a593Smuzhiyun 109*4882a593Smuzhiyun m = json.loads(l) 110*4882a593Smuzhiyun if m and "chunk-stream" in m: 111*4882a593Smuzhiyun lines = [] 112*4882a593Smuzhiyun while True: 113*4882a593Smuzhiyun l = (await get_line()).rstrip("\n") 114*4882a593Smuzhiyun if not l: 115*4882a593Smuzhiyun break 116*4882a593Smuzhiyun lines.append(l) 117*4882a593Smuzhiyun 118*4882a593Smuzhiyun m = json.loads("".join(lines)) 119*4882a593Smuzhiyun 120*4882a593Smuzhiyun return m 121*4882a593Smuzhiyun 122*4882a593Smuzhiyun return await self._send_wrapper(proc) 123*4882a593Smuzhiyun 124*4882a593Smuzhiyun async def ping(self): 125*4882a593Smuzhiyun return await self.send_message( 126*4882a593Smuzhiyun {'ping': {}} 127*4882a593Smuzhiyun ) 128*4882a593Smuzhiyun 129*4882a593Smuzhiyun 130*4882a593Smuzhiyunclass Client(object): 131*4882a593Smuzhiyun def __init__(self): 132*4882a593Smuzhiyun self.client = self._get_async_client() 133*4882a593Smuzhiyun self.loop = asyncio.new_event_loop() 134*4882a593Smuzhiyun 135*4882a593Smuzhiyun # Override any pre-existing loop. 136*4882a593Smuzhiyun # Without this, the PR server export selftest triggers a hang 137*4882a593Smuzhiyun # when running with Python 3.7. The drawback is that there is 138*4882a593Smuzhiyun # potential for issues if the PR and hash equiv (or some new) 139*4882a593Smuzhiyun # clients need to both be instantiated in the same process. 140*4882a593Smuzhiyun # This should be revisited if/when Python 3.9 becomes the 141*4882a593Smuzhiyun # minimum required version for BitBake, as it seems not 142*4882a593Smuzhiyun # required (but harmless) with it. 143*4882a593Smuzhiyun asyncio.set_event_loop(self.loop) 144*4882a593Smuzhiyun 145*4882a593Smuzhiyun self._add_methods('connect_tcp', 'ping') 146*4882a593Smuzhiyun 147*4882a593Smuzhiyun @abc.abstractmethod 148*4882a593Smuzhiyun def _get_async_client(self): 149*4882a593Smuzhiyun pass 150*4882a593Smuzhiyun 151*4882a593Smuzhiyun def _get_downcall_wrapper(self, downcall): 152*4882a593Smuzhiyun def wrapper(*args, **kwargs): 153*4882a593Smuzhiyun return self.loop.run_until_complete(downcall(*args, **kwargs)) 154*4882a593Smuzhiyun 155*4882a593Smuzhiyun return wrapper 156*4882a593Smuzhiyun 157*4882a593Smuzhiyun def _add_methods(self, *methods): 158*4882a593Smuzhiyun for m in methods: 159*4882a593Smuzhiyun downcall = getattr(self.client, m) 160*4882a593Smuzhiyun setattr(self, m, self._get_downcall_wrapper(downcall)) 161*4882a593Smuzhiyun 162*4882a593Smuzhiyun def connect_unix(self, path): 163*4882a593Smuzhiyun self.loop.run_until_complete(self.client.connect_unix(path)) 164*4882a593Smuzhiyun self.loop.run_until_complete(self.client.connect()) 165*4882a593Smuzhiyun 166*4882a593Smuzhiyun @property 167*4882a593Smuzhiyun def max_chunk(self): 168*4882a593Smuzhiyun return self.client.max_chunk 169*4882a593Smuzhiyun 170*4882a593Smuzhiyun @max_chunk.setter 171*4882a593Smuzhiyun def max_chunk(self, value): 172*4882a593Smuzhiyun self.client.max_chunk = value 173*4882a593Smuzhiyun 174*4882a593Smuzhiyun def close(self): 175*4882a593Smuzhiyun self.loop.run_until_complete(self.client.close()) 176*4882a593Smuzhiyun if sys.version_info >= (3, 6): 177*4882a593Smuzhiyun self.loop.run_until_complete(self.loop.shutdown_asyncgens()) 178*4882a593Smuzhiyun self.loop.close() 179