From 3cdb9f590563b1d8be9263f2db210c48351e0894 Mon Sep 17 00:00:00 2001 From: "Javier S. Pedro" Date: Sat, 28 Jul 2012 18:57:15 +0200 Subject: better support for large ranges --- .gitignore | 7 +++ Makefile | 4 +- rodisc.c | 169 ++++++++++++++++++++++++++++++++++++++++++++++++------------- 3 files changed, 142 insertions(+), 38 deletions(-) diff --git a/.gitignore b/.gitignore index 0f00f50..17ada95 100644 --- a/.gitignore +++ b/.gitignore @@ -11,6 +11,13 @@ *.la *.a +# QtCreator stuff +rodisc.config +rodisc.creator +rodisc.creator.* +rodisc.files +rodisc.includes + # Final executables rodisc rodiscd diff --git a/Makefile b/Makefile index 37b2fd6..b79947c 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ -CFLAGS?=-Wall -Os +CFLAGS?=-O0 -g RODISC_PKGCONFIG:=glib-2.0 gobject-2.0 gio-unix-2.0 libsoup-2.4 avahi-gobject -RODISC_CFLAGS:=$(shell pkg-config --cflags $(RODISC_PKGCONFIG)) +RODISC_CFLAGS:=-Wall $(shell pkg-config --cflags $(RODISC_PKGCONFIG)) RODISC_LIBS:=$(shell pkg-config --libs $(RODISC_PKGCONFIG)) all: rodiscd diff --git a/rodisc.c b/rodisc.c index 748d401..a563cf1 100644 --- a/rodisc.c +++ b/rodisc.c @@ -13,11 +13,17 @@ #define RODISC_MDNS_SERVICE "_odisk._tcp" +#define RODISC_IMAGE_MIME "application/octet-stream" + #define RODISC_TYPE_GENERIC "public.optical-storage-media" #define RODISC_TYPE_CD "public.cd-media" #define RODISC_TYPE_DVD "public.dvd-media" #define RODISC_TYPE_BD "public.optical-storage-media" +/** Buffer size for a single read() call. + * Any request larger than this will be served in chunks. */ +#define MAX_BUFFER_SIZE (32*1024*1024UL) + static GMainLoop *main_loop; static SoupServer *server; @@ -52,8 +58,11 @@ typedef struct { typedef struct { RODisc *disc; gsize start; + gsize length; gpointer buffer; SoupMessage *msg; + GInputStream *stream; + GCancellable *cancel; } RODiscReadOp; static void mdns_service_freeze() @@ -95,6 +104,8 @@ static void mdns_service_update() g_warning("Could not update main TXT record: %s", error->message); g_error_free(error); } + + g_free(record); } static void mdns_service_update_disc(RODisc *disc) @@ -103,7 +114,7 @@ static void mdns_service_update_disc(RODisc *disc) g_return_if_fail(mdns_service); - // "CdRom0=adVN=TS3EP05,adVT=public.cd-media" + // "CdRom0=adVN=DiscLabel,adVT=public.cd-media" gchar *record = g_strdup_printf("adVN=%s,adVT=%s", disc->label, disc->type); const gchar *uri_basename = &disc->uri[1]; // Skip first '/' @@ -113,6 +124,8 @@ static void mdns_service_update_disc(RODisc *disc) disc->uri, error->message); g_error_free(error); } + + g_free(record); } static void mdns_service_remove_disc(RODisc *disc) @@ -191,7 +204,6 @@ static void mdns_client_state_changed_cb(GaClient *client, GaClientState state, ga_entry_group_reset(mdns_group, NULL); mdns_service = 0; } - break; default: // Do nothing @@ -199,40 +211,103 @@ static void mdns_client_state_changed_cb(GaClient *client, GaClientState state, } } +static void server_disc_cb(SoupServer *server, SoupMessage *msg, const char *path, + GHashTable *query, SoupClientContext *client, gpointer user_data); + +static inline RODiscReadOp* server_disc_read_op_new() +{ + RODiscReadOp *op = g_slice_new(RODiscReadOp); + return op; +} + +static void server_disc_read_op_free(RODiscReadOp *op) +{ + if (op->stream) g_object_unref(op->stream); + if (op->buffer) g_free(op->buffer); + if (op->cancel) g_object_unref(op->cancel); + g_slice_free(RODiscReadOp, op); +} + +static void server_disc_finished_cb(SoupMessage *msg, gpointer user_data) +{ + RODiscReadOp *op = (RODiscReadOp*) user_data; + g_warn_if_fail(op->msg == msg); + + g_debug(" request finished"); + + if (op->cancel) { + // Cancel pending operation if any + g_cancellable_cancel(op->cancel); + g_object_unref(op->cancel); + op->cancel = NULL; + } + + server_disc_read_op_free(op); +} + static void server_disc_read_cb(GInputStream *stream, GAsyncResult *res, gpointer user_data) { RODiscReadOp *op = (RODiscReadOp*) user_data; GError *error = NULL; gssize size = g_input_stream_read_finish(stream, res, &error); if (size == -1) { + if (g_error_matches(error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) { + g_debug(" request canceled"); + return; + } + g_warning("Failed to read from %s: %s", op->disc->file_path, error->message); soup_message_set_status(op->msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); soup_server_unpause_message(server, op->msg); g_error_free(error); - g_free(op->buffer); - g_slice_free(RODiscReadOp, op); + server_disc_read_op_free(op); return; } - g_debug(" read a total of %ld bytes", size); + g_debug(" read %ld bytes of %lu bytes", size, op->length); + g_warn_if_fail(size > 0); - soup_message_set_status(op->msg, SOUP_STATUS_PARTIAL_CONTENT); - soup_message_headers_set_content_range(op->msg->response_headers, - op->start, op->start + size - 1, - op->disc->size); - soup_message_body_append_take(op->msg->response_body, op->buffer, size); - soup_server_unpause_message(server, op->msg); + if (size >= op->length || size == 0) { + // We have read everything we needed, so this finishes the request! + g_debug(" finishing request"); + g_object_unref(op->cancel); + op->cancel = NULL; + op->length = 0; - g_slice_free(RODiscReadOp, op); + soup_message_body_append(op->msg->response_body, SOUP_MEMORY_TAKE, + op->buffer, size); + soup_message_body_complete(op->msg->response_body); + soup_server_unpause_message(server, op->msg); + + op->buffer = NULL; // Passed ownership of buffer to libsoup + // 'op' will be freed when 'finished' signal @msg is triggered + } else { + soup_message_body_append(op->msg->response_body, SOUP_MEMORY_COPY, + op->buffer, size); + soup_server_unpause_message(server, op->msg); + + // Launch the read for the remaining bytes + op->start += size; + op->length -= size; + + gsize read_size = MIN(op->length, MAX_BUFFER_SIZE); + g_debug(" going to read %lu bytes", read_size); + g_return_if_fail(read_size > 0); + + g_input_stream_read_async(op->stream, op->buffer, read_size, + G_PRIORITY_DEFAULT, op->cancel, + (GAsyncReadyCallback) server_disc_read_cb, op); + } } -static void server_disc_perform_read(SoupMessage *msg, RODisc *disc, goffset start, goffset end) +static void server_disc_perform_read_range(SoupMessage *msg, RODisc *disc, goffset start, goffset end) { GError *error = NULL; g_debug("Opening %s", disc->file_path); + // File is opened on every read because of asynchronous reads GFileInputStream * stream = g_file_read(disc->file, NULL, &error); if (!stream) { @@ -245,27 +320,40 @@ static void server_disc_perform_read(SoupMessage *msg, RODisc *disc, goffset sta gsize size = (end - start) + 1; g_debug(" reading range %lu-%lu size %lu", start, end, size); + g_warn_if_fail(size > 0); - if (!g_seekable_seek(G_SEEKABLE(stream), start, G_SEEK_SET, NULL, &error)) { + if (start != 0 && + !g_seekable_seek(G_SEEKABLE(stream), start, G_SEEK_SET, NULL, &error)) { g_warning("Could not seek in file '%s': %s", - disc->file_path, error->message); + disc->file_path, error->message); soup_message_set_status(msg, SOUP_STATUS_INTERNAL_SERVER_ERROR); g_error_free(error); return; } - RODiscReadOp *op = g_slice_new(RODiscReadOp); + gsize read_size = MIN(size, MAX_BUFFER_SIZE); + g_debug(" going to read %lu bytes", read_size); + + RODiscReadOp *op = server_disc_read_op_new(); op->disc = disc; op->start = start; - op->buffer = g_malloc(size); + op->length = size; + op->buffer = g_malloc(read_size); op->msg = msg; + op->stream = G_INPUT_STREAM(stream); + op->cancel = g_cancellable_new(); - soup_server_pause_message(server, msg); + g_signal_connect(msg, "finished", + G_CALLBACK(server_disc_finished_cb), op); - g_input_stream_read_async(G_INPUT_STREAM(stream), op->buffer, size, - G_PRIORITY_DEFAULT, NULL, + g_input_stream_read_async(op->stream, op->buffer, read_size, + G_PRIORITY_DEFAULT, op->cancel, (GAsyncReadyCallback) server_disc_read_cb, op); - g_object_unref(stream); +} + +static void server_disc_perform_read(SoupMessage *msg, RODisc *disc) +{ + server_disc_perform_read_range(msg, disc, 0, disc->size - 1); } static void server_disc_cb(SoupServer *server, SoupMessage *msg, const char *path, @@ -283,12 +371,13 @@ static void server_disc_cb(SoupServer *server, SoupMessage *msg, const char *pat g_debug("Head on %s", path); soup_message_set_status(msg, SOUP_STATUS_OK); soup_message_headers_set_content_length(msg->response_headers, disc->size); - soup_message_headers_set_content_type(msg->response_headers, "application/octet-stream", NULL); + soup_message_headers_set_content_type(msg->response_headers, RODISC_IMAGE_MIME, NULL); soup_message_headers_replace(msg->response_headers, "Accept-Ranges", "bytes"); } else if (msg->method == SOUP_METHOD_GET) { g_debug("Get on %s", path); - soup_message_headers_set_content_type(msg->response_headers, "application/octet-stream", NULL); + soup_message_headers_set_content_type(msg->response_headers, RODISC_IMAGE_MIME, NULL); soup_message_headers_replace(msg->response_headers, "Accept-Ranges", "bytes"); + soup_message_body_set_accumulate(msg->response_body, FALSE); SoupRange *ranges; int length; if (soup_message_headers_get_ranges(msg->request_headers, disc->size, &ranges, &length)) { @@ -298,11 +387,17 @@ static void server_disc_cb(SoupServer *server, SoupMessage *msg, const char *pat return; } goffset start = ranges[0].start, end = ranges[0].end; - server_disc_perform_read(msg, disc, start, end); + soup_message_set_status(msg, SOUP_STATUS_PARTIAL_CONTENT); + soup_message_headers_set_content_range(msg->response_headers, + start, end, + disc->size); + soup_server_pause_message(server, msg); + server_disc_perform_read_range(msg, disc, start, end); } else { soup_message_headers_set_content_length(msg->response_headers, disc->size); - soup_message_set_status(msg, SOUP_STATUS_REQUEST_ENTITY_TOO_LARGE); - g_warning("Must specify a range"); + soup_message_set_status(msg, SOUP_STATUS_OK); + soup_server_pause_message(server, msg); + server_disc_perform_read(msg, disc); } } else { soup_message_set_status(msg, SOUP_STATUS_NOT_IMPLEMENTED); @@ -310,6 +405,13 @@ static void server_disc_cb(SoupServer *server, SoupMessage *msg, const char *pat } } +static void server_cb(SoupServer *server, SoupMessage *msg, const char *path, + GHashTable *query, SoupClientContext *client, gpointer user_data) +{ + g_message("Unknown path requested: %s", path); + soup_message_set_status(msg, SOUP_STATUS_NOT_FOUND); +} + static inline RODisc *rodisc_lookup(const gchar *id) { return g_hash_table_lookup(discs, id); @@ -506,7 +608,9 @@ static void file_add_disc(const gchar *path) { static int num = 0; GFile *file = g_file_new_for_path(path); - GFileInfo *info = g_file_query_info(file, G_FILE_ATTRIBUTE_STANDARD_SIZE, + GFileInfo *info = g_file_query_info(file, + G_FILE_ATTRIBUTE_STANDARD_DISPLAY_NAME "," + G_FILE_ATTRIBUTE_STANDARD_SIZE, G_FILE_QUERY_INFO_NONE, NULL, NULL); g_return_if_fail(info); const int my_num = ++num; @@ -515,7 +619,7 @@ static void file_add_disc(const gchar *path) disc->uri = g_strdup_printf("/file%d", my_num); disc->file_path = g_file_get_path(file); disc->file = file; - disc->label = g_file_get_basename(file); + disc->label = g_strdup(g_file_info_get_display_name(info)); disc->type = RODISC_TYPE_GENERIC; disc->size = g_file_info_get_size(info); rodisc_export(disc); @@ -531,13 +635,6 @@ static void files_add() } } -static void server_cb(SoupServer *server, SoupMessage *msg, const char *path, - GHashTable *query, SoupClientContext *client, gpointer user_data) -{ - g_message("Unknown path requested: %s", path); - soup_message_set_status(msg, SOUP_STATUS_NOT_FOUND); -} - int main(int argc, char * argv[]) { GError *error = NULL; @@ -604,6 +701,6 @@ int main(int argc, char * argv[]) g_object_unref(mdns_client); g_object_unref(server); g_main_loop_unref(main_loop); + return EXIT_SUCCESS; } - -- cgit v1.2.3