From 352dad23c7847d234e11c1034e1354fbd9a8349a Mon Sep 17 00:00:00 2001 From: "Javier S. Pedro" Date: Sat, 31 Dec 2011 17:50:06 +0100 Subject: initial import --- server.c | 542 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 542 insertions(+) create mode 100644 server.c (limited to 'server.c') diff --git a/server.c b/server.c new file mode 100644 index 0000000..442b63b --- /dev/null +++ b/server.c @@ -0,0 +1,542 @@ +/* + * fmrxd - a daemon to enable and multiplex access to the N950/N9 radio tuner + * Copyright (C) 2011 Javier S. Pedro + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +*/ + +#include +#include +#include +#include +#include +#include + +#include "fmrxd.h" + +typedef struct Client { + int fd; + GIOChannel *channel; + guint out_watch; + guint err_watch; + unsigned int read_p; +} Client; + +static DBusConnection *bus; +static GList *clients; +static guint num_clients; + +#if SERVER_ON_DEMAND +static guint server_stop_source; +#endif + +struct { + char data[RING_BUFFER_SIZE]; + unsigned int write_p; +} rbuffer; + +static const char * introspect_data = { + "\n" + "\n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + " \n" + "\n" +}; + +static void server_kill_client(Client *c); + +static gboolean client_out_callback(GIOChannel *source, GIOCondition condition, + gpointer data); +static gboolean client_err_callback(GIOChannel *source, GIOCondition condition, + gpointer data); + +static DBusHandlerResult handle_introspect(DBusConnection *conn, DBusMessage *m) +{ + DBusMessage *reply = dbus_message_new_method_return(m); + dbus_message_append_args(reply, DBUS_TYPE_STRING, &introspect_data, DBUS_TYPE_INVALID); + dbus_connection_send(conn, reply, NULL); + dbus_message_unref(reply); + + return DBUS_HANDLER_RESULT_HANDLED; +} + +static DBusHandlerResult handle_connect(DBusConnection *conn, DBusMessage *m) +{ + int fd = server_new_client(); + + if (fd == -1) { + g_warning("Error while spawning a new pipe"); + return DBUS_HANDLER_RESULT_NEED_MEMORY; + } + + DBusMessage *reply = dbus_message_new_method_return(m); + dbus_message_append_args(reply, DBUS_TYPE_UNIX_FD, &fd, DBUS_TYPE_INVALID); + dbus_connection_send(conn, reply, NULL); + dbus_message_unref(reply); + + close(fd); + + return DBUS_HANDLER_RESULT_HANDLED; +} + +static DBusHandlerResult handle_tune(DBusConnection *conn, DBusMessage *m) +{ + double freq; + DBusError err; + DBusMessage *reply; + dbus_bool_t ret; + + dbus_error_init(&err); + ret = dbus_message_get_args(m, &err, DBUS_TYPE_DOUBLE, &freq, DBUS_TYPE_INVALID); + if (!ret) { + reply = dbus_message_new_error(m, err.name, err.message); + dbus_connection_send(conn, reply, NULL); + dbus_message_unref(reply); + return DBUS_HANDLER_RESULT_HANDLED; + } + + if (!tuner_set_frequency(freq)) { + reply = dbus_message_new_error(m, BUS_INTERFACE ".TunerError", + "Failed to tune to the specified frequency"); + } else { + reply = dbus_message_new_method_return(m); + } + + dbus_connection_send(conn, reply, NULL); + dbus_message_unref(reply); + return DBUS_HANDLER_RESULT_HANDLED; +} + +static DBusHandlerResult handle_search(DBusConnection *conn, DBusMessage *m, bool forward) +{ + DBusMessage *reply; + + if (!tuner_search(forward)) { + reply = dbus_message_new_error(m, BUS_INTERFACE ".TunerError", + "Failed to start a search"); + } else { + reply = dbus_message_new_method_return(m); + } + + dbus_connection_send(conn, reply, NULL); + dbus_message_unref(reply); + return DBUS_HANDLER_RESULT_HANDLED; +} + +static DBusHandlerResult on_message(DBusConnection *connection, DBusMessage *message, void *user_data) +{ + if (dbus_message_is_method_call(message, DBUS_INTERFACE_INTROSPECTABLE, "Introspect")) { + return handle_introspect(connection, message); + } else if (dbus_message_is_method_call(message, BUS_INTERFACE, "Connect")) { + return handle_connect(connection, message); + } else if (dbus_message_is_method_call(message, BUS_INTERFACE, "Tune")) { + return handle_tune(connection, message); + } else if (dbus_message_is_method_call(message, BUS_INTERFACE, "SearchForward")) { + return handle_search(connection, message, true); + } else if (dbus_message_is_method_call(message, BUS_INTERFACE, "SearchBackward")) { + return handle_search(connection, message, false); + } + + return DBUS_HANDLER_RESULT_NOT_YET_HANDLED; +} + +static DBusObjectPathVTable bus_vtable = { + .message_function = on_message +}; + +static inline char * client_data_p(Client *c) +{ + return &rbuffer.data[c->read_p]; +} + +// This does not return all available data, but only +// max contiguous segment. This is by design. +static inline size_t client_data_avail(Client *c) +{ + if (rbuffer.write_p >= c->read_p) { + return rbuffer.write_p - c->read_p; + } else { + return RING_BUFFER_SIZE - c->read_p; + } +} + +static inline void client_data_forget(Client *c, size_t bytes) +{ + c->read_p = (c->read_p + bytes) % RING_BUFFER_SIZE; +} + +static inline void client_watch(Client *c) +{ + g_return_if_fail(c->out_watch == 0); + c->out_watch = g_io_add_watch(c->channel, G_IO_OUT, client_out_callback, c); +} + +static inline void client_unwatch(Client *c) +{ + g_return_if_fail(c->out_watch != 0); + g_source_remove(c->out_watch); + c->out_watch = 0; +} + +static inline bool client_watched(const Client *c) +{ + return c->out_watch != 0; +} + +static bool client_try_send(Client *c) +{ + size_t avail = client_data_avail(c); + char *p = client_data_p(c); + ssize_t written; + + written = write(c->fd, p, avail); + + if (written == -1) { + switch (errno) { + case EAGAIN: + // Client turned out to be busy + // just assume nothing was written. + written = 0; + break; + case EPIPE: + // Broken pipe; kill client. + g_message("Client pipe broken, closing client"); + server_kill_client(c); + return FALSE; + default: + g_warning("Failed to send buffer to client in %d: %s", + c->fd, strerror(errno)); + server_kill_client(c); + return FALSE; + } + } else if (written > 0) { + client_data_forget(c, written); + } + + // Determine if we need to start/stop watching the client + if (client_watched(c) && client_data_avail(c) == 0) { + // No more data to be sent. + client_unwatch(c); + } else if (!client_watched(c) && client_data_avail(c) > 0) { + // Client is not watched but there is still data to be sent + client_watch(c); + } + + return TRUE; +} + +static gboolean client_out_callback(GIOChannel *source, GIOCondition condition, + gpointer data) +{ + Client *c = (Client*) data; + + if (condition & G_IO_OUT) { + return client_try_send(c); + } + + return TRUE; +} + +static gboolean client_err_callback(GIOChannel *source, GIOCondition condition, + gpointer data) +{ + Client *c = (Client*) data; + + if (condition & G_IO_HUP) { + g_message("Client pipe closed, closing client"); + server_kill_client(c); + return FALSE; + } + if (condition & G_IO_ERR) { + g_message("Client pipe broken, closing client"); + server_kill_client(c); + return FALSE; + } + + return TRUE; +} + +int server_start() +{ + DBusError err; + int ret; + dbus_error_init(&err); + + clients = NULL; + num_clients = 0; + + rbuffer.write_p = 0; + + bus = dbus_bus_get(DBUS_BUS_SESSION, &err); + if (dbus_error_is_set(&err)) { + g_critical("Cannot connect to the system message bus: %s", err.message); + return -1; + } + g_assert(bus != NULL); + + ret = dbus_bus_request_name(bus, BUS_NAME, DBUS_NAME_FLAG_REPLACE_EXISTING, &err); + if (dbus_error_is_set(&err)) { + g_critical("Cannot claim ownership of my bus name: %s", err.message); + return -1; + } + g_assert(ret == DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER); + + ret = dbus_connection_register_object_path(bus, BUS_PATH, &bus_vtable, NULL); + if (!ret) { + g_critical("Cannot register fmrxd object in bus"); + return -1; + } + + dbus_connection_setup_with_g_main(bus, g_main_loop_get_context(main_loop)); + +#if SERVER_ON_DEMAND + // Set up the timer for inactivity, in case we do not get any clients. + server_queue_stop(); +#endif + + return 0; +} + +void server_stop() +{ + g_warn_if_fail(num_clients == 0); + dbus_connection_unregister_object_path(bus, BUS_PATH); + dbus_bus_release_name(bus, BUS_NAME, NULL); +} + +#if SERVER_ON_DEMAND +static gboolean server_stop_func(gpointer data) +{ + g_message("Stopping service because of inactivity"); + server_stop(); + g_main_loop_quit(main_loop); + return FALSE; +} + +void server_queue_stop() +{ + if (!server_stop_source) { + server_stop_source = g_timeout_add_seconds(SERVER_LINGER_TIME, + server_stop_func, NULL); + } +} +#endif + +int server_new_client() +{ + int fds[2]; + int ret = pipe(fds); + if (ret < 0) { + g_warning("Could not create a pipe"); + return -1; + } + + Client* c = g_slice_new(Client); + c->fd = fds[1]; + c->channel = g_io_channel_unix_new(c->fd); + c->out_watch = 0; + c->err_watch = g_io_add_watch(c->channel, G_IO_ERR | G_IO_HUP, + client_err_callback, c); + c->read_p = rbuffer.write_p; + + // Configure some flags of the pipe using GLib + GIOFlags flags = g_io_channel_get_flags(c->channel); + if (g_io_channel_set_flags(c->channel, flags | G_IO_FLAG_NONBLOCK, NULL) + != G_IO_STATUS_NORMAL) { + g_warning("Failed to set non-blocking flag for pipe fd"); + } + g_io_channel_set_encoding(c->channel, NULL, NULL); + g_io_channel_set_buffered(c->channel, FALSE); + + g_debug("New pipe, write fd = %d", fds[1]); + + // Add client to list + clients = g_list_append(clients, c); + + if (num_clients == 0) { + // First client, start tuner. + radio_queue_start(); +#if SERVER_ON_DEMAND + if (server_stop_source) { + g_source_remove(server_stop_source); + server_stop_source = 0; + } +#endif + } + num_clients++; + + // Return the other end of the pipe + return fds[0]; +} + +static void server_kill_client(Client *c) +{ + g_source_remove(c->err_watch); + if (c->out_watch) g_source_remove(c->out_watch); + + clients = g_list_remove(clients, c); + + g_io_channel_shutdown(c->channel, TRUE, NULL); + g_io_channel_unref(c->channel); + + g_slice_free(Client, c); + + if (num_clients == 1) { + // Last client leaving, stop tuner. + radio_queue_stop(); +#if SERVER_ON_DEMAND + server_queue_stop(); +#endif + } + num_clients--; +} + +size_t server_get_buffer(size_t size, void **buffer) +{ + g_return_val_if_fail(buffer != NULL, 0); + g_return_val_if_fail(size > 0, 0); + + *buffer = &rbuffer.data[rbuffer.write_p]; + + size_t avail_size = RING_BUFFER_SIZE - rbuffer.write_p; + if (size > avail_size) { + return avail_size; + } else { + return size; + } +} + +static void overflow_check_func(gpointer data, gpointer user_data) +{ + Client * c = (Client*) data; + unsigned int new_write_p = *(unsigned int*)user_data; + unsigned int old_write_p = rbuffer.write_p; + unsigned int data_lost = 0; + + if (new_write_p < rbuffer.write_p) { + // write_p wraparound: Overflow happens if the read_p was + // positioned right after old write_p. + g_warn_if_fail(new_write_p == 0); // Valid simplification here + if (c->read_p > old_write_p) { + data_lost = c->read_p - old_write_p; + g_warn_if_fail(data_lost < RING_BUFFER_SIZE); + } + } else { + if (c->read_p > old_write_p && c->read_p <= new_write_p) { + data_lost = new_write_p - c->read_p; + g_warn_if_fail(data_lost < RING_BUFFER_SIZE); + } + } + + if (data_lost > 0) { + g_message("Overflow detected (%u bytes), glitch!", data_lost); + c->read_p = (new_write_p) % RING_BUFFER_SIZE; + } +} + +static void commit_buffer_func(gpointer data, gpointer user_data) +{ + Client * c = (Client*) data; + + client_try_send(c); +} + +void server_commit_buffer(size_t size) +{ + g_return_if_fail(size > 0); + g_warn_if_fail(size < RING_BUFFER_SIZE); + + unsigned int new_write_p = (rbuffer.write_p + size) % RING_BUFFER_SIZE; + + g_list_foreach(clients, overflow_check_func, &new_write_p); + + rbuffer.write_p = new_write_p; + + g_list_foreach(clients, commit_buffer_func, NULL); +} + +void server_notify_tuned(double mhz) +{ + DBusMessage *m = dbus_message_new_signal(BUS_PATH, BUS_INTERFACE, "Tuned"); + g_return_if_fail(m != NULL); + dbus_message_append_args(m, DBUS_TYPE_DOUBLE, &mhz, DBUS_TYPE_INVALID); + dbus_connection_send(bus, m, NULL); + dbus_message_unref(m); +} + +void server_notify_stopped() +{ + DBusMessage *m = dbus_message_new_signal(BUS_PATH, BUS_INTERFACE, "Stopped"); + g_return_if_fail(m != NULL); + dbus_connection_send(bus, m, NULL); + dbus_message_unref(m); +} + +void server_notify_pi(uint16_t pi) +{ + DBusMessage *m = dbus_message_new_signal(BUS_PATH, BUS_INTERFACE, "PiChanged"); + g_return_if_fail(m != NULL); + dbus_message_append_args(m, DBUS_TYPE_UINT16, &pi, DBUS_TYPE_INVALID); + dbus_connection_send(bus, m, NULL); + dbus_message_unref(m); +} + +void server_notify_ps(const char *ps) +{ + DBusMessage *m = dbus_message_new_signal(BUS_PATH, BUS_INTERFACE, "PsChanged"); + g_return_if_fail(m != NULL); + dbus_message_append_args(m, DBUS_TYPE_STRING, &ps, DBUS_TYPE_INVALID); + dbus_connection_send(bus, m, NULL); + dbus_message_unref(m); +} + +void server_notify_rt(const char *rt) +{ + DBusMessage *m = dbus_message_new_signal(BUS_PATH, BUS_INTERFACE, "RtChanged"); + g_return_if_fail(m != NULL); + dbus_message_append_args(m, DBUS_TYPE_STRING, &rt, DBUS_TYPE_INVALID); + dbus_connection_send(bus, m, NULL); + dbus_message_unref(m); +} -- cgit v1.2.3