#!/usr/bin/env python3 # ctbtw -- a small program to control BT-W* Bluetooth audio devices # Tested mostly on CT BT-W6 # Copyright (C) 2025 Javier S. Pedro # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. from enum import Enum, Flag from typing import NamedTuple from argparse import ArgumentParser import struct import hid import sys, os class Commands(int, Enum): Ack = 2 DevInfo = 7 BTCommand = 0x6B class BTState(int, Enum): Initializing = 0 PoweredOff = 1 TestMode = 2 Idle = 3 Connectable = 4 Discoverable = 5 Connecting = 6 Inquiring = 7 Connected = 8 Paging = 9 Disconnected = 10 A2DPMode = 11 HFPMode = 12 PagingFailed = 13 LEAudioUnicast = 14 LEAudioUnicastWithVoiceChat = 15 PublicAuracast = 16 PrivateAuracast = 17 class BTDualMode(int, Flag): Classic = 0 LEAudio = 2 class BTAudioMode(int, Flag): A2DP_HFP = 1 # "Enable Mic Input" A2DP = 2 # "Disable Mic Input" PublicAuracast = 8 PrivateAuracast = 0x10 class BTProfile(int, Flag): SBC = 1 HFP = 2 A2DP = 4 AVRCP = 8 LEAudio = 0x4000 class BTCodec(int, Flag): SBC = 1 Faststream = 2 aptXClassic = 4 aptXLowLatency = 8 aptXHD = 0x10 aptXAdaptiveLowLatency = 0x20 aptXAdaptiveHighQuality = 0x40 aptXLossless = 0x80 aptXLite = 0x100 LC3 = 0x200 class BTAddr: def __init__(self, value=None): if isinstance(value, str): if value.count(":") != 5: raise ValueError("BTAddr must be xx:xx:xx:xx:xx:xx string") self.bytes = bytes.fromhex(value.replace(":", " ")) elif isinstance(value, bytes): if len(value) != 6: raise ValueError("BTAddr must be 6 bytes") self.bytes = bytes(value[4:6] + value[0:2] + value[2:4]) elif value is None: self.bytes = bytes() else: raise TypeError("do know how to cast %r into BTAddr"%value) @staticmethod def unpack(data): self = BTAddr() # TODO Likely something wrong here assert len(data) == 6 self.bytes = bytes(data[4:6] + data[0:2] + data[2:4]) return self def pack(self): return bytes(self.bytes[2:4] + self.bytes[4:6] + self.bytes[0:2]) def __repr__(self): return "BTAddr('%s')"%self.bytes.hex(sep=':') def __str__(self): return self.bytes.hex(sep=':') def __bytes__(self): return self.pack() class BTOperation(int, Enum): GetNumPairedDevices = 1 GetPairedDeviceDetails = 2 SetConnectionAction = 3 SetInquiryMode = 4 GetInquiryData = 5 GetState = 6 ForgetDevice = 7 SetDeviceFriendlyName = 8 GetFeature = 9 SetFeature = 10 StopInquiryMode = 11 class BTFeatureGet(int, Enum): """Enum with types of feature bytes that can be read from device, and their datatype""" def __new__(cls, value, valuetype): obj = int.__new__(cls, value) obj._value_ = value obj.valuetype = valuetype return obj AutoReconnect = 0, bool MaxPairedDeviceSupported = 1, int A2DPCodecSupported = 2, BTCodec A2DPCodecCurrent = 3, BTCodec LocalBluetoothAddress = 4, BTAddr SupportedDeviceAudioMode = 5, BTAudioMode CurrentDeviceAudioMode = 6, BTAudioMode AutoMicActivation = 7, bool PreferredAptXCodecVariant = 8, BTCodec AuracastName = 9, str AuracastCode = 10, str ExtendedCodecsSupported = 11, BTCodec ExtendedCurrentCodec = 12, BTCodec PreferredDualMode = 13, BTDualMode ConnectedDeviceSupportedBTProfile = 14, BTProfile GamingModeState = 15, bool BroadcastEncryptedState = 16, bool class BTFeatureSet(int, Enum): """Enum with types of feature bytes that be sent to the device, and their datatype""" def __new__(cls, value, valuetype): obj = int.__new__(cls, value) obj._value_ = value obj.valuetype = valuetype return obj AutoReconnect = 0, bool CurrentDeviceAudioMode = 1, BTAudioMode AutoMicActivation = 2, bool PreferredAptXCodecVariant = 3, BTCodec AuracastName = 4, str AuracastCode = 5, str PreferredDualMode = 6, BTDualMode GamingMode = 7, bool BroadcastEncryptedState = 8, bool class BTConnectionAction(int, Enum): Connect = 0 Disconnect = 1 class BTInquiryAction(int, Enum): InquiryWithPair = 0 InquiryGetSinksOnly = 1 InquiryPairSpecificSink = 2 class Msg: """Base class for messages.""" prefixId = None commandId = None @classmethod def is_mine(cls, data): """Called by message receiver. Should return true if passed data corresponds to a message of this class type.""" assert cls.prefixId assert cls.commandId return data[0] == cls.prefixId and data[1] == cls.commandId.value class AckMsg(Msg): prefixId = 0x5a # TODO Unclear what this prefix actually means, so hardcoding it commandId = Commands.Ack @classmethod def unpack(cls, data): self = cls() # Ignoring arguments return self def __repr__(self): return "AckMsg()" class BTMsg(Msg): """Base class for all bluetooth-related messages.""" prefixId = 0x5a # TODO commandId = Commands.BTCommand operationId = None @classmethod def is_mine(cls, data): assert cls.operationId is not None return data[0] == cls.prefixId and data[1] == cls.commandId.value and data[3] == cls.operationId.value class GetNumPairedDevicesMsg(BTMsg): operationId = BTOperation.GetNumPairedDevices def __init__(self): self.value = None @classmethod def unpack(cls, data): self = cls() prefix, cmd, msglen, op, value = struct.unpack_from(" ', data.hex(sep=' ')) self._hid.write(data) def read_msgs(self, timeout_ms=50): while True: data = self._hid.read(1 + self.REPORT_SIZE, timeout_ms=timeout_ms) if len(data) == 0: break if self._verbose: print('< ', bytes(data).hex(sep=' ')) if len(data) == 1 + self.REPORT_SIZE and data[0] == self.REPORT_ID: msg = dev.parse_msg(bytes(data[1:])) if msg: if self._verbose: print(' ', repr(msg)) yield msg else: if self._verbose: print(" Unknown message with prefix %x command %x " % (data[1], data[2])) # Timeout (in msec) to use for short messages SHORT_TIMEOUT=50 # Timeout (in msec) to use for e.g. connection messages MID_TIMEOUT=1500 # Timeout (in msec) to use for messages related to pairing PAIR_TIMEOUT=30000 bt_codec_colors = { BTCodec.SBC : (0,102,179), BTCodec.aptXClassic : (139,198,64), BTCodec.aptXLowLatency : (0,173,239), BTCodec.aptXHD : (246,148,31), BTCodec.aptXAdaptiveLowLatency : (0,173,239), BTCodec.aptXAdaptiveHighQuality : (110,44,145), BTCodec.aptXLossless : (255,241,0), BTCodec.aptXLite : (0,173,239), BTCodec.LC3 : (255,255,255) } def colored_dot(color): colorstr = "\033[38;2;%d;%d;%dm"%color dot = "●" reset = "\033[0m" return colorstr + dot + reset def codec_name(codec): global use_color assert codec in BTCodec codec_color = bt_codec_colors.get(codec) if use_color and codec_color: return colored_dot(codec_color) + " " + codec.name else: return codec.name def find_device(): for device in hid.enumerate(vendor_id=CTBTWDevice.CT_VENDOR_ID): if device['vendor_id'] == CTBTWDevice.CT_VENDOR_ID and device['product_id'] in CTBTWDevice.CT_PRODUCT_IDS: return device return None def read_and_explain_msgs(dev: CTBTWDevice, timeout_ms=SHORT_TIMEOUT): for msg in dev.read_msgs(timeout_ms=timeout_ms): if isinstance(msg, GetStateMsg): print("Currently %s to device '%s'" % (msg.state.name, msg.displayName)) elif isinstance(msg, GetFeatureMsg): if msg.feature == BTFeatureGet.ExtendedCurrentCodec: print("Currently using codec '%s'" % codec_name(msg.value)) elif msg.feature == BTFeatureGet.ExtendedCodecsSupported: print("Supported codecs: ", " ".join([codec_name(val) for val in msg.value])) def get_paired_devices(dev: CTBTWDevice, timeout_ms=SHORT_TIMEOUT): num_devices = None msg = GetNumPairedDevicesReq() dev.send_msg(msg) for msg in dev.read_msgs(timeout_ms=timeout_ms): if isinstance(msg, GetNumPairedDevicesMsg): num_devices = int(msg.value) break if num_devices is None: raise IOError("Could not get number of paired devices") if num_devices: # Have to send a GetDeviceDetails msg for every potential index for dev_index in range(num_devices): msg = GetPairedDeviceDetailsReq(index=dev_index) dev.send_msg(msg) for msg in dev.read_msgs(timeout_ms=SHORT_TIMEOUT): if isinstance(msg, GetPairedDeviceDetailsMsg): if msg.valid: yield PairedDevice(msg.displayName, msg.addr) break def get_paired_device_address(dev: CTBTWDevice, dev_name: str, timeout_ms=SHORT_TIMEOUT): fdev_name = dev_name.casefold() for paired_dev in get_paired_devices(dev, timeout_ms=timeout_ms): if paired_dev.name.casefold() == fdev_name: return paired_dev.addr return None def get_currently_connected_device(dev: CTBTWDevice, timeout_ms=SHORT_TIMEOUT): paired_dev = None msg = GetStateReq() dev.send_msg(msg) for msg in dev.read_msgs(timeout_ms=SHORT_TIMEOUT): if isinstance(msg, GetStateMsg): if msg.state in (BTState.Connected, BTState.Connecting): paired_dev = PairedDevice(msg.displayName, msg.addr) # Continue through the loop to flush all state messages and avoid spurious ones later one return paired_dev if __name__ == '__main__': parser = ArgumentParser(prog='ctbtw', description="Control CT BT-W* Bluetooth Audio Trasmitter devices", epilog="With no arguments, show current device state") parser.add_argument('--verbose', '-v', action='store_true', help="print protocol traces") actions = parser.add_mutually_exclusive_group() actions.add_argument('--aptxll', '-l', action='store_true', help="for aptX, prefer low-latency mode") actions.add_argument('--aptxhq', '-q', action='store_true', help="for aptX, prefer high-quality mode (also enables aptX lossless)") actions.add_argument('--devices', '-s', action='store_true', help="list currently paired devices") actions.add_argument('--connect', '-c', metavar="DEV", action='store', help="connect to (already paired) device with given name/address") actions.add_argument('--disconnect', '-d', metavar="D", action='store', nargs='?', const='*', help="disconnect from device with given name/address, or current device if no %(metavar)s") actions.add_argument('--pair', '-p', nargs='?', metavar="ADDR", const="*", action='store', help='pair with specific bluetooth address, or any device if no %(metavar)s specified') actions.add_argument('--forget', action='store_true', help='forget all paired devices') actions.add_argument('--get', action='append', choices=[val.name for val in BTFeatureGet], metavar="SETTING", help="get the current value of a setting from device. valid %(metavar)ss: %(choices)s") actions.add_argument('--set', nargs=2, action='append', metavar=("SETTING", "VALUE"), help="set a device setting to a VALUE. valid SETTINGs: %s" % ", ".join(val.name for val in BTFeatureSet)) args = parser.parse_args() verbose = args.verbose use_color = os.getenv("NO_COLOR", "") == "" and os.getenv("TERM") != "dumb" and sys.stdout.isatty() devinfo = find_device() if not devinfo: print("Could not find device with valid vendor/product IDs") sys.exit(1) print("Using device '%s' at %s" % (devinfo['product_string'], os.fsdecode(devinfo['path']))) dev = CTBTWDevice(devinfo['path'], verbose=verbose) if verbose: print("Opened device '%s'" % devinfo['product_string']) if args.aptxll or args.aptxhq: codec = BTCodec.aptXAdaptiveHighQuality if args.aptxhq else BTCodec.aptXAdaptiveLowLatency print("Switching preferred aptX codec to %s" % codec.name) msg = SetFeatureReq(BTFeatureSet.PreferredAptXCodecVariant, codec) dev.send_msg(msg) read_and_explain_msgs(dev, timeout_ms=1000) # Use larger timeout to give time to print final codec used, if any elif args.devices: print("List of paired devices:") for paired_dev in get_paired_devices(dev, timeout_ms=SHORT_TIMEOUT): print("%r (%s)" % (paired_dev.name, paired_dev.addr)) elif args.pair is not None: if args.pair=="*": # Pair with any device msg = SetInquiryModeReq(BTInquiryAction.InquiryWithPair) dev.send_msg(msg) print("Entering pairing mode") else: addr = BTAddr(args.pair) msg = SetInquiryModeReq(BTInquiryAction.InquiryPairSpecificSink, addr) dev.send_msg(msg) print("Pairing with %s"%addr) try: for msg in dev.read_msgs(timeout_ms=PAIR_TIMEOUT): if isinstance(msg, GetInquiryDataMsg): print("Found device %r (%s)" % (msg.displayName, msg.addr)) elif isinstance(msg, GetStateMsg): if msg.state == BTState.Connected: print("Connected to device %r"%msg.displayName) break except OSError: # Received on SIGINT... pass finally: try: print("Leaving pairing mode") except KeyboardInterrupt: pass finally: msg = StopInquiryModeReq() send_msg(dev, msg) elif args.forget: # TODO I couldn't get forgetting about just one device to work; it is not exercised by app msg = ForgetDeviceReq(index=255) dev.send_msg(msg) print("Erased all paired devices") # No point reading messages as device will self-reset elif args.connect or args.disconnect: action = BTConnectionAction.Connect if args.connect else BTConnectionAction.Disconnect name = args.connect if args.connect else args.disconnect if args.disconnect == '*': # Disconnect from current device curdev = get_currently_connected_device(dev, timeout_ms=SHORT_TIMEOUT) if curdev: addr = curdev.addr else: print("Not connected to any device") sys.exit(1) else: try: addr = BTAddr(name) except ValueError: # Not a bluetooth address! addr = get_paired_device_address(dev, name, timeout_ms=SHORT_TIMEOUT) if not addr: print("No such device: %s" % name) sys.exit(1) print("%s device '%s'..." % ("Connecting to" if args.connect else "Disconnecting from", addr)) msg = SetConnectionReq(action, addr) dev.send_msg(msg) read_and_explain_msgs(dev, timeout_ms=MID_TIMEOUT) elif args.get: features = [val.name for val in BTFeatureGet] for feature in args.get: feature = BTFeatureGet(features.index(feature)) msg = GetFeatureReq(feature) dev.send_msg(msg) for msg in dev.read_msgs(timeout_ms=SHORT_TIMEOUT): if isinstance(msg, GetFeatureMsg): if msg.feature == feature: print("%s : %s" % (msg.feature.name, msg.value)) else: break elif args.set: features = [val.name for val in BTFeatureSet] for feature, value in args.set: try: feature = BTFeatureSet(features.index(feature)) except ValueError: print("Feature %r not found, skipping" % feature) continue valuetype = feature.valuetype if valuetype is bool: value = value.lower() in ('y', 'yes', 't', 'true', 'on', '1') elif valuetype is int: value = int(value) msg = SetFeatureReq(feature, value) dev.send_msg(msg) print("Setting %s to %r"%(msg.feature, msg.value)) dev.read_msgs(timeout_ms=SHORT_TIMEOUT) else: msg = GetStateReq() dev.send_msg(msg) read_and_explain_msgs(dev, timeout_ms=SHORT_TIMEOUT) if verbose: print("Printing events from device, ^C to quit") read_and_explain_msgs(dev, timeout_ms=0)