1import asyncio 2from dbus_next.aio import MessageBus 3from dbus_next.service import ServiceInterface, method 4import dbus_next.introspection as intr 5from dbus_next import BusType 6 7 8class SampleInterface(ServiceInterface): 9 def __init__(self): 10 super().__init__('test.interface') 11 12 @method() 13 def Ping(self): 14 pass 15 16 @method() 17 def ConcatStrings(self, what1: 's', what2: 's') -> 's': # noqa: F821 18 return what1 + what2 19 20 21async def main(): 22 bus_name = 'dbus.next.sample' 23 24 bus = await MessageBus(bus_type=BusType.SYSTEM).connect() 25 bus2 = await MessageBus(bus_type=BusType.SYSTEM).connect() 26 27 await bus.request_name(bus_name) 28 29 service_interface = SampleInterface() 30 bus.export('/test/path', service_interface) 31 32 introspection = await bus2.introspect(bus_name, '/test/path') 33 assert type(introspection) is intr.Node 34 obj = bus2.get_proxy_object(bus_name, '/test/path', introspection) 35 interface = obj.get_interface(service_interface.name) 36 37 result = await interface.call_ping() 38 assert result is None 39 40 result = await interface.call_concat_strings('hello ', 'world') 41 assert result == 'hello world' 42 43 44asyncio.run(main()) 45