-
Notifications
You must be signed in to change notification settings - Fork 34
/
Copy pathtimed-ble-beacon-mpy-led
211 lines (174 loc) · 8.38 KB
/
timed-ble-beacon-mpy-led
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# -*- mode: python -*-
# Micropython fw script to passively scan for HMAC-authenticated BLE beacons,
# and change/activate something for the duration while they're being broadcast.
import hashlib as hl, collections as cs
import asyncio, machine, bluetooth, time, struct, random
tdr = cs.namedtuple('TimeDeltaRNG', 'chance td_min td_max')
def tdr_ms(delay):
if isinstance(delay, (int, float)): return round(1000*delay)
for tdr in delay:
if random.random() > tdr.chance: continue
return round(1000*( tdr.td_min +
random.random() * (tdr.td_max - tdr.td_min) ))
return 0
class Conf:
verbose = False
ble_mid = 61_634 # uint16 manufacturer id to match in broadcasts
ble_secret = b'timed-beacon-test' # HMAC key
# 5s ~ 55x 30ms 30%dc listen-intervals
scan_interval_us = 60_000
scan_window_us = 30_000
# "clear" is the normal state, "beacon" is a state when valid beacons are detected
clear_sample_time = 4.0 # seconds to run BLE scan for, with duty cycle scan_* opts
clear_sample_interval = 3.3 * 60 # sleep delay between scans
beacon_span_checks = 8 # wakeup for N checks within timespan sent in the beacon
beacon_sample_time = 3.0 # similar to clear_sample_time
beacon_sample_interval_min = 2.7 * 60 # won't get lower than this in a "beacon" state
beacon_sample_interval_max = 11.1 * 60 # beacon_span_checks won't raise it higher
beacon_repeat_grace = 6.0 * 60 # ok to repeat counter within this td
# List of LED(s) to blink in either "clear" or "beacon" state.
# Time values can be simple ints/floats for a fixed delay or a list of 3-value tuples.
# List-of-tuples is read as TimeDeltaRNG (chance [0-1.0], td-min [s], td-max [s])
# values, evaluated in order on each blink, until first successful chance-roll,
# and interval then picked uniformly from min-max range specified in that tuple.
leds = [dict( # run_with_conf will auto-fill omitted dict keys from default leds[0]
pin='LED', beacon=False, # led=on when beacons are detected, instead of default-on
time_on=[tdr(0.5, 0.08, 0.5), tdr(0.9, 0.02, 1), tdr(1, 0.5, 3)],
time_off=[ tdr(0.6, 0.1, 0.5), tdr(0.6, 0.3, 1),
tdr(0.6, 0.8, 10), tdr(0.7, 5, 90), tdr(1, 60, 3*60) ] )]
def __init__(self, **opts):
for k, v in opts.items():
if k != 'leds': getattr(self, k); setattr(self, k, v); continue
for n, led in enumerate(v := list(v)):
led = v[n] = dict(self.leds[0], **led)
for k in 'time_on', 'time_off':
if isinstance(led[k], list): led[k] = list(tdr(*td) for td in led[k])
self.leds = v
class BLE_MSE_Scanner:
def __init__(self, interval_us, window_us):
self.scan_td, self.scan_dc = 5_000, (interval_us, window_us)
self.scan, self.queue, self.check = False, list(), asyncio.ThreadSafeFlag()
self.ble = bluetooth.BLE()
self.ble.irq(self._irq_handler)
def duration(self, duration_ms):
self.scan_td = duration_ms; return self
def __enter__(self):
if self.scan: raise RuntimeError('Concurrent scans not allowed')
self.scan = True; self.queue.clear(); self.check.clear()
self.ble.active(True); self.ble.gap_scan(self.scan_td, *self.scan_dc)
def __exit__(self, *err):
self.scan = False; self.check.set(); self.ble.active(False)
async def get_data(self):
while True:
data = None
await self.check.wait()
try:
while self.queue: # get first adv_type_manufacturer data
if data := self._decode_field(self.queue.pop(), 0xff): break
finally:
if self.queue or not self.scan: self.check.set()
if not data and self.scan: continue
return data
def _irq_handler(self, ev, data):
if not self.scan: return
if ev == 5: # scan_result
addr_type, addr, adv_type, rssi, adv_data = data
if adv_type != 3: return # only need adv_scan_ind
self.queue.append(bytes(adv_data)); self.check.set()
elif ev == 6: self.scan = False; self.check.set() # scan_done
def _decode_field(self, pkt, ft, n=0):
if not pkt: return
while n + 1 < len(pkt):
if pkt[n + 1] == ft: return pkt[n + 2 : n + pkt[n] + 1]
n += 1 + pkt[n]
class LEDBlinker:
def __init__(self, conf, verbose=False):
self.led = machine.Pin(pin := conf['pin'], machine.Pin.OUT)
self.led_state_invert, self.enabled = conf['beacon'], asyncio.Event()
self.td_on, self.td_off = conf['time_on'], conf['time_off']
self.log = verbose and (lambda *a: print(f'[led.{pin}]', *a))
def blink_state(self, state):
if self.led_state_invert: state = bool(state) ^ 1
if state != self.enabled.is_set():
self.log and self.log('Blinking ' + ('enabled' if state else 'disabled'))
if state: self.enabled.set()
else: self.enabled.clear()
async def led_enable(self, delay, state=True):
self.led.value(state)
if (td := tdr_ms(delay)) >= 20_000:
self.log and self.log(f' blink state={int(state)} long-delay={td:,d}ms')
return await asyncio.sleep_ms(td)
async def run(self):
while True:
await self.enabled.wait()
await self.led_enable(self.td_on)
await self.led_enable(self.td_off, False)
class LEDListBlinker:
def __init__(self, conf_list, verbose=False):
self.leds = list(LEDBlinker(led, verbose=verbose) for led in conf_list)
def blink_state(self, state):
for led in self.leds: led.blink_state(state)
async def run(self): await asyncio.gather(*(led.run() for led in self.leds))
def hmac_sha256(key, msg):
if len(key) > 64: key = hl.sha256(key).digest()
key += bytes(64 - len(key))
inner, outer = (hl.sha256(bytes(c^p for c in key)) for p in [0x36, 0x5c])
inner.update(msg); outer.update(inner.digest()); return outer.digest()
async def run_ble(conf, leds):
st_beacon, st_clear = (states := ('beacon', 'clear'))
timings = dict(zip(states, ((round(s1*1000), round(s2*1000)) for s1, s2 in (
(conf.beacon_sample_interval_min, conf.beacon_sample_time),
(conf.clear_sample_interval, conf.clear_sample_time) ))))
beacon_grace_td = round(conf.beacon_repeat_grace * 1000)
p_log = conf.verbose and (lambda *a: print('[main]', *a))
ble_scan = BLE_MSE_Scanner(conf.scan_interval_us, conf.scan_window_us)
p_log and p_log('Starting main loop...')
counter, st = 0, st_clear; td_sleep, td_sample = timings[st]
td_st, td_st_left, td_st_split, td_st_limit = (
0, list(), conf.beacon_span_checks, conf.beacon_sample_interval_max )
beacon, beacon_ts = False, 0
while True:
leds.blink_state(st != st_beacon)
beacon_expected = st == st_beacon
if beacon != beacon_expected:
st_new = st_beacon if beacon else st_clear
p_log and p_log(f'State change (beacon={int(beacon)}): {st} -> {st_new}')
st = st_new; td_sleep, td_sample = timings[st]
else:
td = td_sleep
if beacon_expected and beacon: # use td_st + with N rare checks in-between
if abs(sum(td_st_left) - td_st) > td*2: td_st_left.clear() # changed/drifted
if not td_st_left: # split td_st into chunks, check on those
if (td_st_chunk := td_st / td_st_split) > td: td_st_left = (
([td_st_chunk + td / td_st_split] * td_st_split)
if td_st_chunk < td_st_limit else # or use td_st_limit+ chunks
([td_st_limit + td / td_st_split] * round(td_st / td_st_limit)) )
if td_st_left: td = round(td_st_left.pop())
elif td_st_left: td_st_left.clear() # beacon went away
p_log and p_log(f'Delay (state={st}): {td:,d}ms')
await asyncio.sleep_ms(td)
p_log and p_log(f'Scanning for broadcasts ({td_sample:,d}ms)...')
beacon = False
with ble_scan.duration(td_sample): # scan until first hmac-match
while data := await ble_scan.get_data():
if len(data) != 16 or int.from_bytes(data[:2], 'little') != conf.ble_mid: continue
data, hmac = data[2:8], data[8:]
td_st, n = struct.unpack('<HL', data); td_st *= 100_000
p_log and p_log(f'Scan data #{n:,d} span={td_st:,d}ms [ {data} {hmac} ]')
if (hmac_chk := hmac_sha256(conf.ble_secret, data)[:8]) == hmac: break
p_log and p_log(f'Bad HMAC: #{n} expected={hmac_chk} actual={hmac}')
p_log and p_log('Scan done' + (', no relevant data found' if not data else ''))
if not data: continue
grace = (beacon_grace_td - time.ticks_diff(ts := time.ticks_ms(), beacon_ts)) > 0
if (grace and n < counter) or (not grace and n <= counter):
p_log and p_log(f'Replayed/reset counter: #{n:,d} <= old #{counter:,d}')
continue
counter, beacon, beacon_ts = n, True, time.ticks_ms()
p_log and p_log('Finished')
async def main(conf=None):
if not conf: conf = Conf()
leds = LEDListBlinker(conf.leds, verbose=conf.verbose)
await asyncio.gather(leds.run(), run_ble(conf, leds))
def run(*args, **kws): asyncio.run(main(*args, **kws))
def run_with_conf(**conf_opts): run(Conf(**conf_opts))
if __name__ == '__main__': run()