mirror of
https://gitlab.uni-freiburg.de/opensourcevdi/spice
synced 2025-12-31 20:04:09 +00:00
server: introduce dispatcher
used for main_dispatcher only in this patch. Dispatcher is meant to be used for Main<->any low frequency messages. It's interface is meant to include the red_dispatcher usage: fixed size messages per message type some messages require an ack Some methods are added to be used by RedDispatcher later: dispatcher_handle_read - to be called directly by RedDispatcher epoll based loop dispatcher_set_opaque - to be set from red_worker pthread dispatcher_init - allow NULL core as used by red_worker Read and Write behavior: Sender: blocking write, blocking read for ack (if any). Reader: poll for any data, if such then blocking read for a message_type and following message. repeat until poll returns with no pending data to read. FDO Bugzilla: 42463
This commit is contained in:
parent
9174b67160
commit
776bdd6c95
@ -78,6 +78,8 @@ libspice_server_la_SOURCES = \
|
||||
red_client_cache.h \
|
||||
red_client_shared_cache.h \
|
||||
red_common.h \
|
||||
dispatcher.c \
|
||||
dispatcher.h \
|
||||
red_dispatcher.c \
|
||||
red_dispatcher.h \
|
||||
main_dispatcher.c \
|
||||
|
||||
248
server/dispatcher.c
Normal file
248
server/dispatcher.c
Normal file
@ -0,0 +1,248 @@
|
||||
#include <unistd.h>
|
||||
#include <errno.h>
|
||||
#include <assert.h>
|
||||
#include <string.h>
|
||||
#include <pthread.h>
|
||||
#include <fcntl.h>
|
||||
#include <poll.h>
|
||||
|
||||
#include "mem.h"
|
||||
#include "spice_common.h"
|
||||
#include "dispatcher.h"
|
||||
|
||||
#define DISPATCHER_DEBUG_PRINTF(level, ...) \
|
||||
red_printf_debug(level, "DISP", ##__VA_ARGS__)
|
||||
|
||||
//#define DEBUG_DISPATCHER
|
||||
|
||||
#ifdef DEBUG_DISPATCHER
|
||||
#include <signal.h>
|
||||
#endif
|
||||
|
||||
#define ACK 0xffffffff
|
||||
|
||||
/*
|
||||
* read_safe
|
||||
* helper. reads until size bytes accumulated in buf, if an error other then
|
||||
* EINTR is encountered returns -1, otherwise returns 0.
|
||||
* @block if 1 the read will block (the fd is always blocking).
|
||||
* if 0 poll first, return immediately if no bytes available, otherwise
|
||||
* read size in blocking mode.
|
||||
*/
|
||||
static int read_safe(int fd, void *buf, size_t size, int block)
|
||||
{
|
||||
int read_size = 0;
|
||||
int ret;
|
||||
struct pollfd pollfd = {.fd = fd, .events = POLLIN, .revents = 0};
|
||||
|
||||
if (size == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (!block) {
|
||||
while ((ret = poll(&pollfd, 1, 0)) == -1) {
|
||||
if (errno == EINTR) {
|
||||
DISPATCHER_DEBUG_PRINTF(3, "EINTR in poll");
|
||||
continue;
|
||||
}
|
||||
red_error("poll failed");
|
||||
return -1;
|
||||
}
|
||||
if (!(pollfd.revents & POLLIN)) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
while (read_size < size) {
|
||||
ret = read(fd, buf + read_size, size - read_size);
|
||||
if (ret == -1) {
|
||||
if (errno == EINTR) {
|
||||
DISPATCHER_DEBUG_PRINTF(3, "EINTR in read");
|
||||
continue;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
if (ret == 0) {
|
||||
red_error("broken pipe on read");
|
||||
return -1;
|
||||
}
|
||||
read_size += ret;
|
||||
}
|
||||
return read_size;
|
||||
}
|
||||
|
||||
/*
|
||||
* write_safe
|
||||
* @return -1 for error, otherwise number of written bytes. may be zero.
|
||||
*/
|
||||
static int write_safe(int fd, void *buf, size_t size)
|
||||
{
|
||||
int written_size = 0;
|
||||
int ret;
|
||||
|
||||
while (written_size < size) {
|
||||
ret = write(fd, buf + written_size, size - written_size);
|
||||
if (ret == -1) {
|
||||
if (errno != EINTR) {
|
||||
DISPATCHER_DEBUG_PRINTF(3, "EINTR in write\n");
|
||||
return -1;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
written_size += ret;
|
||||
}
|
||||
return written_size;
|
||||
}
|
||||
|
||||
static int dispatcher_handle_single_read(Dispatcher *dispatcher)
|
||||
{
|
||||
int ret;
|
||||
uint32_t type;
|
||||
DispatcherMessage *msg = NULL;
|
||||
uint8_t *payload = dispatcher->payload;
|
||||
uint32_t ack = ACK;
|
||||
|
||||
if ((ret = read_safe(dispatcher->recv_fd, &type, sizeof(type), 0)) == -1) {
|
||||
red_printf("error reading from dispatcher: %d\n", errno);
|
||||
return 0;
|
||||
}
|
||||
if (ret == 0) {
|
||||
/* no messsage */
|
||||
return 0;
|
||||
}
|
||||
msg = &dispatcher->messages[type];
|
||||
if (read_safe(dispatcher->recv_fd, payload, msg->size, 1) == -1) {
|
||||
red_printf("error reading from dispatcher: %d\n", errno);
|
||||
/* TODO: close socketpair? */
|
||||
return 0;
|
||||
}
|
||||
if (msg->handler) {
|
||||
msg->handler(dispatcher->opaque, (void *)payload);
|
||||
} else {
|
||||
red_printf("error: no handler for message type %d\n", type);
|
||||
}
|
||||
if (msg->ack && write_safe(dispatcher->recv_fd,
|
||||
&ack, sizeof(ack)) == -1) {
|
||||
red_printf("error writing ack for message %d\n", type);
|
||||
/* TODO: close socketpair? */
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
/*
|
||||
* dispatcher_handle_recv_read
|
||||
* doesn't handle being in the middle of a message. all reads are blocking.
|
||||
*/
|
||||
void dispatcher_handle_recv_read(Dispatcher *dispatcher)
|
||||
{
|
||||
while (dispatcher_handle_single_read(dispatcher)) {
|
||||
}
|
||||
}
|
||||
|
||||
void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
|
||||
void *payload)
|
||||
{
|
||||
DispatcherMessage *msg;
|
||||
uint32_t ack;
|
||||
int send_fd = dispatcher->send_fd;
|
||||
|
||||
assert(dispatcher->max_message_type > message_type);
|
||||
assert(dispatcher->messages[message_type].handler);
|
||||
msg = &dispatcher->messages[message_type];
|
||||
pthread_mutex_lock(&dispatcher->lock);
|
||||
if (write_safe(send_fd, &message_type, sizeof(message_type)) == -1) {
|
||||
red_printf("error: failed to send message type for message %d\n",
|
||||
message_type);
|
||||
goto unlock;
|
||||
}
|
||||
if (write_safe(send_fd, payload, msg->size) == -1) {
|
||||
red_printf("error: failed to send message body for message %d\n",
|
||||
message_type);
|
||||
goto unlock;
|
||||
}
|
||||
if (msg->ack) {
|
||||
if (read_safe(send_fd, &ack, sizeof(ack), 1) == -1) {
|
||||
red_printf("error: failed to read ack");
|
||||
} else if (ack != ACK) {
|
||||
red_printf("error: got wrong ack value in dispatcher "
|
||||
"for message %d\n", message_type);
|
||||
/* TODO handling error? */
|
||||
}
|
||||
}
|
||||
unlock:
|
||||
pthread_mutex_unlock(&dispatcher->lock);
|
||||
}
|
||||
|
||||
void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
|
||||
dispatcher_handle_message handler, size_t size,
|
||||
int ack)
|
||||
{
|
||||
DispatcherMessage *msg;
|
||||
|
||||
assert(message_type < dispatcher->max_message_type);
|
||||
assert(dispatcher->messages[message_type].handler == 0);
|
||||
msg = &dispatcher->messages[message_type];
|
||||
msg->handler = handler;
|
||||
msg->size = size;
|
||||
msg->ack = ack;
|
||||
if (msg->size > dispatcher->payload_size) {
|
||||
dispatcher->payload = realloc(dispatcher->payload, msg->size);
|
||||
dispatcher->payload_size = msg->size;
|
||||
}
|
||||
}
|
||||
|
||||
#ifdef DEBUG_DISPATCHER
|
||||
static void dummy_handler(int bla)
|
||||
{
|
||||
}
|
||||
|
||||
static void setup_dummy_signal_handler(void)
|
||||
{
|
||||
static int inited = 0;
|
||||
struct sigaction act = {
|
||||
.sa_handler = &dummy_handler,
|
||||
};
|
||||
if (inited) {
|
||||
return;
|
||||
}
|
||||
inited = 1;
|
||||
/* handle SIGRTMIN+10 in order to test the loops for EINTR */
|
||||
if (sigaction(SIGRTMIN + 10, &act, NULL) == -1) {
|
||||
fprintf(stderr,
|
||||
"failed to set dummy sigaction for DEBUG_DISPATCHER\n");
|
||||
exit(1);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
void dispatcher_init(Dispatcher *dispatcher, size_t max_message_type,
|
||||
void *opaque)
|
||||
{
|
||||
int channels[2];
|
||||
|
||||
#ifdef DEBUG_DISPATCHER
|
||||
setup_dummy_signal_handler();
|
||||
#endif
|
||||
dispatcher->opaque = opaque;
|
||||
if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
|
||||
red_error("socketpair failed %s", strerror(errno));
|
||||
return;
|
||||
}
|
||||
pthread_mutex_init(&dispatcher->lock, NULL);
|
||||
dispatcher->recv_fd = channels[0];
|
||||
dispatcher->send_fd = channels[1];
|
||||
dispatcher->self = pthread_self();
|
||||
|
||||
dispatcher->messages = spice_malloc0_n(max_message_type,
|
||||
sizeof(dispatcher->messages[0]));
|
||||
dispatcher->max_message_type = max_message_type;
|
||||
}
|
||||
|
||||
void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque)
|
||||
{
|
||||
dispatcher->opaque = opaque;
|
||||
}
|
||||
|
||||
int dispatcher_get_recv_fd(Dispatcher *dispatcher)
|
||||
{
|
||||
return dispatcher->recv_fd;
|
||||
}
|
||||
81
server/dispatcher.h
Normal file
81
server/dispatcher.h
Normal file
@ -0,0 +1,81 @@
|
||||
#ifndef MAIN_DISPATCHER_H
|
||||
#define MAIN_DISPATCHER_H
|
||||
|
||||
#include <spice.h>
|
||||
|
||||
typedef struct Dispatcher Dispatcher;
|
||||
|
||||
typedef void (*dispatcher_handle_message)(void *opaque,
|
||||
void *payload);
|
||||
|
||||
typedef struct DispatcherMessage {
|
||||
size_t size;
|
||||
int ack;
|
||||
dispatcher_handle_message handler;
|
||||
} DispatcherMessage;
|
||||
|
||||
struct Dispatcher {
|
||||
SpiceCoreInterface *recv_core;
|
||||
int recv_fd;
|
||||
int send_fd;
|
||||
pthread_t self;
|
||||
pthread_mutex_t lock;
|
||||
DispatcherMessage *messages;
|
||||
int stage; /* message parser stage - sender has no stages */
|
||||
size_t max_message_type;
|
||||
void *payload; /* allocated as max of message sizes */
|
||||
size_t payload_size; /* used to track realloc calls */
|
||||
void *opaque;
|
||||
};
|
||||
|
||||
/*
|
||||
* dispatcher_send_message
|
||||
* @message_type: message type
|
||||
* @payload: payload
|
||||
*/
|
||||
void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
|
||||
void *payload);
|
||||
|
||||
/*
|
||||
* dispatcher_init
|
||||
* @max_message_type: number of message types. Allows upfront allocation
|
||||
* of a DispatcherMessage list.
|
||||
* up front, and registration in any order wanted.
|
||||
*/
|
||||
void dispatcher_init(Dispatcher *dispatcher, size_t max_message_type,
|
||||
void *opaque);
|
||||
|
||||
/*
|
||||
* dispatcher_register_handler
|
||||
* @dispatcher: dispatcher
|
||||
* @messsage_type: message type
|
||||
* @handler: message handler
|
||||
* @size: message size. Each type has a fixed associated size.
|
||||
* @ack: send an ack. This is per message type - you can't send the
|
||||
* same message type with and without. Register two different
|
||||
* messages if that is what you want.
|
||||
*/
|
||||
void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
|
||||
dispatcher_handle_message handler, size_t size,
|
||||
int ack);
|
||||
|
||||
/*
|
||||
* dispatcher_handle_recv_read
|
||||
* @dispatcher: Dispatcher instance
|
||||
*/
|
||||
void dispatcher_handle_recv_read(Dispatcher *);
|
||||
|
||||
/*
|
||||
* dispatcher_get_recv_fd
|
||||
* @return: receive file descriptor of the dispatcher
|
||||
*/
|
||||
int dispatcher_get_recv_fd(Dispatcher *);
|
||||
|
||||
/*
|
||||
* dispatcher_set_opaque
|
||||
* @dispatcher: Dispatcher instance
|
||||
* @opaque: opaque to use for callbacks
|
||||
*/
|
||||
void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque);
|
||||
|
||||
#endif //MAIN_DISPATCHER_H
|
||||
@ -5,6 +5,7 @@
|
||||
#include <assert.h>
|
||||
|
||||
#include "red_common.h"
|
||||
#include "dispatcher.h"
|
||||
#include "main_dispatcher.h"
|
||||
|
||||
/*
|
||||
@ -28,11 +29,8 @@
|
||||
*/
|
||||
|
||||
typedef struct {
|
||||
Dispatcher base;
|
||||
SpiceCoreInterface *core;
|
||||
int main_fd;
|
||||
int other_fd;
|
||||
pthread_t self;
|
||||
pthread_mutex_t lock;
|
||||
} MainDispatcher;
|
||||
|
||||
MainDispatcher main_dispatcher;
|
||||
@ -43,15 +41,10 @@ enum {
|
||||
MAIN_DISPATCHER_NUM_MESSAGES
|
||||
};
|
||||
|
||||
typedef struct MainDispatcherMessage {
|
||||
uint32_t type;
|
||||
union {
|
||||
struct {
|
||||
int event;
|
||||
SpiceChannelEventInfo *info;
|
||||
} channel_event;
|
||||
} data;
|
||||
} MainDispatcherMessage;
|
||||
typedef struct MainDispatcherChannelEventMessage {
|
||||
int event;
|
||||
SpiceChannelEventInfo *info;
|
||||
} MainDispatcherChannelEventMessage;
|
||||
|
||||
/* channel_event - calls core->channel_event, must be done in main thread */
|
||||
static void main_dispatcher_self_handle_channel_event(
|
||||
@ -61,85 +54,44 @@ static void main_dispatcher_self_handle_channel_event(
|
||||
main_dispatcher.core->channel_event(event, info);
|
||||
}
|
||||
|
||||
static void main_dispatcher_handle_channel_event(MainDispatcherMessage *msg)
|
||||
static void main_dispatcher_handle_channel_event(void *opaque,
|
||||
void *payload)
|
||||
{
|
||||
main_dispatcher_self_handle_channel_event(msg->data.channel_event.event,
|
||||
msg->data.channel_event.info);
|
||||
MainDispatcherChannelEventMessage *channel_event = payload;
|
||||
|
||||
main_dispatcher_self_handle_channel_event(channel_event->event,
|
||||
channel_event->info);
|
||||
}
|
||||
|
||||
void main_dispatcher_channel_event(int event, SpiceChannelEventInfo *info)
|
||||
{
|
||||
MainDispatcherMessage msg;
|
||||
ssize_t written = 0;
|
||||
ssize_t ret;
|
||||
MainDispatcherChannelEventMessage msg;
|
||||
|
||||
if (pthread_self() == main_dispatcher.self) {
|
||||
if (pthread_self() == main_dispatcher.base.self) {
|
||||
main_dispatcher_self_handle_channel_event(event, info);
|
||||
return;
|
||||
}
|
||||
msg.type = MAIN_DISPATCHER_CHANNEL_EVENT;
|
||||
msg.data.channel_event.event = event;
|
||||
msg.data.channel_event.info = info;
|
||||
pthread_mutex_lock(&main_dispatcher.lock);
|
||||
while (written < sizeof(msg)) {
|
||||
ret = write(main_dispatcher.other_fd, &msg + written,
|
||||
sizeof(msg) - written);
|
||||
if (ret == -1) {
|
||||
assert(errno == EINTR);
|
||||
continue;
|
||||
}
|
||||
written += ret;
|
||||
}
|
||||
pthread_mutex_unlock(&main_dispatcher.lock);
|
||||
msg.event = event;
|
||||
msg.info = info;
|
||||
dispatcher_send_message(&main_dispatcher.base, MAIN_DISPATCHER_CHANNEL_EVENT,
|
||||
&msg);
|
||||
}
|
||||
|
||||
|
||||
static void main_dispatcher_handle_read(int fd, int event, void *opaque)
|
||||
static void dispatcher_handle_read(int fd, int event, void *opaque)
|
||||
{
|
||||
int ret;
|
||||
MainDispatcher *md = opaque;
|
||||
MainDispatcherMessage msg;
|
||||
int read_size = 0;
|
||||
Dispatcher *dispatcher = opaque;
|
||||
|
||||
while (read_size < sizeof(msg)) {
|
||||
/* blocks until sizeof(msg) is read */
|
||||
ret = read(md->main_fd, &msg + read_size, sizeof(msg) - read_size);
|
||||
if (ret == -1) {
|
||||
if (errno != EINTR) {
|
||||
red_printf("error reading from main dispatcher: %d\n", errno);
|
||||
/* TODO: close channel? */
|
||||
return;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
read_size += ret;
|
||||
}
|
||||
switch (msg.type) {
|
||||
case MAIN_DISPATCHER_CHANNEL_EVENT:
|
||||
main_dispatcher_handle_channel_event(&msg);
|
||||
break;
|
||||
default:
|
||||
red_printf("error: unhandled main dispatcher message type %d\n",
|
||||
msg.type);
|
||||
}
|
||||
dispatcher_handle_recv_read(dispatcher);
|
||||
}
|
||||
|
||||
void main_dispatcher_init(SpiceCoreInterface *core)
|
||||
{
|
||||
int channels[2];
|
||||
|
||||
memset(&main_dispatcher, 0, sizeof(main_dispatcher));
|
||||
main_dispatcher.core = core;
|
||||
|
||||
if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
|
||||
red_error("socketpair failed %s", strerror(errno));
|
||||
return;
|
||||
}
|
||||
pthread_mutex_init(&main_dispatcher.lock, NULL);
|
||||
main_dispatcher.main_fd = channels[0];
|
||||
main_dispatcher.other_fd = channels[1];
|
||||
main_dispatcher.self = pthread_self();
|
||||
|
||||
core->watch_add(main_dispatcher.main_fd, SPICE_WATCH_EVENT_READ,
|
||||
main_dispatcher_handle_read, &main_dispatcher);
|
||||
dispatcher_init(&main_dispatcher.base, MAIN_DISPATCHER_NUM_MESSAGES, &main_dispatcher.base);
|
||||
core->watch_add(main_dispatcher.base.recv_fd, SPICE_WATCH_EVENT_READ,
|
||||
dispatcher_handle_read, &main_dispatcher.base);
|
||||
dispatcher_register_handler(&main_dispatcher.base, MAIN_DISPATCHER_CHANNEL_EVENT,
|
||||
main_dispatcher_handle_channel_event,
|
||||
sizeof(MainDispatcherChannelEventMessage), 0 /* no ack */);
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user