xref: /OK3568_Linux_fs/buildroot/support/testing/tests/package/sample_python_dbus_next.py (revision 4882a59341e53eb6f0b4789bf948001014eff981)
1import asyncio
2from dbus_next.aio import MessageBus
3from dbus_next.service import ServiceInterface, method
4import dbus_next.introspection as intr
5from dbus_next import BusType
6
7
8class SampleInterface(ServiceInterface):
9    def __init__(self):
10        super().__init__('test.interface')
11
12    @method()
13    def Ping(self):
14        pass
15
16    @method()
17    def ConcatStrings(self, what1: 's', what2: 's') -> 's':  # noqa: F821
18        return what1 + what2
19
20
21async def main():
22    bus_name = 'dbus.next.sample'
23
24    bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
25    bus2 = await MessageBus(bus_type=BusType.SYSTEM).connect()
26
27    await bus.request_name(bus_name)
28
29    service_interface = SampleInterface()
30    bus.export('/test/path', service_interface)
31
32    introspection = await bus2.introspect(bus_name, '/test/path')
33    assert type(introspection) is intr.Node
34    obj = bus2.get_proxy_object(bus_name, '/test/path', introspection)
35    interface = obj.get_interface(service_interface.name)
36
37    result = await interface.call_ping()
38    assert result is None
39
40    result = await interface.call_concat_strings('hello ', 'world')
41    assert result == 'hello world'
42
43
44asyncio.run(main())
45