summaryrefslogtreecommitdiff
path: root/server.c
diff options
context:
space:
mode:
authorJavier S. Pedro <maemo@javispedro.com>2011-12-31 17:50:06 +0100
committerJavier S. Pedro <maemo@javispedro.com>2011-12-31 17:50:06 +0100
commit352dad23c7847d234e11c1034e1354fbd9a8349a (patch)
tree6caab2315dcb20882a05453412788578acc119e5 /server.c
downloadfmrxd-352dad23c7847d234e11c1034e1354fbd9a8349a.tar.gz
fmrxd-352dad23c7847d234e11c1034e1354fbd9a8349a.zip
initial import
Diffstat (limited to 'server.c')
-rw-r--r--server.c542
1 files changed, 542 insertions, 0 deletions
diff --git a/server.c b/server.c
new file mode 100644
index 0000000..442b63b
--- /dev/null
+++ b/server.c
@@ -0,0 +1,542 @@
+/*
+ * fmrxd - a daemon to enable and multiplex access to the N950/N9 radio tuner
+ * Copyright (C) 2011 Javier S. Pedro <maemo@javispedro.com>
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+*/
+
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <dbus/dbus.h>
+#include <dbus/dbus-glib-lowlevel.h>
+
+#include "fmrxd.h"
+
+typedef struct Client {
+ int fd;
+ GIOChannel *channel;
+ guint out_watch;
+ guint err_watch;
+ unsigned int read_p;
+} Client;
+
+static DBusConnection *bus;
+static GList *clients;
+static guint num_clients;
+
+#if SERVER_ON_DEMAND
+static guint server_stop_source;
+#endif
+
+struct {
+ char data[RING_BUFFER_SIZE];
+ unsigned int write_p;
+} rbuffer;
+
+static const char * introspect_data = {
+ "<!DOCTYPE node PUBLIC \"-//freedesktop//DTD D-BUS Object Introspection 1.0//EN\"\n"
+ "\"http://www.freedesktop.org/standards/dbus/1.0/introspect.dtd\">\n"
+ "<node>\n"
+ " <interface name=\"org.freedesktop.DBus.Introspectable\">\n"
+ " <method name=\"Introspect\">\n"
+ " <arg type=\"s\" name=\"data\" direction=\"out\" />\n"
+ " </method>\n"
+ " </interface>\n"
+ " <interface name=\"" BUS_INTERFACE "\">\n"
+ " <method name=\"Connect\">\n"
+ " <arg type=\"h\" name=\"pipe\" direction=\"out\" />\n"
+ " </method>\n"
+ " <method name=\"Tune\">\n"
+ " <arg type=\"d\" name=\"frequency\" direction=\"in\" />\n"
+ " </method>\n"
+ " <method name=\"SearchForward\">\n"
+ " </method>\n"
+ " <method name=\"SearchBackward\">\n"
+ " </method>\n"
+ " <signal name=\"Tuned\">\n"
+ " <arg type=\"d\" name=\"frequency\" />\n"
+ " </signal>\n"
+ " <signal name=\"Stopped\">\n"
+ " </signal>\n"
+ " <signal name=\"PiReceived\">\n"
+ " <arg type=\"q\" name=\"pi\" />\n"
+ " </signal>\n"
+ " <signal name=\"PsReceived\">\n"
+ " <arg type=\"s\" name=\"ps\" />\n"
+ " </signal>\n"
+ " <signal name=\"RtReceived\">\n"
+ " <arg type=\"s\" name=\"rt\" />\n"
+ " </signal>\n"
+ " </interface>\n"
+ "</node>\n"
+};
+
+static void server_kill_client(Client *c);
+
+static gboolean client_out_callback(GIOChannel *source, GIOCondition condition,
+ gpointer data);
+static gboolean client_err_callback(GIOChannel *source, GIOCondition condition,
+ gpointer data);
+
+static DBusHandlerResult handle_introspect(DBusConnection *conn, DBusMessage *m)
+{
+ DBusMessage *reply = dbus_message_new_method_return(m);
+ dbus_message_append_args(reply, DBUS_TYPE_STRING, &introspect_data, DBUS_TYPE_INVALID);
+ dbus_connection_send(conn, reply, NULL);
+ dbus_message_unref(reply);
+
+ return DBUS_HANDLER_RESULT_HANDLED;
+}
+
+static DBusHandlerResult handle_connect(DBusConnection *conn, DBusMessage *m)
+{
+ int fd = server_new_client();
+
+ if (fd == -1) {
+ g_warning("Error while spawning a new pipe");
+ return DBUS_HANDLER_RESULT_NEED_MEMORY;
+ }
+
+ DBusMessage *reply = dbus_message_new_method_return(m);
+ dbus_message_append_args(reply, DBUS_TYPE_UNIX_FD, &fd, DBUS_TYPE_INVALID);
+ dbus_connection_send(conn, reply, NULL);
+ dbus_message_unref(reply);
+
+ close(fd);
+
+ return DBUS_HANDLER_RESULT_HANDLED;
+}
+
+static DBusHandlerResult handle_tune(DBusConnection *conn, DBusMessage *m)
+{
+ double freq;
+ DBusError err;
+ DBusMessage *reply;
+ dbus_bool_t ret;
+
+ dbus_error_init(&err);
+ ret = dbus_message_get_args(m, &err, DBUS_TYPE_DOUBLE, &freq, DBUS_TYPE_INVALID);
+ if (!ret) {
+ reply = dbus_message_new_error(m, err.name, err.message);
+ dbus_connection_send(conn, reply, NULL);
+ dbus_message_unref(reply);
+ return DBUS_HANDLER_RESULT_HANDLED;
+ }
+
+ if (!tuner_set_frequency(freq)) {
+ reply = dbus_message_new_error(m, BUS_INTERFACE ".TunerError",
+ "Failed to tune to the specified frequency");
+ } else {
+ reply = dbus_message_new_method_return(m);
+ }
+
+ dbus_connection_send(conn, reply, NULL);
+ dbus_message_unref(reply);
+ return DBUS_HANDLER_RESULT_HANDLED;
+}
+
+static DBusHandlerResult handle_search(DBusConnection *conn, DBusMessage *m, bool forward)
+{
+ DBusMessage *reply;
+
+ if (!tuner_search(forward)) {
+ reply = dbus_message_new_error(m, BUS_INTERFACE ".TunerError",
+ "Failed to start a search");
+ } else {
+ reply = dbus_message_new_method_return(m);
+ }
+
+ dbus_connection_send(conn, reply, NULL);
+ dbus_message_unref(reply);
+ return DBUS_HANDLER_RESULT_HANDLED;
+}
+
+static DBusHandlerResult on_message(DBusConnection *connection, DBusMessage *message, void *user_data)
+{
+ if (dbus_message_is_method_call(message, DBUS_INTERFACE_INTROSPECTABLE, "Introspect")) {
+ return handle_introspect(connection, message);
+ } else if (dbus_message_is_method_call(message, BUS_INTERFACE, "Connect")) {
+ return handle_connect(connection, message);
+ } else if (dbus_message_is_method_call(message, BUS_INTERFACE, "Tune")) {
+ return handle_tune(connection, message);
+ } else if (dbus_message_is_method_call(message, BUS_INTERFACE, "SearchForward")) {
+ return handle_search(connection, message, true);
+ } else if (dbus_message_is_method_call(message, BUS_INTERFACE, "SearchBackward")) {
+ return handle_search(connection, message, false);
+ }
+
+ return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
+}
+
+static DBusObjectPathVTable bus_vtable = {
+ .message_function = on_message
+};
+
+static inline char * client_data_p(Client *c)
+{
+ return &rbuffer.data[c->read_p];
+}
+
+// This does not return all available data, but only
+// max contiguous segment. This is by design.
+static inline size_t client_data_avail(Client *c)
+{
+ if (rbuffer.write_p >= c->read_p) {
+ return rbuffer.write_p - c->read_p;
+ } else {
+ return RING_BUFFER_SIZE - c->read_p;
+ }
+}
+
+static inline void client_data_forget(Client *c, size_t bytes)
+{
+ c->read_p = (c->read_p + bytes) % RING_BUFFER_SIZE;
+}
+
+static inline void client_watch(Client *c)
+{
+ g_return_if_fail(c->out_watch == 0);
+ c->out_watch = g_io_add_watch(c->channel, G_IO_OUT, client_out_callback, c);
+}
+
+static inline void client_unwatch(Client *c)
+{
+ g_return_if_fail(c->out_watch != 0);
+ g_source_remove(c->out_watch);
+ c->out_watch = 0;
+}
+
+static inline bool client_watched(const Client *c)
+{
+ return c->out_watch != 0;
+}
+
+static bool client_try_send(Client *c)
+{
+ size_t avail = client_data_avail(c);
+ char *p = client_data_p(c);
+ ssize_t written;
+
+ written = write(c->fd, p, avail);
+
+ if (written == -1) {
+ switch (errno) {
+ case EAGAIN:
+ // Client turned out to be busy
+ // just assume nothing was written.
+ written = 0;
+ break;
+ case EPIPE:
+ // Broken pipe; kill client.
+ g_message("Client pipe broken, closing client");
+ server_kill_client(c);
+ return FALSE;
+ default:
+ g_warning("Failed to send buffer to client in %d: %s",
+ c->fd, strerror(errno));
+ server_kill_client(c);
+ return FALSE;
+ }
+ } else if (written > 0) {
+ client_data_forget(c, written);
+ }
+
+ // Determine if we need to start/stop watching the client
+ if (client_watched(c) && client_data_avail(c) == 0) {
+ // No more data to be sent.
+ client_unwatch(c);
+ } else if (!client_watched(c) && client_data_avail(c) > 0) {
+ // Client is not watched but there is still data to be sent
+ client_watch(c);
+ }
+
+ return TRUE;
+}
+
+static gboolean client_out_callback(GIOChannel *source, GIOCondition condition,
+ gpointer data)
+{
+ Client *c = (Client*) data;
+
+ if (condition & G_IO_OUT) {
+ return client_try_send(c);
+ }
+
+ return TRUE;
+}
+
+static gboolean client_err_callback(GIOChannel *source, GIOCondition condition,
+ gpointer data)
+{
+ Client *c = (Client*) data;
+
+ if (condition & G_IO_HUP) {
+ g_message("Client pipe closed, closing client");
+ server_kill_client(c);
+ return FALSE;
+ }
+ if (condition & G_IO_ERR) {
+ g_message("Client pipe broken, closing client");
+ server_kill_client(c);
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+int server_start()
+{
+ DBusError err;
+ int ret;
+ dbus_error_init(&err);
+
+ clients = NULL;
+ num_clients = 0;
+
+ rbuffer.write_p = 0;
+
+ bus = dbus_bus_get(DBUS_BUS_SESSION, &err);
+ if (dbus_error_is_set(&err)) {
+ g_critical("Cannot connect to the system message bus: %s", err.message);
+ return -1;
+ }
+ g_assert(bus != NULL);
+
+ ret = dbus_bus_request_name(bus, BUS_NAME, DBUS_NAME_FLAG_REPLACE_EXISTING, &err);
+ if (dbus_error_is_set(&err)) {
+ g_critical("Cannot claim ownership of my bus name: %s", err.message);
+ return -1;
+ }
+ g_assert(ret == DBUS_REQUEST_NAME_REPLY_PRIMARY_OWNER);
+
+ ret = dbus_connection_register_object_path(bus, BUS_PATH, &bus_vtable, NULL);
+ if (!ret) {
+ g_critical("Cannot register fmrxd object in bus");
+ return -1;
+ }
+
+ dbus_connection_setup_with_g_main(bus, g_main_loop_get_context(main_loop));
+
+#if SERVER_ON_DEMAND
+ // Set up the timer for inactivity, in case we do not get any clients.
+ server_queue_stop();
+#endif
+
+ return 0;
+}
+
+void server_stop()
+{
+ g_warn_if_fail(num_clients == 0);
+ dbus_connection_unregister_object_path(bus, BUS_PATH);
+ dbus_bus_release_name(bus, BUS_NAME, NULL);
+}
+
+#if SERVER_ON_DEMAND
+static gboolean server_stop_func(gpointer data)
+{
+ g_message("Stopping service because of inactivity");
+ server_stop();
+ g_main_loop_quit(main_loop);
+ return FALSE;
+}
+
+void server_queue_stop()
+{
+ if (!server_stop_source) {
+ server_stop_source = g_timeout_add_seconds(SERVER_LINGER_TIME,
+ server_stop_func, NULL);
+ }
+}
+#endif
+
+int server_new_client()
+{
+ int fds[2];
+ int ret = pipe(fds);
+ if (ret < 0) {
+ g_warning("Could not create a pipe");
+ return -1;
+ }
+
+ Client* c = g_slice_new(Client);
+ c->fd = fds[1];
+ c->channel = g_io_channel_unix_new(c->fd);
+ c->out_watch = 0;
+ c->err_watch = g_io_add_watch(c->channel, G_IO_ERR | G_IO_HUP,
+ client_err_callback, c);
+ c->read_p = rbuffer.write_p;
+
+ // Configure some flags of the pipe using GLib
+ GIOFlags flags = g_io_channel_get_flags(c->channel);
+ if (g_io_channel_set_flags(c->channel, flags | G_IO_FLAG_NONBLOCK, NULL)
+ != G_IO_STATUS_NORMAL) {
+ g_warning("Failed to set non-blocking flag for pipe fd");
+ }
+ g_io_channel_set_encoding(c->channel, NULL, NULL);
+ g_io_channel_set_buffered(c->channel, FALSE);
+
+ g_debug("New pipe, write fd = %d", fds[1]);
+
+ // Add client to list
+ clients = g_list_append(clients, c);
+
+ if (num_clients == 0) {
+ // First client, start tuner.
+ radio_queue_start();
+#if SERVER_ON_DEMAND
+ if (server_stop_source) {
+ g_source_remove(server_stop_source);
+ server_stop_source = 0;
+ }
+#endif
+ }
+ num_clients++;
+
+ // Return the other end of the pipe
+ return fds[0];
+}
+
+static void server_kill_client(Client *c)
+{
+ g_source_remove(c->err_watch);
+ if (c->out_watch) g_source_remove(c->out_watch);
+
+ clients = g_list_remove(clients, c);
+
+ g_io_channel_shutdown(c->channel, TRUE, NULL);
+ g_io_channel_unref(c->channel);
+
+ g_slice_free(Client, c);
+
+ if (num_clients == 1) {
+ // Last client leaving, stop tuner.
+ radio_queue_stop();
+#if SERVER_ON_DEMAND
+ server_queue_stop();
+#endif
+ }
+ num_clients--;
+}
+
+size_t server_get_buffer(size_t size, void **buffer)
+{
+ g_return_val_if_fail(buffer != NULL, 0);
+ g_return_val_if_fail(size > 0, 0);
+
+ *buffer = &rbuffer.data[rbuffer.write_p];
+
+ size_t avail_size = RING_BUFFER_SIZE - rbuffer.write_p;
+ if (size > avail_size) {
+ return avail_size;
+ } else {
+ return size;
+ }
+}
+
+static void overflow_check_func(gpointer data, gpointer user_data)
+{
+ Client * c = (Client*) data;
+ unsigned int new_write_p = *(unsigned int*)user_data;
+ unsigned int old_write_p = rbuffer.write_p;
+ unsigned int data_lost = 0;
+
+ if (new_write_p < rbuffer.write_p) {
+ // write_p wraparound: Overflow happens if the read_p was
+ // positioned right after old write_p.
+ g_warn_if_fail(new_write_p == 0); // Valid simplification here
+ if (c->read_p > old_write_p) {
+ data_lost = c->read_p - old_write_p;
+ g_warn_if_fail(data_lost < RING_BUFFER_SIZE);
+ }
+ } else {
+ if (c->read_p > old_write_p && c->read_p <= new_write_p) {
+ data_lost = new_write_p - c->read_p;
+ g_warn_if_fail(data_lost < RING_BUFFER_SIZE);
+ }
+ }
+
+ if (data_lost > 0) {
+ g_message("Overflow detected (%u bytes), glitch!", data_lost);
+ c->read_p = (new_write_p) % RING_BUFFER_SIZE;
+ }
+}
+
+static void commit_buffer_func(gpointer data, gpointer user_data)
+{
+ Client * c = (Client*) data;
+
+ client_try_send(c);
+}
+
+void server_commit_buffer(size_t size)
+{
+ g_return_if_fail(size > 0);
+ g_warn_if_fail(size < RING_BUFFER_SIZE);
+
+ unsigned int new_write_p = (rbuffer.write_p + size) % RING_BUFFER_SIZE;
+
+ g_list_foreach(clients, overflow_check_func, &new_write_p);
+
+ rbuffer.write_p = new_write_p;
+
+ g_list_foreach(clients, commit_buffer_func, NULL);
+}
+
+void server_notify_tuned(double mhz)
+{
+ DBusMessage *m = dbus_message_new_signal(BUS_PATH, BUS_INTERFACE, "Tuned");
+ g_return_if_fail(m != NULL);
+ dbus_message_append_args(m, DBUS_TYPE_DOUBLE, &mhz, DBUS_TYPE_INVALID);
+ dbus_connection_send(bus, m, NULL);
+ dbus_message_unref(m);
+}
+
+void server_notify_stopped()
+{
+ DBusMessage *m = dbus_message_new_signal(BUS_PATH, BUS_INTERFACE, "Stopped");
+ g_return_if_fail(m != NULL);
+ dbus_connection_send(bus, m, NULL);
+ dbus_message_unref(m);
+}
+
+void server_notify_pi(uint16_t pi)
+{
+ DBusMessage *m = dbus_message_new_signal(BUS_PATH, BUS_INTERFACE, "PiChanged");
+ g_return_if_fail(m != NULL);
+ dbus_message_append_args(m, DBUS_TYPE_UINT16, &pi, DBUS_TYPE_INVALID);
+ dbus_connection_send(bus, m, NULL);
+ dbus_message_unref(m);
+}
+
+void server_notify_ps(const char *ps)
+{
+ DBusMessage *m = dbus_message_new_signal(BUS_PATH, BUS_INTERFACE, "PsChanged");
+ g_return_if_fail(m != NULL);
+ dbus_message_append_args(m, DBUS_TYPE_STRING, &ps, DBUS_TYPE_INVALID);
+ dbus_connection_send(bus, m, NULL);
+ dbus_message_unref(m);
+}
+
+void server_notify_rt(const char *rt)
+{
+ DBusMessage *m = dbus_message_new_signal(BUS_PATH, BUS_INTERFACE, "RtChanged");
+ g_return_if_fail(m != NULL);
+ dbus_message_append_args(m, DBUS_TYPE_STRING, &rt, DBUS_TYPE_INVALID);
+ dbus_connection_send(bus, m, NULL);
+ dbus_message_unref(m);
+}