aboutsummaryrefslogtreecommitdiff
path: root/custom_components
diff options
context:
space:
mode:
authorlonkaars <loek@pipeframe.xyz>2023-12-25 11:59:07 +0100
committerlonkaars <loek@pipeframe.xyz>2023-12-25 11:59:07 +0100
commite40d981609d206193df520a65abbf673cfb13ed1 (patch)
tree0757b3b69d6e9d92ae6fc3538d7ea62d30c468ba /custom_components
parentf2c20b7a2c2c45532b2784746e57b60dd4627784 (diff)
restructure repo for HACS0.2.0
Diffstat (limited to 'custom_components')
-rw-r--r--custom_components/beken/__init__.py11
-rw-r--r--custom_components/beken/driver.py61
-rw-r--r--custom_components/beken/light.py140
3 files changed, 212 insertions, 0 deletions
diff --git a/custom_components/beken/__init__.py b/custom_components/beken/__init__.py
new file mode 100644
index 0000000..eb66857
--- /dev/null
+++ b/custom_components/beken/__init__.py
@@ -0,0 +1,11 @@
+from homeassistant.core import HomeAssistant
+
+DOMAIN = "beken"
+PLATFORMS = ["light"]
+
+def setup(hass: HomeAssistant, config):
+ hass.data[DOMAIN] = {}
+ return True
+
+def setup_entry(hass, entry):
+ return True
diff --git a/custom_components/beken/driver.py b/custom_components/beken/driver.py
new file mode 100644
index 0000000..5280d7d
--- /dev/null
+++ b/custom_components/beken/driver.py
@@ -0,0 +1,61 @@
+#!/bin/python3
+from bluepy.btle import Peripheral, ADDR_TYPE_PUBLIC, BTLEDisconnectError
+import threading
+import time
+import sys
+
+BEKEN_CHARACTERISTIC_NULL = 0x0001
+BEKEN_CHARACTERISTIC_LAMP = 0x002a
+
+def makemsg(r, g, b, l=0):
+ return bytes([
+ int(g > 0), g,
+ 0x00, 0x00,
+ int(b > 0), b,
+ int(r > 0), r,
+ int(l > 0), l,
+ ])
+
+class BekenConnection:
+ def __init__(self, mac):
+ self.mac = mac
+ self.dev = None
+ self.messages = []
+
+ def keep_alive(self):
+ while True:
+ self.send(BEKEN_CHARACTERISTIC_NULL, bytes(10))
+ time.sleep(10)
+
+ def send(self, characteristic, message):
+ self.messages.append((characteristic, message))
+
+ def verify_connection(self):
+ while self.dev == None or self.dev.getState() == 'disc':
+ try:
+ self.dev = Peripheral(self.mac, ADDR_TYPE_PUBLIC)
+ except BTLEDisconnectError as e:
+ continue
+
+ def message_handler(self):
+ self.verify_connection()
+ while True:
+ if len(self.messages) < 1: continue
+ message = self.messages.pop(0)
+ self.verify_connection()
+ self.dev.writeCharacteristic(message[0], bytearray(message[1]))
+
+ def start_threads(self):
+ threading.Thread(target=self.message_handler).start()
+ threading.Thread(target=self.keep_alive).start()
+
+if __name__ == "__main__":
+ mac = sys.argv[1]
+ con = BekenConnection(mac)
+ con.start_threads()
+ def user_input():
+ for line in sys.stdin:
+ r, g, b, l = [ int(x, 16) for x in [ line.strip()[i:i+2] for i in range(0, 8, 2) ] ]
+ con.send(BEKEN_CHARACTERISTIC_LAMP, makemsg(r, g, b, l))
+ threading.Thread(target=user_input).start()
+
diff --git a/custom_components/beken/light.py b/custom_components/beken/light.py
new file mode 100644
index 0000000..69b9b86
--- /dev/null
+++ b/custom_components/beken/light.py
@@ -0,0 +1,140 @@
+import homeassistant.helpers.config_validation as cv
+import voluptuous as vol
+from math import floor
+from threading import Thread
+from homeassistant.const import CONF_MAC
+from .driver import BekenConnection, makemsg, BEKEN_CHARACTERISTIC_LAMP
+from homeassistant.components.light import (
+ LightEntity,
+
+ ATTR_BRIGHTNESS,
+ ATTR_RGBW_COLOR,
+ ATTR_TRANSITION,
+
+ PLATFORM_SCHEMA,
+
+ LightEntityFeature,
+ ColorMode,
+)
+from time import sleep
+
+PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend({
+ vol.Required("name"): cv.string,
+ vol.Required("address"): cv.string
+})
+
+def setup_platform(hass, config, add_entities, discovery_info=None):
+ add_entities([ BekenLight(name=config["name"], address=config["address"]) ])
+
+class BekenLight(LightEntity):
+ def __init__(self, **kwargs):
+ self._name = kwargs["name"]
+ self._address = kwargs["address"]
+ self._on = False
+ self._brightness = 255
+ self._rgb = (255, 255, 255)
+ self._w = 255
+ self._connection = BekenConnection(self._address)
+ self._connection.start_threads()
+ self._thread = Thread()
+ self._thread.start()
+ self._transitioning = False
+
+ @property
+ def color_mode(self):
+ return ColorMode.RGBW
+
+ @property
+ def supported_color_modes(self):
+ return set([ ColorMode.RGBW ])
+
+ @property
+ def supported_features(self):
+ return LightEntityFeature.TRANSITION
+
+ @property
+ def unique_id(self):
+ return self._address
+
+ @property
+ def name(self):
+ return self._name
+
+ @property
+ def is_on(self):
+ return self._on
+
+ @property
+ def brightness(self):
+ return self._brightness
+
+ @property
+ def rgbw_color(self):
+ return self._rgb + (self._w,)
+
+ def turn_on(self, **kwargs):
+ on_old = self._on
+ w_old = self._w
+ brightness_old = self._brightness
+ rgb_old = self._rgb
+
+ self._on = True
+
+ brightness = kwargs.get(ATTR_BRIGHTNESS)
+ if brightness != None: self._brightness = brightness
+
+ rgbw = kwargs.get(ATTR_RGBW_COLOR)
+ if rgbw != None:
+ self._rgb = rgbw[0:3]
+ self._w = rgbw[3]
+
+ self.terminate_thread()
+ transition = kwargs.get(ATTR_TRANSITION)
+ if transition != None:
+ self.interpolate(brightness_old if on_old else 0, self._brightness, rgb_old if on_old else (0, 0, 0,), self._rgb, w_old if on_old else 0, self._w, transition)
+ else:
+ self.update_beken_lamp()
+
+ def turn_off(self, **kwargs):
+ self.terminate_thread()
+ self._on = False
+ self.update_beken_lamp()
+
+ def interpolate(self, brightness_old, brightness, rgb_old, rgb, w_old, w, transition):
+ self._thread = Thread(target=self.interpolate_thread, args=(brightness_old, brightness, rgb_old, rgb, w_old, w, transition, ))
+ self._thread.start()
+
+ def terminate_thread(self):
+ self._transitioning = False
+ self._thread.join()
+
+ def interpolate_thread(self, brightness_old, brightness, rgb_old, rgb, w_old, w, transition):
+ step_duration = 0.250
+ steps = int(transition / step_duration)
+ if rgb_old == None: rgb_old = (0, 0, 0,)
+ if rgb == None: rgb = (0, 0, 0,)
+ if brightness_old == None: brightness_old = 0
+ if brightness == None: brightness = 0
+ if w_old == None: w_old = 0
+ if w == None: w = 0
+ self._transitioning = True
+ for x in range(steps):
+ if not self._transitioning: break
+ weight = (x + 1) / steps
+ r = rgb_old[0] * (1 - weight) + rgb[0] * weight
+ g = rgb_old[1] * (1 - weight) + rgb[1] * weight
+ b = rgb_old[2] * (1 - weight) + rgb[2] * weight
+ self._rgb = (r, g, b,)
+ self._w = w_old * (1 - weight) + w * weight
+ self._brightness = brightness_old * (1 - weight) + brightness * weight
+ self.update_beken_lamp()
+ sleep(step_duration)
+ self._transitioning = False
+
+ def update_beken_lamp(self):
+ r = int( int(self._on) * self._rgb[0] * ( self._brightness / 255 ) )
+ g = int( int(self._on) * self._rgb[1] * ( self._brightness / 255 ) )
+ b = int( int(self._on) * self._rgb[2] * ( self._brightness / 255 ) )
+ l = int( int(self._on) * self._w * ( self._brightness / 255 ) )
+ self._connection.send(BEKEN_CHARACTERISTIC_LAMP, makemsg(r, g, b, l))
+