worker: push data when clients can receive them

During every iteration of the main worker loop, outgoing data was pushed to
the client. However, there was no guarantee that the loop would be woken up
in every situation. So there were some conditions where the loop would stop
iterating until a new event was sent.

Currently, the events that can wake up the main worker loop include:
 - data from dispatcher (including wakeups from the guest)
 - data from clients
 - timeouts on a stream
 - other timeouts
 - polling

This patch adds a new wakeup event: when we have items that are queued to
be sent to a client, we set up a watch event for writing data to the
client. If no items are waiting to be sent, this watch will be disabled.
This allows us to remove the explicit push from the main worker loop.

This has some advantages:
 - it could lower latency as we don't have to wait for a polling timeout.
   From my experiments using a tight loop (so not really the ideal
   condition to see the improvements) the total time was reduced by 2-3%)
 - helps reduce the possibility of hanging loops
 - avoids having to scan all clients to detect which one can accept data.

Signed-by: Frediano Ziglio <figlio@redhat.com>
Acked-by: Jonathon Jongsma <jjongsma@redhat.com>
This commit is contained in:
Frediano Ziglio 2016-01-21 00:42:30 +00:00
parent 5a8ccd1ac7
commit 5c460de1a3
2 changed files with 18 additions and 34 deletions

View File

@ -1359,6 +1359,11 @@ void red_channel_client_push(RedChannelClient *rcc)
while ((pipe_item = red_channel_client_pipe_item_get(rcc))) {
red_channel_client_send_item(rcc, pipe_item);
}
if (red_channel_client_no_item_being_sent(rcc) && ring_is_empty(&rcc->pipe)
&& rcc->stream->watch) {
rcc->channel->core->watch_update_mask(rcc->stream->watch,
SPICE_WATCH_EVENT_READ);
}
rcc->during_send = FALSE;
red_channel_client_unref(rcc);
}
@ -1659,7 +1664,7 @@ void red_channel_pipe_item_init(RedChannel *channel, PipeItem *item, int type)
item->type = type;
}
static inline int validate_pipe_add(RedChannelClient *rcc, PipeItem *item)
static inline gboolean client_pipe_add(RedChannelClient *rcc, PipeItem *item, RingItem *pos)
{
spice_assert(rcc && item);
if (SPICE_UNLIKELY(!red_channel_client_is_connected(rcc))) {
@ -1667,17 +1672,20 @@ static inline int validate_pipe_add(RedChannelClient *rcc, PipeItem *item)
red_channel_client_release_item(rcc, item, FALSE);
return FALSE;
}
if (ring_is_empty(&rcc->pipe) && rcc->stream->watch) {
rcc->channel->core->watch_update_mask(rcc->stream->watch,
SPICE_WATCH_EVENT_READ |
SPICE_WATCH_EVENT_WRITE);
}
rcc->pipe_size++;
ring_add(pos, &item->link);
return TRUE;
}
void red_channel_client_pipe_add(RedChannelClient *rcc, PipeItem *item)
{
if (!validate_pipe_add(rcc, item)) {
return;
}
rcc->pipe_size++;
ring_add(&rcc->pipe, &item->link);
client_pipe_add(rcc, item, &rcc->pipe);
}
void red_channel_client_pipe_add_push(RedChannelClient *rcc, PipeItem *item)
@ -1690,11 +1698,7 @@ void red_channel_client_pipe_add_after(RedChannelClient *rcc,
PipeItem *item, PipeItem *pos)
{
spice_assert(pos);
if (!validate_pipe_add(rcc, item)) {
return;
}
rcc->pipe_size++;
ring_add_after(&item->link, &pos->link);
client_pipe_add(rcc, item, &pos->link);
}
int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc,
@ -1706,21 +1710,14 @@ int red_channel_client_pipe_item_is_linked(RedChannelClient *rcc,
void red_channel_client_pipe_add_tail_no_push(RedChannelClient *rcc,
PipeItem *item)
{
if (!validate_pipe_add(rcc, item)) {
return;
}
rcc->pipe_size++;
ring_add_before(&item->link, &rcc->pipe);
client_pipe_add(rcc, item, rcc->pipe.prev);
}
void red_channel_client_pipe_add_tail(RedChannelClient *rcc, PipeItem *item)
{
if (!validate_pipe_add(rcc, item)) {
return;
if (client_pipe_add(rcc, item, rcc->pipe.prev)) {
red_channel_client_push(rcc);
}
rcc->pipe_size++;
ring_add_before(&item->link, &rcc->pipe);
red_channel_client_push(rcc);
}
void red_channel_client_pipe_add_type(RedChannelClient *rcc, int pipe_item_type)

View File

@ -321,16 +321,6 @@ static int red_process_display(RedWorker *worker, int *ring_is_empty)
return n;
}
static void red_push(RedWorker *worker)
{
if (worker->cursor_channel) {
red_channel_push(RED_CHANNEL(worker->cursor_channel));
}
if (worker->display_channel) {
red_channel_push(RED_CHANNEL(worker->display_channel));
}
}
static void red_disconnect_display(RedWorker *worker)
{
spice_warning("update timeout");
@ -1458,9 +1448,6 @@ static gboolean worker_source_dispatch(GSource *source, GSourceFunc callback,
red_process_cursor(worker, &ring_is_empty);
red_process_display(worker, &ring_is_empty);
/* TODO: remove me? that should be handled by watch out condition */
red_push(worker);
return TRUE;
}