#!/usr/bin/python from __future__ import absolute_import, print_function, unicode_literals from optparse import OptionParser, make_option import os from socket import SOCK_SEQPACKET, socket import sys import dbus import dbus.service import dbus.mainloop.glib import glib try: from gi.repository import GObject except ImportError: import gobject as GObject mainloop = None audio_supported = True try: from socket import AF_BLUETOOTH, BTPROTO_SCO except: print("WARNING: python compiled without Bluetooth support" " - audio will not be available") audio_supported = False BUF_SIZE = 1024 BDADDR_ANY = '00:00:00:00:00:00' HF_NREC = 0x0001 HF_3WAY = 0x0002 HF_CLI = 0x0004 HF_VOICE_RECOGNITION = 0x0008 HF_REMOTE_VOL = 0x0010 HF_ENHANCED_STATUS = 0x0020 HF_ENHANCED_CONTROL = 0x0040 HF_CODEC_NEGOTIATION = 0x0080 AG_3WAY = 0x0001 AG_NREC = 0x0002 AG_VOICE_RECOGNITION = 0x0004 AG_INBAND_RING = 0x0008 AG_VOICE_TAG = 0x0010 AG_REJECT_CALL = 0x0020 AG_ENHANCED_STATUS = 0x0040 AG_ENHANCED_CONTROL = 0x0080 AG_EXTENDED_RESULT = 0x0100 AG_CODEC_NEGOTIATION = 0x0200 HF_FEATURES = (HF_3WAY | HF_CLI | HF_VOICE_RECOGNITION | HF_REMOTE_VOL | HF_ENHANCED_STATUS | HF_ENHANCED_CONTROL | HF_CODEC_NEGOTIATION) AVAIL_CODECS = "1,2" class HfpConnection: slc_complete = False fd = None io_id = 0 version = 0 features = 0 pending = None def disconnect(self): if (self.fd >= 0): os.close(self.fd) self.fd = -1 glib.source_remove(self.io_id) self.io_id = 0 def slc_completed(self): print("SLC establisment complete") self.slc_complete = True def slc_next_cmd(self, cmd): print("Unknown SLC command completed: %s" % (cmd)) def io_cb(self, fd, cond): buf = os.read(fd, BUF_SIZE) buf = buf.strip() print("Received: %s" % (buf)) return True def send_cmd(self, cmd): if (self.pending): print("ERROR: Another command is pending") return print("Sending: %s" % (cmd)) os.write(self.fd, cmd + "\r\n") self.pending = cmd def __init__(self, fd, version, features): self.fd = fd self.version = version self.features = features print("Version 0x%04x Features 0x%04x" % (version, features)) self.io_id = glib.io_add_watch(fd, glib.IO_IN, self.io_cb) class HfpProfile(dbus.service.Object): sco_socket = None io_id = 0 conns = {} def sco_cb(self, sock, cond): (sco, peer) = sock.accept() print("New SCO connection from %s" % (peer)) def init_sco(self, sock): self.sco_socket = sock self.io_id = glib.io_add_watch(sock, glib.IO_IN, self.sco_cb) def __init__(self, bus, path, sco): dbus.service.Object.__init__(self, bus, path) if sco: self.init_sco(sco) @dbus.service.method("org.bluez.Profile1", in_signature="", out_signature="") def Release(self): print("Release") mainloop.quit() @dbus.service.method("org.bluez.Profile1", in_signature="", out_signature="") def Cancel(self): print("Cancel") @dbus.service.method("org.bluez.Profile1", in_signature="o", out_signature="") def RequestDisconnection(self, path): conn = self.conns.pop(path) conn.disconnect() @dbus.service.method("org.bluez.Profile1", in_signature="oha{sv}", out_signature="") def NewConnection(self, path, fd, properties): fd = fd.take() version = 0x0105 features = 0 print("NewConnection(%s, %d)" % (path, fd)) for key in properties.keys(): if key == "Version": version = properties[key] elif key == "Features": features = properties[key] #conn = HfpConnection(fd, version, features) #self.conns[path] = conn if __name__ == '__main__': dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() manager = dbus.Interface(bus.get_object("org.bluez", "/org/bluez"), "org.bluez.ProfileManager1") option_list = [ make_option("-p", "--path", action="store", type="string", dest="path", default="/bluez/test/hfp"), make_option("-n", "--name", action="store", type="string", dest="name", default=None), make_option("-C", "--channel", action="store", type="int", dest="channel", default=None), ] parser = OptionParser(option_list=option_list) (options, args) = parser.parse_args() mainloop = GObject.MainLoop() opts = { "Version" : dbus.UInt16(0x0106), "Features" : dbus.UInt16(HF_FEATURES), } if (options.name): opts["Name"] = options.name if (options.channel is not None): opts["Channel"] = dbus.UInt16(options.channel) if audio_supported: sco = socket(AF_BLUETOOTH, SOCK_SEQPACKET, BTPROTO_SCO) sco.bind(BDADDR_ANY) sco.listen(1) else: sco = None profile = HfpProfile(bus, options.path, sco) manager.RegisterProfile(options.path, "hfp-ag", opts) print("Profile registered - waiting for connections") mainloop.run()