From 5b34010d8f1c319711b9808716b998345eb3d7d9 Mon Sep 17 00:00:00 2001 From: Javier Date: Tue, 5 Aug 2025 23:06:28 +0200 Subject: initial import --- ctbtw.py | 725 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 725 insertions(+) create mode 100755 ctbtw.py (limited to 'ctbtw.py') diff --git a/ctbtw.py b/ctbtw.py new file mode 100755 index 0000000..9738ec8 --- /dev/null +++ b/ctbtw.py @@ -0,0 +1,725 @@ +#!/usr/bin/env python3 + +# ctbtw -- a small program to control BT-W* devices (like BT-W6) +# Copyright (C) 2025 Javier S. Pedro + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +from enum import Enum, Flag +from argparse import ArgumentParser +import struct +import hid +import sys, os + +class Commands(int, Enum): + Ack = 2 + DevInfo = 7 + BTCommand = 0x6B + +class BTState(int, Enum): + Initializing = 0 + PoweredOff = 1 + TestMode = 2 + Idle = 3 + Connectable = 4 + Discoverable = 5 + Connecting = 6 + Inquiring = 7 + Connected = 8 + Paging = 9 + Disconnected = 10 + A2DPMode = 11 + HFPMode = 12 + PagingFailed = 13 + LEAudioUnicast = 14 + LEAudioUnicastWithVoiceChat = 15 + PublicAuracast = 16 + PrivateAuracast = 17 + +class BTDualMode(int, Flag): + Classic = 0 + LEAudio = 2 + +class BTAudioMode(int, Flag): + A2DP_HFP = 1 # "Enable Mic Input" + A2DP = 2 # "Disable Mic Input" + PublicAuracast = 8 + PrivateAuracast = 0x10 + +class BTProfile(int, Flag): + SBC = 1 + HFP = 2 + A2DP = 4 + AVRCP = 8 + LEAudio = 0x4000 + +class BTCodec(int, Flag): + SBC = 1 + Faststream = 2 + aptXClassic = 4 + aptXLowLatency = 8 + aptXHD = 0x10 + aptXAdaptiveLowLatency = 0x20 + aptXAdaptiveHighQuality = 0x40 + aptXLossless = 0x80 + aptXLite = 0x100 + LC3 = 0x200 + +class BTAddr: + def __init__(self, value=None): + if isinstance(value, str): + self.bytes = bytes.fromhex(value.replace(":", " ")) + elif isinstance(value, bytes): + assert len(value) == 6 + self.bytes = bytes(value[4:6] + value[0:2] + value[2:4]) + elif value is None: + self.bytes = bytes() + else: + raise TypeError("do know how to cast %r into BTAddr"%value) + + @staticmethod + def unpack(data): + self = BTAddr() + # TODO Likely something wrong here + assert len(data) == 6 + self.bytes = bytes(data[4:6] + data[0:2] + data[2:4]) + return self + + def pack(self): + return bytes(self.bytes[2:4] + self.bytes[4:6] + self.bytes[0:2]) + + def __repr__(self): + return "BTAddr('%s')"%self.bytes.hex(sep=':') + + def __str__(self): + return self.bytes.hex(sep=':') + + def __bytes__(self): + return self.pack() + +class BTOperation(int, Enum): + GetNumPairedDevices = 1 + GetPairedDeviceDetails = 2 + SetConnectionAction = 3 + SetInquiryMode = 4 + GetInquiryData = 5 + GetState = 6 + ForgetDevice = 7 + SetDeviceFriendlyName = 8 + GetFeature = 9 + SetFeature = 10 + StopInquiryMode = 11 + +class BTFeatureGet(int, Enum): + """Enum with types of feature bytes that can be read from device, and their datatype""" + def __new__(cls, value, valuetype): + obj = int.__new__(cls, value) + obj._value_ = value + obj.valuetype = valuetype + return obj + + AutoReconnect = 0, bool + MaxPairedDeviceSupported = 1, int + A2DPCodecSupported = 2, BTCodec + A2DPCodecCurrent = 3, BTCodec + LocalBluetoothAddress = 4, BTAddr + SupportedDeviceAudioMode = 5, BTAudioMode + CurrentDeviceAudioMode = 6, BTAudioMode + AutoMicActivation = 7, bool + PreferredAptXCodecVariant = 8, BTCodec + AuracastName = 9, str + AuracastCode = 10, str + ExtendedCodecsSupported = 11, BTCodec + ExtendedCurrentCodec = 12, BTCodec + PreferredDualMode = 13, BTDualMode + ConnectedDeviceSupportedBTProfile = 14, BTProfile + GamingModeState = 15, bool + BroadcastEncryptedState = 16, bool + +class BTFeatureSet(int, Enum): + """Enum with types of feature bytes that be sent to the device, and their datatype""" + def __new__(cls, value, valuetype): + obj = int.__new__(cls, value) + obj._value_ = value + obj.valuetype = valuetype + return obj + + AutoReconnect = 0, bool + CurrentDeviceAudioMode = 1, BTAudioMode + AutoMicActivation = 2, bool + PreferredAptXCodecVariant = 3, BTCodec + AuracastName = 4, str + AuracastCode = 5, str + PreferredDualMode = 6, BTDualMode + GamingMode = 7, bool + BroadcastEncryptedState = 8, bool + +class BTConnectionAction(int, Enum): + Connect = 0 + Disconnect = 1 + +class BTInquiryAction(int, Enum): + InquiryWithPair = 0 + InquiryGetSinksOnly = 1 + InquiryPairSpecificSink = 2 + +class Msg: + """Base class for messages.""" + prefixId = None + commandId = None + + @classmethod + def is_mine(cls, data): + """Called by message receiver. Should return true if passed data corresponds to a message of this class type.""" + assert cls.prefixId + assert cls.commandId + return data[0] == cls.prefixId and data[1] == cls.commandId.value + +class AckMsg(Msg): + prefixId = 0x5a # TODO Unclear what this prefix actually means, so hardcoding it + commandId = Commands.Ack + + @classmethod + def unpack(cls, data): + self = cls() + # Ignoring arguments + return self + + def __repr__(self): + return "AckMsg()" + +class BTMsg(Msg): + """Base class for all bluetooth-related messages.""" + prefixId = 0x5a # TODO + commandId = Commands.BTCommand + operationId = None + + @classmethod + def is_mine(cls, data): + assert cls.operationId is not None + return data[0] == cls.prefixId and data[1] == cls.commandId.value and data[3] == cls.operationId.value + +class GetNumPairedDevicesMsg(BTMsg): + operationId = BTOperation.GetNumPairedDevices + + def __init__(self): + self.value = None + + @classmethod + def unpack(cls, data): + self = cls() + prefix, cmd, msglen, op, value = struct.unpack_from(" ', data.hex(sep=' ')) + dev.write(data) + +def read_msgs(dev, timeout_ms=1000): + global _verbose + while True: + data = dev.read(1 + REPORT_SIZE, timeout_ms=timeout_ms) + if len(data) == 0: + break + if _verbose: print('< ', bytes(data).hex(sep=' ')) + if len(data) == 1 + REPORT_SIZE and data[0] == REPORT_ID: + msg = parse_msg(bytes(data[1:])) + if msg: + if _verbose: print(' ', repr(msg)) + yield msg + else: + if _verbose: print(" Unknown message with prefix %x command %x " % (data[1], data[2])) + + +def find_device(args): + for device in hid.enumerate(vendor_id=CT_VENDOR_ID): + if device['vendor_id'] == CT_VENDOR_ID and device['product_id'] in CT_PRODUCT_IDS: + return device + return None + +def read_and_explain_msgs(dev, timeout_ms=1000): + for msg in read_msgs(dev, timeout_ms=timeout_ms): + if isinstance(msg, GetStateMsg): + print("Currently %s to device '%s'" % (msg.state.name, msg.displayName)) + elif isinstance(msg, GetFeatureMsg): + if msg.feature == BTFeatureGet.ExtendedCurrentCodec: + print("Currently using codec '%s'" % codec_name(msg.value)) + elif msg.feature == BTFeatureGet.ExtendedCodecsSupported: + print("Supported codecs: ", " ".join([codec_name(val) for val in msg.value])) + +if __name__ == '__main__': + parser = ArgumentParser(prog='ctbtw', description="Control CT BT-W* devices", epilog="") + parser.add_argument('--verbose', '-v', action='store_true', help="print protocol traces") + actions = parser.add_mutually_exclusive_group() + actions.add_argument('--aptxll', '-l', action='store_true', help="for aptX, prefer low-latency mode") + actions.add_argument('--aptxhq', '-q', action='store_true', help="for aptX, prefer high-quality mode (enables lossless)") + actions.add_argument('--disconnect', '-t', metavar="ADDR", action='store', help="disconnect from device with given address") + actions.add_argument('--connect', '-c', metavar="ADDR", action='store', help="connect to device with given address") + actions.add_argument('--devices', '-d', action='store_true', help="view currently paired devices") + actions.add_argument('--pair', '-p', nargs='?', metavar="ADDR", const="*", action='store', help='pair with specific bluetooth address, or any device if no ADDR specified') + actions.add_argument('--forget', '-f', action='store_true', help='forget all paired devices') + actions.add_argument('--get', action='append', choices=[val.name for val in BTFeatureGet], help="get the current value of a setting") + actions.add_argument('--set', nargs=2, action='append', metavar=("KEY", "VALUE"), help="set a setting") + + args = parser.parse_args() + + _verbose = args.verbose + _use_color = os.getenv("NO_COLOR", "") == "" and os.getenv("TERM") != "dumb" and sys.stdout.isatty() + + devinfo = find_device(args) + if not devinfo: + print("Could not find device with valid vendor/product IDs") + sys.exit(1) + + print("Using device '%s' at %s" % (devinfo['product_string'], os.fsdecode(devinfo['path']))) + + dev = hid.device() + dev.open_path(devinfo['path']) + + if _verbose: print("Opened device") + + if args.aptxll or args.aptxhq: + codec = BTCodec.aptXAdaptiveHighQuality if args.aptxhq else BTCodec.aptXAdaptiveLowLatency + print("Switching preferred aptX codec to %s" % codec.name) + msg = SetFeatureReq(BTFeatureSet.PreferredAptXCodecVariant, codec) + send_msg(dev, msg) + read_and_explain_msgs(dev, timeout_ms=1000) # Use larger timeout to give time to print final codec used, if any + elif args.devices: + num_devices = None + msg = GetNumPairedDevicesReq() + send_msg(dev, msg) + for msg in read_msgs(dev, timeout_ms=SHORT_TIMEOUT): + if isinstance(msg, GetNumPairedDevicesMsg): + num_devices = int(msg.value) + break + if num_devices: + print("List of paired devices:") + for dev_index in range(num_devices): + msg = GetPairedDeviceDetailsReq(index=dev_index) + send_msg(dev, msg) + for msg in read_msgs(dev, timeout_ms=SHORT_TIMEOUT): + if isinstance(msg, GetPairedDeviceDetailsMsg): + if msg.valid: + print("#%s : %r (%s)" % (msg.index, msg.displayName, msg.addr)) + break + elif num_devices == 0: + print("No paired devices") + else: + print("Cannot get list of paired devices") + elif args.pair is not None: + if args.pair=="*": + # Pair with any device + msg = SetInquiryModeReq(BTInquiryAction.InquiryWithPair) + send_msg(dev, msg) + print("Entering pairing mode") + else: + addr = BTAddr(args.pair) + msg = SetInquiryModeReq(BTInquiryAction.InquiryPairSpecificSink, addr) + send_msg(dev, msg) + print("Pairing with %s"%addr) + + try: + for msg in read_msgs(dev, timeout_ms=PAIR_TIMEOUT): + if isinstance(msg, GetInquiryDataMsg): + print("Found device %r (%s)" % (msg.displayName, msg.addr)) + elif isinstance(msg, GetStateMsg): + if msg.state == BTState.Connected: + print("Connected to device %r"%msg.displayName) + break + except OSError: # Received on SIGINT... + pass + finally: + try: + print("Leaving pairing mode") + except KeyboardInterrupt: + pass + finally: + msg = StopInquiryModeReq() + send_msg(dev, msg) + elif args.forget: + # TODO I couldn't get forgetting about just one device to work; it is not exercised by app + msg = ForgetDeviceReq(index=255) + send_msg(dev, msg) + print("Erased all paired devices") + # No point reading messages as device will self-reset + elif args.connect or args.disconnect: + addr = BTAddr(args.connect if args.connect else args.disconnect) + action = BTConnectionAction.Connect if args.connect else BTConnectionAction.Disconnect + msg = SetConnectionReq(action, addr) + send_msg(dev, msg) + read_and_explain_msgs(dev, timeout_ms=SHORT_TIMEOUT) + elif args.get: + features = [val.name for val in BTFeatureGet] + for feature in args.get: + feature = BTFeatureGet(features.index(feature)) + msg = GetFeatureReq(feature) + send_msg(dev, msg) + for msg in read_msgs(dev, timeout_ms=SHORT_TIMEOUT): + if isinstance(msg, GetFeatureMsg): + if msg.feature == feature: + print("%s : %s" % (msg.feature.name, msg.value)) + else: + break + elif args.set: + features = [val.name for val in BTFeatureSet] + for feature, value in args.set: + try: + feature = BTFeatureSet(features.index(feature)) + except ValueError: + print("Feature %r not found, skipping" % feature) + continue + valuetype = feature.valuetype + if valuetype is bool: + value = value.lower() in ('y', 'yes', 't', 'true', 'on', '1') + elif valuetype is int: + value = int(value) + msg = SetFeatureReq(feature, value) + send_msg(dev, msg) + print("Setting %s to %r"%(msg.feature, msg.value)) + read_msgs(dev, timeout_ms=SHORT_TIMEOUT) + else: + msg = GetStateReq() + send_msg(dev, msg) + read_and_explain_msgs(dev, timeout_ms=SHORT_TIMEOUT) + if _verbose: + print("Printing events from device, ^C to quit") + read_and_explain_msgs(dev, timeout_ms=0) -- cgit v1.2.3