diff options
author | Javier <dev.git@javispedro.com> | 2025-08-24 16:33:47 +0200 |
---|---|---|
committer | Javier <dev.git@javispedro.com> | 2025-08-24 16:33:47 +0200 |
commit | d98b141421a2f9325d748ad93361211e65603fce (patch) | |
tree | f7f5937e7de1a25954de7d307e53f998cf92607f | |
parent | 5b34010d8f1c319711b9808716b998345eb3d7d9 (diff) | |
download | ctbtw-d98b141421a2f9325d748ad93361211e65603fce.tar.gz ctbtw-d98b141421a2f9325d748ad93361211e65603fce.zip |
refactor UI, introduce connecting/disconnecting by name
-rwxr-xr-x | ctbtw.py | 270 |
1 files changed, 161 insertions, 109 deletions
@@ -1,6 +1,7 @@ #!/usr/bin/env python3 -# ctbtw -- a small program to control BT-W* devices (like BT-W6) +# ctbtw -- a small program to control BT-W* Bluetooth audio devices +# Tested mostly on CT BT-W6 # Copyright (C) 2025 Javier S. Pedro # This program is free software: you can redistribute it and/or modify @@ -14,6 +15,7 @@ # GNU General Public License for more details. from enum import Enum, Flag +from typing import NamedTuple from argparse import ArgumentParser import struct import hid @@ -76,9 +78,12 @@ class BTCodec(int, Flag): class BTAddr: def __init__(self, value=None): if isinstance(value, str): + if value.count(":") != 5: + raise ValueError("BTAddr must be xx:xx:xx:xx:xx:xx string") self.bytes = bytes.fromhex(value.replace(":", " ")) elif isinstance(value, bytes): - assert len(value) == 6 + if len(value) != 6: + raise ValueError("BTAddr must be 6 bytes") self.bytes = bytes(value[4:6] + value[0:2] + value[2:4]) elif value is None: self.bytes = bytes() @@ -246,7 +251,7 @@ class GetPairedDeviceDetailsMsg(BTMsg): self.index = int(index) self.valid = bool(valid) self.addr = BTAddr.unpack(addr) - self.displayName = displayName.decode("utf-8").rstrip('\x00') + self.displayName = displayName.rstrip(b'\x00').decode("utf-8") return self def pack(self): @@ -269,7 +274,7 @@ class GetInquiryDataMsg(BTMsg): assert prefix == cls.prefixId assert cmd == cls.commandId self.addr = BTAddr.unpack(addr) - self.displayName = displayName.decode("utf-8").rstrip('\x00') + self.displayName = displayName.rstrip(b'\x00').decode("utf-8") return self def pack(self): @@ -294,7 +299,7 @@ class GetStateMsg(BTMsg): assert cmd == cls.commandId self.state = BTState(state) self.addr = BTAddr.unpack(addr) - self.displayName = displayName.decode("utf-8").rstrip('\x00') + self.displayName = displayName.rstrip(b'\x00').decode("utf-8") return self def pack(self): @@ -491,23 +496,67 @@ class SetFeatureReq(BTReq): def __repr__(self): return "SetFeatureReq(feature=%s,value=%r)"%(self.feature,self.value) -# List of messages that can be received -msgs = [AckMsg, GetNumPairedDevicesMsg, GetPairedDeviceDetailsMsg, GetInquiryDataMsg, GetStateMsg, GetFeatureMsg] -# HID report number used to both receive/send messages -REPORT_ID = 3 -# Size of this report (hardcoded...) -REPORT_SIZE = 64 -# Vendor ID to search for -CT_VENDOR_ID = 0x041e -# Product IDs to search for -CT_PRODUCT_IDS = [0x3130, 0x3132] +class PairedDevice(NamedTuple): + name: str + addr: BTAddr + +class CTBTWDevice: + # List of messages that can be received + MSG_TYPES = [AckMsg, GetNumPairedDevicesMsg, GetPairedDeviceDetailsMsg, GetInquiryDataMsg, GetStateMsg, GetFeatureMsg] + # HID report number used to both receive/send messages + REPORT_ID = 3 + # Size of this report (hardcoded...) + REPORT_SIZE = 64 + # Vendor ID to search for + CT_VENDOR_ID = 0x041e + # Product IDs to search for. Hardcoded to [BT-W4, BT-W5, BT-W6] + CT_PRODUCT_IDS = [0x312B, 0x3130, 0x3132] + + def __init__(self, devpath, verbose=False): + self._hid = hid.device() + self._hid.open_path(devpath) + self._verbose = verbose + + def parse_msg(self, data): + assert len(data) == self.REPORT_SIZE + for msg in self.MSG_TYPES: + if msg.is_mine(data): + return msg.unpack(data) + return None + + def send_msg(self, msg): + if self._verbose: print(repr(msg)) + data = msg.pack() + assert len(data) <= self.REPORT_SIZE + if len(data) < self.REPORT_SIZE: # pad to report size + data += b"\0" * (self.REPORT_SIZE - len(data)) + data = struct.pack("B", self.REPORT_ID) + data + assert len(data) == 1 + self.REPORT_SIZE + if self._verbose: print('> ', data.hex(sep=' ')) + self._hid.write(data) + + def read_msgs(self, timeout_ms=50): + while True: + data = self._hid.read(1 + self.REPORT_SIZE, timeout_ms=timeout_ms) + if len(data) == 0: + break + if self._verbose: print('< ', bytes(data).hex(sep=' ')) + if len(data) == 1 + self.REPORT_SIZE and data[0] == self.REPORT_ID: + msg = dev.parse_msg(bytes(data[1:])) + if msg: + if self._verbose: print(' ', repr(msg)) + yield msg + else: + if self._verbose: print(" Unknown message with prefix %x command %x " % (data[1], data[2])) + + # Timeout (in msec) to use for short messages SHORT_TIMEOUT=50 +# Timeout (in msec) to use for e.g. connection messages +MID_TIMEOUT=1500 # Timeout (in msec) to use for messages related to pairing PAIR_TIMEOUT=30000 -_verbose = False - bt_codec_colors = { BTCodec.SBC : (0,102,179), BTCodec.aptXClassic : (139,198,64), @@ -527,57 +576,22 @@ def colored_dot(color): return colorstr + dot + reset def codec_name(codec): - global _use_color, codec_color + global use_color assert codec in BTCodec codec_color = bt_codec_colors.get(codec) - if _use_color and codec_color: + if use_color and codec_color: return colored_dot(codec_color) + " " + codec.name else: return codec.name -def parse_msg(data): - assert len(data) == REPORT_SIZE - for msg in msgs: - if msg.is_mine(data): - return msg.unpack(data) - return None - -def send_msg(dev, msg): - global _verbose - if _verbose: print(repr(msg)) - data = msg.pack() - assert len(data) <= REPORT_SIZE - if len(data) < REPORT_SIZE: # pad to report size - data += b"\0" * (REPORT_SIZE - len(data)) - data = struct.pack("B", REPORT_ID) + data - assert len(data) == 1 + REPORT_SIZE - if _verbose: print('> ', data.hex(sep=' ')) - dev.write(data) - -def read_msgs(dev, timeout_ms=1000): - global _verbose - while True: - data = dev.read(1 + REPORT_SIZE, timeout_ms=timeout_ms) - if len(data) == 0: - break - if _verbose: print('< ', bytes(data).hex(sep=' ')) - if len(data) == 1 + REPORT_SIZE and data[0] == REPORT_ID: - msg = parse_msg(bytes(data[1:])) - if msg: - if _verbose: print(' ', repr(msg)) - yield msg - else: - if _verbose: print(" Unknown message with prefix %x command %x " % (data[1], data[2])) - - -def find_device(args): - for device in hid.enumerate(vendor_id=CT_VENDOR_ID): - if device['vendor_id'] == CT_VENDOR_ID and device['product_id'] in CT_PRODUCT_IDS: +def find_device(): + for device in hid.enumerate(vendor_id=CTBTWDevice.CT_VENDOR_ID): + if device['vendor_id'] == CTBTWDevice.CT_VENDOR_ID and device['product_id'] in CTBTWDevice.CT_PRODUCT_IDS: return device return None -def read_and_explain_msgs(dev, timeout_ms=1000): - for msg in read_msgs(dev, timeout_ms=timeout_ms): +def read_and_explain_msgs(dev: CTBTWDevice, timeout_ms=SHORT_TIMEOUT): + for msg in dev.read_msgs(timeout_ms=timeout_ms): if isinstance(msg, GetStateMsg): print("Currently %s to device '%s'" % (msg.state.name, msg.displayName)) elif isinstance(msg, GetFeatureMsg): @@ -586,79 +600,99 @@ def read_and_explain_msgs(dev, timeout_ms=1000): elif msg.feature == BTFeatureGet.ExtendedCodecsSupported: print("Supported codecs: ", " ".join([codec_name(val) for val in msg.value])) +def get_paired_devices(dev: CTBTWDevice, timeout_ms=SHORT_TIMEOUT): + num_devices = None + msg = GetNumPairedDevicesReq() + dev.send_msg(msg) + for msg in dev.read_msgs(timeout_ms=timeout_ms): + if isinstance(msg, GetNumPairedDevicesMsg): + num_devices = int(msg.value) + break + if num_devices is None: + raise IOError("Could not get number of paired devices") + if num_devices: + # Have to send a GetDeviceDetails msg for every potential index + for dev_index in range(num_devices): + msg = GetPairedDeviceDetailsReq(index=dev_index) + dev.send_msg(msg) + for msg in dev.read_msgs(timeout_ms=SHORT_TIMEOUT): + if isinstance(msg, GetPairedDeviceDetailsMsg): + if msg.valid: + yield PairedDevice(msg.displayName, msg.addr) + break + +def get_paired_device_address(dev: CTBTWDevice, dev_name: str, timeout_ms=SHORT_TIMEOUT): + fdev_name = dev_name.casefold() + for paired_dev in get_paired_devices(dev, timeout_ms=timeout_ms): + if paired_dev.name.casefold() == fdev_name: + return paired_dev.addr + return None + +def get_currently_connected_device(dev: CTBTWDevice, timeout_ms=SHORT_TIMEOUT): + paired_dev = None + msg = GetStateReq() + dev.send_msg(msg) + for msg in dev.read_msgs(timeout_ms=SHORT_TIMEOUT): + if isinstance(msg, GetStateMsg): + if msg.state in (BTState.Connected, BTState.Connecting): + paired_dev = PairedDevice(msg.displayName, msg.addr) + # Continue through the loop to flush all state messages and avoid spurious ones later one + return paired_dev + if __name__ == '__main__': - parser = ArgumentParser(prog='ctbtw', description="Control CT BT-W* devices", epilog="") + parser = ArgumentParser(prog='ctbtw', description="Control CT BT-W* Bluetooth Audio Trasmitter devices", epilog="With no arguments, show current device state") parser.add_argument('--verbose', '-v', action='store_true', help="print protocol traces") actions = parser.add_mutually_exclusive_group() actions.add_argument('--aptxll', '-l', action='store_true', help="for aptX, prefer low-latency mode") - actions.add_argument('--aptxhq', '-q', action='store_true', help="for aptX, prefer high-quality mode (enables lossless)") - actions.add_argument('--disconnect', '-t', metavar="ADDR", action='store', help="disconnect from device with given address") - actions.add_argument('--connect', '-c', metavar="ADDR", action='store', help="connect to device with given address") - actions.add_argument('--devices', '-d', action='store_true', help="view currently paired devices") - actions.add_argument('--pair', '-p', nargs='?', metavar="ADDR", const="*", action='store', help='pair with specific bluetooth address, or any device if no ADDR specified') - actions.add_argument('--forget', '-f', action='store_true', help='forget all paired devices') - actions.add_argument('--get', action='append', choices=[val.name for val in BTFeatureGet], help="get the current value of a setting") - actions.add_argument('--set', nargs=2, action='append', metavar=("KEY", "VALUE"), help="set a setting") + actions.add_argument('--aptxhq', '-q', action='store_true', help="for aptX, prefer high-quality mode (also enables aptX lossless)") + actions.add_argument('--devices', '-s', action='store_true', help="list currently paired devices") + actions.add_argument('--connect', '-c', metavar="DEV", action='store', help="connect to (already paired) device with given name/address") + actions.add_argument('--disconnect', '-d', metavar="D", action='store', nargs='?', const='*', help="disconnect from device with given name/address, or current device if no %(metavar)s") + actions.add_argument('--pair', '-p', nargs='?', metavar="ADDR", const="*", action='store', help='pair with specific bluetooth address, or any device if no %(metavar)s specified') + actions.add_argument('--forget', action='store_true', help='forget all paired devices') + actions.add_argument('--get', action='append', choices=[val.name for val in BTFeatureGet], metavar="SETTING", help="get the current value of a setting from device. valid %(metavar)ss: %(choices)s") + actions.add_argument('--set', nargs=2, action='append', metavar=("SETTING", "VALUE"), help="set a device setting to a VALUE. valid SETTINGs: %s" % ", ".join(val.name for val in BTFeatureSet)) args = parser.parse_args() - _verbose = args.verbose - _use_color = os.getenv("NO_COLOR", "") == "" and os.getenv("TERM") != "dumb" and sys.stdout.isatty() + verbose = args.verbose + use_color = os.getenv("NO_COLOR", "") == "" and os.getenv("TERM") != "dumb" and sys.stdout.isatty() - devinfo = find_device(args) + devinfo = find_device() if not devinfo: print("Could not find device with valid vendor/product IDs") sys.exit(1) print("Using device '%s' at %s" % (devinfo['product_string'], os.fsdecode(devinfo['path']))) - dev = hid.device() - dev.open_path(devinfo['path']) + dev = CTBTWDevice(devinfo['path'], verbose=verbose) - if _verbose: print("Opened device") + if verbose: print("Opened device '%s'" % devinfo['product_string']) if args.aptxll or args.aptxhq: codec = BTCodec.aptXAdaptiveHighQuality if args.aptxhq else BTCodec.aptXAdaptiveLowLatency print("Switching preferred aptX codec to %s" % codec.name) msg = SetFeatureReq(BTFeatureSet.PreferredAptXCodecVariant, codec) - send_msg(dev, msg) + dev.send_msg(msg) read_and_explain_msgs(dev, timeout_ms=1000) # Use larger timeout to give time to print final codec used, if any elif args.devices: - num_devices = None - msg = GetNumPairedDevicesReq() - send_msg(dev, msg) - for msg in read_msgs(dev, timeout_ms=SHORT_TIMEOUT): - if isinstance(msg, GetNumPairedDevicesMsg): - num_devices = int(msg.value) - break - if num_devices: - print("List of paired devices:") - for dev_index in range(num_devices): - msg = GetPairedDeviceDetailsReq(index=dev_index) - send_msg(dev, msg) - for msg in read_msgs(dev, timeout_ms=SHORT_TIMEOUT): - if isinstance(msg, GetPairedDeviceDetailsMsg): - if msg.valid: - print("#%s : %r (%s)" % (msg.index, msg.displayName, msg.addr)) - break - elif num_devices == 0: - print("No paired devices") - else: - print("Cannot get list of paired devices") + print("List of paired devices:") + for paired_dev in get_paired_devices(dev, timeout_ms=SHORT_TIMEOUT): + print("%r (%s)" % (paired_dev.name, paired_dev.addr)) elif args.pair is not None: if args.pair=="*": # Pair with any device msg = SetInquiryModeReq(BTInquiryAction.InquiryWithPair) - send_msg(dev, msg) + dev.send_msg(msg) print("Entering pairing mode") else: addr = BTAddr(args.pair) msg = SetInquiryModeReq(BTInquiryAction.InquiryPairSpecificSink, addr) - send_msg(dev, msg) + dev.send_msg(msg) print("Pairing with %s"%addr) try: - for msg in read_msgs(dev, timeout_ms=PAIR_TIMEOUT): + for msg in dev.read_msgs(timeout_ms=PAIR_TIMEOUT): if isinstance(msg, GetInquiryDataMsg): print("Found device %r (%s)" % (msg.displayName, msg.addr)) elif isinstance(msg, GetStateMsg): @@ -678,22 +712,40 @@ if __name__ == '__main__': elif args.forget: # TODO I couldn't get forgetting about just one device to work; it is not exercised by app msg = ForgetDeviceReq(index=255) - send_msg(dev, msg) + dev.send_msg(msg) print("Erased all paired devices") # No point reading messages as device will self-reset elif args.connect or args.disconnect: - addr = BTAddr(args.connect if args.connect else args.disconnect) action = BTConnectionAction.Connect if args.connect else BTConnectionAction.Disconnect + name = args.connect if args.connect else args.disconnect + if args.disconnect == '*': + # Disconnect from current device + curdev = get_currently_connected_device(dev, timeout_ms=SHORT_TIMEOUT) + if curdev: + addr = curdev.addr + else: + print("Not connected to any device") + sys.exit(1) + else: + try: + addr = BTAddr(name) + except ValueError: + # Not a bluetooth address! + addr = get_paired_device_address(dev, name, timeout_ms=SHORT_TIMEOUT) + if not addr: + print("No such device: %s" % name) + sys.exit(1) + print("%s device '%s'..." % ("Connecting to" if args.connect else "Disconnecting from", addr)) msg = SetConnectionReq(action, addr) - send_msg(dev, msg) - read_and_explain_msgs(dev, timeout_ms=SHORT_TIMEOUT) + dev.send_msg(msg) + read_and_explain_msgs(dev, timeout_ms=MID_TIMEOUT) elif args.get: features = [val.name for val in BTFeatureGet] for feature in args.get: feature = BTFeatureGet(features.index(feature)) msg = GetFeatureReq(feature) - send_msg(dev, msg) - for msg in read_msgs(dev, timeout_ms=SHORT_TIMEOUT): + dev.send_msg(msg) + for msg in dev.read_msgs(timeout_ms=SHORT_TIMEOUT): if isinstance(msg, GetFeatureMsg): if msg.feature == feature: print("%s : %s" % (msg.feature.name, msg.value)) @@ -713,13 +765,13 @@ if __name__ == '__main__': elif valuetype is int: value = int(value) msg = SetFeatureReq(feature, value) - send_msg(dev, msg) + dev.send_msg(msg) print("Setting %s to %r"%(msg.feature, msg.value)) - read_msgs(dev, timeout_ms=SHORT_TIMEOUT) + dev.read_msgs(timeout_ms=SHORT_TIMEOUT) else: msg = GetStateReq() - send_msg(dev, msg) + dev.send_msg(msg) read_and_explain_msgs(dev, timeout_ms=SHORT_TIMEOUT) - if _verbose: + if verbose: print("Printing events from device, ^C to quit") read_and_explain_msgs(dev, timeout_ms=0) |