aboutsummaryrefslogtreecommitdiff
path: root/driver.py
diff options
context:
space:
mode:
authorLoek Le Blansch <32883851+lonkaars@users.noreply.github.com>2021-08-13 14:03:33 +0200
committerGitHub <noreply@github.com>2021-08-13 14:03:33 +0200
commitf1872e3b036e679d8e6b5f8e08c31da18caaa37a (patch)
treed5ff7c3656868780a4f29461eb3bbec535ec3cfa /driver.py
parentb8c9fc19e6e35679a1b9125396448987d7a6e8d0 (diff)
parent886630c94bd76b5bec9aa5b2eeb8a6da7d6a0cc2 (diff)
Merge pull request #1 from lonkaars/homeassistant
homebridge to homeassistant
Diffstat (limited to 'driver.py')
-rw-r--r--driver.py61
1 files changed, 61 insertions, 0 deletions
diff --git a/driver.py b/driver.py
new file mode 100644
index 0000000..5280d7d
--- /dev/null
+++ b/driver.py
@@ -0,0 +1,61 @@
+#!/bin/python3
+from bluepy.btle import Peripheral, ADDR_TYPE_PUBLIC, BTLEDisconnectError
+import threading
+import time
+import sys
+
+BEKEN_CHARACTERISTIC_NULL = 0x0001
+BEKEN_CHARACTERISTIC_LAMP = 0x002a
+
+def makemsg(r, g, b, l=0):
+ return bytes([
+ int(g > 0), g,
+ 0x00, 0x00,
+ int(b > 0), b,
+ int(r > 0), r,
+ int(l > 0), l,
+ ])
+
+class BekenConnection:
+ def __init__(self, mac):
+ self.mac = mac
+ self.dev = None
+ self.messages = []
+
+ def keep_alive(self):
+ while True:
+ self.send(BEKEN_CHARACTERISTIC_NULL, bytes(10))
+ time.sleep(10)
+
+ def send(self, characteristic, message):
+ self.messages.append((characteristic, message))
+
+ def verify_connection(self):
+ while self.dev == None or self.dev.getState() == 'disc':
+ try:
+ self.dev = Peripheral(self.mac, ADDR_TYPE_PUBLIC)
+ except BTLEDisconnectError as e:
+ continue
+
+ def message_handler(self):
+ self.verify_connection()
+ while True:
+ if len(self.messages) < 1: continue
+ message = self.messages.pop(0)
+ self.verify_connection()
+ self.dev.writeCharacteristic(message[0], bytearray(message[1]))
+
+ def start_threads(self):
+ threading.Thread(target=self.message_handler).start()
+ threading.Thread(target=self.keep_alive).start()
+
+if __name__ == "__main__":
+ mac = sys.argv[1]
+ con = BekenConnection(mac)
+ con.start_threads()
+ def user_input():
+ for line in sys.stdin:
+ r, g, b, l = [ int(x, 16) for x in [ line.strip()[i:i+2] for i in range(0, 8, 2) ] ]
+ con.send(BEKEN_CHARACTERISTIC_LAMP, makemsg(r, g, b, l))
+ threading.Thread(target=user_input).start()
+