xref: /OK3568_Linux_fs/yocto/poky/bitbake/lib/bb/asyncrpc/client.py (revision 4882a59341e53eb6f0b4789bf948001014eff981)
1*4882a593Smuzhiyun#
2*4882a593Smuzhiyun# Copyright BitBake Contributors
3*4882a593Smuzhiyun#
4*4882a593Smuzhiyun# SPDX-License-Identifier: GPL-2.0-only
5*4882a593Smuzhiyun#
6*4882a593Smuzhiyun
7*4882a593Smuzhiyunimport abc
8*4882a593Smuzhiyunimport asyncio
9*4882a593Smuzhiyunimport json
10*4882a593Smuzhiyunimport os
11*4882a593Smuzhiyunimport socket
12*4882a593Smuzhiyunimport sys
13*4882a593Smuzhiyunfrom . import chunkify, DEFAULT_MAX_CHUNK
14*4882a593Smuzhiyun
15*4882a593Smuzhiyun
16*4882a593Smuzhiyunclass AsyncClient(object):
17*4882a593Smuzhiyun    def __init__(self, proto_name, proto_version, logger, timeout=30):
18*4882a593Smuzhiyun        self.reader = None
19*4882a593Smuzhiyun        self.writer = None
20*4882a593Smuzhiyun        self.max_chunk = DEFAULT_MAX_CHUNK
21*4882a593Smuzhiyun        self.proto_name = proto_name
22*4882a593Smuzhiyun        self.proto_version = proto_version
23*4882a593Smuzhiyun        self.logger = logger
24*4882a593Smuzhiyun        self.timeout = timeout
25*4882a593Smuzhiyun
26*4882a593Smuzhiyun    async def connect_tcp(self, address, port):
27*4882a593Smuzhiyun        async def connect_sock():
28*4882a593Smuzhiyun            return await asyncio.open_connection(address, port)
29*4882a593Smuzhiyun
30*4882a593Smuzhiyun        self._connect_sock = connect_sock
31*4882a593Smuzhiyun
32*4882a593Smuzhiyun    async def connect_unix(self, path):
33*4882a593Smuzhiyun        async def connect_sock():
34*4882a593Smuzhiyun            # AF_UNIX has path length issues so chdir here to workaround
35*4882a593Smuzhiyun            cwd = os.getcwd()
36*4882a593Smuzhiyun            try:
37*4882a593Smuzhiyun                os.chdir(os.path.dirname(path))
38*4882a593Smuzhiyun                # The socket must be opened synchronously so that CWD doesn't get
39*4882a593Smuzhiyun                # changed out from underneath us so we pass as a sock into asyncio
40*4882a593Smuzhiyun                sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
41*4882a593Smuzhiyun                sock.connect(os.path.basename(path))
42*4882a593Smuzhiyun            finally:
43*4882a593Smuzhiyun               os.chdir(cwd)
44*4882a593Smuzhiyun            return await asyncio.open_unix_connection(sock=sock)
45*4882a593Smuzhiyun
46*4882a593Smuzhiyun        self._connect_sock = connect_sock
47*4882a593Smuzhiyun
48*4882a593Smuzhiyun    async def setup_connection(self):
49*4882a593Smuzhiyun        s = '%s %s\n\n' % (self.proto_name, self.proto_version)
50*4882a593Smuzhiyun        self.writer.write(s.encode("utf-8"))
51*4882a593Smuzhiyun        await self.writer.drain()
52*4882a593Smuzhiyun
53*4882a593Smuzhiyun    async def connect(self):
54*4882a593Smuzhiyun        if self.reader is None or self.writer is None:
55*4882a593Smuzhiyun            (self.reader, self.writer) = await self._connect_sock()
56*4882a593Smuzhiyun            await self.setup_connection()
57*4882a593Smuzhiyun
58*4882a593Smuzhiyun    async def close(self):
59*4882a593Smuzhiyun        self.reader = None
60*4882a593Smuzhiyun
61*4882a593Smuzhiyun        if self.writer is not None:
62*4882a593Smuzhiyun            self.writer.close()
63*4882a593Smuzhiyun            self.writer = None
64*4882a593Smuzhiyun
65*4882a593Smuzhiyun    async def _send_wrapper(self, proc):
66*4882a593Smuzhiyun        count = 0
67*4882a593Smuzhiyun        while True:
68*4882a593Smuzhiyun            try:
69*4882a593Smuzhiyun                await self.connect()
70*4882a593Smuzhiyun                return await proc()
71*4882a593Smuzhiyun            except (
72*4882a593Smuzhiyun                OSError,
73*4882a593Smuzhiyun                ConnectionError,
74*4882a593Smuzhiyun                json.JSONDecodeError,
75*4882a593Smuzhiyun                UnicodeDecodeError,
76*4882a593Smuzhiyun            ) as e:
77*4882a593Smuzhiyun                self.logger.warning("Error talking to server: %s" % e)
78*4882a593Smuzhiyun                if count >= 3:
79*4882a593Smuzhiyun                    if not isinstance(e, ConnectionError):
80*4882a593Smuzhiyun                        raise ConnectionError(str(e))
81*4882a593Smuzhiyun                    raise e
82*4882a593Smuzhiyun                await self.close()
83*4882a593Smuzhiyun                count += 1
84*4882a593Smuzhiyun
85*4882a593Smuzhiyun    async def send_message(self, msg):
86*4882a593Smuzhiyun        async def get_line():
87*4882a593Smuzhiyun            try:
88*4882a593Smuzhiyun                line = await asyncio.wait_for(self.reader.readline(), self.timeout)
89*4882a593Smuzhiyun            except asyncio.TimeoutError:
90*4882a593Smuzhiyun                raise ConnectionError("Timed out waiting for server")
91*4882a593Smuzhiyun
92*4882a593Smuzhiyun            if not line:
93*4882a593Smuzhiyun                raise ConnectionError("Connection closed")
94*4882a593Smuzhiyun
95*4882a593Smuzhiyun            line = line.decode("utf-8")
96*4882a593Smuzhiyun
97*4882a593Smuzhiyun            if not line.endswith("\n"):
98*4882a593Smuzhiyun                raise ConnectionError("Bad message %r" % (line))
99*4882a593Smuzhiyun
100*4882a593Smuzhiyun            return line
101*4882a593Smuzhiyun
102*4882a593Smuzhiyun        async def proc():
103*4882a593Smuzhiyun            for c in chunkify(json.dumps(msg), self.max_chunk):
104*4882a593Smuzhiyun                self.writer.write(c.encode("utf-8"))
105*4882a593Smuzhiyun            await self.writer.drain()
106*4882a593Smuzhiyun
107*4882a593Smuzhiyun            l = await get_line()
108*4882a593Smuzhiyun
109*4882a593Smuzhiyun            m = json.loads(l)
110*4882a593Smuzhiyun            if m and "chunk-stream" in m:
111*4882a593Smuzhiyun                lines = []
112*4882a593Smuzhiyun                while True:
113*4882a593Smuzhiyun                    l = (await get_line()).rstrip("\n")
114*4882a593Smuzhiyun                    if not l:
115*4882a593Smuzhiyun                        break
116*4882a593Smuzhiyun                    lines.append(l)
117*4882a593Smuzhiyun
118*4882a593Smuzhiyun                m = json.loads("".join(lines))
119*4882a593Smuzhiyun
120*4882a593Smuzhiyun            return m
121*4882a593Smuzhiyun
122*4882a593Smuzhiyun        return await self._send_wrapper(proc)
123*4882a593Smuzhiyun
124*4882a593Smuzhiyun    async def ping(self):
125*4882a593Smuzhiyun        return await self.send_message(
126*4882a593Smuzhiyun            {'ping': {}}
127*4882a593Smuzhiyun        )
128*4882a593Smuzhiyun
129*4882a593Smuzhiyun
130*4882a593Smuzhiyunclass Client(object):
131*4882a593Smuzhiyun    def __init__(self):
132*4882a593Smuzhiyun        self.client = self._get_async_client()
133*4882a593Smuzhiyun        self.loop = asyncio.new_event_loop()
134*4882a593Smuzhiyun
135*4882a593Smuzhiyun        # Override any pre-existing loop.
136*4882a593Smuzhiyun        # Without this, the PR server export selftest triggers a hang
137*4882a593Smuzhiyun        # when running with Python 3.7.  The drawback is that there is
138*4882a593Smuzhiyun        # potential for issues if the PR and hash equiv (or some new)
139*4882a593Smuzhiyun        # clients need to both be instantiated in the same process.
140*4882a593Smuzhiyun        # This should be revisited if/when Python 3.9 becomes the
141*4882a593Smuzhiyun        # minimum required version for BitBake, as it seems not
142*4882a593Smuzhiyun        # required (but harmless) with it.
143*4882a593Smuzhiyun        asyncio.set_event_loop(self.loop)
144*4882a593Smuzhiyun
145*4882a593Smuzhiyun        self._add_methods('connect_tcp', 'ping')
146*4882a593Smuzhiyun
147*4882a593Smuzhiyun    @abc.abstractmethod
148*4882a593Smuzhiyun    def _get_async_client(self):
149*4882a593Smuzhiyun        pass
150*4882a593Smuzhiyun
151*4882a593Smuzhiyun    def _get_downcall_wrapper(self, downcall):
152*4882a593Smuzhiyun        def wrapper(*args, **kwargs):
153*4882a593Smuzhiyun            return self.loop.run_until_complete(downcall(*args, **kwargs))
154*4882a593Smuzhiyun
155*4882a593Smuzhiyun        return wrapper
156*4882a593Smuzhiyun
157*4882a593Smuzhiyun    def _add_methods(self, *methods):
158*4882a593Smuzhiyun        for m in methods:
159*4882a593Smuzhiyun            downcall = getattr(self.client, m)
160*4882a593Smuzhiyun            setattr(self, m, self._get_downcall_wrapper(downcall))
161*4882a593Smuzhiyun
162*4882a593Smuzhiyun    def connect_unix(self, path):
163*4882a593Smuzhiyun        self.loop.run_until_complete(self.client.connect_unix(path))
164*4882a593Smuzhiyun        self.loop.run_until_complete(self.client.connect())
165*4882a593Smuzhiyun
166*4882a593Smuzhiyun    @property
167*4882a593Smuzhiyun    def max_chunk(self):
168*4882a593Smuzhiyun        return self.client.max_chunk
169*4882a593Smuzhiyun
170*4882a593Smuzhiyun    @max_chunk.setter
171*4882a593Smuzhiyun    def max_chunk(self, value):
172*4882a593Smuzhiyun        self.client.max_chunk = value
173*4882a593Smuzhiyun
174*4882a593Smuzhiyun    def close(self):
175*4882a593Smuzhiyun        self.loop.run_until_complete(self.client.close())
176*4882a593Smuzhiyun        if sys.version_info >= (3, 6):
177*4882a593Smuzhiyun            self.loop.run_until_complete(self.loop.shutdown_asyncgens())
178*4882a593Smuzhiyun        self.loop.close()
179