1# 2# Copyright BitBake Contributors 3# 4# SPDX-License-Identifier: GPL-2.0-only 5# 6 7import abc 8import asyncio 9import json 10import os 11import socket 12import sys 13from . import chunkify, DEFAULT_MAX_CHUNK 14 15 16class AsyncClient(object): 17 def __init__(self, proto_name, proto_version, logger, timeout=30): 18 self.reader = None 19 self.writer = None 20 self.max_chunk = DEFAULT_MAX_CHUNK 21 self.proto_name = proto_name 22 self.proto_version = proto_version 23 self.logger = logger 24 self.timeout = timeout 25 26 async def connect_tcp(self, address, port): 27 async def connect_sock(): 28 return await asyncio.open_connection(address, port) 29 30 self._connect_sock = connect_sock 31 32 async def connect_unix(self, path): 33 async def connect_sock(): 34 # AF_UNIX has path length issues so chdir here to workaround 35 cwd = os.getcwd() 36 try: 37 os.chdir(os.path.dirname(path)) 38 # The socket must be opened synchronously so that CWD doesn't get 39 # changed out from underneath us so we pass as a sock into asyncio 40 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0) 41 sock.connect(os.path.basename(path)) 42 finally: 43 os.chdir(cwd) 44 return await asyncio.open_unix_connection(sock=sock) 45 46 self._connect_sock = connect_sock 47 48 async def setup_connection(self): 49 s = '%s %s\n\n' % (self.proto_name, self.proto_version) 50 self.writer.write(s.encode("utf-8")) 51 await self.writer.drain() 52 53 async def connect(self): 54 if self.reader is None or self.writer is None: 55 (self.reader, self.writer) = await self._connect_sock() 56 await self.setup_connection() 57 58 async def close(self): 59 self.reader = None 60 61 if self.writer is not None: 62 self.writer.close() 63 self.writer = None 64 65 async def _send_wrapper(self, proc): 66 count = 0 67 while True: 68 try: 69 await self.connect() 70 return await proc() 71 except ( 72 OSError, 73 ConnectionError, 74 json.JSONDecodeError, 75 UnicodeDecodeError, 76 ) as e: 77 self.logger.warning("Error talking to server: %s" % e) 78 if count >= 3: 79 if not isinstance(e, ConnectionError): 80 raise ConnectionError(str(e)) 81 raise e 82 await self.close() 83 count += 1 84 85 async def send_message(self, msg): 86 async def get_line(): 87 try: 88 line = await asyncio.wait_for(self.reader.readline(), self.timeout) 89 except asyncio.TimeoutError: 90 raise ConnectionError("Timed out waiting for server") 91 92 if not line: 93 raise ConnectionError("Connection closed") 94 95 line = line.decode("utf-8") 96 97 if not line.endswith("\n"): 98 raise ConnectionError("Bad message %r" % (line)) 99 100 return line 101 102 async def proc(): 103 for c in chunkify(json.dumps(msg), self.max_chunk): 104 self.writer.write(c.encode("utf-8")) 105 await self.writer.drain() 106 107 l = await get_line() 108 109 m = json.loads(l) 110 if m and "chunk-stream" in m: 111 lines = [] 112 while True: 113 l = (await get_line()).rstrip("\n") 114 if not l: 115 break 116 lines.append(l) 117 118 m = json.loads("".join(lines)) 119 120 return m 121 122 return await self._send_wrapper(proc) 123 124 async def ping(self): 125 return await self.send_message( 126 {'ping': {}} 127 ) 128 129 130class Client(object): 131 def __init__(self): 132 self.client = self._get_async_client() 133 self.loop = asyncio.new_event_loop() 134 135 # Override any pre-existing loop. 136 # Without this, the PR server export selftest triggers a hang 137 # when running with Python 3.7. The drawback is that there is 138 # potential for issues if the PR and hash equiv (or some new) 139 # clients need to both be instantiated in the same process. 140 # This should be revisited if/when Python 3.9 becomes the 141 # minimum required version for BitBake, as it seems not 142 # required (but harmless) with it. 143 asyncio.set_event_loop(self.loop) 144 145 self._add_methods('connect_tcp', 'ping') 146 147 @abc.abstractmethod 148 def _get_async_client(self): 149 pass 150 151 def _get_downcall_wrapper(self, downcall): 152 def wrapper(*args, **kwargs): 153 return self.loop.run_until_complete(downcall(*args, **kwargs)) 154 155 return wrapper 156 157 def _add_methods(self, *methods): 158 for m in methods: 159 downcall = getattr(self.client, m) 160 setattr(self, m, self._get_downcall_wrapper(downcall)) 161 162 def connect_unix(self, path): 163 self.loop.run_until_complete(self.client.connect_unix(path)) 164 self.loop.run_until_complete(self.client.connect()) 165 166 @property 167 def max_chunk(self): 168 return self.client.max_chunk 169 170 @max_chunk.setter 171 def max_chunk(self, value): 172 self.client.max_chunk = value 173 174 def close(self): 175 self.loop.run_until_complete(self.client.close()) 176 if sys.version_info >= (3, 6): 177 self.loop.run_until_complete(self.loop.shutdown_asyncgens()) 178 self.loop.close() 179