#!/usr/bin/env python3 import sys, math, datetime, logging, struct, atexit from enum import Enum from argparse import ArgumentParser from functools import partial import dbus from gi.repository import GObject from dbus.mainloop.glib import DBusGMainLoop BLUEZ_SERVICE_NAME = 'org.bluez' DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager' DBUS_PROP_IFACE = 'org.freedesktop.DBus.Properties' BLUEZ_DEVICE_IFACE = 'org.bluez.Device1' GATT_SERVICE_IFACE = 'org.bluez.GattService1' GATT_CHAR_IFACE = 'org.bluez.GattCharacteristic1' WEAHOME_SERVICE = '74e7fe00-c6a4-11e2-b7a9-0002a5d5c51b' DEVINFO_CHAR = '74e78e02-c6a4-11e2-b7a9-0002a5d5c51b' COMMAND_CHAR = '74e78e03-c6a4-11e2-b7a9-0002a5d5c51b' INDOOR_CH_1_3_CHAR = '74e78e10-c6a4-11e2-b7a9-0002a5d5c51b' CH_4_7_CHAR = '74e78e14-c6a4-11e2-b7a9-0002a5d5c51b' PRESSURE_CHAR = '74e78e20-c6a4-11e2-b7a9-0002a5d5c51b' SETTINGS_CHAR = '74e78e2c-c6a4-11e2-b7a9-0002a5d5c51b' BATTERY_SERVICE = '0000180f-0000-1000-8000-00805f9b34fb' BATTERY_CHAR = '00002a19-0000-1000-8000-00805f9b34fb' DEVINFO_SERVICE = '0000180a-0000-1000-8000-00805f9b34fb' MODEL_NAME_CHAR = '00002a24-0000-1000-8000-00805f9b34fb' MANUFACTURER_CHAR = '00002a29-0000-1000-8000-00805f9b34fb' MAX_SENSORS = 16 DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() log = logging.getLogger(__name__) def _get_bluez_objects(): obj_manager = bus.get_object(BLUEZ_SERVICE_NAME, '/') return obj_manager.GetManagedObjects(dbus_interface=DBUS_OM_IFACE) class Event: __slots__ = ['_observers'] def __init__(self): self._observers = [] def observe(self, callback): self._observers.append(callback) def notify(self, *args, **kwargs): for cb in self._observers: cb(*args, **kwargs) class Bitset: __slots__ = ['_val', '_len'] def _compute_len(self): return (math.log(self._val, 2)) + 1 def __init__(self, value=0, length=0): self._val = int(value) self._len = int(length) if self._len == 0 and self._val > 0: self._len = self._compute_len() def __len__(self): return self._len def __getitem__(self, i): # Note: LSB first if i < 0 or i >= self._len: raise IndexError return bool(self._val & (1 << i)) def __str__(self): return ''.join(('1' if self[i] else '0') for i in reversed(range(self._len))) class WeahomeWeather(Enum): PARTLY_CLOUDY = 0 CLOUDY = 1 RAINY = 2 SUNNY = 3 SNOWY = 4 NO_DATA = 5 class WeahomeSettings: __slots__ = [ 'status', 'datetime', 'moon_phase', 'longitude', 'latitude', 'sun_rise', 'sun_set', 'moon_rise', 'moon_set' ] def __init__(self): self.status = None self.datetime = datetime.datetime.now() self.moon_phase = 0 self.longitude = 0 self.latitude = 0 self.sun_rise = datetime.time() self.sun_set = datetime.time() self.moon_rise = datetime.time() self.moon_set = datetime.time() @staticmethod def unpack(value): status, \ year, month, day, hour, minute, second, \ moon_phase, \ longitude, latitude, \ sun_rise_hour, sun_rise_minute, sun_set_hour, sun_set_minute, \ moon_rise_hour, moon_rise_minute, moon_set_hour, moon_set_minute \ = struct.unpack(' 0) @property def name(self): return str(self._get_device_prop('Name')) @property def model(self): if MODEL_NAME_CHAR in self._chars: data = self._read_char(MODEL_NAME_CHAR) return ''.join(map(chr, data)) else: return "Unknown" def connect(self): self._device.Connect(dbus_interface=BLUEZ_DEVICE_IFACE) def disconnect(self): self._device.Disconnect(dbus_interface=BLUEZ_DEVICE_IFACE) def sync_time(self): settings = WeahomeSettings() settings.status = 0 settings.datetime = datetime.datetime.now() self._write_settings(settings) def _is_weahome_device(path): device = bus.get_object(BLUEZ_SERVICE_NAME, path) uuids = device.Get(BLUEZ_DEVICE_IFACE, 'UUIDs', dbus_interface=DBUS_PROP_IFACE) return WEAHOME_SERVICE in uuids def scan_for_weahome_devices(): objects = _get_bluez_objects() devices = [] for path, interfaces in objects.items(): if BLUEZ_DEVICE_IFACE not in interfaces.keys(): continue if not _is_weahome_device(path): continue device = WeahomeDevice(path) devices.append(device) return devices def print_sensor(device, sensor): print("{0} Sensor {1}:".format(device.name, sensor.name)) if sensor.has_temperature: print(" temperature = {0} C".format(sensor.temperature), end='') if sensor.has_temperature_mem: print(" ({0}C/{1}C)".format(sensor.temperature_min, sensor.temperature_max), end='') if sensor.has_temperature_trend: print(" ", sensor.temperature_trend, end='') print("") if sensor.has_humidity: print(" humidity = {0} %".format(sensor.humidity), end='') if sensor.has_humidity_mem: print(" ({0}%/{1}%)".format(sensor.humidity_min, sensor.humidity_max), end='') if sensor.has_humidity_trend: print(" ", sensor.humidity_trend, end='') print("") if sensor.has_pressure: print(" pressure = {0} mbar".format(sensor.pressure)) if sensor.has_weather: print(" weather = {0}".format(sensor.weather)) def disconnect_all_devices(): print("Disconnecting devices") for device in devices: if device.connected: device.disconnect() # Example client program: def _connected_changed_cb(sender): name = sender.name connected = sender.connected print("{0} Connected: {1}".format(name, connected)) if connected and args.sync: print("{0} Syncing time".format(name)) sender.sync_time() if all(not device.connected for device in devices): print("All disconnected") mainloop.quit() def _new_reading_cb(sender, sensors): for s in sensors: sensor = sender.sensors[s] print_sensor(sender, sensor) def _settings_changed_cb(sender): settings = sender.settings log.info("{0} Settings: status={1} datetime={2} moon_phase={3} position={4},{5} sun={6},{7} moon={8},{9}".format(sender.name, settings.status, settings.datetime, settings.moon_phase, settings.latitude, settings.longitude, settings.sun_rise, settings.sun_set, settings.moon_rise, settings.moon_set)) if __name__ == '__main__': parser = ArgumentParser(description="Weather@home") parser.add_argument("--sync", "-s", action='store_true', help="Sync time") parser.add_argument("--verbose", "-v", action='store_true', help="Verbose log") args = parser.parse_args() mainloop = GObject.MainLoop() if args.verbose: logging.basicConfig(level=logging.DEBUG) devices = scan_for_weahome_devices() log.info("Found {0} weather@home devices: {1}".format(len(devices), [dev.name for dev in devices])) atexit.register(disconnect_all_devices) for device in devices: device.connected_changed.observe(_connected_changed_cb) device.new_reading.observe(_new_reading_cb) device.settings_changed.observe(_settings_changed_cb) if device.connected: _connected_changed_cb(device) else: device.connect() if len(devices) > 0: mainloop.run()