summaryrefslogtreecommitdiff
path: root/scripts
diff options
context:
space:
mode:
authorJavier <dev.git@javispedro.com>2014-10-19 18:45:03 +0200
committerJavier <dev.git@javispedro.com>2014-10-19 18:45:03 +0200
commitd8d8fc7a0d139e7b864eee3b573bd208f823ad4f (patch)
treea9b54d6e6e6941c620f4f10cef4b5def9be86f82 /scripts
downloadsapd-d8d8fc7a0d139e7b864eee3b573bd208f823ad4f.tar.gz
sapd-d8d8fc7a0d139e7b864eee3b573bd208f823ad4f.zip
initial import, no crypto
Diffstat (limited to 'scripts')
-rwxr-xr-xscripts/test-hfp-ag210
1 files changed, 210 insertions, 0 deletions
diff --git a/scripts/test-hfp-ag b/scripts/test-hfp-ag
new file mode 100755
index 0000000..3152339
--- /dev/null
+++ b/scripts/test-hfp-ag
@@ -0,0 +1,210 @@
+#!/usr/bin/python
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+from optparse import OptionParser, make_option
+import os
+from socket import SOCK_SEQPACKET, socket
+import sys
+import dbus
+import dbus.service
+import dbus.mainloop.glib
+import glib
+try:
+ from gi.repository import GObject
+except ImportError:
+ import gobject as GObject
+
+mainloop = None
+audio_supported = True
+
+try:
+ from socket import AF_BLUETOOTH, BTPROTO_SCO
+except:
+ print("WARNING: python compiled without Bluetooth support"
+ " - audio will not be available")
+ audio_supported = False
+
+BUF_SIZE = 1024
+
+BDADDR_ANY = '00:00:00:00:00:00'
+
+HF_NREC = 0x0001
+HF_3WAY = 0x0002
+HF_CLI = 0x0004
+HF_VOICE_RECOGNITION = 0x0008
+HF_REMOTE_VOL = 0x0010
+HF_ENHANCED_STATUS = 0x0020
+HF_ENHANCED_CONTROL = 0x0040
+HF_CODEC_NEGOTIATION = 0x0080
+
+AG_3WAY = 0x0001
+AG_NREC = 0x0002
+AG_VOICE_RECOGNITION = 0x0004
+AG_INBAND_RING = 0x0008
+AG_VOICE_TAG = 0x0010
+AG_REJECT_CALL = 0x0020
+AG_ENHANCED_STATUS = 0x0040
+AG_ENHANCED_CONTROL = 0x0080
+AG_EXTENDED_RESULT = 0x0100
+AG_CODEC_NEGOTIATION = 0x0200
+
+HF_FEATURES = (HF_3WAY | HF_CLI | HF_VOICE_RECOGNITION |
+ HF_REMOTE_VOL | HF_ENHANCED_STATUS |
+ HF_ENHANCED_CONTROL | HF_CODEC_NEGOTIATION)
+
+AVAIL_CODECS = "1,2"
+
+class HfpConnection:
+ slc_complete = False
+ fd = None
+ io_id = 0
+ version = 0
+ features = 0
+ pending = None
+
+ def disconnect(self):
+ if (self.fd >= 0):
+ os.close(self.fd)
+ self.fd = -1
+ glib.source_remove(self.io_id)
+ self.io_id = 0
+
+ def slc_completed(self):
+ print("SLC establisment complete")
+ self.slc_complete = True
+
+ def slc_next_cmd(self, cmd):
+ print("Unknown SLC command completed: %s" % (cmd))
+
+ def io_cb(self, fd, cond):
+ buf = os.read(fd, BUF_SIZE)
+ buf = buf.strip()
+
+ print("Received: %s" % (buf))
+
+ return True
+
+ def send_cmd(self, cmd):
+ if (self.pending):
+ print("ERROR: Another command is pending")
+ return
+
+ print("Sending: %s" % (cmd))
+
+ os.write(self.fd, cmd + "\r\n")
+ self.pending = cmd
+
+ def __init__(self, fd, version, features):
+ self.fd = fd
+ self.version = version
+ self.features = features
+
+ print("Version 0x%04x Features 0x%04x" % (version, features))
+
+ self.io_id = glib.io_add_watch(fd, glib.IO_IN, self.io_cb)
+
+class HfpProfile(dbus.service.Object):
+ sco_socket = None
+ io_id = 0
+ conns = {}
+
+ def sco_cb(self, sock, cond):
+ (sco, peer) = sock.accept()
+ print("New SCO connection from %s" % (peer))
+
+ def init_sco(self, sock):
+ self.sco_socket = sock
+ self.io_id = glib.io_add_watch(sock, glib.IO_IN, self.sco_cb)
+
+ def __init__(self, bus, path, sco):
+ dbus.service.Object.__init__(self, bus, path)
+
+ if sco:
+ self.init_sco(sco)
+
+ @dbus.service.method("org.bluez.Profile1",
+ in_signature="", out_signature="")
+ def Release(self):
+ print("Release")
+ mainloop.quit()
+
+ @dbus.service.method("org.bluez.Profile1",
+ in_signature="", out_signature="")
+ def Cancel(self):
+ print("Cancel")
+
+ @dbus.service.method("org.bluez.Profile1",
+ in_signature="o", out_signature="")
+ def RequestDisconnection(self, path):
+ conn = self.conns.pop(path)
+ conn.disconnect()
+
+ @dbus.service.method("org.bluez.Profile1",
+ in_signature="oha{sv}", out_signature="")
+ def NewConnection(self, path, fd, properties):
+ fd = fd.take()
+ version = 0x0105
+ features = 0
+ print("NewConnection(%s, %d)" % (path, fd))
+ for key in properties.keys():
+ if key == "Version":
+ version = properties[key]
+ elif key == "Features":
+ features = properties[key]
+
+ #conn = HfpConnection(fd, version, features)
+
+ #self.conns[path] = conn
+
+if __name__ == '__main__':
+ dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+
+ bus = dbus.SystemBus()
+
+ manager = dbus.Interface(bus.get_object("org.bluez",
+ "/org/bluez"), "org.bluez.ProfileManager1")
+
+ option_list = [
+ make_option("-p", "--path", action="store",
+ type="string", dest="path",
+ default="/bluez/test/hfp"),
+ make_option("-n", "--name", action="store",
+ type="string", dest="name",
+ default=None),
+ make_option("-C", "--channel", action="store",
+ type="int", dest="channel",
+ default=None),
+ ]
+
+ parser = OptionParser(option_list=option_list)
+
+ (options, args) = parser.parse_args()
+
+ mainloop = GObject.MainLoop()
+
+ opts = {
+ "Version" : dbus.UInt16(0x0106),
+ "Features" : dbus.UInt16(HF_FEATURES),
+ }
+
+ if (options.name):
+ opts["Name"] = options.name
+
+ if (options.channel is not None):
+ opts["Channel"] = dbus.UInt16(options.channel)
+
+ if audio_supported:
+ sco = socket(AF_BLUETOOTH, SOCK_SEQPACKET, BTPROTO_SCO)
+ sco.bind(BDADDR_ANY)
+ sco.listen(1)
+ else:
+ sco = None
+
+ profile = HfpProfile(bus, options.path, sco)
+
+ manager.RegisterProfile(options.path, "hfp-ag", opts)
+
+ print("Profile registered - waiting for connections")
+
+ mainloop.run()