mirror of
https://gitlab.uni-freiburg.de/opensourcevdi/spice
synced 2026-01-08 21:14:11 +00:00
server/red_worker: introduce red_peer_handle_outgoing and OutgoingHandler
From red_channel.
This commit is contained in:
parent
29a7bcd596
commit
73858b93dc
@ -354,6 +354,31 @@ typedef void (*channel_send_pipe_item_proc)(RedChannel *channel, PipeItem *item)
|
||||
typedef void (*channel_release_pipe_item_proc)(RedChannel *channel, PipeItem *item, int item_pushed);
|
||||
typedef int (*channel_handle_parsed_proc)(RedChannel *channel, uint32_t size, uint16_t type, void *message);
|
||||
|
||||
#define MAX_SEND_VEC 100
|
||||
|
||||
typedef int (*get_outgoing_msg_size_proc)(void *opaque);
|
||||
typedef void (*prepare_outgoing_proc)(void *opaque, struct iovec *vec, int *vec_size, int pos);
|
||||
typedef void (*on_outgoing_error_proc)(void *opaque);
|
||||
typedef void (*on_outgoing_block_proc)(void *opaque);
|
||||
typedef void (*on_outgoing_msg_done_proc)(void *opaque);
|
||||
|
||||
typedef struct OutgoingHandler {
|
||||
void *opaque;
|
||||
struct iovec vec_buf[MAX_SEND_VEC];
|
||||
int vec_size;
|
||||
struct iovec *vec;
|
||||
int pos;
|
||||
int size;
|
||||
get_outgoing_msg_size_proc get_msg_size;
|
||||
prepare_outgoing_proc prepare;
|
||||
on_outgoing_error_proc on_error;
|
||||
on_outgoing_block_proc on_block;
|
||||
on_outgoing_msg_done_proc on_msg_done;
|
||||
#ifdef RED_STATISTICS
|
||||
uint64_t *out_bytes_counter;
|
||||
#endif
|
||||
} OutgoingHandler;
|
||||
|
||||
struct RedChannel {
|
||||
spice_parse_channel_func_t parser;
|
||||
RedsStream *stream;
|
||||
@ -386,6 +411,8 @@ struct RedChannel {
|
||||
uint8_t *end;
|
||||
} incoming;
|
||||
|
||||
OutgoingHandler outgoing;
|
||||
|
||||
channel_disconnect_proc disconnect;
|
||||
channel_hold_pipe_item_proc hold_item;
|
||||
channel_release_pipe_item_proc release_item;
|
||||
@ -393,12 +420,6 @@ struct RedChannel {
|
||||
channel_send_pipe_item_proc send_item;
|
||||
|
||||
int during_send;
|
||||
|
||||
#ifdef RED_STATISTICS
|
||||
struct {
|
||||
uint64_t *out_bytes_counter;
|
||||
} outgoing;
|
||||
#endif
|
||||
};
|
||||
|
||||
typedef struct ImageItem {
|
||||
@ -7277,8 +7298,6 @@ static inline void red_send_qxl_drawable(RedWorker *worker, DisplayChannel *disp
|
||||
display_begin_send_message(display_channel);
|
||||
}
|
||||
|
||||
#define MAX_SEND_VEC 100
|
||||
|
||||
static void inline channel_release_res(RedChannel *channel)
|
||||
{
|
||||
if (!channel->send_data.item) {
|
||||
@ -7288,47 +7307,83 @@ static void inline channel_release_res(RedChannel *channel)
|
||||
channel->send_data.item = NULL;
|
||||
}
|
||||
|
||||
static void red_channel_send(RedChannel *channel)
|
||||
static void red_channel_peer_prepare_out_msg(void *opaque, struct iovec *vec,
|
||||
int *vec_size, int pos)
|
||||
{
|
||||
for (;;) {
|
||||
ssize_t n = channel->send_data.size - channel->send_data.pos;
|
||||
struct iovec vec[MAX_SEND_VEC];
|
||||
size_t vec_size;
|
||||
RedChannel *channel = (RedChannel *)opaque;
|
||||
|
||||
if (!n) {
|
||||
channel->send_data.blocked = FALSE;
|
||||
if (channel->send_data.item) {
|
||||
channel->release_item(channel, channel->send_data.item, FALSE);
|
||||
channel->send_data.item = NULL;
|
||||
}
|
||||
break;
|
||||
*vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller,
|
||||
vec, MAX_SEND_VEC, pos);
|
||||
}
|
||||
|
||||
static void red_channel_peer_on_out_block(void *opaque)
|
||||
{
|
||||
RedChannel *channel = (RedChannel *)opaque;
|
||||
|
||||
channel->send_data.blocked = TRUE;
|
||||
}
|
||||
|
||||
static void red_channel_peer_on_out_msg_done(void *opaque)
|
||||
{
|
||||
RedChannel *channel = (RedChannel *)opaque;
|
||||
|
||||
channel->send_data.size = 0;
|
||||
if (channel->send_data.item) {
|
||||
channel->release_item(channel, channel->send_data.item, TRUE);
|
||||
channel->send_data.item = NULL;
|
||||
}
|
||||
channel->send_data.blocked = FALSE;
|
||||
}
|
||||
|
||||
static void red_peer_handle_outgoing(RedsStream *stream, OutgoingHandler *handler)
|
||||
{
|
||||
int n;
|
||||
|
||||
ASSERT(stream);
|
||||
if (handler->size == 0) {
|
||||
handler->vec = handler->vec_buf;
|
||||
handler->size = handler->get_msg_size(handler->opaque);
|
||||
if (!handler->size) { // nothing to be sent
|
||||
return;
|
||||
}
|
||||
vec_size = spice_marshaller_fill_iovec(channel->send_data.marshaller,
|
||||
vec, MAX_SEND_VEC, channel->send_data.pos);
|
||||
ASSERT(channel->stream);
|
||||
n = reds_stream_writev(channel->stream, vec, vec_size);
|
||||
}
|
||||
for (;;) {
|
||||
handler->prepare(handler->opaque, handler->vec, &handler->vec_size, handler->pos);
|
||||
n = reds_stream_writev(stream, handler->vec, handler->vec_size);
|
||||
if (n == -1) {
|
||||
switch (errno) {
|
||||
case EAGAIN:
|
||||
channel->send_data.blocked = TRUE;
|
||||
handler->on_block(handler->opaque);
|
||||
return;
|
||||
case EINTR:
|
||||
break;
|
||||
case EPIPE:
|
||||
channel->disconnect(channel);
|
||||
handler->on_error(handler->opaque);
|
||||
return;
|
||||
default:
|
||||
red_printf("%s", strerror(errno));
|
||||
channel->disconnect(channel);
|
||||
handler->on_error(handler->opaque);
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
channel->send_data.pos += n;
|
||||
stat_inc_counter(channel->outgoing.out_bytes_counter, n);
|
||||
handler->pos += n;
|
||||
stat_inc_counter(handler->out_bytes_counter, n);
|
||||
if (handler->pos == handler->size) { // finished writing data
|
||||
handler->on_msg_done(handler->opaque);
|
||||
handler->vec = handler->vec_buf;
|
||||
handler->pos = 0;
|
||||
handler->size = 0;
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void red_channel_send(RedChannel *channel)
|
||||
{
|
||||
red_peer_handle_outgoing(channel->stream, &channel->outgoing);
|
||||
}
|
||||
|
||||
static void display_channel_push_release(DisplayChannel *channel, uint8_t type, uint64_t id,
|
||||
uint64_t* sync_data)
|
||||
{
|
||||
@ -8244,16 +8299,17 @@ static void red_send_surface_destroy(DisplayChannel *display, uint32_t surface_i
|
||||
red_channel_begin_send_message(channel);
|
||||
}
|
||||
|
||||
static inline int red_channel_waiting_for_ack(RedChannel *channel)
|
||||
{
|
||||
return (channel->ack_data.messages_window > channel->ack_data.client_window * 2);
|
||||
}
|
||||
|
||||
static inline PipeItem *red_channel_pipe_get(RedChannel *channel)
|
||||
{
|
||||
PipeItem *item;
|
||||
if (!channel || channel->send_data.blocked ||
|
||||
!(item = (PipeItem *)ring_get_tail(&channel->pipe))) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (channel->ack_data.messages_window > channel->ack_data.client_window * 2) {
|
||||
channel->send_data.blocked = TRUE;
|
||||
red_channel_waiting_for_ack(channel) ||
|
||||
!(item = (PipeItem *)ring_get_tail(&channel->pipe))) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -9336,6 +9392,18 @@ static void free_common_channel_from_listener(EventListener *ctx)
|
||||
free(common);
|
||||
}
|
||||
|
||||
static void red_channel_default_peer_on_error(RedChannel *channel)
|
||||
{
|
||||
channel->disconnect(channel);
|
||||
}
|
||||
|
||||
static int red_channel_peer_get_out_msg_size(void *opaque)
|
||||
{
|
||||
RedChannel *channel = (RedChannel *)opaque;
|
||||
|
||||
return channel->send_data.size;
|
||||
}
|
||||
|
||||
static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_id,
|
||||
RedsStream *stream, int migrate,
|
||||
event_listener_action_proc handler,
|
||||
@ -9380,6 +9448,17 @@ static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_i
|
||||
ring_init(&channel->pipe);
|
||||
channel->send_data.marshaller = spice_marshaller_new();
|
||||
|
||||
channel->outgoing.opaque = channel;
|
||||
channel->outgoing.pos = 0;
|
||||
channel->outgoing.size = 0;
|
||||
channel->outgoing.out_bytes_counter = 0;
|
||||
|
||||
channel->outgoing.get_msg_size = red_channel_peer_get_out_msg_size;
|
||||
channel->outgoing.prepare = red_channel_peer_prepare_out_msg;
|
||||
channel->outgoing.on_block = red_channel_peer_on_out_block;
|
||||
channel->outgoing.on_error = (on_outgoing_error_proc)red_channel_default_peer_on_error;
|
||||
channel->outgoing.on_msg_done = red_channel_peer_on_out_msg_done;
|
||||
|
||||
event.events = EPOLLIN | EPOLLOUT | EPOLLET;
|
||||
event.data.ptr = &common->listener;
|
||||
if (epoll_ctl(worker->epoll, EPOLL_CTL_ADD, stream->socket, &event) == -1) {
|
||||
|
||||
Loading…
Reference in New Issue
Block a user