#!/usr/bin/env python3 import sys, math, datetime, logging, struct, atexit from enum import Enum from argparse import ArgumentParser from functools import partial import dbus from gi.repository import GObject, GLib from dbus.mainloop.glib import DBusGMainLoop BLUEZ_SERVICE_NAME = 'org.bluez' DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' BLUEZ_ADAPTER_IFACE = 'org.bluez.Adapter1' BLUEZ_DEVICE_IFACE = 'org.bluez.Device1' GATT_SERVICE_IFACE = 'org.bluez.GattService1' GATT_CHAR_IFACE = 'org.bluez.GattCharacteristic1' WEAHOME_SERVICE = '74e7fe00-c6a4-11e2-b7a9-0002a5d5c51b' DEVINFO_CHAR = '74e78e02-c6a4-11e2-b7a9-0002a5d5c51b' COMMAND_CHAR = '74e78e03-c6a4-11e2-b7a9-0002a5d5c51b' INDOOR_CH_1_3_CHAR = '74e78e10-c6a4-11e2-b7a9-0002a5d5c51b' CH_4_7_CHAR = '74e78e14-c6a4-11e2-b7a9-0002a5d5c51b' PRESSURE_CHAR = '74e78e20-c6a4-11e2-b7a9-0002a5d5c51b' SETTINGS_CHAR = '74e78e2c-c6a4-11e2-b7a9-0002a5d5c51b' BATTERY_SERVICE = '0000180f-0000-1000-8000-00805f9b34fb' BATTERY_CHAR = '00002a19-0000-1000-8000-00805f9b34fb' DEVINFO_SERVICE = '0000180a-0000-1000-8000-00805f9b34fb' MODEL_NAME_CHAR = '00002a24-0000-1000-8000-00805f9b34fb' MANUFACTURER_CHAR = '00002a29-0000-1000-8000-00805f9b34fb' MAX_SENSORS = 16 DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() log = logging.getLogger(__name__) def _get_bluez_objects(): obj_manager = bus.get_object(BLUEZ_SERVICE_NAME, '/') return obj_manager.GetManagedObjects(dbus_interface=DBUS_OM_IFACE) def _get_bluez_iface_objs(objects, iface): for path, interfaces in objects.items(): if iface in interfaces.keys(): obj = bus.get_object(BLUEZ_SERVICE_NAME, path) yield obj, path, interfaces class Event: __slots__ = ['_observers'] def __init__(self): self._observers = [] def observe(self, callback): self._observers.append(callback) def notify(self, *args, **kwargs): for cb in self._observers: cb(*args, **kwargs) class Bitset: __slots__ = ['_val', '_len'] def _compute_len(self): return (math.log(self._val, 2)) + 1 def __init__(self, value=0, length=0): self._val = int(value) self._len = int(length) if self._len == 0 and self._val > 0: self._len = self._compute_len() def __len__(self): return self._len def __getitem__(self, i): # Note: LSB first if i < 0 or i >= self._len: raise IndexError return bool(self._val & (1 << i)) def __str__(self): return ''.join(('1' if self[i] else '0') for i in reversed(range(self._len))) class WeahomeWeather(Enum): PARTLY_CLOUDY = 0 CLOUDY = 1 RAINY = 2 SUNNY = 3 SNOWY = 4 NO_DATA = 5 class WeahomeSettings: __slots__ = [ 'status', 'datetime', 'moon_phase', 'longitude', 'latitude', 'sun_rise', 'sun_set', 'moon_rise', 'moon_set' ] def __init__(self): self.status = None self.datetime = datetime.datetime.now() self.moon_phase = 0 self.longitude = 0 self.latitude = 0 self.sun_rise = datetime.time() self.sun_set = datetime.time() self.moon_rise = datetime.time() self.moon_set = datetime.time() @staticmethod def unpack(value): status, \ year, month, day, hour, minute, second, \ moon_phase, \ longitude, latitude, \ sun_rise_hour, sun_rise_minute, sun_set_hour, sun_set_minute, \ moon_rise_hour, moon_rise_minute, moon_set_hour, moon_set_minute \ = struct.unpack(' 0) @property def name(self): return str(self._get_device_prop('Name')) @property def model(self): if MODEL_NAME_CHAR in self._chars: data = self._read_char(MODEL_NAME_CHAR) return ''.join(map(chr, data)) else: return "Unknown" @property def address(self): return str(self._get_device_prop("Address")) def connect(self): self._device.Connect(dbus_interface=BLUEZ_DEVICE_IFACE) def disconnect(self): self._device.Disconnect(dbus_interface=BLUEZ_DEVICE_IFACE) def sync_time(self): settings = WeahomeSettings() settings.status = 0 settings.datetime = datetime.datetime.now() self._write_settings(settings) def _is_weahome_device(dbus_obj): uuids = dbus_obj.Get(BLUEZ_DEVICE_IFACE, 'UUIDs', dbus_interface=DBUS_PROP_IFACE) return WEAHOME_SERVICE in uuids def scan_for_weahome_devices(): objects = _get_bluez_objects() devices = [] for obj, path, _ in _get_bluez_iface_objs(objects, BLUEZ_DEVICE_IFACE): if not _is_weahome_device(obj): continue device = WeahomeDevice(path) devices.append(device) return devices def connect_to_weahome_device(address): address = str(address).upper() objects = _get_bluez_objects() # First, find preexisting devices with the same address and return it for obj, path, _ in _get_bluez_iface_objs(objects, BLUEZ_DEVICE_IFACE): device_addr = obj.Get(BLUEZ_DEVICE_IFACE, 'Address', dbus_interface=DBUS_PROP_IFACE) if device_addr == address: device = WeahomeDevice(path) device.connect() return device # TODO Adapter1 ConnectDevice is still experimental... # No device, no adapters... raise KeyError("Device with address '{0}' not found".format(address)) def print_sensor(device, sensor): print("{0} Sensor {1}:".format(device.name, sensor.name)) if sensor.has_temperature: print(" temperature = {0} C".format(sensor.temperature), end='') if sensor.has_temperature_mem: print(" ({0}C/{1}C)".format(sensor.temperature_min, sensor.temperature_max), end='') if sensor.has_temperature_trend: print(" ", sensor.temperature_trend, end='') print("") if sensor.has_humidity: print(" humidity = {0} %".format(sensor.humidity), end='') if sensor.has_humidity_mem: print(" ({0}%/{1}%)".format(sensor.humidity_min, sensor.humidity_max), end='') if sensor.has_humidity_trend: print(" ", sensor.humidity_trend, end='') print("") if sensor.has_pressure: print(" pressure = {0} mbar".format(sensor.pressure)) if sensor.has_weather: print(" weather = {0}".format(sensor.weather)) # Example client program: def _connected_changed_cb(sender): name = sender.name connected = sender.connected print("{0} Connected: {1}".format(name, connected)) if connected and args.sync: print("{0} Syncing time".format(name)) sender.sync_time() if all(not device.connected for device in devices): print("All disconnected") mainloop.quit() def _new_reading_cb(sender, sensors): for s in sensors: sensor = sender.sensors[s] print_sensor(sender, sensor) def _settings_changed_cb(sender): settings = sender.settings log.info("{0} Settings: status={1} datetime={2} moon_phase={3} position={4},{5} sun={6},{7} moon={8},{9}".format(sender.name, settings.status, settings.datetime, settings.moon_phase, settings.latitude, settings.longitude, settings.sun_rise, settings.sun_set, settings.moon_rise, settings.moon_set)) def _disconnect_all_devices(): print("Disconnecting devices") for device in devices: if device.connected: device.disconnect() if __name__ == '__main__': parser = ArgumentParser(description="Weather@home") parser.add_argument("--device", "-d", action='append', help="Connect to specific address") parser.add_argument("--sync", "-s", action='store_true', help="Sync time") parser.add_argument("--verbose", "-v", action='store_true', help="Verbose log") args = parser.parse_args() mainloop = GLib.MainLoop() if args.verbose: logging.basicConfig(level=logging.DEBUG) devices = [] if args.device: for address in args.device: device = connect_to_weahome_device(address) devices.append(device) else: devices = scan_for_weahome_devices() log.info("Found {0} weather@home devices: {1}".format(len(devices), [dev.name for dev in devices])) atexit.register(_disconnect_all_devices) for device in devices: device.connected_changed.observe(_connected_changed_cb) device.new_reading.observe(_new_reading_cb) device.settings_changed.observe(_settings_changed_cb) if device.connected: _connected_changed_cb(device) else: device.connect() if len(devices) > 0: mainloop.run()