-
Notifications
You must be signed in to change notification settings - Fork 42
/
dbus.py
76 lines (60 loc) · 2.23 KB
/
dbus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
from gi.repository import GLib, Gio
class Server(object):
def __init__(self, bus, path):
self.loop = GLib.MainLoop()
interface_info = Gio.DBusNodeInfo.new_for_xml(self.__doc__).interfaces[0]
method_outargs = {}
method_inargs = {}
for method in interface_info.methods:
method_outargs[method.name] = '(' + ''.join([arg.signature for arg in method.out_args]) + ')'
method_inargs[method.name] = tuple(arg.signature for arg in method.in_args)
self.method_inargs = method_inargs
self.method_outargs = method_outargs
bus.register_object(object_path=path, interface_info=interface_info, method_call_closure=self.on_method_call)
def run(self):
self.loop.run()
def quit(self):
self.loop.quit()
def on_method_call(self,
connection,
sender,
object_path,
interface_name,
method_name,
parameters,
invocation):
args = list(parameters.unpack())
for i, sig in enumerate(self.method_inargs[method_name]):
if sig is 'h':
msg = invocation.get_message()
fd_list = msg.get_unix_fd_list()
args[i] = fd_list.get(args[i])
result = getattr(self, method_name)(*args)
if type(result) is list:
result = tuple(result)
elif not type(result) is tuple:
result = (result,)
out_args = self.method_outargs[method_name]
if out_args != '()':
invocation.return_value(GLib.Variant(out_args, result))
class Foo(Server):
'''
<node>
<interface name='net.lvht.Foo1'>
<method name='HelloWorld'>
<arg type='s' name='a' direction='in'/>
<arg type='i' name='b' direction='in'/>
<arg type='s' name='c' direction='out'/>
<arg type='s' name='d' direction='out'/>
</method>
</interface>
</node>
'''
def HelloWorld(self, a, b):
return ('+' + a, '+{}'.format(b))
if __name__ == '__main__':
from pydbus import SessionBus
bus = SessionBus()
bus.own_name(name = 'net.lvht')
foo = Foo(bus=bus.con, path='/net/lvht/Foo')
foo.run()