summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJavier <dev.git@javispedro.com>2025-08-05 23:06:28 +0200
committerJavier <dev.git@javispedro.com>2025-08-05 23:15:54 +0200
commit5b34010d8f1c319711b9808716b998345eb3d7d9 (patch)
treee00358c1d4b0a738057106d8b7a07fd2e9b48a35
downloadctbtw-5b34010d8f1c319711b9808716b998345eb3d7d9.tar.gz
ctbtw-5b34010d8f1c319711b9808716b998345eb3d7d9.zip
initial import
-rwxr-xr-xctbtw.py725
1 files changed, 725 insertions, 0 deletions
diff --git a/ctbtw.py b/ctbtw.py
new file mode 100755
index 0000000..9738ec8
--- /dev/null
+++ b/ctbtw.py
@@ -0,0 +1,725 @@
+#!/usr/bin/env python3
+
+# ctbtw -- a small program to control BT-W* devices (like BT-W6)
+# Copyright (C) 2025 Javier S. Pedro
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+
+from enum import Enum, Flag
+from argparse import ArgumentParser
+import struct
+import hid
+import sys, os
+
+class Commands(int, Enum):
+ Ack = 2
+ DevInfo = 7
+ BTCommand = 0x6B
+
+class BTState(int, Enum):
+ Initializing = 0
+ PoweredOff = 1
+ TestMode = 2
+ Idle = 3
+ Connectable = 4
+ Discoverable = 5
+ Connecting = 6
+ Inquiring = 7
+ Connected = 8
+ Paging = 9
+ Disconnected = 10
+ A2DPMode = 11
+ HFPMode = 12
+ PagingFailed = 13
+ LEAudioUnicast = 14
+ LEAudioUnicastWithVoiceChat = 15
+ PublicAuracast = 16
+ PrivateAuracast = 17
+
+class BTDualMode(int, Flag):
+ Classic = 0
+ LEAudio = 2
+
+class BTAudioMode(int, Flag):
+ A2DP_HFP = 1 # "Enable Mic Input"
+ A2DP = 2 # "Disable Mic Input"
+ PublicAuracast = 8
+ PrivateAuracast = 0x10
+
+class BTProfile(int, Flag):
+ SBC = 1
+ HFP = 2
+ A2DP = 4
+ AVRCP = 8
+ LEAudio = 0x4000
+
+class BTCodec(int, Flag):
+ SBC = 1
+ Faststream = 2
+ aptXClassic = 4
+ aptXLowLatency = 8
+ aptXHD = 0x10
+ aptXAdaptiveLowLatency = 0x20
+ aptXAdaptiveHighQuality = 0x40
+ aptXLossless = 0x80
+ aptXLite = 0x100
+ LC3 = 0x200
+
+class BTAddr:
+ def __init__(self, value=None):
+ if isinstance(value, str):
+ self.bytes = bytes.fromhex(value.replace(":", " "))
+ elif isinstance(value, bytes):
+ assert len(value) == 6
+ self.bytes = bytes(value[4:6] + value[0:2] + value[2:4])
+ elif value is None:
+ self.bytes = bytes()
+ else:
+ raise TypeError("do know how to cast %r into BTAddr"%value)
+
+ @staticmethod
+ def unpack(data):
+ self = BTAddr()
+ # TODO Likely something wrong here
+ assert len(data) == 6
+ self.bytes = bytes(data[4:6] + data[0:2] + data[2:4])
+ return self
+
+ def pack(self):
+ return bytes(self.bytes[2:4] + self.bytes[4:6] + self.bytes[0:2])
+
+ def __repr__(self):
+ return "BTAddr('%s')"%self.bytes.hex(sep=':')
+
+ def __str__(self):
+ return self.bytes.hex(sep=':')
+
+ def __bytes__(self):
+ return self.pack()
+
+class BTOperation(int, Enum):
+ GetNumPairedDevices = 1
+ GetPairedDeviceDetails = 2
+ SetConnectionAction = 3
+ SetInquiryMode = 4
+ GetInquiryData = 5
+ GetState = 6
+ ForgetDevice = 7
+ SetDeviceFriendlyName = 8
+ GetFeature = 9
+ SetFeature = 10
+ StopInquiryMode = 11
+
+class BTFeatureGet(int, Enum):
+ """Enum with types of feature bytes that can be read from device, and their datatype"""
+ def __new__(cls, value, valuetype):
+ obj = int.__new__(cls, value)
+ obj._value_ = value
+ obj.valuetype = valuetype
+ return obj
+
+ AutoReconnect = 0, bool
+ MaxPairedDeviceSupported = 1, int
+ A2DPCodecSupported = 2, BTCodec
+ A2DPCodecCurrent = 3, BTCodec
+ LocalBluetoothAddress = 4, BTAddr
+ SupportedDeviceAudioMode = 5, BTAudioMode
+ CurrentDeviceAudioMode = 6, BTAudioMode
+ AutoMicActivation = 7, bool
+ PreferredAptXCodecVariant = 8, BTCodec
+ AuracastName = 9, str
+ AuracastCode = 10, str
+ ExtendedCodecsSupported = 11, BTCodec
+ ExtendedCurrentCodec = 12, BTCodec
+ PreferredDualMode = 13, BTDualMode
+ ConnectedDeviceSupportedBTProfile = 14, BTProfile
+ GamingModeState = 15, bool
+ BroadcastEncryptedState = 16, bool
+
+class BTFeatureSet(int, Enum):
+ """Enum with types of feature bytes that be sent to the device, and their datatype"""
+ def __new__(cls, value, valuetype):
+ obj = int.__new__(cls, value)
+ obj._value_ = value
+ obj.valuetype = valuetype
+ return obj
+
+ AutoReconnect = 0, bool
+ CurrentDeviceAudioMode = 1, BTAudioMode
+ AutoMicActivation = 2, bool
+ PreferredAptXCodecVariant = 3, BTCodec
+ AuracastName = 4, str
+ AuracastCode = 5, str
+ PreferredDualMode = 6, BTDualMode
+ GamingMode = 7, bool
+ BroadcastEncryptedState = 8, bool
+
+class BTConnectionAction(int, Enum):
+ Connect = 0
+ Disconnect = 1
+
+class BTInquiryAction(int, Enum):
+ InquiryWithPair = 0
+ InquiryGetSinksOnly = 1
+ InquiryPairSpecificSink = 2
+
+class Msg:
+ """Base class for messages."""
+ prefixId = None
+ commandId = None
+
+ @classmethod
+ def is_mine(cls, data):
+ """Called by message receiver. Should return true if passed data corresponds to a message of this class type."""
+ assert cls.prefixId
+ assert cls.commandId
+ return data[0] == cls.prefixId and data[1] == cls.commandId.value
+
+class AckMsg(Msg):
+ prefixId = 0x5a # TODO Unclear what this prefix actually means, so hardcoding it
+ commandId = Commands.Ack
+
+ @classmethod
+ def unpack(cls, data):
+ self = cls()
+ # Ignoring arguments
+ return self
+
+ def __repr__(self):
+ return "AckMsg()"
+
+class BTMsg(Msg):
+ """Base class for all bluetooth-related messages."""
+ prefixId = 0x5a # TODO
+ commandId = Commands.BTCommand
+ operationId = None
+
+ @classmethod
+ def is_mine(cls, data):
+ assert cls.operationId is not None
+ return data[0] == cls.prefixId and data[1] == cls.commandId.value and data[3] == cls.operationId.value
+
+class GetNumPairedDevicesMsg(BTMsg):
+ operationId = BTOperation.GetNumPairedDevices
+
+ def __init__(self):
+ self.value = None
+
+ @classmethod
+ def unpack(cls, data):
+ self = cls()
+ prefix, cmd, msglen, op, value = struct.unpack_from("<BBBB B", data)
+ assert prefix == cls.prefixId
+ assert cmd == cls.commandId
+ self.value = value
+ return self
+
+ def pack(self):
+ raise NotImplementedError
+
+ def __repr__(self):
+ return "GetNumPairedDevicesMsg(value=%s)"%self.value
+
+class GetPairedDeviceDetailsMsg(BTMsg):
+ operationId = BTOperation.GetPairedDeviceDetails
+
+ def __init__(self):
+ self.index = None
+ self.valid = False
+ self.addr = bytes()
+ self.displayName = str()
+
+ @classmethod
+ def unpack(cls, data):
+ self = cls()
+ prefix, cmd, msglen, op, index, valid, addr, displayName = struct.unpack_from("<BBBB BB6s40p", data)
+ assert prefix == cls.prefixId
+ assert cmd == cls.commandId
+ self.index = int(index)
+ self.valid = bool(valid)
+ self.addr = BTAddr.unpack(addr)
+ self.displayName = displayName.decode("utf-8").rstrip('\x00')
+ return self
+
+ def pack(self):
+ raise NotImplementedError
+
+ def __repr__(self):
+ return "GetPairedDeviceDetailsMsg(index=%s,valid=%s,addr=%r,displayName=%r)"%(self.index,self.valid,self.addr,self.displayName)
+
+class GetInquiryDataMsg(BTMsg):
+ operationId = BTOperation.GetInquiryData
+
+ def __init__(self):
+ self.addr = bytes()
+ self.displayName = str()
+
+ @classmethod
+ def unpack(cls, data):
+ self = cls()
+ prefix, cmd, msglen, op, addr, displayName = struct.unpack_from("<BBBB 6s40p", data)
+ assert prefix == cls.prefixId
+ assert cmd == cls.commandId
+ self.addr = BTAddr.unpack(addr)
+ self.displayName = displayName.decode("utf-8").rstrip('\x00')
+ return self
+
+ def pack(self):
+ raise NotImplementedError
+
+ def __repr__(self):
+ return "GetInquiryDataMsg(addr=%r,displayName=%r)"%(self.addr,self.displayName)
+
+class GetStateMsg(BTMsg):
+ operationId = BTOperation.GetState
+
+ def __init__(self):
+ self.state = BTState.Initializing
+ self.addr = bytes()
+ self.displayName = str()
+
+ @classmethod
+ def unpack(cls, data):
+ self = cls()
+ prefix, cmd, msglen, op, state, addr, displayName = struct.unpack_from("<BBBB B6s40p", data)
+ assert prefix == cls.prefixId
+ assert cmd == cls.commandId
+ self.state = BTState(state)
+ self.addr = BTAddr.unpack(addr)
+ self.displayName = displayName.decode("utf-8").rstrip('\x00')
+ return self
+
+ def pack(self):
+ raise NotImplementedError
+
+ def __repr__(self):
+ return "GetStateMsg(state=%s,addr=%r,displayName=%r)"%(self.state,self.addr,self.displayName)
+
+class GetFeatureMsg(BTMsg):
+ operationId = BTOperation.GetFeature
+
+ def __init__(self):
+ self.feature = None
+ self.value = None
+
+ @classmethod
+ def unpack(cls, data):
+ self = cls()
+ prefix, cmd, msglen, op, feature = struct.unpack_from("<BBBB B", data)
+ assert prefix == cls.prefixId
+ assert cmd == cls.commandId
+ offset = struct.calcsize("<BBBB B")
+ valuelen = msglen - struct.calcsize("<B B")
+ value = data[offset:offset+valuelen]
+ self.feature = BTFeatureGet(feature)
+ valuetype = self.feature.valuetype
+ if valuetype is bytes or valuetype is str:
+ # multi-byte value, pascal string
+ valuelen = value[0]
+ self.value = bytes(value[1:1+valuelen])
+ if valuetype is str:
+ self.value = self.value.decode("utf-8")
+ elif valuetype is BTAddr:
+ # multi-byte value fixed length
+ self.value = valuetype(value[0:6])
+ else:
+ # single-byte value
+ self.value = valuetype(value[0])
+
+ return self
+
+ def pack(self):
+ raise NotImplementedError
+
+ def __repr__(self):
+ return "GetFeatureMsg(feature=%s,value=%r)"%(self.feature,self.value)
+
+class BTReq(BTMsg):
+ pass
+
+class GetNumPairedDevicesReq(BTReq):
+ operationId = BTOperation.GetNumPairedDevices
+
+ def __init__(self):
+ pass # No arguments
+
+ def pack(self):
+ msglen = struct.calcsize("<B ")
+ return struct.pack("<BBBB ", self.prefixId, self.commandId, msglen, self.operationId)
+
+ def __repr__(self):
+ return "GetNumPairedDevicesReq()"
+
+class GetPairedDeviceDetailsReq(BTReq):
+ operationId = BTOperation.GetPairedDeviceDetails
+
+ def __init__(self, index = None):
+ self.index = index
+
+ def pack(self):
+ assert self.index is not None
+ msglen = struct.calcsize("<B B")
+ return struct.pack("<BBBB B", self.prefixId, self.commandId, msglen, self.operationId, self.index)
+
+ def __repr__(self):
+ return "GetPairedDeviceDetailsReq(index=%s)"%self.index
+
+class SetConnectionReq(BTReq):
+ operationId = BTOperation.SetConnectionAction
+
+ def __init__(self, action=None, addr=None):
+ assert action in BTConnectionAction
+ self.action = action
+ self.addr = addr
+
+ def pack(self):
+ if self.addr:
+ msglen = struct.calcsize("<B B6s")
+ return struct.pack("<BBBB B6s", self.prefixId, self.commandId, msglen, self.operationId, self.action, self.addr.pack())
+ else:
+ msglen = struct.calcsize("<B B")
+ return struct.pack("<BBBB B", self.prefixId, self.commandId, msglen, self.operationId, self.action)
+
+ def __repr__(self):
+ return "SetConnectionReq(action=%s,addr=%s)"%(self.action.name, self.addr)
+
+class SetInquiryModeReq(BTReq):
+ operationId = BTOperation.SetInquiryMode
+
+ def __init__(self, action=None, addr=None):
+ assert action is None or action in BTInquiryAction
+ self.action = action
+ if addr:
+ assert action == BTInquiryAction.InquiryPairSpecificSink
+ self.addr = addr
+ else:
+ self.addr = None
+
+ def pack(self):
+ if self.addr:
+ msglen = struct.calcsize("<B B6s")
+ return struct.pack("<BBBB B6s", self.prefixId, self.commandId, msglen, self.operationId, self.action, self.addr.pack())
+ else:
+ msglen = struct.calcsize("<B B")
+ return struct.pack("<BBBB B", self.prefixId, self.commandId, msglen, self.operationId, self.action)
+
+ def __repr__(self):
+ return "SetInquiryModeReq(action=%s,addr=%s)"%(self.action.name, self.addr)
+
+class StopInquiryModeReq(BTReq):
+ operationId = BTOperation.StopInquiryMode
+
+ def __init__(self):
+ pass
+
+ def pack(self):
+ msglen = struct.calcsize("<B ")
+ return struct.pack("<BBBB ", self.prefixId, self.commandId, msglen, self.operationId)
+
+ def __repr__(self):
+ return "StopInquiryMode()"
+
+class GetStateReq(BTReq):
+ operationId = BTOperation.GetState
+
+ def __init__(self):
+ pass
+
+ def pack(self):
+ msglen = struct.calcsize("<B ")
+ return struct.pack("<BBBB ", self.prefixId, self.commandId, msglen, self.operationId)
+
+ def __repr__(self):
+ return "GetStateReq()"
+
+class ForgetDeviceReq(BTReq):
+ operationId = BTOperation.ForgetDevice
+
+ def __init__(self, index = None):
+ # -1 may also mean "all"
+ self.index = index
+
+ def pack(self):
+ msglen = struct.calcsize("<B B")
+ return struct.pack("<BBBB B", self.prefixId, self.commandId, msglen, self.operationId, self.index)
+
+ def __repr__(self):
+ return "ForgetDeviceReq(index=%s)"%self.index
+
+class GetFeatureReq(BTReq):
+ operationId = BTOperation.GetFeature
+
+ def __init__(self, feature):
+ assert feature in BTFeatureGet
+ self.feature = feature
+
+ def pack(self):
+ msglen = struct.calcsize("<B B")
+ return struct.pack("<BBBB B", self.prefixId, self.commandId, msglen, self.operationId, self.feature.value)
+
+ def __repr__(self):
+ return "GetFeatureReq(feature=%s)"%(self.feature)
+
+class SetFeatureReq(BTReq):
+ operationId = BTOperation.SetFeature
+
+ def __init__(self, feature, value):
+ assert feature in BTFeatureSet
+ self.feature = feature
+ self.value = feature.valuetype(value)
+
+ def pack(self):
+ valuetype = self.feature.valuetype
+ if valuetype is str:
+ # Prefix with length
+ value = struct.pack("<40p", self.value.encode("utf-8"))
+ elif valuetype is BTAddr:
+ value = struct.pack("<6B", self.value)
+ else:
+ value = struct.pack("<B", self.value)
+ msglen = struct.calcsize("<B B") + len(value)
+ return struct.pack("<BBBB B", self.prefixId, self.commandId, msglen, self.operationId, self.feature.value) + value
+
+ def __repr__(self):
+ return "SetFeatureReq(feature=%s,value=%r)"%(self.feature,self.value)
+
+# List of messages that can be received
+msgs = [AckMsg, GetNumPairedDevicesMsg, GetPairedDeviceDetailsMsg, GetInquiryDataMsg, GetStateMsg, GetFeatureMsg]
+# HID report number used to both receive/send messages
+REPORT_ID = 3
+# Size of this report (hardcoded...)
+REPORT_SIZE = 64
+# Vendor ID to search for
+CT_VENDOR_ID = 0x041e
+# Product IDs to search for
+CT_PRODUCT_IDS = [0x3130, 0x3132]
+# Timeout (in msec) to use for short messages
+SHORT_TIMEOUT=50
+# Timeout (in msec) to use for messages related to pairing
+PAIR_TIMEOUT=30000
+
+_verbose = False
+
+bt_codec_colors = {
+ BTCodec.SBC : (0,102,179),
+ BTCodec.aptXClassic : (139,198,64),
+ BTCodec.aptXLowLatency : (0,173,239),
+ BTCodec.aptXHD : (246,148,31),
+ BTCodec.aptXAdaptiveLowLatency : (0,173,239),
+ BTCodec.aptXAdaptiveHighQuality : (110,44,145),
+ BTCodec.aptXLossless : (255,241,0),
+ BTCodec.aptXLite : (0,173,239),
+ BTCodec.LC3 : (255,255,255)
+ }
+
+def colored_dot(color):
+ colorstr = "\033[38;2;%d;%d;%dm"%color
+ dot = "●"
+ reset = "\033[0m"
+ return colorstr + dot + reset
+
+def codec_name(codec):
+ global _use_color, codec_color
+ assert codec in BTCodec
+ codec_color = bt_codec_colors.get(codec)
+ if _use_color and codec_color:
+ return colored_dot(codec_color) + " " + codec.name
+ else:
+ return codec.name
+
+def parse_msg(data):
+ assert len(data) == REPORT_SIZE
+ for msg in msgs:
+ if msg.is_mine(data):
+ return msg.unpack(data)
+ return None
+
+def send_msg(dev, msg):
+ global _verbose
+ if _verbose: print(repr(msg))
+ data = msg.pack()
+ assert len(data) <= REPORT_SIZE
+ if len(data) < REPORT_SIZE: # pad to report size
+ data += b"\0" * (REPORT_SIZE - len(data))
+ data = struct.pack("B", REPORT_ID) + data
+ assert len(data) == 1 + REPORT_SIZE
+ if _verbose: print('> ', data.hex(sep=' '))
+ dev.write(data)
+
+def read_msgs(dev, timeout_ms=1000):
+ global _verbose
+ while True:
+ data = dev.read(1 + REPORT_SIZE, timeout_ms=timeout_ms)
+ if len(data) == 0:
+ break
+ if _verbose: print('< ', bytes(data).hex(sep=' '))
+ if len(data) == 1 + REPORT_SIZE and data[0] == REPORT_ID:
+ msg = parse_msg(bytes(data[1:]))
+ if msg:
+ if _verbose: print(' ', repr(msg))
+ yield msg
+ else:
+ if _verbose: print(" Unknown message with prefix %x command %x " % (data[1], data[2]))
+
+
+def find_device(args):
+ for device in hid.enumerate(vendor_id=CT_VENDOR_ID):
+ if device['vendor_id'] == CT_VENDOR_ID and device['product_id'] in CT_PRODUCT_IDS:
+ return device
+ return None
+
+def read_and_explain_msgs(dev, timeout_ms=1000):
+ for msg in read_msgs(dev, timeout_ms=timeout_ms):
+ if isinstance(msg, GetStateMsg):
+ print("Currently %s to device '%s'" % (msg.state.name, msg.displayName))
+ elif isinstance(msg, GetFeatureMsg):
+ if msg.feature == BTFeatureGet.ExtendedCurrentCodec:
+ print("Currently using codec '%s'" % codec_name(msg.value))
+ elif msg.feature == BTFeatureGet.ExtendedCodecsSupported:
+ print("Supported codecs: ", " ".join([codec_name(val) for val in msg.value]))
+
+if __name__ == '__main__':
+ parser = ArgumentParser(prog='ctbtw', description="Control CT BT-W* devices", epilog="")
+ parser.add_argument('--verbose', '-v', action='store_true', help="print protocol traces")
+ actions = parser.add_mutually_exclusive_group()
+ actions.add_argument('--aptxll', '-l', action='store_true', help="for aptX, prefer low-latency mode")
+ actions.add_argument('--aptxhq', '-q', action='store_true', help="for aptX, prefer high-quality mode (enables lossless)")
+ actions.add_argument('--disconnect', '-t', metavar="ADDR", action='store', help="disconnect from device with given address")
+ actions.add_argument('--connect', '-c', metavar="ADDR", action='store', help="connect to device with given address")
+ actions.add_argument('--devices', '-d', action='store_true', help="view currently paired devices")
+ actions.add_argument('--pair', '-p', nargs='?', metavar="ADDR", const="*", action='store', help='pair with specific bluetooth address, or any device if no ADDR specified')
+ actions.add_argument('--forget', '-f', action='store_true', help='forget all paired devices')
+ actions.add_argument('--get', action='append', choices=[val.name for val in BTFeatureGet], help="get the current value of a setting")
+ actions.add_argument('--set', nargs=2, action='append', metavar=("KEY", "VALUE"), help="set a setting")
+
+ args = parser.parse_args()
+
+ _verbose = args.verbose
+ _use_color = os.getenv("NO_COLOR", "") == "" and os.getenv("TERM") != "dumb" and sys.stdout.isatty()
+
+ devinfo = find_device(args)
+ if not devinfo:
+ print("Could not find device with valid vendor/product IDs")
+ sys.exit(1)
+
+ print("Using device '%s' at %s" % (devinfo['product_string'], os.fsdecode(devinfo['path'])))
+
+ dev = hid.device()
+ dev.open_path(devinfo['path'])
+
+ if _verbose: print("Opened device")
+
+ if args.aptxll or args.aptxhq:
+ codec = BTCodec.aptXAdaptiveHighQuality if args.aptxhq else BTCodec.aptXAdaptiveLowLatency
+ print("Switching preferred aptX codec to %s" % codec.name)
+ msg = SetFeatureReq(BTFeatureSet.PreferredAptXCodecVariant, codec)
+ send_msg(dev, msg)
+ read_and_explain_msgs(dev, timeout_ms=1000) # Use larger timeout to give time to print final codec used, if any
+ elif args.devices:
+ num_devices = None
+ msg = GetNumPairedDevicesReq()
+ send_msg(dev, msg)
+ for msg in read_msgs(dev, timeout_ms=SHORT_TIMEOUT):
+ if isinstance(msg, GetNumPairedDevicesMsg):
+ num_devices = int(msg.value)
+ break
+ if num_devices:
+ print("List of paired devices:")
+ for dev_index in range(num_devices):
+ msg = GetPairedDeviceDetailsReq(index=dev_index)
+ send_msg(dev, msg)
+ for msg in read_msgs(dev, timeout_ms=SHORT_TIMEOUT):
+ if isinstance(msg, GetPairedDeviceDetailsMsg):
+ if msg.valid:
+ print("#%s : %r (%s)" % (msg.index, msg.displayName, msg.addr))
+ break
+ elif num_devices == 0:
+ print("No paired devices")
+ else:
+ print("Cannot get list of paired devices")
+ elif args.pair is not None:
+ if args.pair=="*":
+ # Pair with any device
+ msg = SetInquiryModeReq(BTInquiryAction.InquiryWithPair)
+ send_msg(dev, msg)
+ print("Entering pairing mode")
+ else:
+ addr = BTAddr(args.pair)
+ msg = SetInquiryModeReq(BTInquiryAction.InquiryPairSpecificSink, addr)
+ send_msg(dev, msg)
+ print("Pairing with %s"%addr)
+
+ try:
+ for msg in read_msgs(dev, timeout_ms=PAIR_TIMEOUT):
+ if isinstance(msg, GetInquiryDataMsg):
+ print("Found device %r (%s)" % (msg.displayName, msg.addr))
+ elif isinstance(msg, GetStateMsg):
+ if msg.state == BTState.Connected:
+ print("Connected to device %r"%msg.displayName)
+ break
+ except OSError: # Received on SIGINT...
+ pass
+ finally:
+ try:
+ print("Leaving pairing mode")
+ except KeyboardInterrupt:
+ pass
+ finally:
+ msg = StopInquiryModeReq()
+ send_msg(dev, msg)
+ elif args.forget:
+ # TODO I couldn't get forgetting about just one device to work; it is not exercised by app
+ msg = ForgetDeviceReq(index=255)
+ send_msg(dev, msg)
+ print("Erased all paired devices")
+ # No point reading messages as device will self-reset
+ elif args.connect or args.disconnect:
+ addr = BTAddr(args.connect if args.connect else args.disconnect)
+ action = BTConnectionAction.Connect if args.connect else BTConnectionAction.Disconnect
+ msg = SetConnectionReq(action, addr)
+ send_msg(dev, msg)
+ read_and_explain_msgs(dev, timeout_ms=SHORT_TIMEOUT)
+ elif args.get:
+ features = [val.name for val in BTFeatureGet]
+ for feature in args.get:
+ feature = BTFeatureGet(features.index(feature))
+ msg = GetFeatureReq(feature)
+ send_msg(dev, msg)
+ for msg in read_msgs(dev, timeout_ms=SHORT_TIMEOUT):
+ if isinstance(msg, GetFeatureMsg):
+ if msg.feature == feature:
+ print("%s : %s" % (msg.feature.name, msg.value))
+ else:
+ break
+ elif args.set:
+ features = [val.name for val in BTFeatureSet]
+ for feature, value in args.set:
+ try:
+ feature = BTFeatureSet(features.index(feature))
+ except ValueError:
+ print("Feature %r not found, skipping" % feature)
+ continue
+ valuetype = feature.valuetype
+ if valuetype is bool:
+ value = value.lower() in ('y', 'yes', 't', 'true', 'on', '1')
+ elif valuetype is int:
+ value = int(value)
+ msg = SetFeatureReq(feature, value)
+ send_msg(dev, msg)
+ print("Setting %s to %r"%(msg.feature, msg.value))
+ read_msgs(dev, timeout_ms=SHORT_TIMEOUT)
+ else:
+ msg = GetStateReq()
+ send_msg(dev, msg)
+ read_and_explain_msgs(dev, timeout_ms=SHORT_TIMEOUT)
+ if _verbose:
+ print("Printing events from device, ^C to quit")
+ read_and_explain_msgs(dev, timeout_ms=0)