xref: /OK3568_Linux_fs/yocto/poky/bitbake/lib/bb/asyncrpc/client.py (revision 4882a59341e53eb6f0b4789bf948001014eff981)
1#
2# Copyright BitBake Contributors
3#
4# SPDX-License-Identifier: GPL-2.0-only
5#
6
7import abc
8import asyncio
9import json
10import os
11import socket
12import sys
13from . import chunkify, DEFAULT_MAX_CHUNK
14
15
16class AsyncClient(object):
17    def __init__(self, proto_name, proto_version, logger, timeout=30):
18        self.reader = None
19        self.writer = None
20        self.max_chunk = DEFAULT_MAX_CHUNK
21        self.proto_name = proto_name
22        self.proto_version = proto_version
23        self.logger = logger
24        self.timeout = timeout
25
26    async def connect_tcp(self, address, port):
27        async def connect_sock():
28            return await asyncio.open_connection(address, port)
29
30        self._connect_sock = connect_sock
31
32    async def connect_unix(self, path):
33        async def connect_sock():
34            # AF_UNIX has path length issues so chdir here to workaround
35            cwd = os.getcwd()
36            try:
37                os.chdir(os.path.dirname(path))
38                # The socket must be opened synchronously so that CWD doesn't get
39                # changed out from underneath us so we pass as a sock into asyncio
40                sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
41                sock.connect(os.path.basename(path))
42            finally:
43               os.chdir(cwd)
44            return await asyncio.open_unix_connection(sock=sock)
45
46        self._connect_sock = connect_sock
47
48    async def setup_connection(self):
49        s = '%s %s\n\n' % (self.proto_name, self.proto_version)
50        self.writer.write(s.encode("utf-8"))
51        await self.writer.drain()
52
53    async def connect(self):
54        if self.reader is None or self.writer is None:
55            (self.reader, self.writer) = await self._connect_sock()
56            await self.setup_connection()
57
58    async def close(self):
59        self.reader = None
60
61        if self.writer is not None:
62            self.writer.close()
63            self.writer = None
64
65    async def _send_wrapper(self, proc):
66        count = 0
67        while True:
68            try:
69                await self.connect()
70                return await proc()
71            except (
72                OSError,
73                ConnectionError,
74                json.JSONDecodeError,
75                UnicodeDecodeError,
76            ) as e:
77                self.logger.warning("Error talking to server: %s" % e)
78                if count >= 3:
79                    if not isinstance(e, ConnectionError):
80                        raise ConnectionError(str(e))
81                    raise e
82                await self.close()
83                count += 1
84
85    async def send_message(self, msg):
86        async def get_line():
87            try:
88                line = await asyncio.wait_for(self.reader.readline(), self.timeout)
89            except asyncio.TimeoutError:
90                raise ConnectionError("Timed out waiting for server")
91
92            if not line:
93                raise ConnectionError("Connection closed")
94
95            line = line.decode("utf-8")
96
97            if not line.endswith("\n"):
98                raise ConnectionError("Bad message %r" % (line))
99
100            return line
101
102        async def proc():
103            for c in chunkify(json.dumps(msg), self.max_chunk):
104                self.writer.write(c.encode("utf-8"))
105            await self.writer.drain()
106
107            l = await get_line()
108
109            m = json.loads(l)
110            if m and "chunk-stream" in m:
111                lines = []
112                while True:
113                    l = (await get_line()).rstrip("\n")
114                    if not l:
115                        break
116                    lines.append(l)
117
118                m = json.loads("".join(lines))
119
120            return m
121
122        return await self._send_wrapper(proc)
123
124    async def ping(self):
125        return await self.send_message(
126            {'ping': {}}
127        )
128
129
130class Client(object):
131    def __init__(self):
132        self.client = self._get_async_client()
133        self.loop = asyncio.new_event_loop()
134
135        # Override any pre-existing loop.
136        # Without this, the PR server export selftest triggers a hang
137        # when running with Python 3.7.  The drawback is that there is
138        # potential for issues if the PR and hash equiv (or some new)
139        # clients need to both be instantiated in the same process.
140        # This should be revisited if/when Python 3.9 becomes the
141        # minimum required version for BitBake, as it seems not
142        # required (but harmless) with it.
143        asyncio.set_event_loop(self.loop)
144
145        self._add_methods('connect_tcp', 'ping')
146
147    @abc.abstractmethod
148    def _get_async_client(self):
149        pass
150
151    def _get_downcall_wrapper(self, downcall):
152        def wrapper(*args, **kwargs):
153            return self.loop.run_until_complete(downcall(*args, **kwargs))
154
155        return wrapper
156
157    def _add_methods(self, *methods):
158        for m in methods:
159            downcall = getattr(self.client, m)
160            setattr(self, m, self._get_downcall_wrapper(downcall))
161
162    def connect_unix(self, path):
163        self.loop.run_until_complete(self.client.connect_unix(path))
164        self.loop.run_until_complete(self.client.connect())
165
166    @property
167    def max_chunk(self):
168        return self.client.max_chunk
169
170    @max_chunk.setter
171    def max_chunk(self, value):
172        self.client.max_chunk = value
173
174    def close(self):
175        self.loop.run_until_complete(self.client.close())
176        if sys.version_info >= (3, 6):
177            self.loop.run_until_complete(self.loop.shutdown_asyncgens())
178        self.loop.close()
179