/* * fmrxd - a daemon to enable and multiplex access to the N950/N9 radio tuner * Copyright (C) 2011 Javier S. Pedro * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * You should have received a copy of the GNU General Public License along * with this program; if not, write to the Free Software Foundation, Inc., * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ #include #include #include #include #include #include #include "fmrxd.h" typedef struct Client { int fd; GIOChannel *channel; guint out_watch; guint err_watch; unsigned int read_p; } Client; static DBusConnection *bus; static GList *clients; static guint num_clients; #if SERVER_ON_DEMAND static guint server_stop_source; #endif struct { char data[RING_BUFFER_SIZE]; unsigned int write_p; } rbuffer; static const char * introspect_data = { "\n" "\n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" " \n" "\n" }; static void server_kill_client(Client *c); static gboolean client_out_callback(GIOChannel *source, GIOCondition condition, gpointer data); static gboolean client_err_callback(GIOChannel *source, GIOCondition condition, gpointer data); static DBusHandlerResult handle_introspect(DBusConnection *conn, DBusMessage *m) { DBusMessage *reply = dbus_message_new_method_return(m); dbus_message_append_args(reply, DBUS_TYPE_STRING, &introspect_data, DBUS_TYPE_INVALID); dbus_connection_send(conn, reply, NULL); dbus_message_unref(reply); return DBUS_HANDLER_RESULT_HANDLED; } static DBusHandlerResult handle_connect(DBusConnection *conn, DBusMessage *m) { int fd = server_new_client(); if (fd == -1) { g_warning("Error while spawning a new pipe"); return DBUS_HANDLER_RESULT_NEED_MEMORY; } DBusMessage *reply = dbus_message_new_method_return(m); dbus_message_append_args(reply, DBUS_TYPE_UNIX_FD, &fd, DBUS_TYPE_INVALID); dbus_connection_send(conn, reply, NULL); dbus_message_unref(reply); close(fd); return DBUS_HANDLER_RESULT_HANDLED; } static DBusHandlerResult handle_tune(DBusConnection *conn, DBusMessage *m) { double freq; DBusError err; DBusMessage *reply; dbus_bool_t ret; dbus_error_init(&err); ret = dbus_message_get_args(m, &err, DBUS_TYPE_DOUBLE, &freq, DBUS_TYPE_INVALID); if (!ret) { reply = dbus_message_new_error(m, err.name, err.message); dbus_connection_send(conn, reply, NULL); dbus_message_unref(reply); return DBUS_HANDLER_RESULT_HANDLED; } if (!tuner_set_frequency(freq)) { reply = dbus_message_new_error(m, BUS_INTERFACE ".TunerError", "Failed to tune to the specified frequency"); } else { reply = dbus_message_new_method_return(m); } dbus_connection_send(conn, reply, NULL); dbus_message_unref(reply); return DBUS_HANDLER_RESULT_HANDLED; } static DBusHandlerResult handle_search(DBusConnection *conn, DBusMessage *m, bool forward) { DBusMessage *reply; if (!tuner_search(forward)) { reply = dbus_message_new_error(m, BUS_INTERFACE ".TunerError", "Failed to start a search"); } else { reply = dbus_message_new_method_return(m); } dbus_connection_send(conn, reply, NULL); dbus_message_unref(reply); return DBUS_HANDLER_RESULT_HANDLED; } static DBusHandlerResult on_message(DBusConnection *connection, DBusMessage *message, void *user_data) { if (dbus_message_is_method_call(message, DBUS_INTERFACE_INTROSPECTABLE, "Introspect")) { return handle_introspect(connection, message); } else if (dbus_message_is_method_call(message, BUS_INTERFACE, "Connect")) { return handle_connect(connection, message); } else if (dbus_message_is_method_call(message, BUS_INTERFACE, "Tune")) { return handle_tune(connection, message); } else if (dbus_message_is_method_call(message, BUS_INTERFACE, "SearchForward")) { return handle_search(connection, message, true); } else if (dbus_message_is_method_call(message, BUS_INTERFACE, "SearchBackward")) { return handle_search(connection, message, false); } return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; } static DBusObjectPathVTable bus_vtable = { .message_function = on_message }; static inline char * client_data_p(Client *c) { return &rbuffer.data[c->read_p]; } // This does not return all available data, but only // max contiguous segment. This is by design. static inline size_t client_data_avail(Client *c) { if (rbuffer.write_p >= c->read_p) { return rbuffer.write_p - c->read_p; } else { return RING_BUFFER_SIZE - c->read_p; } } static inline void client_data_forget(Client *c, size_t bytes) { c->read_p = (c->read_p + bytes) % RING_BUFFER_SIZE; } static inline void client_watch(Client *c) { g_return_if_fail(c->out_watch == 0); c->out_watch = g_io_add_watch(c->channel, G_IO_OUT, client_out_callback, c); } static inline void client_unwatch(Client *c) { g_return_if_fail(c->out_watch != 0); g_source_remove(c->out_watch); c->out_watch = 0; } static inline bool client_watched(const Client *c) { return c->out_watch != 0; } static bool client_try_send(Client *c) { size_t avail = client_data_avail(c); char *p = client_data_p(c); ssize_t written; written = write(c->fd, p, avail); if (written == -1) { switch (errno) { case EAGAIN: // Client turned out to be busy // just assume nothing was written. written = 0; break; case EPIPE: // Broken pipe; kill client. g_message("Client pipe broken, closing client"); server_kill_client(c); return FALSE; default: g_warning("Failed to send buffer to client in %d: %s", c->fd, strerror(errno)); server_kill_client(c); return FALSE; } } else if (written > 0) { client_data_forget(c, written); } // Determine if we need to start/stop watching the client if (client_watched(c) && client_data_avail(c) == 0) { // No more data to be sent. client_unwatch(c); } else if (!client_watched(c) && client_data_avail(c) > 0) { // Client is not watched but there is still data to be sent client_watch(c); } return TRUE; } static gboolean client_out_callback(GIOChannel *source, GIOCondition condition, gpointer data) { Client *c = (Client*) data; if (condition & G_IO_OUT) { return client_try_send(c); } return TRUE; } static gboolean client_err_callback(GIOChannel *source, GIOCondition condition, gpointer data) { Client *c = (Client*) data; if (condition & G_IO_HUP) { g_message("Client pipe closed, closing client"); server_kill_client(c); return FALSE; } if (condition & G_IO_ERR) { g_message("Client pipe broken, closing client"); server_kill_client(c); return FALSE; } return TRUE; } int server_start() { DBusError err; int ret; dbus_error_init(&err); clients = NULL; num_clients = 0; rbuffer.write_p = 0; bus = dbus_bus_get(DBUS_BUS_SYSTEM, &err); if (dbus_error_is_set(&err)) { g_critical("Cannot connect to the system message bus: %s", err.message); return -1; } g_assert(bus != NULL); ret = dbus_bus_request_name(bus, BUS_NAME, DBUS_NAME_FLAG_REPLACE_EXISTING, &err); if (dbus_error_is_set(&err)) { g_critical("Cannot claim ownership of my bus name: %s", err.message); return -1; } g_assert(ret == DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER); ret = dbus_connection_register_object_path(bus, BUS_PATH, &bus_vtable, NULL); if (!ret) { g_critical("Cannot register fmrxd object in bus"); return -1; } dbus_connection_setup_with_g_main(bus, g_main_loop_get_context(main_loop)); #if SERVER_ON_DEMAND // Set up the timer for inactivity, in case we do not get any clients. server_queue_stop(); #endif return 0; } void server_stop() { g_warn_if_fail(num_clients == 0); dbus_connection_unregister_object_path(bus, BUS_PATH); dbus_bus_release_name(bus, BUS_NAME, NULL); } #if SERVER_ON_DEMAND static gboolean server_stop_func(gpointer data) { g_message("Stopping service because of inactivity"); server_stop(); g_main_loop_quit(main_loop); return FALSE; } void server_queue_stop() { if (!server_stop_source) { server_stop_source = g_timeout_add_seconds(SERVER_LINGER_TIME, server_stop_func, NULL); } } #endif int server_new_client() { int fds[2]; int ret = pipe(fds); if (ret < 0) { g_warning("Could not create a pipe"); return -1; } Client* c = g_slice_new(Client); c->fd = fds[1]; c->channel = g_io_channel_unix_new(c->fd); c->out_watch = 0; c->err_watch = g_io_add_watch(c->channel, G_IO_ERR | G_IO_HUP, client_err_callback, c); c->read_p = rbuffer.write_p; // Configure some flags of the pipe using GLib GIOFlags flags = g_io_channel_get_flags(c->channel); if (g_io_channel_set_flags(c->channel, flags | G_IO_FLAG_NONBLOCK, NULL) != G_IO_STATUS_NORMAL) { g_warning("Failed to set non-blocking flag for pipe fd"); } g_io_channel_set_encoding(c->channel, NULL, NULL); g_io_channel_set_buffered(c->channel, FALSE); g_debug("New pipe, write fd = %d", fds[1]); // Add client to list clients = g_list_append(clients, c); if (num_clients == 0) { // First client, start tuner. radio_queue_start(); #if SERVER_ON_DEMAND if (server_stop_source) { g_source_remove(server_stop_source); server_stop_source = 0; } #endif } num_clients++; // Return the other end of the pipe return fds[0]; } static void server_kill_client(Client *c) { g_source_remove(c->err_watch); if (c->out_watch) g_source_remove(c->out_watch); clients = g_list_remove(clients, c); g_io_channel_shutdown(c->channel, TRUE, NULL); g_io_channel_unref(c->channel); g_slice_free(Client, c); if (num_clients == 1) { // Last client leaving, stop tuner. radio_queue_stop(); #if SERVER_ON_DEMAND server_queue_stop(); #endif } num_clients--; } size_t server_get_buffer(size_t size, void **buffer) { g_return_val_if_fail(buffer != NULL, 0); g_return_val_if_fail(size > 0, 0); *buffer = &rbuffer.data[rbuffer.write_p]; size_t avail_size = RING_BUFFER_SIZE - rbuffer.write_p; if (size > avail_size) { return avail_size; } else { return size; } } static void overflow_check_func(gpointer data, gpointer user_data) { Client * c = (Client*) data; unsigned int new_write_p = *(unsigned int*)user_data; unsigned int old_write_p = rbuffer.write_p; unsigned int data_lost = 0; if (new_write_p < rbuffer.write_p) { // write_p wraparound: Overflow happens if the read_p was // positioned right after old write_p. g_warn_if_fail(new_write_p == 0); // Valid simplification here if (c->read_p > old_write_p) { data_lost = c->read_p - old_write_p; g_warn_if_fail(data_lost < RING_BUFFER_SIZE); } } else { if (c->read_p > old_write_p && c->read_p <= new_write_p) { data_lost = new_write_p - c->read_p; g_warn_if_fail(data_lost < RING_BUFFER_SIZE); } } if (data_lost > 0) { g_message("Overflow detected (%u bytes), glitch!", data_lost); c->read_p = (new_write_p) % RING_BUFFER_SIZE; } } static void commit_buffer_func(gpointer data, gpointer user_data) { Client * c = (Client*) data; client_try_send(c); } void server_commit_buffer(size_t size) { g_return_if_fail(size > 0); g_warn_if_fail(size < RING_BUFFER_SIZE); unsigned int new_write_p = (rbuffer.write_p + size) % RING_BUFFER_SIZE; g_list_foreach(clients, overflow_check_func, &new_write_p); rbuffer.write_p = new_write_p; g_list_foreach(clients, commit_buffer_func, NULL); } void server_notify_tuned(double mhz) { DBusMessage *m = dbus_message_new_signal(BUS_PATH, BUS_INTERFACE, "Tuned"); g_return_if_fail(m != NULL); dbus_message_append_args(m, DBUS_TYPE_DOUBLE, &mhz, DBUS_TYPE_INVALID); dbus_connection_send(bus, m, NULL); dbus_message_unref(m); } void server_notify_stopped() { DBusMessage *m = dbus_message_new_signal(BUS_PATH, BUS_INTERFACE, "Stopped"); g_return_if_fail(m != NULL); dbus_connection_send(bus, m, NULL); dbus_message_unref(m); } void server_notify_signal(uint16_t signal) { DBusMessage *m = dbus_message_new_signal(BUS_PATH, BUS_INTERFACE, "SignalLevelChanged"); g_return_if_fail(m != NULL); dbus_message_append_args(m, DBUS_TYPE_UINT16, &signal, DBUS_TYPE_INVALID); dbus_connection_send(bus, m, NULL); dbus_message_unref(m); } void server_notify_pi(uint16_t pi) { DBusMessage *m = dbus_message_new_signal(BUS_PATH, BUS_INTERFACE, "PiChanged"); g_return_if_fail(m != NULL); dbus_message_append_args(m, DBUS_TYPE_UINT16, &pi, DBUS_TYPE_INVALID); dbus_connection_send(bus, m, NULL); dbus_message_unref(m); } void server_notify_ps(const char *ps) { DBusMessage *m = dbus_message_new_signal(BUS_PATH, BUS_INTERFACE, "PsChanged"); g_return_if_fail(m != NULL); dbus_message_append_args(m, DBUS_TYPE_STRING, &ps, DBUS_TYPE_INVALID); dbus_connection_send(bus, m, NULL); dbus_message_unref(m); } void server_notify_rt(const char *rt) { DBusMessage *m = dbus_message_new_signal(BUS_PATH, BUS_INTERFACE, "RtChanged"); g_return_if_fail(m != NULL); dbus_message_append_args(m, DBUS_TYPE_STRING, &rt, DBUS_TYPE_INVALID); dbus_connection_send(bus, m, NULL); dbus_message_unref(m); }