#include #include "rodisc.h" /** Buffer size for a single read() call. * Any request larger than this will be served in chunks. */ #define MAX_BUFFER_SIZE (32*1024*1024UL) #define RESPONSE_SERVER_HEADER "RODisc/1.0" typedef struct { RODisc *disc; gsize start; gsize length; gpointer buffer; SoupMessage *msg; GInputStream *stream; GCancellable *cancel; } RODiscReadOp; static SoupServer *server; static unsigned int server_port = 0; static void server_disc_cb(SoupServer *server, SoupMessage *msg, const char *path, GHashTable *query, SoupClientContext *client, gpointer user_data); static inline RODiscReadOp* server_disc_read_op_new() { RODiscReadOp *op = g_slice_new(RODiscReadOp); return op; } static void server_disc_read_op_free(RODiscReadOp *op) { if (op->stream) g_object_unref(op->stream); if (op->buffer) g_free(op->buffer); if (op->cancel) g_object_unref(op->cancel); g_slice_free(RODiscReadOp, op); } static void server_disc_finished_cb(SoupMessage *msg, gpointer user_data) { RODiscReadOp *op = (RODiscReadOp*) user_data; g_warn_if_fail(op->msg == msg); g_debug(" request finished"); if (op->cancel) { // Cancel pending operation if any g_cancellable_cancel(op->cancel); g_object_unref(op->cancel); op->cancel = NULL; } server_disc_read_op_free(op); } static void server_disc_read_cb(GInputStream *stream, GAsyncResult *res, gpointer user_data) { RODiscReadOp *op = (RODiscReadOp*) user_data; GError *error = NULL; gssize size = g_input_stream_read_finish(stream, res, &error); if (size == -1) { if (g_error_matches(error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { g_debug(" request canceled"); return; } g_warning("Failed to read from %s: %s", op->disc->file_path, error->message); soup_message_set_status(op->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); soup_server_unpause_message(server, op->msg); g_error_free(error); return; } g_debug(" read %ld bytes of %lu bytes", size, op->length); g_warn_if_fail(size > 0); if (size >= op->length || size == 0) { // We have read everything we needed, so this finishes the request! g_debug(" finishing request"); g_object_unref(op->cancel); op->cancel = NULL; op->length = 0; soup_message_body_append(op->msg->response_body, SOUP_MEMORY_TAKE, op->buffer, size); soup_message_body_complete(op->msg->response_body); soup_server_unpause_message(server, op->msg); op->buffer = NULL; // Passed ownership of buffer to libsoup // 'op' will be freed when 'finished' signal @msg is triggered } else { soup_message_body_append(op->msg->response_body, SOUP_MEMORY_COPY, op->buffer, size); soup_server_unpause_message(server, op->msg); // Launch the read for the remaining bytes op->start += size; op->length -= size; gsize read_size = MIN(op->length, MAX_BUFFER_SIZE); g_debug(" going to read %lu bytes", read_size); g_return_if_fail(read_size > 0); g_input_stream_read_async(op->stream, op->buffer, read_size, G_PRIORITY_DEFAULT, op->cancel, (GAsyncReadyCallback) server_disc_read_cb, op); } } static void server_disc_perform_read_range(SoupMessage *msg, RODisc *disc, goffset start, goffset end) { GError *error = NULL; g_debug("Opening %s", disc->file_path); // File is opened on every read because of asynchronous reads GFileInputStream * stream = g_file_read(disc->file, NULL, &error); if (!stream) { g_warning("Could not open file '%s' for reading: %s", disc->file_path, error->message); soup_message_set_status(msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); g_error_free(error); return; } gsize size = (end - start) + 1; g_debug(" reading range %lu-%lu size %lu", start, end, size); g_warn_if_fail(size > 0); if (start != 0 && !g_seekable_seek(G_SEEKABLE(stream), start, G_SEEK_SET, NULL, &error)) { g_warning("Could not seek in file '%s': %s", disc->file_path, error->message); soup_message_set_status(msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); g_error_free(error); return; } gsize read_size = MIN(size, MAX_BUFFER_SIZE); g_debug(" going to read %lu bytes", read_size); RODiscReadOp *op = server_disc_read_op_new(); op->disc = disc; op->start = start; op->length = size; op->buffer = g_malloc(read_size); op->msg = msg; op->stream = G_INPUT_STREAM(stream); op->cancel = g_cancellable_new(); g_signal_connect(msg, "finished", G_CALLBACK(server_disc_finished_cb), op); g_input_stream_read_async(op->stream, op->buffer, read_size, G_PRIORITY_DEFAULT, op->cancel, (GAsyncReadyCallback) server_disc_read_cb, op); } static void server_disc_perform_read(SoupMessage *msg, RODisc *disc) { server_disc_perform_read_range(msg, disc, 0, disc->size - 1); } static void server_disc_cb(SoupServer *server, SoupMessage *msg, const char *path, GHashTable *query, SoupClientContext *client, gpointer user_data) { RODisc *disc = (RODisc*) user_data; if (!g_str_has_suffix(path,".dmg")) { g_debug("Not found (%s)", path); soup_message_set_status(msg, SOUP_STATUS_NOT_FOUND); return; } if (msg->method == SOUP_METHOD_HEAD) { g_debug("Head on %s", path); soup_message_set_status(msg, SOUP_STATUS_OK); soup_message_headers_set_content_length(msg->response_headers, disc->size); soup_message_headers_set_content_type(msg->response_headers, RODISC_IMAGE_MIME, NULL); soup_message_headers_replace(msg->response_headers, "Accept-Ranges", "bytes"); } else if (msg->method == SOUP_METHOD_GET) { g_debug("Get on %s", path); soup_message_headers_set_content_type(msg->response_headers, RODISC_IMAGE_MIME, NULL); soup_message_headers_replace(msg->response_headers, "Accept-Ranges", "bytes"); soup_message_body_set_accumulate(msg->response_body, FALSE); SoupRange *ranges; int length; if (soup_message_headers_get_ranges(msg->request_headers, disc->size, &ranges, &length)) { if (length != 1) { g_warning("Multi-range not yet supported"); soup_message_set_status(msg, SOUP_STATUS_INVALID_RANGE); return; } goffset start = ranges[0].start, end = ranges[0].end; soup_message_set_status(msg, SOUP_STATUS_PARTIAL_CONTENT); soup_message_headers_set_content_range(msg->response_headers, start, end, disc->size); soup_message_headers_free_ranges(msg->request_headers, ranges); soup_server_pause_message(server, msg); server_disc_perform_read_range(msg, disc, start, end); } else { soup_message_headers_set_content_length(msg->response_headers, disc->size); soup_message_set_status(msg, SOUP_STATUS_OK); soup_server_pause_message(server, msg); server_disc_perform_read(msg, disc); } } else { soup_message_set_status(msg, SOUP_STATUS_NOT_IMPLEMENTED); g_warning("Unimplemented method on %s", path); } } static void server_cb(SoupServer *server, SoupMessage *msg, const char *path, GHashTable *query, SoupClientContext *client, gpointer user_data) { g_debug("Unknown path requested: %s", path); soup_message_set_status(msg, SOUP_STATUS_NOT_FOUND); } bool server_start() { GError *error = NULL; server = soup_server_new(SOUP_SERVER_SERVER_HEADER, RESPONSE_SERVER_HEADER, NULL); if (!server) { g_printerr("Could not create HTTP server\n"); return false; } soup_server_add_handler(server, NULL, server_cb, NULL, NULL); if (!soup_server_listen_all(server, 0, SOUP_SERVER_LISTEN_IPV4_ONLY, &error)) { g_printerr("Could not start HTTP server: %s\n", error->message); g_error_free(error); return false; } // Fetch the server port GSList *uris = soup_server_get_uris(server); g_warn_if_fail(g_slist_length(uris) == 1); for (GSList *l = uris; l; l = l->next) { SoupURI *uri = l->data; char *str = soup_uri_to_string(uri, FALSE); g_message("Listening on %s", str); g_free(str); server_port = soup_uri_get_port(uri); } g_slist_free_full(uris, (GDestroyNotify)soup_uri_free); if (server_port == 0) { g_printerr("Could not get HTTP server port\n"); return false; } return true; } void server_stop() { g_object_unref(server); } void server_register(RODisc *disc) { soup_server_add_handler(server, disc->uri, server_disc_cb, disc, NULL); } void server_unregister(RODisc *disc) { soup_server_remove_handler(server, disc->uri); } unsigned int server_get_port() { return server_port; }