diff --git a/README.md b/README.md index 488f5c0..e506343 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,6 @@ HIDP协议栈全文可以从蓝牙官网下载到,但是是英文的,啃吧 - 无法支持多设备连接 - 无法很优雅地处理设备断掉连接的情况 -- 无法独自完成设备之间的认证和授权(依赖GNOME)工作 - 主程序不支持配置选项 - 不支持systemd集成 @@ -43,18 +42,18 @@ HIDP协议栈全文可以从蓝牙官网下载到,但是是英文的,啃吧 - 蓝牙硬件 - BlueZ(>=5) - python(>=2.7) +- [PyGObject](https://live.gnome.org/PyGObject) - [evdev](https://pypi.python.org/pypi/evdev) -- [dbus-python](https://pypi.python.org/pypi/dbus-python) +- [pydbus]() - [PyBlueZ](https://pypi.python.org/pypi/PyBluez) - iOS或者Android设备 -系统准备好之后需要对源代码做少许修改。我的电脑的键盘设备节点是`/dev/input/event3`,所以我就将路径硬编码到程序中了。大家需要找到自己的键盘设备节点,然后对btk.py中的第13行做相应更改。 - -```py -keyboard = kb.Keyboard('/dev/input/event3') +首先,运行脚本: +```bash +sudo python agent.py ``` -启动蓝牙之后以root用户运行btk.py脚本就可以了。 +然后使用手机搜索蓝牙,找到你的主机点击配对。这时agent.py脚本会提示你输入配对码。直接输入并回车即可开启体验之旅。 键鼠HID报告描述符 ================= diff --git a/agent.py b/agent.py index 6c57a8b..e2810d5 100644 --- a/agent.py +++ b/agent.py @@ -1,67 +1,77 @@ from __future__ import print_function -#from future.builtins import input -import sys -import dbus -import dbus.service -import dbus.mainloop.glib + import evdev as ev import glob -from btk import HIDProfile, PSM_CTRL, PSM_INTR -import uuid import bluetooth as bt -try: - from gi.repository import GObject as gobject -except ImportError: - import gobject - - -BUS_NAME = 'org.bluez' -AGENT_INTERFACE = 'org.bluez.Agent1' -AGENT_PATH = "/test/agent" +from gi.repository import GLib +from pydbus import SystemBus +from dbus import Server def ask(prompt): - return 1111 # input(prompt) + return 1111 def set_trusted(path): - props = dbus.Interface(bus.get_object("org.bluez", path), - "org.freedesktop.DBus.Properties") + props = bus.get('org.bluez', path)['org.freedesktop.DBus.Properties'] print('trust', path) - props.Set("org.bluez.Device1", "Trusted", True) + props.Set("org.bluez.Device1", "Trusted", GLib.Variant.new_boolean(True)) def dev_connect(path): - dev = dbus.Interface(bus.get_object("org.bluez", path), - "org.bluez.Device1") + dev = bus.get('org.bluez', path)['org.bluez.Device1'] dev.Connect() -class Rejected(dbus.DBusException): - _dbus_error_name = "org.bluez.Error.Rejected" - -class Agent(dbus.service.Object): - @dbus.service.method(AGENT_INTERFACE, - in_signature="", out_signature="") +class Agent(Server): + ''' + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + ''' def Release(self): print("Release") - @dbus.service.method(AGENT_INTERFACE, - in_signature="os", out_signature="") def AuthorizeService(self, device, uuid): print("AuthorizeService (%s, %s)" % (device, uuid)) authorize = ask("Authorize connection (yes/no): ") if (authorize == "yes"): return - raise Rejected("Connection rejected by user") - @dbus.service.method(AGENT_INTERFACE, - in_signature="o", out_signature="s") def RequestPinCode(self, device): print("RequestPinCode (%s)" % (device)) set_trusted(device) return ask("Enter PIN Code: ") - @dbus.service.method(AGENT_INTERFACE, - in_signature="o", out_signature="u") def RequestPasskey(self, device): print("RequestPasskey (%s)" % (device)) passkey = "" @@ -81,81 +91,44 @@ def RequestPasskey(self, device): passkey = passkey + key set_trusted(device) - return dbus.UInt32(passkey) + return int(passkey) - @dbus.service.method(AGENT_INTERFACE, - in_signature="ouq", out_signature="") def DisplayPasskey(self, device, passkey, entered): print("DisplayPasskey (%s, %06u entered %u)" % (device, passkey, entered)) - @dbus.service.method(AGENT_INTERFACE, - in_signature="os", out_signature="") def DisplayPinCode(self, device, pincode): print("DisplayPinCode (%s, %s)" % (device, pincode)) - @dbus.service.method(AGENT_INTERFACE, - in_signature="ou", out_signature="") def RequestConfirmation(self, device, passkey): print("RequestConfirmation (%s, %06d)" % (device, passkey)) confirm = ask("Confirm passkey ([y]/n): ") - if (confirm != "n"): - set_trusted(device) - return - raise Rejected("Passkey doesn't match") - @dbus.service.method(AGENT_INTERFACE, - in_signature="o", out_signature="") def RequestAuthorization(self, device): print("RequestAuthorization (%s)" % (device)) auth = ask("Authorize? ([y]/n): ") - if (auth != "n"): - return - raise Rejected("Pairing rejected") - @dbus.service.method(AGENT_INTERFACE, - in_signature="", out_signature="") def Cancel(self): print("Cancel") -if __name__ == '__main__': - dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) - bus = dbus.SystemBus() - - props = dbus.Interface( - bus.get_object("org.bluez", '/org/bluez/hci0'), - "org.freedesktop.DBus.Properties" - ) - props.Set("org.bluez.Adapter1", "Powered", dbus.Boolean(True)) - props.Set("org.bluez.Adapter1", "Discoverable", dbus.Boolean(True)) - +def open_hci(): + props = bus.get('org.bluez', '/org/bluez/hci0')['org.freedesktop.DBus.Properties'] + props.Set("org.bluez.Adapter1", "Powered", GLib.Variant.new_boolean(True)) + props.Set("org.bluez.Adapter1", "Discoverable", GLib.Variant.new_boolean(True)) +def register_agent(): capability = "KeyboardOnly" - path = "/test/agent" - agent = Agent(bus, path) - obj = bus.get_object('org.bluez', "/org/bluez"); - manager = dbus.Interface(obj, "org.bluez.AgentManager1") + path = "/net/lvht/btk/agent" + agent = Agent(bus.con, path) + manager = bus.get('org.bluez')['.AgentManager1'] manager.RegisterAgent(path, capability) manager.RequestDefaultAgent(path) +if __name__ == '__main__': + bus = SystemBus() + open_hci() + register_agent() + print('start hid') - sock = bt.BluetoothSocket(bt.L2CAP) - sock.setblocking(False) - sock.bind(('', PSM_INTR)) - sock.listen(1) - - obj_path = '/ml/jlyu/HIDProfile' - profile = HIDProfile(bus, obj_path, sock) - - opts = { - "PSM": dbus.UInt16(PSM_CTRL), - "ServiceRecord": open('./sdp_record.xml', 'r').read(), - "RequireAuthentication": dbus.Boolean(True), - "RequireAuthorization": dbus.Boolean(True), - } - dbus.Interface( - bus.get_object("org.bluez", "/org/bluez"), - "org.bluez.ProfileManager1" - ).RegisterProfile(obj_path, str(uuid.uuid4()), opts) - - gobject.MainLoop().run() + import btk + btk.loop() diff --git a/btk.py b/btk.py index 65125c1..f9abe4e 100644 --- a/btk.py +++ b/btk.py @@ -1,14 +1,10 @@ #! /usr/bin/env python from __future__ import print_function -import dbus -import dbus.mainloop.glib -import dbus.service -try: - from gi.repository import GObject as gobject -except ImportError: - import gobject +from gi.repository import GLib, Gio +from pydbus import SessionBus, SystemBus +from dbus import Server import os import sys @@ -18,10 +14,8 @@ import glob from inputdev import Keyboard, Mouse -from IPython import embed import struct -mainloop = None keyboard_dev_paths = glob.glob('/dev/input/by-path/*event-kbd') mouse_dev_paths = glob.glob('/dev/input/by-path/*event-mouse') @@ -53,9 +47,9 @@ class HIDConnection: def __init__(self, ctrl_fd): self.ctrl_fd = ctrl_fd - self.ctrl_io_id = gobject.io_add_watch( + self.ctrl_io_id = GLib.io_add_watch( self.ctrl_fd, - gobject.IO_IN, + GLib.IO_IN, self.ctrl_data_cb ) @@ -109,77 +103,76 @@ def register_intr_sock(self, sock): def close(self): pass -class HIDProfile(dbus.service.Object): +class HIDProfile(Server): + ''' + + + + + + + + + + + + + + ''' conns = {} sock = None def __init__(self, bus, path, sock): - dbus.service.Object.__init__(self, bus, path) - if (sock): - self.sock = sock + Server.__init__(self, bus, path) + self.sock = sock - @dbus.service.method("org.bluez.Profile1", - in_signature="", out_signature="") def Release(self): print("Release") - mainloop.quit() + self.quit() - @dbus.service.method("org.bluez.Profile1", - in_signature="o", out_signature="") - def RequestDisconnection(self, path): + def RequestDisconnection(self, device): print('RequestDisconnection') - conns.pop(path).close() + conns.pop(device).close() - @dbus.service.method("org.bluez.Profile1", - in_signature="oha{sv}", out_signature="") - def NewConnection(self, path, fd, properties): + def NewConnection(self, device, fd, fd_properties): print("new control connectin") - self.conns[path] = HIDConnection(fd.take()) + self.conns[device] = HIDConnection(fd) def new_intr_conn(ssock, ip_type): sock, info = ssock.accept() print("interrput connection:", info) - self.conns[path].register_intr_sock(sock) - + self.conns[device].register_intr_sock(sock) return False - gobject.io_add_watch(self.sock, gobject.IO_IN, new_intr_conn) - - -def main(): - dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) + GLib.io_add_watch(self.sock, GLib.IO_IN, new_intr_conn) - bus = dbus.SystemBus() - obj_path = '/cn/lvht/bluez/HIDProfile' +def loop(): + bus = SystemBus() + bus.own_name('net.lvht.btk') + obj_path = '/net/lvht/btk/HIDProfile' sock = bt.BluetoothSocket(bt.L2CAP) sock.setblocking(False) try: sock.bind(('', PSM_INTR)) except: - print("Someone has the bluetooth HID socket open, it might be bluetoothd") print("For bluez5 add --noplugin=input to the bluetoothd commandline") - print("For bluez4 add PluginsDisable=input to /etc/bluetooth/main.conf") print("Else there is another application running that has it open.") sys.exit(errno.EACCES) sock.listen(1) - profile = HIDProfile(bus, obj_path, sock) + profile = HIDProfile(bus.con, obj_path, sock) opts = { - "PSM": dbus.UInt16(PSM_CTRL), - "ServiceRecord": open('./sdp_record.xml', 'r').read(), - "RequireAuthentication": dbus.Boolean(True), - "RequireAuthorization": dbus.Boolean(False), + "PSM": GLib.Variant.new_uint16(PSM_CTRL), + "ServiceRecord": GLib.Variant.new_string(open('./sdp_record.xml', 'r').read()), + "RequireAuthentication": GLib.Variant.new_boolean(True), + "RequireAuthorization": GLib.Variant.new_boolean(False), } - dbus.Interface( - bus.get_object("org.bluez", "/org/bluez"), - "org.bluez.ProfileManager1" - ).RegisterProfile(obj_path, str(uuid.uuid4()), opts) - - gobject.MainLoop().run() + manager = bus.get('org.bluez')['.ProfileManager1'] + manager.RegisterProfile(obj_path, str(uuid.uuid4()), opts) + profile.run() if __name__ == '__main__': - main() - + loop() diff --git a/dbus.py b/dbus.py new file mode 100644 index 0000000..8a03e5e --- /dev/null +++ b/dbus.py @@ -0,0 +1,76 @@ +from gi.repository import GLib, Gio + +class Server(object): + def __init__(self, bus, path): + self.loop = GLib.MainLoop() + + interface_info = Gio.DBusNodeInfo.new_for_xml(self.__doc__).interfaces[0] + + method_outargs = {} + method_inargs = {} + for method in interface_info.methods: + method_outargs[method.name] = '(' + ''.join([arg.signature for arg in method.out_args]) + ')' + method_inargs[method.name] = tuple(arg.signature for arg in method.in_args) + + self.method_inargs = method_inargs + self.method_outargs = method_outargs + + bus.register_object(object_path=path, interface_info=interface_info, method_call_closure=self.on_method_call) + + def run(self): + self.loop.run() + + def quit(self): + self.loop.quit() + + def on_method_call(self, + connection, + sender, + object_path, + interface_name, + method_name, + parameters, + invocation): + + args = list(parameters.unpack()) + for i, sig in enumerate(self.method_inargs[method_name]): + if sig is 'h': + msg = invocation.get_message() + fd_list = msg.get_unix_fd_list() + args[i] = fd_list.get(args[i]) + + result = getattr(self, method_name)(*args) + + if type(result) is list: + result = tuple(result) + elif not type(result) is tuple: + result = (result,) + + out_args = self.method_outargs[method_name] + if out_args != '()': + invocation.return_value(GLib.Variant(out_args, result)) + +class Foo(Server): + ''' + + + + + + + + + + + ''' + def HelloWorld(self, a, b): + return ('+' + a, '+{}'.format(b)) + + +if __name__ == '__main__': + from pydbus import SessionBus + bus = SessionBus() + bus.own_name(name = 'net.lvht') + + foo = Foo(bus=bus.con, path='/net/lvht/Foo') + foo.run() diff --git a/requirements.txt b/requirements.txt index 9e10bea..0a352e5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ future evdev PyBluez +pydbus