diff options
author | Javier <dev.git@javispedro.com> | 2014-10-19 18:45:03 +0200 |
---|---|---|
committer | Javier <dev.git@javispedro.com> | 2014-10-19 18:45:03 +0200 |
commit | d8d8fc7a0d139e7b864eee3b573bd208f823ad4f (patch) | |
tree | a9b54d6e6e6941c620f4f10cef4b5def9be86f82 /scripts/test-hfp-ag | |
download | sapd-d8d8fc7a0d139e7b864eee3b573bd208f823ad4f.tar.gz sapd-d8d8fc7a0d139e7b864eee3b573bd208f823ad4f.zip |
initial import, no crypto
Diffstat (limited to 'scripts/test-hfp-ag')
-rwxr-xr-x | scripts/test-hfp-ag | 210 |
1 files changed, 210 insertions, 0 deletions
diff --git a/scripts/test-hfp-ag b/scripts/test-hfp-ag new file mode 100755 index 0000000..3152339 --- /dev/null +++ b/scripts/test-hfp-ag @@ -0,0 +1,210 @@ +#!/usr/bin/python + +from __future__ import absolute_import, print_function, unicode_literals + +from optparse import OptionParser, make_option +import os +from socket import SOCK_SEQPACKET, socket +import sys +import dbus +import dbus.service +import dbus.mainloop.glib +import glib +try: + from gi.repository import GObject +except ImportError: + import gobject as GObject + +mainloop = None +audio_supported = True + +try: + from socket import AF_BLUETOOTH, BTPROTO_SCO +except: + print("WARNING: python compiled without Bluetooth support" + " - audio will not be available") + audio_supported = False + +BUF_SIZE = 1024 + +BDADDR_ANY = '00:00:00:00:00:00' + +HF_NREC = 0x0001 +HF_3WAY = 0x0002 +HF_CLI = 0x0004 +HF_VOICE_RECOGNITION = 0x0008 +HF_REMOTE_VOL = 0x0010 +HF_ENHANCED_STATUS = 0x0020 +HF_ENHANCED_CONTROL = 0x0040 +HF_CODEC_NEGOTIATION = 0x0080 + +AG_3WAY = 0x0001 +AG_NREC = 0x0002 +AG_VOICE_RECOGNITION = 0x0004 +AG_INBAND_RING = 0x0008 +AG_VOICE_TAG = 0x0010 +AG_REJECT_CALL = 0x0020 +AG_ENHANCED_STATUS = 0x0040 +AG_ENHANCED_CONTROL = 0x0080 +AG_EXTENDED_RESULT = 0x0100 +AG_CODEC_NEGOTIATION = 0x0200 + +HF_FEATURES = (HF_3WAY | HF_CLI | HF_VOICE_RECOGNITION | + HF_REMOTE_VOL | HF_ENHANCED_STATUS | + HF_ENHANCED_CONTROL | HF_CODEC_NEGOTIATION) + +AVAIL_CODECS = "1,2" + +class HfpConnection: + slc_complete = False + fd = None + io_id = 0 + version = 0 + features = 0 + pending = None + + def disconnect(self): + if (self.fd >= 0): + os.close(self.fd) + self.fd = -1 + glib.source_remove(self.io_id) + self.io_id = 0 + + def slc_completed(self): + print("SLC establisment complete") + self.slc_complete = True + + def slc_next_cmd(self, cmd): + print("Unknown SLC command completed: %s" % (cmd)) + + def io_cb(self, fd, cond): + buf = os.read(fd, BUF_SIZE) + buf = buf.strip() + + print("Received: %s" % (buf)) + + return True + + def send_cmd(self, cmd): + if (self.pending): + print("ERROR: Another command is pending") + return + + print("Sending: %s" % (cmd)) + + os.write(self.fd, cmd + "\r\n") + self.pending = cmd + + def __init__(self, fd, version, features): + self.fd = fd + self.version = version + self.features = features + + print("Version 0x%04x Features 0x%04x" % (version, features)) + + self.io_id = glib.io_add_watch(fd, glib.IO_IN, self.io_cb) + +class HfpProfile(dbus.service.Object): + sco_socket = None + io_id = 0 + conns = {} + + def sco_cb(self, sock, cond): + (sco, peer) = sock.accept() + print("New SCO connection from %s" % (peer)) + + def init_sco(self, sock): + self.sco_socket = sock + self.io_id = glib.io_add_watch(sock, glib.IO_IN, self.sco_cb) + + def __init__(self, bus, path, sco): + dbus.service.Object.__init__(self, bus, path) + + if sco: + self.init_sco(sco) + + @dbus.service.method("org.bluez.Profile1", + in_signature="", out_signature="") + def Release(self): + print("Release") + mainloop.quit() + + @dbus.service.method("org.bluez.Profile1", + in_signature="", out_signature="") + def Cancel(self): + print("Cancel") + + @dbus.service.method("org.bluez.Profile1", + in_signature="o", out_signature="") + def RequestDisconnection(self, path): + conn = self.conns.pop(path) + conn.disconnect() + + @dbus.service.method("org.bluez.Profile1", + in_signature="oha{sv}", out_signature="") + def NewConnection(self, path, fd, properties): + fd = fd.take() + version = 0x0105 + features = 0 + print("NewConnection(%s, %d)" % (path, fd)) + for key in properties.keys(): + if key == "Version": + version = properties[key] + elif key == "Features": + features = properties[key] + + #conn = HfpConnection(fd, version, features) + + #self.conns[path] = conn + +if __name__ == '__main__': + dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) + + bus = dbus.SystemBus() + + manager = dbus.Interface(bus.get_object("org.bluez", + "/org/bluez"), "org.bluez.ProfileManager1") + + option_list = [ + make_option("-p", "--path", action="store", + type="string", dest="path", + default="/bluez/test/hfp"), + make_option("-n", "--name", action="store", + type="string", dest="name", + default=None), + make_option("-C", "--channel", action="store", + type="int", dest="channel", + default=None), + ] + + parser = OptionParser(option_list=option_list) + + (options, args) = parser.parse_args() + + mainloop = GObject.MainLoop() + + opts = { + "Version" : dbus.UInt16(0x0106), + "Features" : dbus.UInt16(HF_FEATURES), + } + + if (options.name): + opts["Name"] = options.name + + if (options.channel is not None): + opts["Channel"] = dbus.UInt16(options.channel) + + if audio_supported: + sco = socket(AF_BLUETOOTH, SOCK_SEQPACKET, BTPROTO_SCO) + sco.bind(BDADDR_ANY) + sco.listen(1) + else: + sco = None + + profile = HfpProfile(bus, options.path, sco) + + manager.RegisterProfile(options.path, "hfp-ag", opts) + + print("Profile registered - waiting for connections") + + mainloop.run() |