server/red_channel: introduce client ring in RedChannel

Also adds Drawable pipes and glz rings.

main_channel and red_worker had several locations that still accessed rcc
directly, so they had to be touched too, but the changes are minimal.

Most changes are in red_channel: drop the single client reference in RedChannel
and add a ring of channels.

Things missing / not done right in red_worker:
 * StreamAgents are in DCC - right/wrong?
 * GlzDict is multiplied - multiple compressions.

We still are missing:
 * remove the disconnect calls on new connections
This commit is contained in:
Alon Levy 2011-04-13 09:24:31 +03:00
parent 6a1d657373
commit 4db9f1d1a9
4 changed files with 814 additions and 330 deletions

View File

@ -165,15 +165,13 @@ static void main_disconnect(MainChannel *main_chan)
red_channel_destroy(&main_chan->base);
}
#define MAIN_FOREACH(_link, _main, _mcc) \
if ((_main) && ((_mcc) = \
SPICE_CONTAINEROF((_main)->base.rcc, MainChannelClient, base)))
RedClient *main_channel_get_client_by_link_id(MainChannel *main_chan, uint32_t connection_id)
{
RingItem *link;
MainChannelClient *mcc;
MAIN_FOREACH(link, main_chan, mcc) {
RING_FOREACH(link, &main_chan->base.clients) {
mcc = SPICE_CONTAINEROF(link, MainChannelClient, base.channel_link);
if (mcc->connection_id == connection_id) {
return mcc->base.client;
}

View File

@ -29,6 +29,7 @@
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#include "ring.h"
#include "stat.h"
#include "red_channel.h"
#include "generated_marshallers.h"
@ -159,7 +160,14 @@ void red_channel_client_receive(RedChannelClient *rcc)
void red_channel_receive(RedChannel *channel)
{
red_channel_client_receive(channel->rcc);
RingItem *link;
RingItem *next;
RedChannelClient *rcc;
RING_FOREACH_SAFE(link, next, &channel->clients) {
rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
red_channel_client_receive(rcc);
}
}
static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handler)
@ -358,8 +366,8 @@ static void red_channel_client_pipe_remove(RedChannelClient *rcc, PipeItem *item
static void red_channel_add_client(RedChannel *channel, RedChannelClient *rcc)
{
ASSERT(rcc && !channel->rcc);
channel->rcc = rcc;
ASSERT(rcc);
ring_add(&channel->clients, &rcc->channel_link);
channel->clients_num++;
}
@ -442,6 +450,7 @@ RedChannel *red_channel_create(int size,
channel->core = core;
channel->migrate = migrate;
ring_init(&channel->clients);
channel->incoming_cb.alloc_msg_buf = (alloc_msg_recv_buf_proc)alloc_recv_buf;
channel->incoming_cb.release_msg_buf = (release_msg_recv_buf_proc)release_recv_buf;
@ -511,12 +520,16 @@ void red_channel_client_destroy(RedChannelClient *rcc)
void red_channel_destroy(RedChannel *channel)
{
RingItem *link;
RingItem *next;
if (!channel) {
return;
}
red_channel_pipes_clear(channel);
if (channel->rcc) {
red_channel_client_destroy(channel->rcc);
RING_FOREACH_SAFE(link, next, &channel->clients) {
red_channel_client_destroy(
SPICE_CONTAINEROF(link, RedChannelClient, channel_link));
}
free(channel);
}
@ -535,8 +548,12 @@ void red_channel_client_shutdown(RedChannelClient *rcc)
void red_channel_shutdown(RedChannel *channel)
{
if (channel->rcc) {
red_channel_client_shutdown(channel->rcc);
RingItem *link;
RingItem *next;
red_printf("%d", channel->clients_num);
RING_FOREACH_SAFE(link, next, &channel->clients) {
red_channel_client_shutdown(SPICE_CONTAINEROF(link, RedChannelClient, channel_link));
}
red_channel_pipes_clear(channel);
}
@ -548,8 +565,11 @@ void red_channel_client_send(RedChannelClient *rcc)
void red_channel_send(RedChannel *channel)
{
if (channel->rcc) {
red_channel_client_send(channel->rcc);
RingItem *link;
RingItem *next;
RING_FOREACH_SAFE(link, next, &channel->clients) {
red_channel_client_send(SPICE_CONTAINEROF(link, RedChannelClient, channel_link));
}
}
@ -586,7 +606,7 @@ void red_channel_client_push(RedChannelClient *rcc)
red_channel_client_send(rcc);
}
if (rcc->send_data.item && !rcc->send_data.blocked) {
if (!red_channel_client_no_item_being_sent(rcc) && !rcc->send_data.blocked) {
rcc->send_data.blocked = TRUE;
red_printf("ERROR: an item waiting to be sent and not blocked");
}
@ -599,10 +619,21 @@ void red_channel_client_push(RedChannelClient *rcc)
void red_channel_push(RedChannel *channel)
{
if (!channel || !channel->rcc) {
RingItem *link;
RingItem *next;
RedChannelClient *rcc;
if (!channel) {
return;
}
red_channel_client_push(channel->rcc);
RING_FOREACH_SAFE(link, next, &channel->clients) {
rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
if (rcc->stream == NULL) {
rcc->channel->disconnect(rcc);
} else {
red_channel_client_push(rcc);
}
}
}
static void red_channel_client_init_outgoing_messages_window(RedChannelClient *rcc)
@ -615,8 +646,12 @@ static void red_channel_client_init_outgoing_messages_window(RedChannelClient *r
// specific
void red_channel_init_outgoing_messages_window(RedChannel *channel)
{
if (channel->rcc) {
red_channel_client_init_outgoing_messages_window(channel->rcc);
RingItem *link;
RingItem *next;
RING_FOREACH_SAFE(link, next, &channel->clients) {
red_channel_client_init_outgoing_messages_window(
SPICE_CONTAINEROF(link, RedChannelClient, channel_link));
}
}
@ -790,8 +825,12 @@ void red_channel_client_pipe_add_type(RedChannelClient *rcc, int pipe_item_type)
void red_channel_pipes_add_type(RedChannel *channel, int pipe_item_type)
{
if (channel->rcc) {
red_channel_client_pipe_add_type(channel->rcc, pipe_item_type);
RingItem *link;
RING_FOREACH(link, &channel->clients) {
red_channel_client_pipe_add_type(
SPICE_CONTAINEROF(link, RedChannelClient, channel_link),
pipe_item_type);
}
}
@ -807,7 +846,18 @@ void red_channel_pipe_item_remove(RedChannel *channel, PipeItem *item)
int red_channel_is_connected(RedChannel *channel)
{
return (channel->rcc != NULL) && red_channel_client_is_connected(channel->rcc);
RingItem *link;
if (!channel || channel->clients_num == 0) {
return FALSE;
}
RING_FOREACH(link, &channel->clients) {
if (red_channel_client_is_connected(
SPICE_CONTAINEROF(link, RedChannelClient, channel_link))) {
return TRUE;
}
}
return FALSE;
}
void red_channel_client_clear_sent_item(RedChannelClient *rcc)
@ -836,10 +886,17 @@ void red_channel_client_pipe_clear(RedChannelClient *rcc)
void red_channel_pipes_clear(RedChannel *channel)
{
if (!channel || !channel->rcc) {
RingItem *link;
RingItem *next;
RedChannelClient *rcc;
if (!channel) {
return;
}
red_channel_client_pipe_clear(channel->rcc);
RING_FOREACH_SAFE(link, next, &channel->clients) {
rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
red_channel_client_pipe_clear(rcc);
}
}
void red_channel_client_ack_zero_messages_window(RedChannelClient *rcc)
@ -856,8 +913,7 @@ static void red_channel_client_remove(RedChannelClient *rcc)
{
ring_remove(&rcc->client_link);
rcc->client->channels_num--;
ASSERT(rcc->channel->rcc == rcc);
rcc->channel->rcc = NULL;
ring_remove(&rcc->channel_link);
rcc->channel->clients_num--;
}
@ -878,46 +934,94 @@ void red_channel_client_disconnect(RedChannelClient *rcc)
void red_channel_disconnect(RedChannel *channel)
{
RingItem *link;
RingItem *next;
red_channel_pipes_clear(channel);
if (channel->rcc) {
red_channel_client_disconnect(channel->rcc);
RING_FOREACH_SAFE(link, next, &channel->clients) {
red_channel_client_disconnect(
SPICE_CONTAINEROF(link, RedChannelClient, channel_link));
}
}
int red_channel_all_clients_serials_are_zero(RedChannel *channel)
{
return (!channel->rcc || channel->rcc->send_data.serial == 0);
RingItem *link;
RedChannelClient *rcc;
RING_FOREACH(link, &channel->clients) {
rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
if (rcc->send_data.serial != 0) {
return FALSE;
}
}
return TRUE;
}
void red_channel_apply_clients(RedChannel *channel, channel_client_callback cb)
{
if (channel->rcc) {
cb(channel->rcc);
RingItem *link;
RingItem *next;
RedChannelClient *rcc;
RING_FOREACH_SAFE(link, next, &channel->clients) {
rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
cb(rcc);
}
}
void red_channel_apply_clients_data(RedChannel *channel, channel_client_callback_data cb, void *data)
{
if (channel->rcc) {
cb(channel->rcc, data);
RingItem *link;
RingItem *next;
RedChannelClient *rcc;
RING_FOREACH_SAFE(link, next, &channel->clients) {
rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
cb(rcc, data);
}
}
void red_channel_set_shut(RedChannel *channel)
{
if (channel->rcc) {
channel->rcc->incoming.shut = TRUE;
RingItem *link;
RedChannelClient *rcc;
RING_FOREACH(link, &channel->clients) {
rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
rcc->incoming.shut = TRUE;
}
}
int red_channel_all_blocked(RedChannel *channel)
{
return !channel || !channel->rcc || channel->rcc->send_data.blocked;
RingItem *link;
RedChannelClient *rcc;
if (!channel || channel->clients_num == 0) {
return FALSE;
}
RING_FOREACH(link, &channel->clients) {
rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
if (!rcc->send_data.blocked) {
return FALSE;
}
}
return TRUE;
}
int red_channel_any_blocked(RedChannel *channel)
{
return !channel || !channel->rcc || channel->rcc->send_data.blocked;
RingItem *link;
RedChannelClient *rcc;
RING_FOREACH(link, &channel->clients) {
rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
if (rcc->send_data.blocked) {
return TRUE;
}
}
return FALSE;
}
int red_channel_client_blocked(RedChannelClient *rcc)
@ -954,10 +1058,11 @@ SpiceDataHeader *red_channel_client_get_header(RedChannelClient *rcc)
int red_channel_get_first_socket(RedChannel *channel)
{
if (!channel->rcc || !channel->rcc->stream) {
if (!channel || channel->clients_num == 0) {
return -1;
}
return channel->rcc->stream->socket;
return SPICE_CONTAINEROF(ring_get_head(&channel->clients),
RedChannelClient, channel_link)->stream->socket;
}
int red_channel_client_item_being_sent(RedChannelClient *rcc, PipeItem *item)
@ -967,12 +1072,30 @@ int red_channel_client_item_being_sent(RedChannelClient *rcc, PipeItem *item)
int red_channel_item_being_sent(RedChannel *channel, PipeItem *item)
{
return channel->rcc && red_channel_client_item_being_sent(channel->rcc, item);
RingItem *link;
RedChannelClient *rcc;
RING_FOREACH(link, &channel->clients) {
rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
if (rcc->send_data.item == item) {
return TRUE;
}
}
return FALSE;
}
int red_channel_no_item_being_sent(RedChannel *channel)
{
return !channel->rcc || red_channel_client_no_item_being_sent(channel->rcc);
RingItem *link;
RedChannelClient *rcc;
RING_FOREACH(link, &channel->clients) {
rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
if (!red_channel_client_no_item_being_sent(rcc)) {
return FALSE;
}
}
return TRUE;
}
int red_channel_client_no_item_being_sent(RedChannelClient *rcc)
@ -1073,16 +1196,17 @@ static void red_channel_pipes_create_batch(RedChannel *channel,
new_pipe_item_t creator, void *data,
rcc_item_t callback)
{
RingItem *link;
RedChannelClient *rcc;
PipeItem *item;
int num = 0;
if (!(rcc = channel->rcc)) {
return;
}
item = (*creator)(rcc, data, num++);
if (callback) {
(*callback)(rcc, item);
RING_FOREACH(link, &channel->clients) {
rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
item = (*creator)(rcc, data, num++);
if (callback) {
(*callback)(rcc, item);
}
}
}
@ -1108,15 +1232,39 @@ void red_channel_pipes_new_add_tail(RedChannel *channel, new_pipe_item_t creator
uint32_t red_channel_max_pipe_size(RedChannel *channel)
{
return channel->rcc ? channel->rcc->pipe_size : 0;
RingItem *link;
RedChannelClient *rcc;
uint32_t pipe_size = 0;
RING_FOREACH(link, &channel->clients) {
rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
pipe_size = pipe_size > rcc->pipe_size ? pipe_size : rcc->pipe_size;
}
return pipe_size;
}
uint32_t red_channel_min_pipe_size(RedChannel *channel)
{
return channel->rcc ? channel->rcc->pipe_size : 0;
RingItem *link;
RedChannelClient *rcc;
uint32_t pipe_size = ~0;
RING_FOREACH(link, &channel->clients) {
rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
pipe_size = pipe_size < rcc->pipe_size ? pipe_size : rcc->pipe_size;
}
return pipe_size == ~0 ? 0 : pipe_size;
}
uint32_t red_channel_sum_pipes_size(RedChannel *channel)
{
return channel->rcc ? channel->rcc->pipe_size : 0;
RingItem *link;
RedChannelClient *rcc;
uint32_t sum = 0;
RING_FOREACH(link, &channel->clients) {
rcc = SPICE_CONTAINEROF(link, RedChannelClient, channel_link);
sum += rcc->pipe_size;
}
return sum;
}

View File

@ -173,7 +173,7 @@ struct RedChannel {
int migrate;
int handle_acks;
RedChannelClient *rcc;
Ring clients;
uint32_t clients_num;
OutgoingHandlerInterface outgoing_cb;
@ -200,7 +200,7 @@ struct RedChannel {
};
/* if one of the callbacks should cause disconnect, use red_channel_shutdown and don't
explicitly destroy the channel */
* explicitly destroy the channel */
RedChannel *red_channel_create(int size,
SpiceCoreInterface *core,
int migrate, int handle_acks,
@ -235,6 +235,7 @@ RedChannel *red_channel_create_parser(int size,
channel_handle_migrate_flush_mark_proc handle_migrate_flush_mark,
channel_handle_migrate_data_proc handle_migrate_data,
channel_handle_migrate_data_get_serial_proc handle_migrate_data_get_serial);
RedChannelClient *red_channel_client_create(int size, RedChannel *channel, RedClient *client,
RedsStream *stream);
@ -314,8 +315,10 @@ int red_channel_client_send_message_pending(RedChannelClient *rcc);
/* returns TRUE if item is being sent by one of the channel clients. This will
* be true if someone called init_send_data but send has not completed (or perhaps
* hasn't even begun, i.e. no one called begin_send_)
* */
* hasn't even begun, i.e. no one called begin_send_).
* However, note that red_channel_client_init_send_data can also be called with
* item==NULL, thus not all pipe items can be tracked.
*/
int red_channel_item_being_sent(RedChannel *channel, PipeItem *item);
int red_channel_client_item_being_sent(RedChannelClient *rcc, PipeItem *item);
@ -387,8 +390,10 @@ struct RedClient {
};
RedClient *red_client_new();
void red_client_destroy(RedClient *client);
void red_client_set_main(RedClient *client, MainChannelClient *mcc);
MainChannelClient *red_client_get_main(RedClient *client);
void red_client_set_main(RedClient *client, MainChannelClient *mcc);
void red_client_destroy(RedClient *client);
void red_client_disconnect(RedClient *client);
void red_client_remove_channel(RedClient *client, RedChannelClient *rcc);
#endif

File diff suppressed because it is too large Load Diff