Remove GObject from Dispatcher hierarchy

Signed-off-by: Frediano Ziglio <fziglio@redhat.com>
This commit is contained in:
Frediano Ziglio 2020-03-05 08:27:57 +00:00 committed by Frediano Ziglio
parent f680cc7870
commit f4aefa728e
13 changed files with 416 additions and 706 deletions

View File

@ -36,136 +36,60 @@
* Is also packaged to not leave holes in both 32 and 64 environments
* so memory instrumentation tools should not find uninitialised bytes.
*/
typedef struct DispatcherMessage {
struct DispatcherMessage {
dispatcher_handle_message handler;
uint32_t size;
uint32_t type:31;
uint32_t ack:1;
} DispatcherMessage;
};
struct DispatcherPrivate {
SPICE_CXX_GLIB_ALLOCATOR
DispatcherPrivate(uint32_t max_message_type):
max_message_type(max_message_type)
{
}
~DispatcherPrivate();
int recv_fd;
int send_fd;
pthread_mutex_t lock;
DispatcherMessage *messages;
guint max_message_type;
const guint max_message_type;
void *payload; /* allocated as max of message sizes */
size_t payload_size; /* used to track realloc calls */
void *opaque;
dispatcher_handle_any_message any_handler;
};
G_DEFINE_TYPE_WITH_PRIVATE(Dispatcher, dispatcher, G_TYPE_OBJECT)
enum {
PROP_0,
PROP_MAX_MESSAGE_TYPE
};
static void
dispatcher_get_property(GObject *object,
guint property_id,
GValue *value,
GParamSpec *pspec)
DispatcherPrivate::~DispatcherPrivate()
{
Dispatcher *self = DISPATCHER(object);
switch (property_id)
{
case PROP_MAX_MESSAGE_TYPE:
g_value_set_uint(value, self->priv->max_message_type);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, property_id, pspec);
}
g_free(messages);
socket_close(send_fd);
socket_close(recv_fd);
pthread_mutex_destroy(&lock);
g_free(payload);
}
static void
dispatcher_set_property(GObject *object,
guint property_id,
const GValue *value,
GParamSpec *pspec)
Dispatcher::~Dispatcher()
{
Dispatcher *self = DISPATCHER(object);
switch (property_id)
{
case PROP_MAX_MESSAGE_TYPE:
self->priv->max_message_type = g_value_get_uint(value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, property_id, pspec);
}
}
static void
dispatcher_finalize(GObject *object)
Dispatcher::Dispatcher(uint32_t max_message_type):
priv(new DispatcherPrivate(max_message_type))
{
Dispatcher *self = DISPATCHER(object);
g_free(self->priv->messages);
socket_close(self->priv->send_fd);
socket_close(self->priv->recv_fd);
pthread_mutex_destroy(&self->priv->lock);
g_free(self->priv->payload);
G_OBJECT_CLASS(dispatcher_parent_class)->finalize(object);
}
static void dispatcher_constructed(GObject *object)
{
Dispatcher *self = DISPATCHER(object);
int channels[2];
G_OBJECT_CLASS(dispatcher_parent_class)->constructed(object);
if (socketpair(AF_LOCAL, SOCK_STREAM, 0, channels) == -1) {
spice_error("socketpair failed %s", strerror(errno));
return;
}
pthread_mutex_init(&self->priv->lock, NULL);
self->priv->recv_fd = channels[0];
self->priv->send_fd = channels[1];
pthread_mutex_init(&priv->lock, NULL);
priv->recv_fd = channels[0];
priv->send_fd = channels[1];
self->priv->messages = g_new0(DispatcherMessage,
self->priv->max_message_type);
priv->messages = g_new0(DispatcherMessage, priv->max_message_type);
}
static void
dispatcher_class_init(DispatcherClass *klass)
{
GObjectClass *object_class = G_OBJECT_CLASS(klass);
object_class->get_property = dispatcher_get_property;
object_class->set_property = dispatcher_set_property;
object_class->constructed = dispatcher_constructed;
object_class->finalize = dispatcher_finalize;
g_object_class_install_property(object_class,
PROP_MAX_MESSAGE_TYPE,
g_param_spec_uint("max-message-type",
"max-message-type",
"Maximum message type",
0, G_MAXUINT, 0,
G_PARAM_STATIC_STRINGS |
G_PARAM_READWRITE |
G_PARAM_CONSTRUCT_ONLY));
}
static void
dispatcher_init(Dispatcher *self)
{
self->priv = (DispatcherPrivate*) dispatcher_get_instance_private(self);
}
Dispatcher *
dispatcher_new(size_t max_message_type)
{
return (Dispatcher*)
g_object_new(TYPE_DISPATCHER,
"max-message-type", (guint) max_message_type,
NULL);
}
#define ACK 0xffffffff
/*
@ -260,7 +184,7 @@ static int write_safe(int fd, uint8_t *buf, size_t size)
return written_size;
}
static int dispatcher_handle_single_read(Dispatcher *dispatcher)
int Dispatcher::handle_single_read(Dispatcher *dispatcher)
{
int ret;
DispatcherMessage msg[1];
@ -304,25 +228,21 @@ static int dispatcher_handle_single_read(Dispatcher *dispatcher)
}
/*
* dispatcher_handle_event
* handle_event
* doesn't handle being in the middle of a message. all reads are blocking.
*/
static void dispatcher_handle_event(int fd, int event, void *opaque)
void Dispatcher::handle_event(int fd, int event, Dispatcher* dispatcher)
{
Dispatcher *dispatcher = (Dispatcher *) opaque;
while (dispatcher_handle_single_read(dispatcher)) {
while (dispatcher->handle_single_read(dispatcher)) {
}
}
static void
dispatcher_send_message_internal(Dispatcher *dispatcher, const DispatcherMessage*msg,
void *payload)
void Dispatcher::send_message_internal(const DispatcherMessage* msg, void *payload)
{
uint32_t ack;
int send_fd = dispatcher->priv->send_fd;
int send_fd = priv->send_fd;
pthread_mutex_lock(&dispatcher->priv->lock);
pthread_mutex_lock(&priv->lock);
if (write_safe(send_fd, (uint8_t*)msg, sizeof(*msg)) == -1) {
g_warning("error: failed to send message header for message %d",
msg->type);
@ -343,22 +263,21 @@ dispatcher_send_message_internal(Dispatcher *dispatcher, const DispatcherMessage
}
}
unlock:
pthread_mutex_unlock(&dispatcher->priv->lock);
pthread_mutex_unlock(&priv->lock);
}
void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
void *payload)
void Dispatcher::send_message(uint32_t message_type, void *payload)
{
DispatcherMessage *msg;
assert(dispatcher->priv->max_message_type > message_type);
assert(dispatcher->priv->messages[message_type].handler);
msg = &dispatcher->priv->messages[message_type];
dispatcher_send_message_internal(dispatcher, msg, payload);
assert(priv->max_message_type > message_type);
assert(priv->messages[message_type].handler);
msg = &priv->messages[message_type];
send_message_internal(msg, payload);
}
void dispatcher_send_message_custom(Dispatcher *dispatcher, dispatcher_handle_message handler,
void *payload, uint32_t payload_size, bool ack)
void Dispatcher::send_message_custom(dispatcher_handle_message handler,
void *payload, uint32_t payload_size, bool ack)
{
DispatcherMessage msg = {
.handler = handler,
@ -366,42 +285,40 @@ void dispatcher_send_message_custom(Dispatcher *dispatcher, dispatcher_handle_me
.type = DISPATCHER_MESSAGE_TYPE_CUSTOM,
.ack = ack,
};
dispatcher_send_message_internal(dispatcher, &msg, payload);
send_message_internal(&msg, payload);
}
void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
dispatcher_handle_message handler,
size_t size, bool ack)
void Dispatcher::register_handler(uint32_t message_type,
dispatcher_handle_message handler,
size_t size, bool ack)
{
DispatcherMessage *msg;
assert(message_type < dispatcher->priv->max_message_type);
assert(dispatcher->priv->messages[message_type].handler == NULL);
msg = &dispatcher->priv->messages[message_type];
assert(message_type < priv->max_message_type);
assert(priv->messages[message_type].handler == NULL);
msg = &priv->messages[message_type];
msg->handler = handler;
msg->size = size;
msg->type = message_type;
msg->ack = ack;
if (msg->size > dispatcher->priv->payload_size) {
dispatcher->priv->payload = g_realloc(dispatcher->priv->payload, msg->size);
dispatcher->priv->payload_size = msg->size;
if (msg->size > priv->payload_size) {
priv->payload = g_realloc(priv->payload, msg->size);
priv->payload_size = msg->size;
}
}
void dispatcher_register_universal_handler(
Dispatcher *dispatcher,
dispatcher_handle_any_message any_handler)
void Dispatcher::register_universal_handler(dispatcher_handle_any_message any_handler)
{
dispatcher->priv->any_handler = any_handler;
priv->any_handler = any_handler;
}
SpiceWatch *dispatcher_create_watch(Dispatcher *dispatcher, SpiceCoreInterfaceInternal *core)
SpiceWatch *Dispatcher::create_watch(SpiceCoreInterfaceInternal *core)
{
return core->watch_add(core, dispatcher->priv->recv_fd,
SPICE_WATCH_EVENT_READ, dispatcher_handle_event, dispatcher);
return core->watch_new(priv->recv_fd,
SPICE_WATCH_EVENT_READ, handle_event, this);
}
void dispatcher_set_opaque(Dispatcher *self, void *opaque)
void Dispatcher::set_opaque(void *opaque)
{
self->priv->opaque = opaque;
priv->opaque = opaque;
}

View File

@ -20,61 +20,15 @@
#define DISPATCHER_H_
#include <pthread.h>
#include <glib-object.h>
#include "red-common.h"
#include "utils.hpp"
SPICE_BEGIN_DECLS
#define TYPE_DISPATCHER dispatcher_get_type()
#define DISPATCHER(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), TYPE_DISPATCHER, Dispatcher))
#define DISPATCHER_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), TYPE_DISPATCHER, DispatcherClass))
#define IS_DISPATCHER(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), TYPE_DISPATCHER))
#define IS_DISPATCHER_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), TYPE_DISPATCHER))
#define DISPATCHER_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj), TYPE_DISPATCHER, DispatcherClass))
typedef struct Dispatcher Dispatcher;
typedef struct DispatcherClass DispatcherClass;
typedef struct DispatcherPrivate DispatcherPrivate;
/* A Dispatcher provides inter-thread communication by serializing messages.
* Currently the Dispatcher uses a unix socket (socketpair) for dispatching the
* messages.
*
* Message types are identified by a unique integer value and must first be
* registered with the class (see dispatcher_register_handler()) before they
* can be sent. Sending threads can send a message using the
* dispatcher_send_message() function. The receiving thread can monitor the
* dispatcher's 'receive' file descriptor (see dispatcher_get_recv_fd()) for
* activity and should call dispatcher_handle_recv_read() to process incoming
* messages.
*/
struct Dispatcher: public GObject
{
DispatcherPrivate *priv;
void ref() { g_object_ref(this); }
void unref() { g_object_unref(this); }
};
struct DispatcherClass
{
GObjectClass parent_class;
};
GType dispatcher_get_type(void) G_GNUC_CONST;
/* dispatcher_new
*
* Create a new Dispatcher object
*
* @max_message_type: indicates the number of unique message types that can
* be handled by this dispatcher. Each message type is
* identified by an integer value between 0 and
* max_message_type-1.
*/
Dispatcher *dispatcher_new(size_t max_message_type);
#include "push-visibility.h"
struct Dispatcher;
struct DispatcherPrivate;
struct DispatcherMessage;
/* The function signature for handlers of a specific message type */
typedef void (*dispatcher_handle_message)(void *opaque,
@ -86,100 +40,130 @@ typedef void (*dispatcher_handle_any_message)(void *opaque,
uint32_t message_type,
void *payload);
/* dispatcher_send_message
/* A Dispatcher provides inter-thread communication by serializing messages.
* Currently the Dispatcher uses a unix socket (socketpair) for dispatching the
* messages.
*
* Sends a message to the receiving thread. The message type must have been
* registered first (see dispatcher_register_handler()). @payload must be a
* buffer of the same size as the size registered for @message_type
*
* If the sent message is a message type requires an ACK, this function will
* block until it receives an ACK from the receiving thread.
*
* @message_type: message type
* @payload: payload
* Message types are identified by a unique integer value and must first be
* registered with the class (see register_handler()) before they
* can be sent. Sending threads can send a message using the
* send_message() function. The receiving thread can monitor the
* dispatcher's 'receive' file descriptor (see dispatcher_get_recv_fd()) for
* activity and should call dispatcher_handle_recv_read() to process incoming
* messages.
*/
void dispatcher_send_message(Dispatcher *dispatcher, uint32_t message_type,
void *payload);
/* dispatcher_send_message_custom
*
* Sends a message to the receiving thread.
*
* If the sent message requires an ACK, this function will block until it
* receives an ACK from the receiving thread.
*
* @handler: callback to handle message
* @payload: payload
* @payload_size: size of payload
* @ack: acknowledge required. Make message synchronous
*/
void dispatcher_send_message_custom(Dispatcher *dispatcher, dispatcher_handle_message handler,
void *payload, uint32_t payload_size, bool ack);
/* dispatcher_register_handler
*
* This function registers a message type with the dispatcher, and registers
* @handler as the function that will handle incoming messages of this type.
* If @ack is true, the dispatcher will also send an ACK in response to the
* message after the message has been passed to the handler. You can only
* register a given message type once. For example, you cannot register two
* different handlers for the same message type with different @ack values.
*
* @dispatcher: dispatcher
* @messsage_type: message type
* @handler: message handler
* @size: message size. Each type has a fixed associated size.
* @ack: whether the dispatcher should send an ACK to the sender
*/
void dispatcher_register_handler(Dispatcher *dispatcher, uint32_t message_type,
dispatcher_handle_message handler, size_t size,
bool ack);
/* dispatcher_register_universal_handler
*
* Register a universal handler that will be called when *any* message is
* received by the dispatcher. When a message is received, this handler will be
* called first. If the received message type was registered via
* dispatcher_register_handler(), the message-specific handler will then be
* called. Only one universal handler can be registered. This feature can be
* used to record all messages to a file for replay and debugging.
*
* @dispatcher: dispatcher
* @handler: a handler function
*/
void dispatcher_register_universal_handler(Dispatcher *dispatcher,
dispatcher_handle_any_message handler);
/* dispatcher_create_watch
*
* Create a new watch to handle events for the dispatcher.
* You should release it before releasing the dispatcher.
*
* @return: newly created watch
*/
SpiceWatch *dispatcher_create_watch(Dispatcher *dispatcher, SpiceCoreInterfaceInternal *core);
/* dispatcher_set_opaque
*
* This @opaque pointer is user-defined data that will be passed as the first
* argument to all handler functions.
*
* @dispatcher: Dispatcher instance
* @opaque: opaque to use for callbacks
*/
void dispatcher_set_opaque(Dispatcher *dispatcher, void *opaque);
SPICE_END_DECLS
#ifdef __cplusplus
template <typename T>
inline void
dispatcher_send_message_custom(Dispatcher *dispatcher, void (*handler)(void *, T*),
T *payload, bool ack)
class Dispatcher
{
dispatcher_send_message_custom(dispatcher, (dispatcher_handle_message) handler,
payload, sizeof(*payload), ack);
}
#endif
public:
SPICE_CXX_GLIB_ALLOCATOR
/* Create a new Dispatcher object
*
* @max_message_type: indicates the number of unique message types that can
* be handled by this dispatcher. Each message type is
* identified by an integer value between 0 and
* max_message_type-1.
*/
Dispatcher(uint32_t max_message_type);
/* send_message
*
* Sends a message to the receiving thread. The message type must have been
* registered first (see register_handler()). @payload must be a
* buffer of the same size as the size registered for @message_type
*
* If the sent message is a message type requires an ACK, this function will
* block until it receives an ACK from the receiving thread.
*
* @message_type: message type
* @payload: payload
*/
void send_message(uint32_t message_type, void *payload);
/* send_message_custom
*
* Sends a message to the receiving thread.
*
* If the sent message requires an ACK, this function will block until it
* receives an ACK from the receiving thread.
*
* @handler: callback to handle message
* @payload: payload
* @payload_size: size of payload
* @ack: acknowledge required. Make message synchronous
*/
void send_message_custom(dispatcher_handle_message handler,
void *payload, uint32_t payload_size, bool ack);
template <typename T> inline void
send_message_custom(void (*handler)(void *, T*), T *payload, bool ack)
{
send_message_custom((dispatcher_handle_message) handler,
payload, sizeof(*payload), ack);
}
/* register_handler
*
* This function registers a message type with the dispatcher, and registers
* @handler as the function that will handle incoming messages of this type.
* If @ack is true, the dispatcher will also send an ACK in response to the
* message after the message has been passed to the handler. You can only
* register a given message type once. For example, you cannot register two
* different handlers for the same message type with different @ack values.
*
* @messsage_type: message type
* @handler: message handler
* @size: message size. Each type has a fixed associated size.
* @ack: whether the dispatcher should send an ACK to the sender
*/
void register_handler(uint32_t message_type,
dispatcher_handle_message handler, size_t size,
bool ack);
/* register_universal_handler
*
* Register a universal handler that will be called when *any* message is
* received by the dispatcher. When a message is received, this handler will be
* called first. If the received message type was registered via
* register_handler(), the message-specific handler will then be
* called. Only one universal handler can be registered. This feature can be
* used to record all messages to a file for replay and debugging.
*
* @handler: a handler function
*/
void register_universal_handler(dispatcher_handle_any_message handler);
/* create_watch
*
* Create a new watch to handle events for the dispatcher.
* You should release it before releasing the dispatcher.
*
* @return: newly created watch
*/
SpiceWatch *create_watch(SpiceCoreInterfaceInternal *core);
/* set_opaque
*
* This @opaque pointer is user-defined data that will be passed as the first
* argument to all handler functions.
*
* @opaque: opaque to use for callbacks
*/
void set_opaque(void *opaque);
void ref() { g_atomic_int_inc(&_ref); }
void unref() { if (g_atomic_int_dec_and_test(&_ref)) delete this; }
protected:
virtual ~Dispatcher();
private:
static int handle_single_read(Dispatcher *dispatcher);
static void handle_event(int fd, int event, Dispatcher* dispatcher);
void send_message_internal(const DispatcherMessage*msg, void *payload);
gint _ref = 1;
red::unique_link<DispatcherPrivate> priv;
};
#include "pop-visibility.h"
#endif /* DISPATCHER_H_ */

View File

@ -152,8 +152,7 @@ void MainChannelClient::release_recv_buf(uint16_t type, uint32_t size, uint8_t *
void MainChannelClient::on_disconnect()
{
RedsState *reds = get_channel()->get_server();
main_dispatcher_client_disconnect(reds_get_main_dispatcher(reds),
get_client());
reds_get_main_dispatcher(reds)->client_disconnect(get_client());
}
static void main_channel_client_push_ping(MainChannelClient *mcc, int size);

View File

@ -46,82 +46,6 @@
* main_dispatcher_handle_<event_name> - handler for callback from main thread
* seperate from self because it may send an ack or do other work in the future.
*/
struct MainDispatcherPrivate
{
RedsState *reds; /* weak */
SpiceWatch *watch;
pthread_t thread_id;
};
G_DEFINE_TYPE_WITH_PRIVATE(MainDispatcher, main_dispatcher, TYPE_DISPATCHER)
enum {
PROP0,
PROP_SPICE_SERVER,
};
static void
main_dispatcher_get_property(GObject *object,
guint property_id,
GValue *value,
GParamSpec *pspec)
{
MainDispatcher *self = MAIN_DISPATCHER(object);
switch (property_id) {
case PROP_SPICE_SERVER:
g_value_set_pointer(value, self->priv->reds);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, property_id, pspec);
}
}
static void
main_dispatcher_set_property(GObject *object,
guint property_id,
const GValue *value,
GParamSpec *pspec)
{
MainDispatcher *self = MAIN_DISPATCHER(object);
switch (property_id) {
case PROP_SPICE_SERVER:
self->priv->reds = (RedsState*) g_value_get_pointer(value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID(object, property_id, pspec);
}
}
static void main_dispatcher_constructed(GObject *object);
static void main_dispatcher_finalize(GObject *object);
static void
main_dispatcher_class_init(MainDispatcherClass *klass)
{
GObjectClass *object_class = G_OBJECT_CLASS(klass);
object_class->constructed = main_dispatcher_constructed;
object_class->finalize = main_dispatcher_finalize;
object_class->get_property = main_dispatcher_get_property;
object_class->set_property = main_dispatcher_set_property;
g_object_class_install_property(object_class,
PROP_SPICE_SERVER,
g_param_spec_pointer("spice-server",
"spice-server",
"The spice server associated with this dispatcher",
G_PARAM_READWRITE |
G_PARAM_CONSTRUCT_ONLY));
}
static void
main_dispatcher_init(MainDispatcher *self)
{
self->priv = (MainDispatcherPrivate*) main_dispatcher_get_instance_private(self);
self->priv->thread_id = pthread_self();
}
enum {
MAIN_DISPATCHER_CHANNEL_EVENT = 0,
@ -160,18 +84,17 @@ static void main_dispatcher_handle_channel_event(void *opaque,
reds_handle_channel_event(reds, channel_event->event, channel_event->info);
}
void main_dispatcher_channel_event(MainDispatcher *self, int event, SpiceChannelEventInfo *info)
void MainDispatcher::channel_event(int event, SpiceChannelEventInfo *info)
{
MainDispatcherChannelEventMessage msg = {0,};
if (pthread_self() == self->priv->thread_id) {
reds_handle_channel_event(self->priv->reds, event, info);
if (pthread_self() == thread_id) {
reds_handle_channel_event(reds, event, info);
return;
}
msg.event = event;
msg.info = info;
dispatcher_send_message(DISPATCHER(self), MAIN_DISPATCHER_CHANNEL_EVENT,
&msg);
send_message(MAIN_DISPATCHER_CHANNEL_EVENT, &msg);
}
@ -205,45 +128,41 @@ static void main_dispatcher_handle_client_disconnect(void *opaque,
msg->client->unref();
}
void main_dispatcher_seamless_migrate_dst_complete(MainDispatcher *self,
RedClient *client)
void MainDispatcher::seamless_migrate_dst_complete(RedClient *client)
{
MainDispatcherMigrateSeamlessDstCompleteMessage msg;
if (pthread_self() == self->priv->thread_id) {
reds_on_client_seamless_migrate_complete(self->priv->reds, client);
if (pthread_self() == thread_id) {
reds_on_client_seamless_migrate_complete(reds, client);
return;
}
msg.client = red::add_ref(client);
dispatcher_send_message(DISPATCHER(self), MAIN_DISPATCHER_MIGRATE_SEAMLESS_DST_COMPLETE,
&msg);
send_message(MAIN_DISPATCHER_MIGRATE_SEAMLESS_DST_COMPLETE, &msg);
}
void main_dispatcher_set_mm_time_latency(MainDispatcher *self, RedClient *client, uint32_t latency)
void MainDispatcher::set_mm_time_latency(RedClient *client, uint32_t latency)
{
MainDispatcherMmTimeLatencyMessage msg;
if (pthread_self() == self->priv->thread_id) {
reds_set_client_mm_time_latency(self->priv->reds, client, latency);
if (pthread_self() == thread_id) {
reds_set_client_mm_time_latency(reds, client, latency);
return;
}
msg.client = red::add_ref(client);
msg.latency = latency;
dispatcher_send_message(DISPATCHER(self), MAIN_DISPATCHER_SET_MM_TIME_LATENCY,
&msg);
send_message(MAIN_DISPATCHER_SET_MM_TIME_LATENCY, &msg);
}
void main_dispatcher_client_disconnect(MainDispatcher *self, RedClient *client)
void MainDispatcher::client_disconnect(RedClient *client)
{
MainDispatcherClientDisconnectMessage msg;
if (!red_client_is_disconnecting(client)) {
spice_debug("client %p", client);
msg.client = red::add_ref(client);
dispatcher_send_message(DISPATCHER(self), MAIN_DISPATCHER_CLIENT_DISCONNECT,
&msg);
send_message(MAIN_DISPATCHER_CLIENT_DISCONNECT, &msg);
} else {
spice_debug("client %p already during disconnection", client);
}
@ -254,43 +173,29 @@ void main_dispatcher_client_disconnect(MainDispatcher *self, RedClient *client)
* Reds routines shouldn't be exposed. Instead reds.c should register the callbacks,
* and the corresponding operations should be made only via main_dispatcher.
*/
MainDispatcher* main_dispatcher_new(RedsState *reds)
MainDispatcher::MainDispatcher(RedsState *reds):
Dispatcher(MAIN_DISPATCHER_NUM_MESSAGES),
reds(reds),
thread_id(pthread_self())
{
MainDispatcher *self = (MainDispatcher*) g_object_new(TYPE_MAIN_DISPATCHER,
"spice-server", reds,
"max-message-type", MAIN_DISPATCHER_NUM_MESSAGES,
NULL);
return self;
set_opaque(reds);
watch = create_watch(reds_get_core_interface(reds));
register_handler(MAIN_DISPATCHER_CHANNEL_EVENT,
main_dispatcher_handle_channel_event,
sizeof(MainDispatcherChannelEventMessage), false);
register_handler(MAIN_DISPATCHER_MIGRATE_SEAMLESS_DST_COMPLETE,
main_dispatcher_handle_migrate_complete,
sizeof(MainDispatcherMigrateSeamlessDstCompleteMessage), false);
register_handler(MAIN_DISPATCHER_SET_MM_TIME_LATENCY,
main_dispatcher_handle_mm_time_latency,
sizeof(MainDispatcherMmTimeLatencyMessage), false);
register_handler(MAIN_DISPATCHER_CLIENT_DISCONNECT,
main_dispatcher_handle_client_disconnect,
sizeof(MainDispatcherClientDisconnectMessage), false);
}
void main_dispatcher_constructed(GObject *object)
MainDispatcher::~MainDispatcher()
{
MainDispatcher *self = MAIN_DISPATCHER(object);
G_OBJECT_CLASS(main_dispatcher_parent_class)->constructed(object);
dispatcher_set_opaque(DISPATCHER(self), self->priv->reds);
self->priv->watch =
dispatcher_create_watch(DISPATCHER(self), reds_get_core_interface(self->priv->reds));
dispatcher_register_handler(DISPATCHER(self), MAIN_DISPATCHER_CHANNEL_EVENT,
main_dispatcher_handle_channel_event,
sizeof(MainDispatcherChannelEventMessage), false);
dispatcher_register_handler(DISPATCHER(self), MAIN_DISPATCHER_MIGRATE_SEAMLESS_DST_COMPLETE,
main_dispatcher_handle_migrate_complete,
sizeof(MainDispatcherMigrateSeamlessDstCompleteMessage), false);
dispatcher_register_handler(DISPATCHER(self), MAIN_DISPATCHER_SET_MM_TIME_LATENCY,
main_dispatcher_handle_mm_time_latency,
sizeof(MainDispatcherMmTimeLatencyMessage), false);
dispatcher_register_handler(DISPATCHER(self), MAIN_DISPATCHER_CLIENT_DISCONNECT,
main_dispatcher_handle_client_disconnect,
sizeof(MainDispatcherClientDisconnectMessage), false);
}
static void main_dispatcher_finalize(GObject *object)
{
MainDispatcher *self = MAIN_DISPATCHER(object);
red_watch_remove(self->priv->watch);
self->priv->watch = NULL;
G_OBJECT_CLASS(main_dispatcher_parent_class)->finalize(object);
red_watch_remove(watch);
}

View File

@ -23,44 +23,30 @@
#include "dispatcher.h"
#include "red-channel.h"
SPICE_BEGIN_DECLS
#include "push-visibility.h"
#define TYPE_MAIN_DISPATCHER main_dispatcher_get_type()
#define MAIN_DISPATCHER(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), TYPE_MAIN_DISPATCHER, MainDispatcher))
#define MAIN_DISPATCHER_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), TYPE_MAIN_DISPATCHER, MainDispatcherClass))
#define IS_MAIN_DISPATCHER(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), TYPE_MAIN_DISPATCHER))
#define IS_MAIN_DISPATCHER_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), TYPE_MAIN_DISPATCHER))
#define MAIN_DISPATCHER_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS((obj), TYPE_MAIN_DISPATCHER, MainDispatcherClass))
typedef struct MainDispatcher MainDispatcher;
typedef struct MainDispatcherClass MainDispatcherClass;
typedef struct MainDispatcherPrivate MainDispatcherPrivate;
struct MainDispatcher: public Dispatcher
class MainDispatcher final: public Dispatcher
{
MainDispatcherPrivate *priv;
public:
MainDispatcher(RedsState *reds);
void channel_event(int event, SpiceChannelEventInfo *info);
void seamless_migrate_dst_complete(RedClient *client);
void set_mm_time_latency(RedClient *client, uint32_t latency);
/*
* Disconnecting the client is always executed asynchronously,
* in order to protect from expired references in the routines
* that triggered the client destruction.
*/
void client_disconnect(RedClient *client);
protected:
~MainDispatcher();
private:
RedsState *const reds;
SpiceWatch *watch = nullptr;
pthread_t thread_id;
};
struct MainDispatcherClass
{
DispatcherClass parent_class;
};
GType main_dispatcher_get_type(void) G_GNUC_CONST;
void main_dispatcher_channel_event(MainDispatcher *self, int event, SpiceChannelEventInfo *info);
void main_dispatcher_seamless_migrate_dst_complete(MainDispatcher *self, RedClient *client);
void main_dispatcher_set_mm_time_latency(MainDispatcher *self, RedClient *client, uint32_t latency);
/*
* Disconnecting the client is always executed asynchronously,
* in order to protect from expired references in the routines
* that triggered the client destruction.
*/
void main_dispatcher_client_disconnect(MainDispatcher *self, RedClient *client);
MainDispatcher* main_dispatcher_new(RedsState *reds);
SPICE_END_DECLS
#include "pop-visibility.h"
#endif /* MAIN_DISPATCHER_H_ */

View File

@ -78,8 +78,11 @@ struct RedChannelPrivate
thread_id = pthread_self();
}
~RedChannelPrivate() {
g_clear_object(&dispatcher);
~RedChannelPrivate()
{
if (dispatcher) {
dispatcher->unref();
}
red_channel_capabilities_reset(&local_caps);
}
@ -365,8 +368,7 @@ void RedChannel::connect(RedClient *client, RedStream *stream, int migration,
};
red_channel_capabilities_init(&payload.caps, caps);
dispatcher_send_message_custom(dispatcher, handle_dispatcher_connect,
&payload, false);
dispatcher->send_message_custom(handle_dispatcher_connect, &payload, false);
}
GList *RedChannel::get_clients()
@ -551,10 +553,8 @@ typedef struct RedMessageMigrate {
RedChannelClient *rcc;
} RedMessageMigrate;
static void handle_dispatcher_migrate(void *opaque, void *payload)
static void handle_dispatcher_migrate(void *opaque, RedMessageMigrate *msg)
{
RedMessageMigrate *msg = (RedMessageMigrate*) payload;
msg->rcc->migrate();
msg->rcc->unref();
}
@ -568,19 +568,18 @@ void RedChannel::migrate_client(RedChannelClient *rcc)
}
RedMessageMigrate payload = { .rcc = red::add_ref(rcc) };
dispatcher_send_message_custom(priv->dispatcher, handle_dispatcher_migrate,
&payload, sizeof(payload), false);
priv->dispatcher->send_message_custom(handle_dispatcher_migrate,
&payload, false);
}
typedef struct RedMessageDisconnect {
RedChannelClient *rcc;
} RedMessageDisconnect;
static void handle_dispatcher_disconnect(void *opaque, void *payload)
static void handle_dispatcher_disconnect(void *opaque, RedMessageDisconnect *msg)
{
RedMessageDisconnect *msg = (RedMessageDisconnect*) payload;
msg->rcc->disconnect();
msg->rcc->unref();
}
void RedChannel::disconnect_client(RedChannelClient *rcc)
@ -593,7 +592,7 @@ void RedChannel::disconnect_client(RedChannelClient *rcc)
// TODO: we turned it to be sync, due to client_destroy . Should we support async? - for this we will need ref count
// for channels
RedMessageDisconnect payload = { .rcc = rcc };
dispatcher_send_message_custom(priv->dispatcher, handle_dispatcher_disconnect,
&payload, sizeof(payload), true);
RedMessageDisconnect payload = { .rcc = red::add_ref(rcc) };
priv->dispatcher->send_message_custom(handle_dispatcher_disconnect,
&payload, true);
}

View File

@ -35,14 +35,13 @@
#include "red-channel-capabilities.h"
#include "utils.hpp"
struct Dispatcher;
#include "push-visibility.h"
class RedChannel;
struct RedChannelPrivate;
struct RedChannelClient;
struct RedClient;
struct MainChannelClient;
struct Dispatcher;
static inline gboolean test_capability(const uint32_t *caps, int num_caps, uint32_t cap)
{

View File

@ -350,8 +350,7 @@ gboolean red_client_seamless_migration_done_for_channel(RedClient *client)
client->seamless_migrate = FALSE;
/* migration completion might have been triggered from a different thread
* than the main thread */
main_dispatcher_seamless_migrate_dst_complete(reds_get_main_dispatcher(client->reds),
client);
reds_get_main_dispatcher(client->reds)->seamless_migrate_dst_complete(client);
ret = TRUE;
}
pthread_mutex_unlock(&client->lock);

View File

@ -98,9 +98,7 @@ void spice_qxl_update_area(QXLInstance *instance, uint32_t surface_id,
payload.qxl_dirty_rects = qxl_dirty_rects;
payload.num_dirty_rects = num_dirty_rects;
payload.clear_dirty_region = clear_dirty_region;
dispatcher_send_message(instance->st->dispatcher,
RED_WORKER_MESSAGE_UPDATE,
&payload);
instance->st->dispatcher->send_message(RED_WORKER_MESSAGE_UPDATE, &payload);
}
gboolean red_qxl_client_monitors_config(QXLInstance *qxl,
@ -122,9 +120,7 @@ void spice_qxl_update_area_async(QXLInstance *instance, uint32_t surface_id, QXL
payload.surface_id = surface_id;
payload.qxl_area = *qxl_area;
payload.clear_dirty_region = clear_dirty_region;
dispatcher_send_message(instance->st->dispatcher,
message,
&payload);
instance->st->dispatcher->send_message(message, &payload);
}
SPICE_GNUC_VISIBLE
@ -133,9 +129,7 @@ void spice_qxl_add_memslot(QXLInstance *instance, QXLDevMemSlot *mem_slot)
RedWorkerMessageAddMemslot payload;
payload.mem_slot = *mem_slot;
dispatcher_send_message(instance->st->dispatcher,
RED_WORKER_MESSAGE_ADD_MEMSLOT,
&payload);
instance->st->dispatcher->send_message(RED_WORKER_MESSAGE_ADD_MEMSLOT, &payload);
}
SPICE_GNUC_VISIBLE
@ -146,7 +140,7 @@ void spice_qxl_add_memslot_async(QXLInstance *instance, QXLDevMemSlot *mem_slot,
payload.base.cookie = cookie;
payload.mem_slot = *mem_slot;
dispatcher_send_message(instance->st->dispatcher, message, &payload);
instance->st->dispatcher->send_message(message, &payload);
}
SPICE_GNUC_VISIBLE
@ -157,7 +151,7 @@ void spice_qxl_del_memslot(QXLInstance *instance, uint32_t slot_group_id, uint32
payload.slot_group_id = slot_group_id;
payload.slot_id = slot_id;
dispatcher_send_message(instance->st->dispatcher, message, &payload);
instance->st->dispatcher->send_message(message, &payload);
}
SPICE_GNUC_VISIBLE
@ -165,9 +159,7 @@ void spice_qxl_destroy_surfaces(QXLInstance *instance)
{
RedWorkerMessageDestroySurfaces payload;
dispatcher_send_message(instance->st->dispatcher,
RED_WORKER_MESSAGE_DESTROY_SURFACES,
&payload);
instance->st->dispatcher->send_message(RED_WORKER_MESSAGE_DESTROY_SURFACES, &payload);
}
SPICE_GNUC_VISIBLE
@ -177,7 +169,7 @@ void spice_qxl_destroy_surfaces_async(QXLInstance *instance, uint64_t cookie)
RedWorkerMessage message = RED_WORKER_MESSAGE_DESTROY_SURFACES_ASYNC;
payload.base.cookie = cookie;
dispatcher_send_message(instance->st->dispatcher, message, &payload);
instance->st->dispatcher->send_message(message, &payload);
}
/* used by RedWorker */
@ -196,9 +188,7 @@ void spice_qxl_destroy_primary_surface(QXLInstance *instance, uint32_t surface_i
{
RedWorkerMessageDestroyPrimarySurface payload;
payload.surface_id = surface_id;
dispatcher_send_message(instance->st->dispatcher,
RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE,
&payload);
instance->st->dispatcher->send_message(RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE, &payload);
red_qxl_destroy_primary_surface_complete(instance->st);
}
@ -211,7 +201,7 @@ void spice_qxl_destroy_primary_surface_async(QXLInstance *instance,
payload.base.cookie = cookie;
payload.surface_id = surface_id;
dispatcher_send_message(instance->st->dispatcher, message, &payload);
instance->st->dispatcher->send_message(message, &payload);
}
/* used by RedWorker */
@ -236,7 +226,7 @@ void spice_qxl_create_primary_surface_async(QXLInstance *instance, uint32_t surf
payload.base.cookie = cookie;
payload.surface_id = surface_id;
payload.surface = *surface;
dispatcher_send_message(instance->st->dispatcher, message, &payload);
instance->st->dispatcher->send_message(message, &payload);
}
SPICE_GNUC_VISIBLE
@ -247,9 +237,7 @@ void spice_qxl_create_primary_surface(QXLInstance *instance, uint32_t surface_id
payload.surface_id = surface_id;
payload.surface = *surface;
dispatcher_send_message(instance->st->dispatcher,
RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE,
&payload);
instance->st->dispatcher->send_message(RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE, &payload);
red_qxl_create_primary_surface_complete(instance->st, surface);
}
@ -258,9 +246,7 @@ void spice_qxl_reset_image_cache(QXLInstance *instance)
{
RedWorkerMessageResetImageCache payload;
dispatcher_send_message(instance->st->dispatcher,
RED_WORKER_MESSAGE_RESET_IMAGE_CACHE,
&payload);
instance->st->dispatcher->send_message(RED_WORKER_MESSAGE_RESET_IMAGE_CACHE, &payload);
}
SPICE_GNUC_VISIBLE
@ -268,9 +254,7 @@ void spice_qxl_reset_cursor(QXLInstance *instance)
{
RedWorkerMessageResetCursor payload;
dispatcher_send_message(instance->st->dispatcher,
RED_WORKER_MESSAGE_RESET_CURSOR,
&payload);
instance->st->dispatcher->send_message(RED_WORKER_MESSAGE_RESET_CURSOR, &payload);
}
SPICE_GNUC_VISIBLE
@ -279,9 +263,7 @@ void spice_qxl_destroy_surface_wait(QXLInstance *instance, uint32_t surface_id)
RedWorkerMessageDestroySurfaceWait payload;
payload.surface_id = surface_id;
dispatcher_send_message(instance->st->dispatcher,
RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT,
&payload);
instance->st->dispatcher->send_message(RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT, &payload);
}
SPICE_GNUC_VISIBLE
@ -292,7 +274,7 @@ void spice_qxl_destroy_surface_async(QXLInstance *instance, uint32_t surface_id,
payload.base.cookie = cookie;
payload.surface_id = surface_id;
dispatcher_send_message(instance->st->dispatcher, message, &payload);
instance->st->dispatcher->send_message(message, &payload);
}
SPICE_GNUC_VISIBLE
@ -300,9 +282,7 @@ void spice_qxl_reset_memslots(QXLInstance *instance)
{
RedWorkerMessageResetMemslots payload;
dispatcher_send_message(instance->st->dispatcher,
RED_WORKER_MESSAGE_RESET_MEMSLOTS,
&payload);
instance->st->dispatcher->send_message(RED_WORKER_MESSAGE_RESET_MEMSLOTS, &payload);
}
static bool red_qxl_set_pending(QXLState *qxl_state, int pending)
@ -324,9 +304,7 @@ void spice_qxl_wakeup(QXLInstance *instance)
if (red_qxl_set_pending(instance->st, RED_DISPATCHER_PENDING_WAKEUP))
return;
dispatcher_send_message(instance->st->dispatcher,
RED_WORKER_MESSAGE_WAKEUP,
&payload);
instance->st->dispatcher->send_message(RED_WORKER_MESSAGE_WAKEUP, &payload);
}
SPICE_GNUC_VISIBLE
@ -337,18 +315,14 @@ void spice_qxl_oom(QXLInstance *instance)
if (red_qxl_set_pending(instance->st, RED_DISPATCHER_PENDING_OOM))
return;
dispatcher_send_message(instance->st->dispatcher,
RED_WORKER_MESSAGE_OOM,
&payload);
instance->st->dispatcher->send_message(RED_WORKER_MESSAGE_OOM, &payload);
}
void red_qxl_start(QXLInstance *qxl)
{
RedWorkerMessageStart payload;
dispatcher_send_message(qxl->st->dispatcher,
RED_WORKER_MESSAGE_START,
&payload);
qxl->st->dispatcher->send_message(RED_WORKER_MESSAGE_START, &payload);
}
SPICE_GNUC_VISIBLE
@ -358,7 +332,7 @@ void spice_qxl_flush_surfaces_async(QXLInstance *instance, uint64_t cookie)
RedWorkerMessage message = RED_WORKER_MESSAGE_FLUSH_SURFACES_ASYNC;
payload.base.cookie = cookie;
dispatcher_send_message(instance->st->dispatcher, message, &payload);
instance->st->dispatcher->send_message(message, &payload);
}
SPICE_GNUC_VISIBLE
@ -373,7 +347,7 @@ void spice_qxl_monitors_config_async(QXLInstance *instance, QXLPHYSICAL monitors
payload.group_id = group_id;
payload.max_monitors = instance->st->max_monitors;
dispatcher_send_message(instance->st->dispatcher, message, &payload);
instance->st->dispatcher->send_message(message, &payload);
}
SPICE_GNUC_VISIBLE
@ -381,18 +355,14 @@ void spice_qxl_driver_unload(QXLInstance *instance)
{
RedWorkerMessageDriverUnload payload;
dispatcher_send_message(instance->st->dispatcher,
RED_WORKER_MESSAGE_DRIVER_UNLOAD,
&payload);
instance->st->dispatcher->send_message(RED_WORKER_MESSAGE_DRIVER_UNLOAD, &payload);
}
void red_qxl_stop(QXLInstance *qxl)
{
RedWorkerMessageStop payload;
dispatcher_send_message(qxl->st->dispatcher,
RED_WORKER_MESSAGE_STOP,
&payload);
qxl->st->dispatcher->send_message(RED_WORKER_MESSAGE_STOP, &payload);
}
SPICE_GNUC_VISIBLE
@ -402,9 +372,7 @@ void spice_qxl_loadvm_commands(QXLInstance *instance, struct QXLCommandExt *ext,
payload.count = count;
payload.ext = ext;
dispatcher_send_message(instance->st->dispatcher,
RED_WORKER_MESSAGE_LOADVM_COMMANDS,
&payload);
instance->st->dispatcher->send_message(RED_WORKER_MESSAGE_LOADVM_COMMANDS, &payload);
}
uint32_t red_qxl_get_ram_size(QXLInstance *qxl)
@ -482,9 +450,7 @@ void spice_qxl_gl_scanout(QXLInstance *qxl,
pthread_mutex_unlock(&qxl_state->scanout_mutex);
/* FIXME: find a way to coallesce all pending SCANOUTs */
dispatcher_send_message(qxl_state->dispatcher,
RED_WORKER_MESSAGE_GL_SCANOUT, &payload);
qxl_state->dispatcher->send_message(RED_WORKER_MESSAGE_GL_SCANOUT, &payload);
reds_update_client_mouse_allowed(qxl_state->reds);
}
@ -515,7 +481,7 @@ void spice_qxl_gl_draw_async(QXLInstance *qxl,
spice_return_if_fail(qxl_state->gl_draw_cookie == GL_DRAW_COOKIE_INVALID);
qxl_state->gl_draw_cookie = cookie;
dispatcher_send_message(qxl_state->dispatcher, message, &draw);
qxl_state->dispatcher->send_message(message, &draw);
}
void red_qxl_gl_draw_async_complete(QXLInstance *qxl)
@ -606,7 +572,7 @@ void red_qxl_init(RedsState *reds, QXLInstance *qxl)
pthread_mutex_init(&qxl_state->scanout_mutex, NULL);
qxl_state->scanout.drm_dma_buf_fd = -1;
qxl_state->gl_draw_cookie = GL_DRAW_COOKIE_INVALID;
qxl_state->dispatcher = dispatcher_new(RED_WORKER_MESSAGE_COUNT);
qxl_state->dispatcher = new Dispatcher(RED_WORKER_MESSAGE_COUNT);
qxl_state->max_monitors = UINT_MAX;
qxl->st = qxl_state;
@ -624,9 +590,7 @@ void red_qxl_destroy(QXLInstance *qxl)
/* send message to close thread */
RedWorkerMessageClose message;
dispatcher_send_message(qxl_state->dispatcher,
RED_WORKER_MESSAGE_CLOSE_WORKER,
&message);
qxl_state->dispatcher->send_message(RED_WORKER_MESSAGE_CLOSE_WORKER, &message);
red_worker_free(qxl_state->worker);
qxl_state->dispatcher->unref();
/* this must be done after calling red_worker_free */
@ -676,36 +640,28 @@ void red_qxl_on_ic_change(QXLInstance *qxl, SpiceImageCompression ic)
{
RedWorkerMessageSetCompression payload;
payload.image_compression = ic;
dispatcher_send_message(qxl->st->dispatcher,
RED_WORKER_MESSAGE_SET_COMPRESSION,
&payload);
qxl->st->dispatcher->send_message(RED_WORKER_MESSAGE_SET_COMPRESSION, &payload);
}
void red_qxl_on_sv_change(QXLInstance *qxl, int sv)
{
RedWorkerMessageSetStreamingVideo payload;
payload.streaming_video = sv;
dispatcher_send_message(qxl->st->dispatcher,
RED_WORKER_MESSAGE_SET_STREAMING_VIDEO,
&payload);
qxl->st->dispatcher->send_message(RED_WORKER_MESSAGE_SET_STREAMING_VIDEO, &payload);
}
void red_qxl_on_vc_change(QXLInstance *qxl, GArray *video_codecs)
{
RedWorkerMessageSetVideoCodecs payload;
payload.video_codecs = g_array_ref(video_codecs);
dispatcher_send_message(qxl->st->dispatcher,
RED_WORKER_MESSAGE_SET_VIDEO_CODECS,
&payload);
qxl->st->dispatcher->send_message(RED_WORKER_MESSAGE_SET_VIDEO_CODECS, &payload);
}
void red_qxl_set_mouse_mode(QXLInstance *qxl, uint32_t mode)
{
RedWorkerMessageSetMouseMode payload;
payload.mode = mode;
dispatcher_send_message(qxl->st->dispatcher,
RED_WORKER_MESSAGE_SET_MOUSE_MODE,
&payload);
qxl->st->dispatcher->send_message(RED_WORKER_MESSAGE_SET_MOUSE_MODE, &payload);
}
RedsState* red_qxl_get_server(QXLState *qxl_state)

