summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJavier <dev.git@javispedro.com>2025-08-24 16:33:47 +0200
committerJavier <dev.git@javispedro.com>2025-08-24 16:33:47 +0200
commitd98b141421a2f9325d748ad93361211e65603fce (patch)
treef7f5937e7de1a25954de7d307e53f998cf92607f
parent5b34010d8f1c319711b9808716b998345eb3d7d9 (diff)
downloadctbtw-d98b141421a2f9325d748ad93361211e65603fce.tar.gz
ctbtw-d98b141421a2f9325d748ad93361211e65603fce.zip
refactor UI, introduce connecting/disconnecting by name
-rwxr-xr-xctbtw.py270
1 files changed, 161 insertions, 109 deletions
diff --git a/ctbtw.py b/ctbtw.py
index 9738ec8..a8ae77e 100755
--- a/ctbtw.py
+++ b/ctbtw.py
@@ -1,6 +1,7 @@
#!/usr/bin/env python3
-# ctbtw -- a small program to control BT-W* devices (like BT-W6)
+# ctbtw -- a small program to control BT-W* Bluetooth audio devices
+# Tested mostly on CT BT-W6
# Copyright (C) 2025 Javier S. Pedro
# This program is free software: you can redistribute it and/or modify
@@ -14,6 +15,7 @@
# GNU General Public License for more details.
from enum import Enum, Flag
+from typing import NamedTuple
from argparse import ArgumentParser
import struct
import hid
@@ -76,9 +78,12 @@ class BTCodec(int, Flag):
class BTAddr:
def __init__(self, value=None):
if isinstance(value, str):
+ if value.count(":") != 5:
+ raise ValueError("BTAddr must be xx:xx:xx:xx:xx:xx string")
self.bytes = bytes.fromhex(value.replace(":", " "))
elif isinstance(value, bytes):
- assert len(value) == 6
+ if len(value) != 6:
+ raise ValueError("BTAddr must be 6 bytes")
self.bytes = bytes(value[4:6] + value[0:2] + value[2:4])
elif value is None:
self.bytes = bytes()
@@ -246,7 +251,7 @@ class GetPairedDeviceDetailsMsg(BTMsg):
self.index = int(index)
self.valid = bool(valid)
self.addr = BTAddr.unpack(addr)
- self.displayName = displayName.decode("utf-8").rstrip('\x00')
+ self.displayName = displayName.rstrip(b'\x00').decode("utf-8")
return self
def pack(self):
@@ -269,7 +274,7 @@ class GetInquiryDataMsg(BTMsg):
assert prefix == cls.prefixId
assert cmd == cls.commandId
self.addr = BTAddr.unpack(addr)
- self.displayName = displayName.decode("utf-8").rstrip('\x00')
+ self.displayName = displayName.rstrip(b'\x00').decode("utf-8")
return self
def pack(self):
@@ -294,7 +299,7 @@ class GetStateMsg(BTMsg):
assert cmd == cls.commandId
self.state = BTState(state)
self.addr = BTAddr.unpack(addr)
- self.displayName = displayName.decode("utf-8").rstrip('\x00')
+ self.displayName = displayName.rstrip(b'\x00').decode("utf-8")
return self
def pack(self):
@@ -491,23 +496,67 @@ class SetFeatureReq(BTReq):
def __repr__(self):
return "SetFeatureReq(feature=%s,value=%r)"%(self.feature,self.value)
-# List of messages that can be received
-msgs = [AckMsg, GetNumPairedDevicesMsg, GetPairedDeviceDetailsMsg, GetInquiryDataMsg, GetStateMsg, GetFeatureMsg]
-# HID report number used to both receive/send messages
-REPORT_ID = 3
-# Size of this report (hardcoded...)
-REPORT_SIZE = 64
-# Vendor ID to search for
-CT_VENDOR_ID = 0x041e
-# Product IDs to search for
-CT_PRODUCT_IDS = [0x3130, 0x3132]
+class PairedDevice(NamedTuple):
+ name: str
+ addr: BTAddr
+
+class CTBTWDevice:
+ # List of messages that can be received
+ MSG_TYPES = [AckMsg, GetNumPairedDevicesMsg, GetPairedDeviceDetailsMsg, GetInquiryDataMsg, GetStateMsg, GetFeatureMsg]
+ # HID report number used to both receive/send messages
+ REPORT_ID = 3
+ # Size of this report (hardcoded...)
+ REPORT_SIZE = 64
+ # Vendor ID to search for
+ CT_VENDOR_ID = 0x041e
+ # Product IDs to search for. Hardcoded to [BT-W4, BT-W5, BT-W6]
+ CT_PRODUCT_IDS = [0x312B, 0x3130, 0x3132]
+
+ def __init__(self, devpath, verbose=False):
+ self._hid = hid.device()
+ self._hid.open_path(devpath)
+ self._verbose = verbose
+
+ def parse_msg(self, data):
+ assert len(data) == self.REPORT_SIZE
+ for msg in self.MSG_TYPES:
+ if msg.is_mine(data):
+ return msg.unpack(data)
+ return None
+
+ def send_msg(self, msg):
+ if self._verbose: print(repr(msg))
+ data = msg.pack()
+ assert len(data) <= self.REPORT_SIZE
+ if len(data) < self.REPORT_SIZE: # pad to report size
+ data += b"\0" * (self.REPORT_SIZE - len(data))
+ data = struct.pack("B", self.REPORT_ID) + data
+ assert len(data) == 1 + self.REPORT_SIZE
+ if self._verbose: print('> ', data.hex(sep=' '))
+ self._hid.write(data)
+
+ def read_msgs(self, timeout_ms=50):
+ while True:
+ data = self._hid.read(1 + self.REPORT_SIZE, timeout_ms=timeout_ms)
+ if len(data) == 0:
+ break
+ if self._verbose: print('< ', bytes(data).hex(sep=' '))
+ if len(data) == 1 + self.REPORT_SIZE and data[0] == self.REPORT_ID:
+ msg = dev.parse_msg(bytes(data[1:]))
+ if msg:
+ if self._verbose: print(' ', repr(msg))
+ yield msg
+ else:
+ if self._verbose: print(" Unknown message with prefix %x command %x " % (data[1], data[2]))
+
+
# Timeout (in msec) to use for short messages
SHORT_TIMEOUT=50
+# Timeout (in msec) to use for e.g. connection messages
+MID_TIMEOUT=1500
# Timeout (in msec) to use for messages related to pairing
PAIR_TIMEOUT=30000
-_verbose = False
-
bt_codec_colors = {
BTCodec.SBC : (0,102,179),
BTCodec.aptXClassic : (139,198,64),
@@ -527,57 +576,22 @@ def colored_dot(color):
return colorstr + dot + reset
def codec_name(codec):
- global _use_color, codec_color
+ global use_color
assert codec in BTCodec
codec_color = bt_codec_colors.get(codec)
- if _use_color and codec_color:
+ if use_color and codec_color:
return colored_dot(codec_color) + " " + codec.name
else:
return codec.name
-def parse_msg(data):
- assert len(data) == REPORT_SIZE
- for msg in msgs:
- if msg.is_mine(data):
- return msg.unpack(data)
- return None
-
-def send_msg(dev, msg):
- global _verbose
- if _verbose: print(repr(msg))
- data = msg.pack()
- assert len(data) <= REPORT_SIZE
- if len(data) < REPORT_SIZE: # pad to report size
- data += b"\0" * (REPORT_SIZE - len(data))
- data = struct.pack("B", REPORT_ID) + data
- assert len(data) == 1 + REPORT_SIZE
- if _verbose: print('> ', data.hex(sep=' '))
- dev.write(data)
-
-def read_msgs(dev, timeout_ms=1000):
- global _verbose
- while True:
- data = dev.read(1 + REPORT_SIZE, timeout_ms=timeout_ms)
- if len(data) == 0:
- break
- if _verbose: print('< ', bytes(data).hex(sep=' '))
- if len(data) == 1 + REPORT_SIZE and data[0] == REPORT_ID:
- msg = parse_msg(bytes(data[1:]))
- if msg:
- if _verbose: print(' ', repr(msg))
- yield msg
- else:
- if _verbose: print(" Unknown message with prefix %x command %x " % (data[1], data[2]))
-
-
-def find_device(args):
- for device in hid.enumerate(vendor_id=CT_VENDOR_ID):
- if device['vendor_id'] == CT_VENDOR_ID and device['product_id'] in CT_PRODUCT_IDS:
+def find_device():
+ for device in hid.enumerate(vendor_id=CTBTWDevice.CT_VENDOR_ID):
+ if device['vendor_id'] == CTBTWDevice.CT_VENDOR_ID and device['product_id'] in CTBTWDevice.CT_PRODUCT_IDS:
return device
return None
-def read_and_explain_msgs(dev, timeout_ms=1000):
- for msg in read_msgs(dev, timeout_ms=timeout_ms):
+def read_and_explain_msgs(dev: CTBTWDevice, timeout_ms=SHORT_TIMEOUT):
+ for msg in dev.read_msgs(timeout_ms=timeout_ms):
if isinstance(msg, GetStateMsg):
print("Currently %s to device '%s'" % (msg.state.name, msg.displayName))
elif isinstance(msg, GetFeatureMsg):
@@ -586,79 +600,99 @@ def read_and_explain_msgs(dev, timeout_ms=1000):
elif msg.feature == BTFeatureGet.ExtendedCodecsSupported:
print("Supported codecs: ", " ".join([codec_name(val) for val in msg.value]))
+def get_paired_devices(dev: CTBTWDevice, timeout_ms=SHORT_TIMEOUT):
+ num_devices = None
+ msg = GetNumPairedDevicesReq()
+ dev.send_msg(msg)
+ for msg in dev.read_msgs(timeout_ms=timeout_ms):
+ if isinstance(msg, GetNumPairedDevicesMsg):
+ num_devices = int(msg.value)
+ break
+ if num_devices is None:
+ raise IOError("Could not get number of paired devices")
+ if num_devices:
+ # Have to send a GetDeviceDetails msg for every potential index
+ for dev_index in range(num_devices):
+ msg = GetPairedDeviceDetailsReq(index=dev_index)
+ dev.send_msg(msg)
+ for msg in dev.read_msgs(timeout_ms=SHORT_TIMEOUT):
+ if isinstance(msg, GetPairedDeviceDetailsMsg):
+ if msg.valid:
+ yield PairedDevice(msg.displayName, msg.addr)
+ break
+
+def get_paired_device_address(dev: CTBTWDevice, dev_name: str, timeout_ms=SHORT_TIMEOUT):
+ fdev_name = dev_name.casefold()
+ for paired_dev in get_paired_devices(dev, timeout_ms=timeout_ms):
+ if paired_dev.name.casefold() == fdev_name:
+ return paired_dev.addr
+ return None
+
+def get_currently_connected_device(dev: CTBTWDevice, timeout_ms=SHORT_TIMEOUT):
+ paired_dev = None
+ msg = GetStateReq()
+ dev.send_msg(msg)
+ for msg in dev.read_msgs(timeout_ms=SHORT_TIMEOUT):
+ if isinstance(msg, GetStateMsg):
+ if msg.state in (BTState.Connected, BTState.Connecting):
+ paired_dev = PairedDevice(msg.displayName, msg.addr)
+ # Continue through the loop to flush all state messages and avoid spurious ones later one
+ return paired_dev
+
if __name__ == '__main__':
- parser = ArgumentParser(prog='ctbtw', description="Control CT BT-W* devices", epilog="")
+ parser = ArgumentParser(prog='ctbtw', description="Control CT BT-W* Bluetooth Audio Trasmitter devices", epilog="With no arguments, show current device state")
parser.add_argument('--verbose', '-v', action='store_true', help="print protocol traces")
actions = parser.add_mutually_exclusive_group()
actions.add_argument('--aptxll', '-l', action='store_true', help="for aptX, prefer low-latency mode")
- actions.add_argument('--aptxhq', '-q', action='store_true', help="for aptX, prefer high-quality mode (enables lossless)")
- actions.add_argument('--disconnect', '-t', metavar="ADDR", action='store', help="disconnect from device with given address")
- actions.add_argument('--connect', '-c', metavar="ADDR", action='store', help="connect to device with given address")
- actions.add_argument('--devices', '-d', action='store_true', help="view currently paired devices")
- actions.add_argument('--pair', '-p', nargs='?', metavar="ADDR", const="*", action='store', help='pair with specific bluetooth address, or any device if no ADDR specified')
- actions.add_argument('--forget', '-f', action='store_true', help='forget all paired devices')
- actions.add_argument('--get', action='append', choices=[val.name for val in BTFeatureGet], help="get the current value of a setting")
- actions.add_argument('--set', nargs=2, action='append', metavar=("KEY", "VALUE"), help="set a setting")
+ actions.add_argument('--aptxhq', '-q', action='store_true', help="for aptX, prefer high-quality mode (also enables aptX lossless)")
+ actions.add_argument('--devices', '-s', action='store_true', help="list currently paired devices")
+ actions.add_argument('--connect', '-c', metavar="DEV", action='store', help="connect to (already paired) device with given name/address")
+ actions.add_argument('--disconnect', '-d', metavar="D", action='store', nargs='?', const='*', help="disconnect from device with given name/address, or current device if no %(metavar)s")
+ actions.add_argument('--pair', '-p', nargs='?', metavar="ADDR", const="*", action='store', help='pair with specific bluetooth address, or any device if no %(metavar)s specified')
+ actions.add_argument('--forget', action='store_true', help='forget all paired devices')
+ actions.add_argument('--get', action='append', choices=[val.name for val in BTFeatureGet], metavar="SETTING", help="get the current value of a setting from device. valid %(metavar)ss: %(choices)s")
+ actions.add_argument('--set', nargs=2, action='append', metavar=("SETTING", "VALUE"), help="set a device setting to a VALUE. valid SETTINGs: %s" % ", ".join(val.name for val in BTFeatureSet))
args = parser.parse_args()
- _verbose = args.verbose
- _use_color = os.getenv("NO_COLOR", "") == "" and os.getenv("TERM") != "dumb" and sys.stdout.isatty()
+ verbose = args.verbose
+ use_color = os.getenv("NO_COLOR", "") == "" and os.getenv("TERM") != "dumb" and sys.stdout.isatty()
- devinfo = find_device(args)
+ devinfo = find_device()
if not devinfo:
print("Could not find device with valid vendor/product IDs")
sys.exit(1)
print("Using device '%s' at %s" % (devinfo['product_string'], os.fsdecode(devinfo['path'])))
- dev = hid.device()
- dev.open_path(devinfo['path'])
+ dev = CTBTWDevice(devinfo['path'], verbose=verbose)
- if _verbose: print("Opened device")
+ if verbose: print("Opened device '%s'" % devinfo['product_string'])
if args.aptxll or args.aptxhq:
codec = BTCodec.aptXAdaptiveHighQuality if args.aptxhq else BTCodec.aptXAdaptiveLowLatency
print("Switching preferred aptX codec to %s" % codec.name)
msg = SetFeatureReq(BTFeatureSet.PreferredAptXCodecVariant, codec)
- send_msg(dev, msg)
+ dev.send_msg(msg)
read_and_explain_msgs(dev, timeout_ms=1000) # Use larger timeout to give time to print final codec used, if any
elif args.devices:
- num_devices = None
- msg = GetNumPairedDevicesReq()
- send_msg(dev, msg)
- for msg in read_msgs(dev, timeout_ms=SHORT_TIMEOUT):
- if isinstance(msg, GetNumPairedDevicesMsg):
- num_devices = int(msg.value)
- break
- if num_devices:
- print("List of paired devices:")
- for dev_index in range(num_devices):
- msg = GetPairedDeviceDetailsReq(index=dev_index)
- send_msg(dev, msg)
- for msg in read_msgs(dev, timeout_ms=SHORT_TIMEOUT):
- if isinstance(msg, GetPairedDeviceDetailsMsg):
- if msg.valid:
- print("#%s : %r (%s)" % (msg.index, msg.displayName, msg.addr))
- break
- elif num_devices == 0:
- print("No paired devices")
- else:
- print("Cannot get list of paired devices")
+ print("List of paired devices:")
+ for paired_dev in get_paired_devices(dev, timeout_ms=SHORT_TIMEOUT):
+ print("%r (%s)" % (paired_dev.name, paired_dev.addr))
elif args.pair is not None:
if args.pair=="*":
# Pair with any device
msg = SetInquiryModeReq(BTInquiryAction.InquiryWithPair)
- send_msg(dev, msg)
+ dev.send_msg(msg)
print("Entering pairing mode")
else:
addr = BTAddr(args.pair)
msg = SetInquiryModeReq(BTInquiryAction.InquiryPairSpecificSink, addr)
- send_msg(dev, msg)
+ dev.send_msg(msg)
print("Pairing with %s"%addr)
try:
- for msg in read_msgs(dev, timeout_ms=PAIR_TIMEOUT):
+ for msg in dev.read_msgs(timeout_ms=PAIR_TIMEOUT):
if isinstance(msg, GetInquiryDataMsg):
print("Found device %r (%s)" % (msg.displayName, msg.addr))
elif isinstance(msg, GetStateMsg):
@@ -678,22 +712,40 @@ if __name__ == '__main__':
elif args.forget:
# TODO I couldn't get forgetting about just one device to work; it is not exercised by app
msg = ForgetDeviceReq(index=255)
- send_msg(dev, msg)
+ dev.send_msg(msg)
print("Erased all paired devices")
# No point reading messages as device will self-reset
elif args.connect or args.disconnect:
- addr = BTAddr(args.connect if args.connect else args.disconnect)
action = BTConnectionAction.Connect if args.connect else BTConnectionAction.Disconnect
+ name = args.connect if args.connect else args.disconnect
+ if args.disconnect == '*':
+ # Disconnect from current device
+ curdev = get_currently_connected_device(dev, timeout_ms=SHORT_TIMEOUT)
+ if curdev:
+ addr = curdev.addr
+ else:
+ print("Not connected to any device")
+ sys.exit(1)
+ else:
+ try:
+ addr = BTAddr(name)
+ except ValueError:
+ # Not a bluetooth address!
+ addr = get_paired_device_address(dev, name, timeout_ms=SHORT_TIMEOUT)
+ if not addr:
+ print("No such device: %s" % name)
+ sys.exit(1)
+ print("%s device '%s'..." % ("Connecting to" if args.connect else "Disconnecting from", addr))
msg = SetConnectionReq(action, addr)
- send_msg(dev, msg)
- read_and_explain_msgs(dev, timeout_ms=SHORT_TIMEOUT)
+ dev.send_msg(msg)
+ read_and_explain_msgs(dev, timeout_ms=MID_TIMEOUT)
elif args.get:
features = [val.name for val in BTFeatureGet]
for feature in args.get:
feature = BTFeatureGet(features.index(feature))
msg = GetFeatureReq(feature)
- send_msg(dev, msg)
- for msg in read_msgs(dev, timeout_ms=SHORT_TIMEOUT):
+ dev.send_msg(msg)
+ for msg in dev.read_msgs(timeout_ms=SHORT_TIMEOUT):
if isinstance(msg, GetFeatureMsg):
if msg.feature == feature:
print("%s : %s" % (msg.feature.name, msg.value))
@@ -713,13 +765,13 @@ if __name__ == '__main__':
elif valuetype is int:
value = int(value)
msg = SetFeatureReq(feature, value)
- send_msg(dev, msg)
+ dev.send_msg(msg)
print("Setting %s to %r"%(msg.feature, msg.value))
- read_msgs(dev, timeout_ms=SHORT_TIMEOUT)
+ dev.read_msgs(timeout_ms=SHORT_TIMEOUT)
else:
msg = GetStateReq()
- send_msg(dev, msg)
+ dev.send_msg(msg)
read_and_explain_msgs(dev, timeout_ms=SHORT_TIMEOUT)
- if _verbose:
+ if verbose:
print("Printing events from device, ^C to quit")
read_and_explain_msgs(dev, timeout_ms=0)