-
Notifications
You must be signed in to change notification settings - Fork 0
/
rfid_reader.py
109 lines (88 loc) · 3.81 KB
/
rfid_reader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
""" This implemenetation of an RFID Reader expects a keyboard-like device
which registers with linux's event sub-system.
`evdev` does all the heavy lifting of interacting with the device and emits
individual InputEvents for analysis, which represent keyboard key interactions.
"down" presses are ignored in favor of readering "up" events (e.g. key-release)
and the return key represents the end of an string of input.
That string of input is treated as an identifier, and this component
interprets this identifier as a request to open the front door.
The main loop waits for input from the event subsystem and analyzes each event
to fit the input criteria, storing key values until return is detected.
"""
import asyncio
import evdev
from comms import create_comms
def make_request(source_id, identifier):
""" Helper function to creates permission request targetting the
front_door_latch, adding the identifier to the permission's context
"""
req = {
"source_id": source_id,
"target_id": "front_door_latch",
"permissions": [{
"perm": "/open",
"ctx": {"identity": identifier}}]
}
return req
class RfidReader:
def __init__(self, name, dev_name):
self.name = name
self.comms = create_comms(name)
self.dev = self.find_ev_device(dev_name)
def find_ev_device(self, label):
""" Queries devices for one with the name `label` and returns an
evdev.InputDevice, if found.
If no device is found with such a name, exit the interpretter.
"""
devices = [evdev.InputDevice(path) for path in evdev.list_devices()]
try:
rfid_reader = [d for d in devices if d.name == label][0]
except IndexError:
self.comms.logger.error(f"No RFID reader: {label}")
exit()
if rfid_reader:
self.comms.logger.debug(f"Found {label}")
for d in devices:
if d != rfid_reader:
d.close()
return rfid_reader
async def process(self):
""" Main loop for this component. Waits on events from the event
device and checks if they are Key Presses. If they are, it will
filter out Key Down (press) in favor of Key Up (release). The
value of the keys are derived from the key codes (e.g. KEY_5).
non-integers are filtered out except for Return, which represents
the end of an input string.
When enter is found, the string is assemled and sent to the
`authorizer` component with a permission request to open the front
door.
"""
keys = []
if not self.dev:
raise ValueError(f"Missing device for {self.name}")
async for ev in self.dev.async_read_loop():
if ev.type != evdev.ecodes.EV_KEY or ev.value != 0:
continue
if ev.code != evdev.ecodes.KEY_ENTER:
c = evdev.categorize(ev)
self.comms.logger.debug(
f"Appending keycode: {c.keycode}, {c.keycode[4:]}")
try:
keys.append(int(c.keycode[4:]))
except ValueError as e:
pass
else:
identifier = "".join(map(str, keys))
try:
# ignores response
await self.comms.request(
"authorizer",
make_request(self.name, identifier))
except ValueError as e:
self.comms.logger.error(f"Auth request failed with {e}")
keys = []
self.comms.logger.debug("Read complete")
if __name__ == "__main__":
front_door_reader = RfidReader("front_door_rfid", "Barcode Reader ")
loop = asyncio.get_event_loop()
loop.run_until_complete(front_door_reader.process())