View File

@ -446,7 +446,7 @@ void red_stream_push_channel_event(RedStream *s, int event)
{
RedsState *reds = s->priv->reds;
MainDispatcher *md = reds_get_main_dispatcher(reds);
main_dispatcher_channel_event(md, event, s->priv->info);
md->channel_event(event, s->priv->info);
}
static void red_stream_set_socket(RedStream *stream, int socket)

View File

@ -817,161 +817,130 @@ static void worker_dispatcher_record(void *opaque, uint32_t message_type, void *
static void register_callbacks(Dispatcher *dispatcher)
{
/* TODO: register cursor & display specific msg in respective channel files */
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_UPDATE,
handle_dev_update,
sizeof(RedWorkerMessageUpdate),
true);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_UPDATE_ASYNC,
handle_dev_update_async,
sizeof(RedWorkerMessageUpdateAsync),
false);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_ADD_MEMSLOT,
handle_dev_add_memslot,
sizeof(RedWorkerMessageAddMemslot),
true);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_ADD_MEMSLOT_ASYNC,
handle_dev_add_memslot_async,
sizeof(RedWorkerMessageAddMemslotAsync),
false);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_DEL_MEMSLOT,
handle_dev_del_memslot,
sizeof(RedWorkerMessageDelMemslot),
false);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_DESTROY_SURFACES,
handle_dev_destroy_surfaces,
sizeof(RedWorkerMessageDestroySurfaces),
true);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_DESTROY_SURFACES_ASYNC,
handle_dev_destroy_surfaces_async,
sizeof(RedWorkerMessageDestroySurfacesAsync),
false);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE,
handle_dev_destroy_primary_surface,
sizeof(RedWorkerMessageDestroyPrimarySurface),
true);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE_ASYNC,
handle_dev_destroy_primary_surface_async,
sizeof(RedWorkerMessageDestroyPrimarySurfaceAsync),
false);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE_ASYNC,
handle_dev_create_primary_surface_async,
sizeof(RedWorkerMessageCreatePrimarySurfaceAsync),
false);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE,
handle_dev_create_primary_surface,
sizeof(RedWorkerMessageCreatePrimarySurface),
true);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_RESET_IMAGE_CACHE,
handle_dev_reset_image_cache,
sizeof(RedWorkerMessageResetImageCache),
true);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_RESET_CURSOR,
handle_dev_reset_cursor,
sizeof(RedWorkerMessageResetCursor),
true);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_WAKEUP,
handle_dev_wakeup,
sizeof(RedWorkerMessageWakeup),
false);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_OOM,
handle_dev_oom,
sizeof(RedWorkerMessageOom),
false);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_START,
handle_dev_start,
sizeof(RedWorkerMessageStart),
false);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_FLUSH_SURFACES_ASYNC,
handle_dev_flush_surfaces_async,
sizeof(RedWorkerMessageFlushSurfacesAsync),
false);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_STOP,
handle_dev_stop,
sizeof(RedWorkerMessageStop),
true);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_LOADVM_COMMANDS,
handle_dev_loadvm_commands,
sizeof(RedWorkerMessageLoadvmCommands),
true);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_SET_COMPRESSION,
handle_dev_set_compression,
sizeof(RedWorkerMessageSetCompression),
false);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_SET_STREAMING_VIDEO,
handle_dev_set_streaming_video,
sizeof(RedWorkerMessageSetStreamingVideo),
false);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_SET_VIDEO_CODECS,
handle_dev_set_video_codecs,
sizeof(RedWorkerMessageSetVideoCodecs),
false);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_SET_MOUSE_MODE,
handle_dev_set_mouse_mode,
sizeof(RedWorkerMessageSetMouseMode),
false);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT,
handle_dev_destroy_surface_wait,
sizeof(RedWorkerMessageDestroySurfaceWait),
true);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT_ASYNC,
handle_dev_destroy_surface_wait_async,
sizeof(RedWorkerMessageDestroySurfaceWaitAsync),
false);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_RESET_MEMSLOTS,
handle_dev_reset_memslots,
sizeof(RedWorkerMessageResetMemslots),
false);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_MONITORS_CONFIG_ASYNC,
handle_dev_monitors_config_async,
sizeof(RedWorkerMessageMonitorsConfigAsync),
false);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_DRIVER_UNLOAD,
handle_dev_driver_unload,
sizeof(RedWorkerMessageDriverUnload),
false);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_GL_SCANOUT,
handle_dev_gl_scanout,
sizeof(RedWorkerMessageGlScanout),
false);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_GL_DRAW_ASYNC,
handle_dev_gl_draw_async,
sizeof(RedWorkerMessageGlDraw),
false);
dispatcher_register_handler(dispatcher,
RED_WORKER_MESSAGE_CLOSE_WORKER,
handle_dev_close,
sizeof(RedWorkerMessageClose),
false);
dispatcher->register_handler(RED_WORKER_MESSAGE_UPDATE,
handle_dev_update,
sizeof(RedWorkerMessageUpdate),
true);
dispatcher->register_handler(RED_WORKER_MESSAGE_UPDATE_ASYNC,
handle_dev_update_async,
sizeof(RedWorkerMessageUpdateAsync),
false);
dispatcher->register_handler(RED_WORKER_MESSAGE_ADD_MEMSLOT,
handle_dev_add_memslot,
sizeof(RedWorkerMessageAddMemslot),
true);
dispatcher->register_handler(RED_WORKER_MESSAGE_ADD_MEMSLOT_ASYNC,
handle_dev_add_memslot_async,
sizeof(RedWorkerMessageAddMemslotAsync),
false);
dispatcher->register_handler(RED_WORKER_MESSAGE_DEL_MEMSLOT,
handle_dev_del_memslot,
sizeof(RedWorkerMessageDelMemslot),
false);
dispatcher->register_handler(RED_WORKER_MESSAGE_DESTROY_SURFACES,
handle_dev_destroy_surfaces,
sizeof(RedWorkerMessageDestroySurfaces),
true);
dispatcher->register_handler(RED_WORKER_MESSAGE_DESTROY_SURFACES_ASYNC,
handle_dev_destroy_surfaces_async,
sizeof(RedWorkerMessageDestroySurfacesAsync),
false);
dispatcher->register_handler(RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE,
handle_dev_destroy_primary_surface,
sizeof(RedWorkerMessageDestroyPrimarySurface),
true);
dispatcher->register_handler(RED_WORKER_MESSAGE_DESTROY_PRIMARY_SURFACE_ASYNC,
handle_dev_destroy_primary_surface_async,
sizeof(RedWorkerMessageDestroyPrimarySurfaceAsync),
false);
dispatcher->register_handler(RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE_ASYNC,
handle_dev_create_primary_surface_async,
sizeof(RedWorkerMessageCreatePrimarySurfaceAsync),
false);
dispatcher->register_handler(RED_WORKER_MESSAGE_CREATE_PRIMARY_SURFACE,
handle_dev_create_primary_surface,
sizeof(RedWorkerMessageCreatePrimarySurface),
true);
dispatcher->register_handler(RED_WORKER_MESSAGE_RESET_IMAGE_CACHE,
handle_dev_reset_image_cache,
sizeof(RedWorkerMessageResetImageCache),
true);
dispatcher->register_handler(RED_WORKER_MESSAGE_RESET_CURSOR,
handle_dev_reset_cursor,
sizeof(RedWorkerMessageResetCursor),
true);
dispatcher->register_handler(RED_WORKER_MESSAGE_WAKEUP,
handle_dev_wakeup,
sizeof(RedWorkerMessageWakeup),
false);
dispatcher->register_handler(RED_WORKER_MESSAGE_OOM,
handle_dev_oom,
sizeof(RedWorkerMessageOom),
false);
dispatcher->register_handler(RED_WORKER_MESSAGE_START,
handle_dev_start,
sizeof(RedWorkerMessageStart),
false);
dispatcher->register_handler(RED_WORKER_MESSAGE_FLUSH_SURFACES_ASYNC,
handle_dev_flush_surfaces_async,
sizeof(RedWorkerMessageFlushSurfacesAsync),
false);
dispatcher->register_handler(RED_WORKER_MESSAGE_STOP,
handle_dev_stop,
sizeof(RedWorkerMessageStop),
true);
dispatcher->register_handler(RED_WORKER_MESSAGE_LOADVM_COMMANDS,
handle_dev_loadvm_commands,
sizeof(RedWorkerMessageLoadvmCommands),
true);
dispatcher->register_handler(RED_WORKER_MESSAGE_SET_COMPRESSION,
handle_dev_set_compression,
sizeof(RedWorkerMessageSetCompression),
false);
dispatcher->register_handler(RED_WORKER_MESSAGE_SET_STREAMING_VIDEO,
handle_dev_set_streaming_video,
sizeof(RedWorkerMessageSetStreamingVideo),
false);
dispatcher->register_handler(RED_WORKER_MESSAGE_SET_VIDEO_CODECS,
handle_dev_set_video_codecs,
sizeof(RedWorkerMessageSetVideoCodecs),
false);
dispatcher->register_handler(RED_WORKER_MESSAGE_SET_MOUSE_MODE,
handle_dev_set_mouse_mode,
sizeof(RedWorkerMessageSetMouseMode),
false);
dispatcher->register_handler(RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT,
handle_dev_destroy_surface_wait,
sizeof(RedWorkerMessageDestroySurfaceWait),
true);
dispatcher->register_handler(RED_WORKER_MESSAGE_DESTROY_SURFACE_WAIT_ASYNC,
handle_dev_destroy_surface_wait_async,
sizeof(RedWorkerMessageDestroySurfaceWaitAsync),
false);
dispatcher->register_handler(RED_WORKER_MESSAGE_RESET_MEMSLOTS,
handle_dev_reset_memslots,
sizeof(RedWorkerMessageResetMemslots),
false);
dispatcher->register_handler(RED_WORKER_MESSAGE_MONITORS_CONFIG_ASYNC,
handle_dev_monitors_config_async,
sizeof(RedWorkerMessageMonitorsConfigAsync),
false);
dispatcher->register_handler(RED_WORKER_MESSAGE_DRIVER_UNLOAD,
handle_dev_driver_unload,
sizeof(RedWorkerMessageDriverUnload),
false);
dispatcher->register_handler(RED_WORKER_MESSAGE_GL_SCANOUT,
handle_dev_gl_scanout,
sizeof(RedWorkerMessageGlScanout),
false);
dispatcher->register_handler(RED_WORKER_MESSAGE_GL_DRAW_ASYNC,
handle_dev_gl_draw_async,
sizeof(RedWorkerMessageGlDraw),
false);
dispatcher->register_handler(RED_WORKER_MESSAGE_CLOSE_WORKER,
handle_dev_close,
sizeof(RedWorkerMessageClose),
false);
}
@ -1057,12 +1026,12 @@ RedWorker* red_worker_new(QXLInstance *qxl)
worker->record = reds_get_record(reds);
dispatcher = red_qxl_get_dispatcher(qxl);
dispatcher_set_opaque(dispatcher, worker);
dispatcher->set_opaque(worker);
worker->qxl = qxl;
register_callbacks(dispatcher);
if (worker->record) {
dispatcher_register_universal_handler(dispatcher, worker_dispatcher_record);
dispatcher->register_universal_handler(worker_dispatcher_record);
}
worker->driver_cap_monitors_config = false;
@ -1074,8 +1043,7 @@ RedWorker* red_worker_new(QXLInstance *qxl)
stat_init_counter(&worker->full_loop_counter, reds, &worker->stat, "full_loops", TRUE);
stat_init_counter(&worker->total_loop_counter, reds, &worker->stat, "total_loops", TRUE);
worker->dispatch_watch =
dispatcher_create_watch(dispatcher, &worker->core);
worker->dispatch_watch = dispatcher->create_watch(&worker->core);
spice_assert(worker->dispatch_watch != NULL);
GSource *source = g_source_new(&worker_source_funcs, sizeof(RedWorkerSource));

View File

@ -3467,7 +3467,7 @@ static int do_spice_init(RedsState *reds, SpiceCoreInterface *core_interface)
reds->agent_dev = red_char_device_vdi_port_new(reds);
reds_update_agent_properties(reds);
reds->clients = NULL;
reds->main_dispatcher = main_dispatcher_new(reds);
reds->main_dispatcher = new MainDispatcher(reds);
reds->channels = NULL;
reds->mig_target_clients = NULL;
reds->char_devices = NULL;

View File

@ -707,9 +707,7 @@ static void update_client_playback_delay(void *opaque, uint32_t delay_ms)
dcc_set_max_stream_latency(dcc, delay_ms);
}
spice_debug("resetting client latency: %u", dcc_get_max_stream_latency(dcc));
main_dispatcher_set_mm_time_latency(reds_get_main_dispatcher(reds),
client,
dcc_get_max_stream_latency(dcc));
reds_get_main_dispatcher(reds)->set_mm_time_latency(client, dcc_get_max_stream_latency(dcc));
}
static void bitmap_ref(gpointer data)