diff options
author | Loek Le Blansch <32883851+lonkaars@users.noreply.github.com> | 2021-08-13 14:03:33 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-08-13 14:03:33 +0200 |
commit | f1872e3b036e679d8e6b5f8e08c31da18caaa37a (patch) | |
tree | d5ff7c3656868780a4f29461eb3bbec535ec3cfa /driver.py | |
parent | b8c9fc19e6e35679a1b9125396448987d7a6e8d0 (diff) | |
parent | 886630c94bd76b5bec9aa5b2eeb8a6da7d6a0cc2 (diff) |
Merge pull request #1 from lonkaars/homeassistant
homebridge to homeassistant
Diffstat (limited to 'driver.py')
-rw-r--r-- | driver.py | 61 |
1 files changed, 61 insertions, 0 deletions
diff --git a/driver.py b/driver.py new file mode 100644 index 0000000..5280d7d --- /dev/null +++ b/driver.py @@ -0,0 +1,61 @@ +#!/bin/python3 +from bluepy.btle import Peripheral, ADDR_TYPE_PUBLIC, BTLEDisconnectError +import threading +import time +import sys + +BEKEN_CHARACTERISTIC_NULL = 0x0001 +BEKEN_CHARACTERISTIC_LAMP = 0x002a + +def makemsg(r, g, b, l=0): + return bytes([ + int(g > 0), g, + 0x00, 0x00, + int(b > 0), b, + int(r > 0), r, + int(l > 0), l, + ]) + +class BekenConnection: + def __init__(self, mac): + self.mac = mac + self.dev = None + self.messages = [] + + def keep_alive(self): + while True: + self.send(BEKEN_CHARACTERISTIC_NULL, bytes(10)) + time.sleep(10) + + def send(self, characteristic, message): + self.messages.append((characteristic, message)) + + def verify_connection(self): + while self.dev == None or self.dev.getState() == 'disc': + try: + self.dev = Peripheral(self.mac, ADDR_TYPE_PUBLIC) + except BTLEDisconnectError as e: + continue + + def message_handler(self): + self.verify_connection() + while True: + if len(self.messages) < 1: continue + message = self.messages.pop(0) + self.verify_connection() + self.dev.writeCharacteristic(message[0], bytearray(message[1])) + + def start_threads(self): + threading.Thread(target=self.message_handler).start() + threading.Thread(target=self.keep_alive).start() + +if __name__ == "__main__": + mac = sys.argv[1] + con = BekenConnection(mac) + con.start_threads() + def user_input(): + for line in sys.stdin: + r, g, b, l = [ int(x, 16) for x in [ line.strip()[i:i+2] for i in range(0, 8, 2) ] ] + con.send(BEKEN_CHARACTERISTIC_LAMP, makemsg(r, g, b, l)) + threading.Thread(target=user_input).start() + |