diff options
author | Javier <dev.git@javispedro.com> | 2025-08-05 23:06:28 +0200 |
---|---|---|
committer | Javier <dev.git@javispedro.com> | 2025-08-05 23:15:54 +0200 |
commit | 5b34010d8f1c319711b9808716b998345eb3d7d9 (patch) | |
tree | e00358c1d4b0a738057106d8b7a07fd2e9b48a35 | |
download | ctbtw-5b34010d8f1c319711b9808716b998345eb3d7d9.tar.gz ctbtw-5b34010d8f1c319711b9808716b998345eb3d7d9.zip |
initial import
-rwxr-xr-x | ctbtw.py | 725 |
1 files changed, 725 insertions, 0 deletions
diff --git a/ctbtw.py b/ctbtw.py new file mode 100755 index 0000000..9738ec8 --- /dev/null +++ b/ctbtw.py @@ -0,0 +1,725 @@ +#!/usr/bin/env python3 + +# ctbtw -- a small program to control BT-W* devices (like BT-W6) +# Copyright (C) 2025 Javier S. Pedro + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. + +from enum import Enum, Flag +from argparse import ArgumentParser +import struct +import hid +import sys, os + +class Commands(int, Enum): + Ack = 2 + DevInfo = 7 + BTCommand = 0x6B + +class BTState(int, Enum): + Initializing = 0 + PoweredOff = 1 + TestMode = 2 + Idle = 3 + Connectable = 4 + Discoverable = 5 + Connecting = 6 + Inquiring = 7 + Connected = 8 + Paging = 9 + Disconnected = 10 + A2DPMode = 11 + HFPMode = 12 + PagingFailed = 13 + LEAudioUnicast = 14 + LEAudioUnicastWithVoiceChat = 15 + PublicAuracast = 16 + PrivateAuracast = 17 + +class BTDualMode(int, Flag): + Classic = 0 + LEAudio = 2 + +class BTAudioMode(int, Flag): + A2DP_HFP = 1 # "Enable Mic Input" + A2DP = 2 # "Disable Mic Input" + PublicAuracast = 8 + PrivateAuracast = 0x10 + +class BTProfile(int, Flag): + SBC = 1 + HFP = 2 + A2DP = 4 + AVRCP = 8 + LEAudio = 0x4000 + +class BTCodec(int, Flag): + SBC = 1 + Faststream = 2 + aptXClassic = 4 + aptXLowLatency = 8 + aptXHD = 0x10 + aptXAdaptiveLowLatency = 0x20 + aptXAdaptiveHighQuality = 0x40 + aptXLossless = 0x80 + aptXLite = 0x100 + LC3 = 0x200 + +class BTAddr: + def __init__(self, value=None): + if isinstance(value, str): + self.bytes = bytes.fromhex(value.replace(":", " ")) + elif isinstance(value, bytes): + assert len(value) == 6 + self.bytes = bytes(value[4:6] + value[0:2] + value[2:4]) + elif value is None: + self.bytes = bytes() + else: + raise TypeError("do know how to cast %r into BTAddr"%value) + + @staticmethod + def unpack(data): + self = BTAddr() + # TODO Likely something wrong here + assert len(data) == 6 + self.bytes = bytes(data[4:6] + data[0:2] + data[2:4]) + return self + + def pack(self): + return bytes(self.bytes[2:4] + self.bytes[4:6] + self.bytes[0:2]) + + def __repr__(self): + return "BTAddr('%s')"%self.bytes.hex(sep=':') + + def __str__(self): + return self.bytes.hex(sep=':') + + def __bytes__(self): + return self.pack() + +class BTOperation(int, Enum): + GetNumPairedDevices = 1 + GetPairedDeviceDetails = 2 + SetConnectionAction = 3 + SetInquiryMode = 4 + GetInquiryData = 5 + GetState = 6 + ForgetDevice = 7 + SetDeviceFriendlyName = 8 + GetFeature = 9 + SetFeature = 10 + StopInquiryMode = 11 + +class BTFeatureGet(int, Enum): + """Enum with types of feature bytes that can be read from device, and their datatype""" + def __new__(cls, value, valuetype): + obj = int.__new__(cls, value) + obj._value_ = value + obj.valuetype = valuetype + return obj + + AutoReconnect = 0, bool + MaxPairedDeviceSupported = 1, int + A2DPCodecSupported = 2, BTCodec + A2DPCodecCurrent = 3, BTCodec + LocalBluetoothAddress = 4, BTAddr + SupportedDeviceAudioMode = 5, BTAudioMode + CurrentDeviceAudioMode = 6, BTAudioMode + AutoMicActivation = 7, bool + PreferredAptXCodecVariant = 8, BTCodec + AuracastName = 9, str + AuracastCode = 10, str + ExtendedCodecsSupported = 11, BTCodec + ExtendedCurrentCodec = 12, BTCodec + PreferredDualMode = 13, BTDualMode + ConnectedDeviceSupportedBTProfile = 14, BTProfile + GamingModeState = 15, bool + BroadcastEncryptedState = 16, bool + +class BTFeatureSet(int, Enum): + """Enum with types of feature bytes that be sent to the device, and their datatype""" + def __new__(cls, value, valuetype): + obj = int.__new__(cls, value) + obj._value_ = value + obj.valuetype = valuetype + return obj + + AutoReconnect = 0, bool + CurrentDeviceAudioMode = 1, BTAudioMode + AutoMicActivation = 2, bool + PreferredAptXCodecVariant = 3, BTCodec + AuracastName = 4, str + AuracastCode = 5, str + PreferredDualMode = 6, BTDualMode + GamingMode = 7, bool + BroadcastEncryptedState = 8, bool + +class BTConnectionAction(int, Enum): + Connect = 0 + Disconnect = 1 + +class BTInquiryAction(int, Enum): + InquiryWithPair = 0 + InquiryGetSinksOnly = 1 + InquiryPairSpecificSink = 2 + +class Msg: + """Base class for messages.""" + prefixId = None + commandId = None + + @classmethod + def is_mine(cls, data): + """Called by message receiver. Should return true if passed data corresponds to a message of this class type.""" + assert cls.prefixId + assert cls.commandId + return data[0] == cls.prefixId and data[1] == cls.commandId.value + +class AckMsg(Msg): + prefixId = 0x5a # TODO Unclear what this prefix actually means, so hardcoding it + commandId = Commands.Ack + + @classmethod + def unpack(cls, data): + self = cls() + # Ignoring arguments + return self + + def __repr__(self): + return "AckMsg()" + +class BTMsg(Msg): + """Base class for all bluetooth-related messages.""" + prefixId = 0x5a # TODO + commandId = Commands.BTCommand + operationId = None + + @classmethod + def is_mine(cls, data): + assert cls.operationId is not None + return data[0] == cls.prefixId and data[1] == cls.commandId.value and data[3] == cls.operationId.value + +class GetNumPairedDevicesMsg(BTMsg): + operationId = BTOperation.GetNumPairedDevices + + def __init__(self): + self.value = None + + @classmethod + def unpack(cls, data): + self = cls() + prefix, cmd, msglen, op, value = struct.unpack_from("<BBBB B", data) + assert prefix == cls.prefixId + assert cmd == cls.commandId + self.value = value + return self + + def pack(self): + raise NotImplementedError + + def __repr__(self): + return "GetNumPairedDevicesMsg(value=%s)"%self.value + +class GetPairedDeviceDetailsMsg(BTMsg): + operationId = BTOperation.GetPairedDeviceDetails + + def __init__(self): + self.index = None + self.valid = False + self.addr = bytes() + self.displayName = str() + + @classmethod + def unpack(cls, data): + self = cls() + prefix, cmd, msglen, op, index, valid, addr, displayName = struct.unpack_from("<BBBB BB6s40p", data) + assert prefix == cls.prefixId + assert cmd == cls.commandId + self.index = int(index) + self.valid = bool(valid) + self.addr = BTAddr.unpack(addr) + self.displayName = displayName.decode("utf-8").rstrip('\x00') + return self + + def pack(self): + raise NotImplementedError + + def __repr__(self): + return "GetPairedDeviceDetailsMsg(index=%s,valid=%s,addr=%r,displayName=%r)"%(self.index,self.valid,self.addr,self.displayName) + +class GetInquiryDataMsg(BTMsg): + operationId = BTOperation.GetInquiryData + + def __init__(self): + self.addr = bytes() + self.displayName = str() + + @classmethod + def unpack(cls, data): + self = cls() + prefix, cmd, msglen, op, addr, displayName = struct.unpack_from("<BBBB 6s40p", data) + assert prefix == cls.prefixId + assert cmd == cls.commandId + self.addr = BTAddr.unpack(addr) + self.displayName = displayName.decode("utf-8").rstrip('\x00') + return self + + def pack(self): + raise NotImplementedError + + def __repr__(self): + return "GetInquiryDataMsg(addr=%r,displayName=%r)"%(self.addr,self.displayName) + +class GetStateMsg(BTMsg): + operationId = BTOperation.GetState + + def __init__(self): + self.state = BTState.Initializing + self.addr = bytes() + self.displayName = str() + + @classmethod + def unpack(cls, data): + self = cls() + prefix, cmd, msglen, op, state, addr, displayName = struct.unpack_from("<BBBB B6s40p", data) + assert prefix == cls.prefixId + assert cmd == cls.commandId + self.state = BTState(state) + self.addr = BTAddr.unpack(addr) + self.displayName = displayName.decode("utf-8").rstrip('\x00') + return self + + def pack(self): + raise NotImplementedError + + def __repr__(self): + return "GetStateMsg(state=%s,addr=%r,displayName=%r)"%(self.state,self.addr,self.displayName) + +class GetFeatureMsg(BTMsg): + operationId = BTOperation.GetFeature + + def __init__(self): + self.feature = None + self.value = None + + @classmethod + def unpack(cls, data): + self = cls() + prefix, cmd, msglen, op, feature = struct.unpack_from("<BBBB B", data) + assert prefix == cls.prefixId + assert cmd == cls.commandId + offset = struct.calcsize("<BBBB B") + valuelen = msglen - struct.calcsize("<B B") + value = data[offset:offset+valuelen] + self.feature = BTFeatureGet(feature) + valuetype = self.feature.valuetype + if valuetype is bytes or valuetype is str: + # multi-byte value, pascal string + valuelen = value[0] + self.value = bytes(value[1:1+valuelen]) + if valuetype is str: + self.value = self.value.decode("utf-8") + elif valuetype is BTAddr: + # multi-byte value fixed length + self.value = valuetype(value[0:6]) + else: + # single-byte value + self.value = valuetype(value[0]) + + return self + + def pack(self): + raise NotImplementedError + + def __repr__(self): + return "GetFeatureMsg(feature=%s,value=%r)"%(self.feature,self.value) + +class BTReq(BTMsg): + pass + +class GetNumPairedDevicesReq(BTReq): + operationId = BTOperation.GetNumPairedDevices + + def __init__(self): + pass # No arguments + + def pack(self): + msglen = struct.calcsize("<B ") + return struct.pack("<BBBB ", self.prefixId, self.commandId, msglen, self.operationId) + + def __repr__(self): + return "GetNumPairedDevicesReq()" + +class GetPairedDeviceDetailsReq(BTReq): + operationId = BTOperation.GetPairedDeviceDetails + + def __init__(self, index = None): + self.index = index + + def pack(self): + assert self.index is not None + msglen = struct.calcsize("<B B") + return struct.pack("<BBBB B", self.prefixId, self.commandId, msglen, self.operationId, self.index) + + def __repr__(self): + return "GetPairedDeviceDetailsReq(index=%s)"%self.index + +class SetConnectionReq(BTReq): + operationId = BTOperation.SetConnectionAction + + def __init__(self, action=None, addr=None): + assert action in BTConnectionAction + self.action = action + self.addr = addr + + def pack(self): + if self.addr: + msglen = struct.calcsize("<B B6s") + return struct.pack("<BBBB B6s", self.prefixId, self.commandId, msglen, self.operationId, self.action, self.addr.pack()) + else: + msglen = struct.calcsize("<B B") + return struct.pack("<BBBB B", self.prefixId, self.commandId, msglen, self.operationId, self.action) + + def __repr__(self): + return "SetConnectionReq(action=%s,addr=%s)"%(self.action.name, self.addr) + +class SetInquiryModeReq(BTReq): + operationId = BTOperation.SetInquiryMode + + def __init__(self, action=None, addr=None): + assert action is None or action in BTInquiryAction + self.action = action + if addr: + assert action == BTInquiryAction.InquiryPairSpecificSink + self.addr = addr + else: + self.addr = None + + def pack(self): + if self.addr: + msglen = struct.calcsize("<B B6s") + return struct.pack("<BBBB B6s", self.prefixId, self.commandId, msglen, self.operationId, self.action, self.addr.pack()) + else: + msglen = struct.calcsize("<B B") + return struct.pack("<BBBB B", self.prefixId, self.commandId, msglen, self.operationId, self.action) + + def __repr__(self): + return "SetInquiryModeReq(action=%s,addr=%s)"%(self.action.name, self.addr) + +class StopInquiryModeReq(BTReq): + operationId = BTOperation.StopInquiryMode + + def __init__(self): + pass + + def pack(self): + msglen = struct.calcsize("<B ") + return struct.pack("<BBBB ", self.prefixId, self.commandId, msglen, self.operationId) + + def __repr__(self): + return "StopInquiryMode()" + +class GetStateReq(BTReq): + operationId = BTOperation.GetState + + def __init__(self): + pass + + def pack(self): + msglen = struct.calcsize("<B ") + return struct.pack("<BBBB ", self.prefixId, self.commandId, msglen, self.operationId) + + def __repr__(self): + return "GetStateReq()" + +class ForgetDeviceReq(BTReq): + operationId = BTOperation.ForgetDevice + + def __init__(self, index = None): + # -1 may also mean "all" + self.index = index + + def pack(self): + msglen = struct.calcsize("<B B") + return struct.pack("<BBBB B", self.prefixId, self.commandId, msglen, self.operationId, self.index) + + def __repr__(self): + return "ForgetDeviceReq(index=%s)"%self.index + +class GetFeatureReq(BTReq): + operationId = BTOperation.GetFeature + + def __init__(self, feature): + assert feature in BTFeatureGet + self.feature = feature + + def pack(self): + msglen = struct.calcsize("<B B") + return struct.pack("<BBBB B", self.prefixId, self.commandId, msglen, self.operationId, self.feature.value) + + def __repr__(self): + return "GetFeatureReq(feature=%s)"%(self.feature) + +class SetFeatureReq(BTReq): + operationId = BTOperation.SetFeature + + def __init__(self, feature, value): + assert feature in BTFeatureSet + self.feature = feature + self.value = feature.valuetype(value) + + def pack(self): + valuetype = self.feature.valuetype + if valuetype is str: + # Prefix with length + value = struct.pack("<40p", self.value.encode("utf-8")) + elif valuetype is BTAddr: + value = struct.pack("<6B", self.value) + else: + value = struct.pack("<B", self.value) + msglen = struct.calcsize("<B B") + len(value) + return struct.pack("<BBBB B", self.prefixId, self.commandId, msglen, self.operationId, self.feature.value) + value + + def __repr__(self): + return "SetFeatureReq(feature=%s,value=%r)"%(self.feature,self.value) + +# List of messages that can be received +msgs = [AckMsg, GetNumPairedDevicesMsg, GetPairedDeviceDetailsMsg, GetInquiryDataMsg, GetStateMsg, GetFeatureMsg] +# HID report number used to both receive/send messages +REPORT_ID = 3 +# Size of this report (hardcoded...) +REPORT_SIZE = 64 +# Vendor ID to search for +CT_VENDOR_ID = 0x041e +# Product IDs to search for +CT_PRODUCT_IDS = [0x3130, 0x3132] +# Timeout (in msec) to use for short messages +SHORT_TIMEOUT=50 +# Timeout (in msec) to use for messages related to pairing +PAIR_TIMEOUT=30000 + +_verbose = False + +bt_codec_colors = { + BTCodec.SBC : (0,102,179), + BTCodec.aptXClassic : (139,198,64), + BTCodec.aptXLowLatency : (0,173,239), + BTCodec.aptXHD : (246,148,31), + BTCodec.aptXAdaptiveLowLatency : (0,173,239), + BTCodec.aptXAdaptiveHighQuality : (110,44,145), + BTCodec.aptXLossless : (255,241,0), + BTCodec.aptXLite : (0,173,239), + BTCodec.LC3 : (255,255,255) + } + +def colored_dot(color): + colorstr = "\033[38;2;%d;%d;%dm"%color + dot = "●" + reset = "\033[0m" + return colorstr + dot + reset + +def codec_name(codec): + global _use_color, codec_color + assert codec in BTCodec + codec_color = bt_codec_colors.get(codec) + if _use_color and codec_color: + return colored_dot(codec_color) + " " + codec.name + else: + return codec.name + +def parse_msg(data): + assert len(data) == REPORT_SIZE + for msg in msgs: + if msg.is_mine(data): + return msg.unpack(data) + return None + +def send_msg(dev, msg): + global _verbose + if _verbose: print(repr(msg)) + data = msg.pack() + assert len(data) <= REPORT_SIZE + if len(data) < REPORT_SIZE: # pad to report size + data += b"\0" * (REPORT_SIZE - len(data)) + data = struct.pack("B", REPORT_ID) + data + assert len(data) == 1 + REPORT_SIZE + if _verbose: print('> ', data.hex(sep=' ')) + dev.write(data) + +def read_msgs(dev, timeout_ms=1000): + global _verbose + while True: + data = dev.read(1 + REPORT_SIZE, timeout_ms=timeout_ms) + if len(data) == 0: + break + if _verbose: print('< ', bytes(data).hex(sep=' ')) + if len(data) == 1 + REPORT_SIZE and data[0] == REPORT_ID: + msg = parse_msg(bytes(data[1:])) + if msg: + if _verbose: print(' ', repr(msg)) + yield msg + else: + if _verbose: print(" Unknown message with prefix %x command %x " % (data[1], data[2])) + + +def find_device(args): + for device in hid.enumerate(vendor_id=CT_VENDOR_ID): + if device['vendor_id'] == CT_VENDOR_ID and device['product_id'] in CT_PRODUCT_IDS: + return device + return None + +def read_and_explain_msgs(dev, timeout_ms=1000): + for msg in read_msgs(dev, timeout_ms=timeout_ms): + if isinstance(msg, GetStateMsg): + print("Currently %s to device '%s'" % (msg.state.name, msg.displayName)) + elif isinstance(msg, GetFeatureMsg): + if msg.feature == BTFeatureGet.ExtendedCurrentCodec: + print("Currently using codec '%s'" % codec_name(msg.value)) + elif msg.feature == BTFeatureGet.ExtendedCodecsSupported: + print("Supported codecs: ", " ".join([codec_name(val) for val in msg.value])) + +if __name__ == '__main__': + parser = ArgumentParser(prog='ctbtw', description="Control CT BT-W* devices", epilog="") + parser.add_argument('--verbose', '-v', action='store_true', help="print protocol traces") + actions = parser.add_mutually_exclusive_group() + actions.add_argument('--aptxll', '-l', action='store_true', help="for aptX, prefer low-latency mode") + actions.add_argument('--aptxhq', '-q', action='store_true', help="for aptX, prefer high-quality mode (enables lossless)") + actions.add_argument('--disconnect', '-t', metavar="ADDR", action='store', help="disconnect from device with given address") + actions.add_argument('--connect', '-c', metavar="ADDR", action='store', help="connect to device with given address") + actions.add_argument('--devices', '-d', action='store_true', help="view currently paired devices") + actions.add_argument('--pair', '-p', nargs='?', metavar="ADDR", const="*", action='store', help='pair with specific bluetooth address, or any device if no ADDR specified') + actions.add_argument('--forget', '-f', action='store_true', help='forget all paired devices') + actions.add_argument('--get', action='append', choices=[val.name for val in BTFeatureGet], help="get the current value of a setting") + actions.add_argument('--set', nargs=2, action='append', metavar=("KEY", "VALUE"), help="set a setting") + + args = parser.parse_args() + + _verbose = args.verbose + _use_color = os.getenv("NO_COLOR", "") == "" and os.getenv("TERM") != "dumb" and sys.stdout.isatty() + + devinfo = find_device(args) + if not devinfo: + print("Could not find device with valid vendor/product IDs") + sys.exit(1) + + print("Using device '%s' at %s" % (devinfo['product_string'], os.fsdecode(devinfo['path']))) + + dev = hid.device() + dev.open_path(devinfo['path']) + + if _verbose: print("Opened device") + + if args.aptxll or args.aptxhq: + codec = BTCodec.aptXAdaptiveHighQuality if args.aptxhq else BTCodec.aptXAdaptiveLowLatency + print("Switching preferred aptX codec to %s" % codec.name) + msg = SetFeatureReq(BTFeatureSet.PreferredAptXCodecVariant, codec) + send_msg(dev, msg) + read_and_explain_msgs(dev, timeout_ms=1000) # Use larger timeout to give time to print final codec used, if any + elif args.devices: + num_devices = None + msg = GetNumPairedDevicesReq() + send_msg(dev, msg) + for msg in read_msgs(dev, timeout_ms=SHORT_TIMEOUT): + if isinstance(msg, GetNumPairedDevicesMsg): + num_devices = int(msg.value) + break + if num_devices: + print("List of paired devices:") + for dev_index in range(num_devices): + msg = GetPairedDeviceDetailsReq(index=dev_index) + send_msg(dev, msg) + for msg in read_msgs(dev, timeout_ms=SHORT_TIMEOUT): + if isinstance(msg, GetPairedDeviceDetailsMsg): + if msg.valid: + print("#%s : %r (%s)" % (msg.index, msg.displayName, msg.addr)) + break + elif num_devices == 0: + print("No paired devices") + else: + print("Cannot get list of paired devices") + elif args.pair is not None: + if args.pair=="*": + # Pair with any device + msg = SetInquiryModeReq(BTInquiryAction.InquiryWithPair) + send_msg(dev, msg) + print("Entering pairing mode") + else: + addr = BTAddr(args.pair) + msg = SetInquiryModeReq(BTInquiryAction.InquiryPairSpecificSink, addr) + send_msg(dev, msg) + print("Pairing with %s"%addr) + + try: + for msg in read_msgs(dev, timeout_ms=PAIR_TIMEOUT): + if isinstance(msg, GetInquiryDataMsg): + print("Found device %r (%s)" % (msg.displayName, msg.addr)) + elif isinstance(msg, GetStateMsg): + if msg.state == BTState.Connected: + print("Connected to device %r"%msg.displayName) + break + except OSError: # Received on SIGINT... + pass + finally: + try: + print("Leaving pairing mode") + except KeyboardInterrupt: + pass + finally: + msg = StopInquiryModeReq() + send_msg(dev, msg) + elif args.forget: + # TODO I couldn't get forgetting about just one device to work; it is not exercised by app + msg = ForgetDeviceReq(index=255) + send_msg(dev, msg) + print("Erased all paired devices") + # No point reading messages as device will self-reset + elif args.connect or args.disconnect: + addr = BTAddr(args.connect if args.connect else args.disconnect) + action = BTConnectionAction.Connect if args.connect else BTConnectionAction.Disconnect + msg = SetConnectionReq(action, addr) + send_msg(dev, msg) + read_and_explain_msgs(dev, timeout_ms=SHORT_TIMEOUT) + elif args.get: + features = [val.name for val in BTFeatureGet] + for feature in args.get: + feature = BTFeatureGet(features.index(feature)) + msg = GetFeatureReq(feature) + send_msg(dev, msg) + for msg in read_msgs(dev, timeout_ms=SHORT_TIMEOUT): + if isinstance(msg, GetFeatureMsg): + if msg.feature == feature: + print("%s : %s" % (msg.feature.name, msg.value)) + else: + break + elif args.set: + features = [val.name for val in BTFeatureSet] + for feature, value in args.set: + try: + feature = BTFeatureSet(features.index(feature)) + except ValueError: + print("Feature %r not found, skipping" % feature) + continue + valuetype = feature.valuetype + if valuetype is bool: + value = value.lower() in ('y', 'yes', 't', 'true', 'on', '1') + elif valuetype is int: + value = int(value) + msg = SetFeatureReq(feature, value) + send_msg(dev, msg) + print("Setting %s to %r"%(msg.feature, msg.value)) + read_msgs(dev, timeout_ms=SHORT_TIMEOUT) + else: + msg = GetStateReq() + send_msg(dev, msg) + read_and_explain_msgs(dev, timeout_ms=SHORT_TIMEOUT) + if _verbose: + print("Printing events from device, ^C to quit") + read_and_explain_msgs(dev, timeout_ms=0) |