From f6f47b786983159055ad29e9eef258ca94cebf5c Mon Sep 17 00:00:00 2001 From: Javier Date: Wed, 15 Aug 2018 23:23:31 +0200 Subject: initial import --- weahome.py | 550 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 550 insertions(+) create mode 100755 weahome.py diff --git a/weahome.py b/weahome.py new file mode 100755 index 0000000..96e8b6b --- /dev/null +++ b/weahome.py @@ -0,0 +1,550 @@ +#!/usr/bin/env python3 + +import sys, math, datetime, struct, atexit +from enum import Enum +from argparse import ArgumentParser +from functools import partial +import dbus +from gi.repository import GObject +from dbus.mainloop.glib import DBusGMainLoop + +BLUEZ_SERVICE_NAME = 'org.bluez' +DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' +DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' + +BLUEZ_DEVICE_IFACE = 'org.bluez.Device1' +GATT_SERVICE_IFACE = 'org.bluez.GattService1' +GATT_CHAR_IFACE = 'org.bluez.GattCharacteristic1' + +WEAHOME_SERVICE = '74e7fe00-c6a4-11e2-b7a9-0002a5d5c51b' +DEVINFO_CHAR = '74e78e02-c6a4-11e2-b7a9-0002a5d5c51b' +COMMAND_CHAR = '74e78e03-c6a4-11e2-b7a9-0002a5d5c51b' +INDOOR_CH_1_3_CHAR = '74e78e10-c6a4-11e2-b7a9-0002a5d5c51b' +CH_4_7_CHAR = '74e78e14-c6a4-11e2-b7a9-0002a5d5c51b' +PRESSURE_CHAR = '74e78e20-c6a4-11e2-b7a9-0002a5d5c51b' +SETTINGS_CHAR = '74e78e2c-c6a4-11e2-b7a9-0002a5d5c51b' + +BATTERY_SERVICE = '0000180f-0000-1000-8000-00805f9b34fb' +BATTERY_CHAR = '00002a19-0000-1000-8000-00805f9b34fb' + +DEVINFO_SERVICE = '0000180a-0000-1000-8000-00805f9b34fb' +MODEL_NAME_CHAR = '00002a24-0000-1000-8000-00805f9b34fb' +MANUFACTURER_CHAR = '00002a29-0000-1000-8000-00805f9b34fb' + +MAX_SENSORS = 16 + +parser = ArgumentParser(description="Weather@home") +parser.add_argument("--sync", "-s", action='store_true', help="Sync time") +args = parser.parse_args() + +DBusGMainLoop(set_as_default=True) +bus = dbus.SystemBus() +mainloop = GObject.MainLoop() + +def _get_bluez_objects(): + obj_manager = bus.get_object(BLUEZ_SERVICE_NAME, '/') + return obj_manager.GetManagedObjects(dbus_interface=DBUS_OM_IFACE) + +class Event: + __slots__ = ['_observers'] + + def __init__(self): + self._observers = [] + + def observe(self, callback): + self._observers.append(callback) + + def notify(self, *args, **kwargs): + for cb in self._observers: + cb(*args, **kwargs) + +class Bitset: + __slots__ = ['_val', '_len'] + + def _compute_len(self): + return (math.log(self._val, 2)) + 1 + + def __init__(self, value=0, length=0): + self._val = int(value) + self._len = int(length) + if self._len == 0 and self._val > 0: + self._len = self._compute_len() + + def __len__(self): + return self._len + + def __getitem__(self, i): # Note: LSB first + if i < 0 or i >= self._len: + raise IndexError + return bool(self._val & (1 << i)) + + def __str__(self): + return ''.join(('1' if self[i] else '0') for i in reversed(range(self._len))) + +class WeahomeWeather(Enum): + PARTLY_CLOUDY = 0 + CLOUDY = 1 + RAINY = 2 + SUNNY = 3 + SNOWY = 4 + NO_DATA = 5 + +class WeahomeSettings: + __slots__ = [ 'status', 'datetime', 'moon_phase', 'longitude', 'latitude', 'sun_rise', 'sun_set', 'moon_rise', 'moon_set' ] + + def __init__(self): + self.status = None + self.datetime = datetime.datetime.now() + self.moon_phase = 0 + self.longitude = 0 + self.latitude = 0 + self.sun_rise = datetime.time() + self.sun_set = datetime.time() + self.moon_rise = datetime.time() + self.moon_set = datetime.time() + + @staticmethod + def unpack(value): + status, \ + year, month, day, hour, minute, second, \ + moon_phase, \ + longitude, latitude, \ + sun_rise_hour, sun_rise_minute, sun_set_hour, sun_set_minute, \ + moon_rise_hour, moon_rise_minute, moon_set_hour, moon_set_minute \ + = struct.unpack(' 0) + + @property + def name(self): + return str(self._get_device_prop('Name')) + + @property + def model(self): + if MODEL_NAME_CHAR in self._chars: + data = self._read_char(MODEL_NAME_CHAR) + return ''.join(map(chr, data)) + else: + return "Unknown" + + def connect(self): + self._device.Connect(dbus_interface=BLUEZ_DEVICE_IFACE) + + def disconnect(self): + self._device.Disconnect(dbus_interface=BLUEZ_DEVICE_IFACE) + + def sync_time(self): + settings = WeahomeSettings() + settings.status = 0 + settings.datetime = datetime.datetime.now() + self._write_settings(settings) + +def _is_weahome_device(path): + device = bus.get_object(BLUEZ_SERVICE_NAME, path) + uuids = device.Get(BLUEZ_DEVICE_IFACE, 'UUIDs', dbus_interface=DBUS_PROP_IFACE) + return WEAHOME_SERVICE in uuids + +def scan_for_weahome_devices(): + objects = _get_bluez_objects() + devices = [] + + for path, interfaces in objects.items(): + if BLUEZ_DEVICE_IFACE not in interfaces.keys(): + continue + if not _is_weahome_device(path): + continue + + device = WeahomeDevice(path) + devices.append(device) + + return devices + +def print_sensor(device, sensor): + print("{0} Sensor {1}:".format(device.name, sensor.name)) + if sensor.has_temperature: + print(" temperature = {0} C".format(sensor.temperature), end='') + if sensor.has_temperature_mem: + print(" ({0}C/{1}C)".format(sensor.temperature_min, sensor.temperature_max), end='') + if sensor.has_temperature_trend: + print(" ", sensor.temperature_trend, end='') + print("") + + if sensor.has_humidity: + print(" humidity = {0} %".format(sensor.humidity), end='') + if sensor.has_humidity_mem: + print(" ({0}%/{1}%)".format(sensor.humidity_min, sensor.humidity_max), end='') + if sensor.has_humidity_trend: + print(" ", sensor.humidity_trend, end='') + print("") + + if sensor.has_pressure: + print(" pressure = {0} mbar".format(sensor.pressure)) + if sensor.has_weather: + print(" weather = {0}".format(sensor.weather)) + +def _connected_changed_cb(sender): + name = sender.name + connected = sender.connected + print("{0} Connected: {1}".format(name, connected)) + if connected and args.sync: + print("{0} Syncing time".format(name)) + sender.sync_time() + if all(not device.connected for device in devices): + print("All disconnected") + mainloop.quit() + +def _new_reading_cb(sender, sensors): + for s in sensors: + sensor = sender.sensors[s] + print_sensor(sender, sensor) + +def _settings_changed_cb(sender): + settings = sender.settings + print("{0} Settings: status={1} datetime={2} moon_phase={3} position={4},{5} sun={6},{7} moon={8},{9}".format(sender.name, settings.status, settings.datetime, settings.moon_phase, settings.latitude, settings.longitude, settings.sun_rise, settings.sun_set, settings.moon_rise, settings.moon_set)) + +@atexit.register +def disconnect_all_devices(): + print("Disconnecting devices") + for device in devices: + if device.connected: + device.disconnect() + +devices = scan_for_weahome_devices() +print("Found {0} weather@home devices: {1}".format(len(devices), [dev.name for dev in devices])) + +for device in devices: + device.connected_changed.observe(_connected_changed_cb) + device.new_reading.observe(_new_reading_cb) + device.settings_changed.observe(_settings_changed_cb) + if device.connected: + _connected_changed_cb(device) + else: + device.connect() + +if len(devices) > 0: + mainloop.run() + -- cgit v1.2.3