diff --git a/server/Makefile.am b/server/Makefile.am index 92b716f5..411a0d94 100644 --- a/server/Makefile.am +++ b/server/Makefile.am @@ -121,8 +121,6 @@ libserver_la_SOURCES = \ sound.h \ stat.h \ spicevmc.c \ - spice_timer_queue.c \ - spice_timer_queue.h \ zlib-encoder.c \ zlib-encoder.h \ image-cache.h \ diff --git a/server/red-worker.c b/server/red-worker.c index eb87813a..0d412199 100644 --- a/server/red-worker.c +++ b/server/red-worker.c @@ -49,22 +49,14 @@ #include "spice.h" #include "red-worker.h" -#include "spice_timer_queue.h" #include "cursor-channel.h" #include "tree.h" #define CMD_RING_POLL_TIMEOUT 10 //milli #define CMD_RING_POLL_RETRIES 200 -#define MAX_EVENT_SOURCES 20 #define INF_EVENT_WAIT ~0 -struct SpiceWatch { - struct RedWorker *worker; - SpiceWatchFunc watch_func; - void *watch_func_opaque; -}; - struct RedWorker { pthread_t thread; QXLInstance *qxl; @@ -72,8 +64,7 @@ struct RedWorker { SpiceWatch *dispatch_watch; int running; SpiceCoreInterfaceInternal core; - struct pollfd poll_fds[MAX_EVENT_SOURCES]; - struct SpiceWatch watches[MAX_EVENT_SOURCES]; + unsigned int event_timeout; DisplayChannel *display_channel; @@ -497,84 +488,6 @@ static int common_channel_config_socket(RedChannelClient *rcc) return TRUE; } -static void worker_watch_update_mask(SpiceWatch *watch, int event_mask) -{ - struct RedWorker *worker; - int i; - - if (!watch) { - return; - } - - worker = watch->worker; - i = watch - worker->watches; - - worker->poll_fds[i].events = 0; - if (event_mask & SPICE_WATCH_EVENT_READ) { - worker->poll_fds[i].events |= POLLIN; - } - if (event_mask & SPICE_WATCH_EVENT_WRITE) { - worker->poll_fds[i].events |= POLLOUT; - } -} - -static SpiceWatch *worker_watch_add(const SpiceCoreInterfaceInternal *iface, - int fd, int event_mask, SpiceWatchFunc func, void *opaque) -{ - RedWorker *worker = SPICE_CONTAINEROF(iface, RedWorker, core); - int i; - - /* Search for a free slot in our poll_fds & watches arrays */ - for (i = 0; i < MAX_EVENT_SOURCES; i++) { - if (worker->poll_fds[i].fd == -1) { - break; - } - } - if (i == MAX_EVENT_SOURCES) { - /* Since we are a channel core implementation, we always get called from - red_channel_client_create(), so opaque always is our rcc */ - RedChannelClient *rcc = opaque; - spice_warning("could not add a watch for channel type %u id %u", - rcc->channel->type, rcc->channel->id); - return NULL; - } - - worker->poll_fds[i].fd = fd; - worker->watches[i].worker = worker; - worker->watches[i].watch_func = func; - worker->watches[i].watch_func_opaque = opaque; - worker_watch_update_mask(&worker->watches[i], event_mask); - - return &worker->watches[i]; -} - -static void worker_watch_remove(SpiceWatch *watch) -{ - if (!watch) { - return; - } - - /* Note we don't touch the poll_fd here, to avoid the - poll_fds/watches table entry getting re-used in the same - red_worker_main loop over the fds as it is removed. - - This is done because re-using it while events were pending on - the fd previously occupying the slot would lead to incorrectly - calling the watch_func for the new fd. */ - memset(watch, 0, sizeof(SpiceWatch)); -} - -static const SpiceCoreInterfaceInternal worker_core_initializer = { - .timer_add = spice_timer_queue_add, - .timer_start = spice_timer_set, - .timer_cancel = spice_timer_cancel, - .timer_remove = spice_timer_remove, - - .watch_update_mask = worker_watch_update_mask, - .watch_add = worker_watch_add, - .watch_remove = worker_watch_remove, -}; - CommonChannelClient *common_channel_new_client(CommonChannel *common, int size, RedClient *client, @@ -1518,18 +1431,81 @@ static void handle_dev_input(int fd, int event, void *opaque) dispatcher_handle_recv_read(red_dispatcher_get_dispatcher(worker->red_dispatcher)); } +typedef struct RedWorkerSource { + GSource source; + RedWorker *worker; +} RedWorkerSource; + +static gboolean worker_source_prepare(GSource *source, gint *p_timeout) +{ + RedWorkerSource *wsource = SPICE_CONTAINEROF(source, RedWorkerSource, source); + RedWorker *worker = wsource->worker; + unsigned int timeout; + + timeout = MIN(worker->event_timeout, + display_channel_get_streams_timeout(worker->display_channel)); + + *p_timeout = (timeout == INF_EVENT_WAIT) ? -1 : timeout; + if (*p_timeout == 0) + return TRUE; + + return FALSE; +} + +static gboolean worker_source_check(GSource *source) +{ + RedWorkerSource *wsource = SPICE_CONTAINEROF(source, RedWorkerSource, source); + RedWorker *worker = wsource->worker; + + return worker->running /* TODO && worker->pending_process */; +} + +static gboolean worker_source_dispatch(GSource *source, GSourceFunc callback, + gpointer user_data) +{ + RedWorkerSource *wsource = SPICE_CONTAINEROF(source, RedWorkerSource, source); + RedWorker *worker = wsource->worker; + DisplayChannel *display = worker->display_channel; + int ring_is_empty; + + /* during migration, in the dest, the display channel can be initialized + while the global lz data not since migrate data msg hasn't been + received yet */ + /* TODO: why is this here, and not in display_channel_create */ + display_channel_free_glz_drawables_to_free(display); + + /* TODO: could use its own source */ + stream_timeout(display); + + worker->event_timeout = INF_EVENT_WAIT; + red_process_cursor(worker, &ring_is_empty); + red_process_display(worker, &ring_is_empty); + + /* TODO: remove me? that should be handled by watch out condition */ + red_push(worker); + + return TRUE; +} + +/* cannot be const */ +static GSourceFuncs worker_source_funcs = { + .prepare = worker_source_prepare, + .check = worker_source_check, + .dispatch = worker_source_dispatch, +}; + RedWorker* red_worker_new(QXLInstance *qxl, RedDispatcher *red_dispatcher) { QXLDevInitInfo init_info; RedWorker *worker; Dispatcher *dispatcher; - int i; const char *record_filename; qxl->st->qif->get_init_info(qxl, &init_info); worker = spice_new0(RedWorker, 1); - worker->core = worker_core_initializer; + worker->core = event_loop_core; + worker->core.main_context = g_main_context_new(); record_filename = getenv("SPICE_WORKER_RECORD_FILENAME"); if (record_filename) { @@ -1563,15 +1539,17 @@ RedWorker* red_worker_new(QXLInstance *qxl, RedDispatcher *red_dispatcher) worker->wakeup_counter = stat_add_counter(worker->stat, "wakeups", TRUE); worker->command_counter = stat_add_counter(worker->stat, "commands", TRUE); #endif - for (i = 0; i < MAX_EVENT_SOURCES; i++) { - worker->poll_fds[i].fd = -1; - } worker->dispatch_watch = worker->core.watch_add(&worker->core, dispatcher_get_recv_fd(dispatcher), SPICE_WATCH_EVENT_READ, handle_dev_input, worker); spice_assert(worker->dispatch_watch != NULL); + GSource *source = g_source_new(&worker_source_funcs, sizeof(RedWorkerSource)); + SPICE_CONTAINEROF(source, RedWorkerSource, source)->worker = worker; + g_source_attach(source, worker->core.main_context); + g_source_unref(source); + memslot_info_init(&worker->mem_slots, init_info.num_memslots_groups, init_info.num_memslots, @@ -1599,73 +1577,15 @@ SPICE_GNUC_NORETURN static void *red_worker_main(void *arg) spice_assert(MAX_PIPE_SIZE > WIDE_CLIENT_ACK_WINDOW && MAX_PIPE_SIZE > NARROW_CLIENT_ACK_WINDOW); //ensure wakeup by ack message - if (!spice_timer_queue_create()) { - spice_error("failed to create timer queue"); - } - RED_CHANNEL(worker->cursor_channel)->thread_id = pthread_self(); RED_CHANNEL(worker->display_channel)->thread_id = pthread_self(); - for (;;) { - int i, num_events; - unsigned int timeout; + GMainLoop *loop = g_main_loop_new(worker->core.main_context, FALSE); + g_main_loop_run(loop); + g_main_loop_unref(loop); - timeout = spice_timer_queue_get_timeout_ms(); - worker->event_timeout = MIN(timeout, worker->event_timeout); - timeout = display_channel_get_streams_timeout(worker->display_channel); - worker->event_timeout = MIN(timeout, worker->event_timeout); - num_events = poll(worker->poll_fds, MAX_EVENT_SOURCES, worker->event_timeout); - stream_timeout(worker->display_channel); - spice_timer_queue_cb(); - - if (worker->display_channel) { - /* during migration, in the dest, the display channel can be initialized - while the global lz data not since migrate data msg hasn't been - received yet */ - display_channel_free_glz_drawables_to_free(worker->display_channel); - } - - worker->event_timeout = INF_EVENT_WAIT; - if (num_events == -1) { - if (errno != EINTR) { - spice_error("poll failed, %s", strerror(errno)); - } - } - - for (i = 0; i < MAX_EVENT_SOURCES; i++) { - /* The watch may have been removed by the watch-func from - another fd (ie a disconnect through the dispatcher), - in this case watch_func is NULL. */ - if (worker->poll_fds[i].revents && worker->watches[i].watch_func) { - int events = 0; - if (worker->poll_fds[i].revents & POLLIN) { - events |= SPICE_WATCH_EVENT_READ; - } - if (worker->poll_fds[i].revents & POLLOUT) { - events |= SPICE_WATCH_EVENT_WRITE; - } - worker->watches[i].watch_func(worker->poll_fds[i].fd, events, - worker->watches[i].watch_func_opaque); - } - } - - /* Clear the poll_fd for any removed watches, see the comment in - watch_remove for why we don't do this there. */ - for (i = 0; i < MAX_EVENT_SOURCES; i++) { - if (!worker->watches[i].watch_func) { - worker->poll_fds[i].fd = -1; - } - } - - if (worker->running) { - int ring_is_empty; - red_process_cursor(worker, &ring_is_empty); - red_process_display(worker, &ring_is_empty); - } - red_push(worker); - } - - spice_warn_if_reached(); + /* FIXME: free worker, and join threads */ + exit(0); } bool red_worker_run(RedWorker *worker) diff --git a/server/spice_timer_queue.c b/server/spice_timer_queue.c deleted file mode 100644 index 421b0909..00000000 --- a/server/spice_timer_queue.c +++ /dev/null @@ -1,267 +0,0 @@ -/* -*- Mode: C; c-basic-offset: 4; indent-tabs-mode: nil -*- */ -/* - Copyright (C) 2013 Red Hat, Inc. - - This library is free software; you can redistribute it and/or - modify it under the terms of the GNU Lesser General Public - License as published by the Free Software Foundation; either - version 2.1 of the License, or (at your option) any later version. - - This library is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public - License along with this library; if not, see . -*/ -#include -#include -#include "red-common.h" -#include "spice_timer_queue.h" -#include "common/ring.h" -#include "utils.h" - -static Ring timer_queue_list; -static int queue_count = 0; -static pthread_mutex_t queue_list_lock = PTHREAD_MUTEX_INITIALIZER; - -static void spice_timer_queue_init(void) -{ - ring_init(&timer_queue_list); -} - -struct SpiceTimer { - RingItem link; - RingItem active_link; - - SpiceTimerFunc func; - void *opaque; - - SpiceTimerQueue *queue; - - int is_active; - uint32_t ms; - uint64_t expiry_time; -}; - -struct SpiceTimerQueue { - RingItem link; - pthread_t thread; - Ring timers; - Ring active_timers; -}; - -static SpiceTimerQueue *spice_timer_queue_find(void) -{ - pthread_t self = pthread_self(); - RingItem *queue_item; - - RING_FOREACH(queue_item, &timer_queue_list) { - SpiceTimerQueue *queue = SPICE_CONTAINEROF(queue_item, SpiceTimerQueue, link); - - if (pthread_equal(self, queue->thread) != 0) { - return queue; - } - } - - return NULL; -} - -static SpiceTimerQueue *spice_timer_queue_find_with_lock(void) -{ - SpiceTimerQueue *queue; - - pthread_mutex_lock(&queue_list_lock); - queue = spice_timer_queue_find(); - pthread_mutex_unlock(&queue_list_lock); - return queue; -} - -int spice_timer_queue_create(void) -{ - SpiceTimerQueue *queue; - - pthread_mutex_lock(&queue_list_lock); - if (queue_count == 0) { - spice_timer_queue_init(); - } - - if (spice_timer_queue_find() != NULL) { - spice_printerr("timer queue was already created for the thread"); - return FALSE; - } - - queue = spice_new0(SpiceTimerQueue, 1); - queue->thread = pthread_self(); - ring_init(&queue->timers); - ring_init(&queue->active_timers); - - ring_add(&timer_queue_list, &queue->link); - queue_count++; - - pthread_mutex_unlock(&queue_list_lock); - - return TRUE; -} - -void spice_timer_queue_destroy(void) -{ - RingItem *item; - SpiceTimerQueue *queue; - - pthread_mutex_lock(&queue_list_lock); - queue = spice_timer_queue_find(); - - spice_assert(queue != NULL); - - while ((item = ring_get_head(&queue->timers))) { - SpiceTimer *timer; - - timer = SPICE_CONTAINEROF(item, SpiceTimer, link); - spice_timer_remove(timer); - } - - ring_remove(&queue->link); - free(queue); - queue_count--; - - pthread_mutex_unlock(&queue_list_lock); -} - -SpiceTimer *spice_timer_queue_add(const SpiceCoreInterfaceInternal *iface, - SpiceTimerFunc func, void *opaque) -{ - SpiceTimer *timer = spice_new0(SpiceTimer, 1); - SpiceTimerQueue *queue = spice_timer_queue_find_with_lock(); - - spice_assert(queue != NULL); - - ring_item_init(&timer->link); - ring_item_init(&timer->active_link); - - timer->opaque = opaque; - timer->func = func; - timer->queue = queue; - - ring_add(&queue->timers, &timer->link); - - return timer; -} - -static void _spice_timer_set(SpiceTimer *timer, uint32_t ms, uint64_t now) -{ - RingItem *next_item; - SpiceTimerQueue *queue; - - if (timer->is_active) { - spice_timer_cancel(timer); - } - - queue = timer->queue; - timer->expiry_time = now + ms; - timer->ms = ms; - - RING_FOREACH(next_item, &queue->active_timers) { - SpiceTimer *next_timer = SPICE_CONTAINEROF(next_item, SpiceTimer, active_link); - - if (timer->expiry_time <= next_timer->expiry_time) { - break; - } - } - - if (next_item) { - ring_add_before(&timer->active_link, next_item); - } else { - ring_add_before(&timer->active_link, &queue->active_timers); - } - timer->is_active = TRUE; -} - -void spice_timer_set(SpiceTimer *timer, uint32_t ms) -{ - spice_assert(pthread_equal(timer->queue->thread, pthread_self()) != 0); - - _spice_timer_set(timer, ms, spice_get_monotonic_time_ms()); -} - -void spice_timer_cancel(SpiceTimer *timer) -{ - spice_assert(pthread_equal(timer->queue->thread, pthread_self()) != 0); - - if (!ring_item_is_linked(&timer->active_link)) { - spice_assert(!timer->is_active); - return; - } - - spice_assert(timer->is_active); - ring_remove(&timer->active_link); - timer->is_active = FALSE; -} - -void spice_timer_remove(SpiceTimer *timer) -{ - spice_assert(timer->queue); - spice_assert(ring_item_is_linked(&timer->link)); - spice_assert(pthread_equal(timer->queue->thread, pthread_self()) != 0); - - if (timer->is_active) { - spice_assert(ring_item_is_linked(&timer->active_link)); - ring_remove(&timer->active_link); - } - ring_remove(&timer->link); - free(timer); -} - -unsigned int spice_timer_queue_get_timeout_ms(void) -{ - int64_t now_ms; - RingItem *head; - SpiceTimer *head_timer; - SpiceTimerQueue *queue = spice_timer_queue_find_with_lock(); - - spice_assert(queue != NULL); - - if (ring_is_empty(&queue->active_timers)) { - return -1; - } - - head = ring_get_head(&queue->active_timers); - head_timer = SPICE_CONTAINEROF(head, SpiceTimer, active_link); - - now_ms = spice_get_monotonic_time_ms(); - - return MAX(0, ((int64_t)head_timer->expiry_time - now_ms)); -} - - -void spice_timer_queue_cb(void) -{ - uint64_t now_ms; - RingItem *head; - SpiceTimerQueue *queue = spice_timer_queue_find_with_lock(); - - spice_assert(queue != NULL); - - if (ring_is_empty(&queue->active_timers)) { - return; - } - - now_ms = spice_get_monotonic_time_ms(); - - while ((head = ring_get_head(&queue->active_timers))) { - SpiceTimer *timer = SPICE_CONTAINEROF(head, SpiceTimer, active_link); - - if (timer->expiry_time > now_ms) { - break; - } else { - /* Remove active timer before calling the timer function. - * Timer function could delete the timer making the timer - * pointer point to freed data. - */ - spice_timer_cancel(timer); - timer->func(timer->opaque); - /* timer could now be invalid ! */ - } - } -} diff --git a/server/spice_timer_queue.h b/server/spice_timer_queue.h deleted file mode 100644 index b17cecff..00000000 --- a/server/spice_timer_queue.h +++ /dev/null @@ -1,44 +0,0 @@ -/* - Copyright (C) 2013 Red Hat, Inc. - - This library is free software; you can redistribute it and/or - modify it under the terms of the GNU Lesser General Public - License as published by the Free Software Foundation; either - version 2.1 of the License, or (at your option) any later version. - - This library is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public - License along with this library; if not, see . -*/ - -#ifndef _H_SPICE_TIMER_QUEUE -#define _H_SPICE_TIMER_QUEUE - -#include -#include "spice.h" - -typedef struct SpiceTimerQueue SpiceTimerQueue; - -/* create/destroy a timer queue for the current thread. - * In order to execute the timers functions, spice_timer_queue_cb should be called - * periodically, according to spice_timer_queue_get_timeout_ms */ -int spice_timer_queue_create(void); -void spice_timer_queue_destroy(void); - -SpiceTimer *spice_timer_queue_add(const SpiceCoreInterfaceInternal *iface, - SpiceTimerFunc func, void *opaque); -void spice_timer_set(SpiceTimer *timer, uint32_t ms); -void spice_timer_cancel(SpiceTimer *timer); -void spice_timer_remove(SpiceTimer *timer); - -/* returns the time left till the earliest timer in the queue expires. - * returns (unsigned)-1 if there are no active timers */ -unsigned int spice_timer_queue_get_timeout_ms(void); -/* call the timeout callbacks of all the expired timers */ -void spice_timer_queue_cb(void); - -#endif