Use generated demarshallers in server

This commit is contained in:
Alexander Larsson 2010-06-18 10:22:57 +02:00
parent ec10a1662f
commit c553fafe4b
3 changed files with 118 additions and 73 deletions

View File

@ -51,6 +51,7 @@
#include "jpeg_encoder.h"
#include "rect.h"
#include "marshaller.h"
#include "demarshallers.h"
#include "generated_marshallers.h"
//#define COMPRESS_STAT
@ -348,11 +349,12 @@ typedef struct RedChannel RedChannel;
typedef void (*disconnect_channel_proc)(RedChannel *channel);
typedef void (*hold_item_proc)(void *item);
typedef void (*release_item_proc)(RedChannel *channel, void *item);
typedef int (*handle_message_proc)(RedChannel *channel, SpiceDataHeader *message);
typedef int (*handle_message_proc)(RedChannel *channel, size_t size, uint32_t type, void *message);
struct RedChannel {
EventListener listener;
uint32_t id;
spice_parse_channel_func_t parser;
struct RedWorker *worker;
RedsStreamContext *peer;
int migrate;
@ -9888,15 +9890,11 @@ static void on_new_display_channel(RedWorker *worker)
}
}
static int channel_handle_message(RedChannel *channel, SpiceDataHeader *message)
static int channel_handle_message(RedChannel *channel, size_t size, uint32_t type, void *message)
{
switch (message->type) {
switch (type) {
case SPICE_MSGC_ACK_SYNC:
if (message->size != sizeof(uint32_t)) {
red_printf("bad message size");
return FALSE;
}
channel->client_ack_generation = *(uint32_t *)(message + 1);
channel->client_ack_generation = *(uint32_t *)message;
break;
case SPICE_MSGC_ACK:
if (channel->client_ack_generation == channel->ack_generation) {
@ -9906,7 +9904,7 @@ static int channel_handle_message(RedChannel *channel, SpiceDataHeader *message)
case SPICE_MSGC_DISCONNECTING:
break;
default:
red_printf("invalid message type %u", message->type);
red_printf("invalid message type %u", type);
return FALSE;
}
return TRUE;
@ -10146,7 +10144,7 @@ static int display_channel_handle_migrate_mark(DisplayChannel *channel)
return TRUE;
}
static int display_channel_handle_migrate_data(DisplayChannel *channel, SpiceDataHeader *message)
static int display_channel_handle_migrate_data(DisplayChannel *channel, size_t size, void *message)
{
DisplayChannelMigrateData *migrate_data;
int i;
@ -10156,11 +10154,11 @@ static int display_channel_handle_migrate_data(DisplayChannel *channel, SpiceDat
return FALSE;
}
channel->expect_migrate_data = FALSE;
if (message->size < sizeof(*migrate_data)) {
if (size < sizeof(*migrate_data)) {
red_printf("bad message size");
return FALSE;
}
migrate_data = (DisplayChannelMigrateData *)(message + 1);
migrate_data = (DisplayChannelMigrateData *)message;
if (migrate_data->magic != DISPLAY_MIGRATE_DATA_MAGIC ||
migrate_data->version != DISPLAY_MIGRATE_DATA_VERSION) {
red_printf("invalid content");
@ -10197,26 +10195,22 @@ static int display_channel_handle_migrate_data(DisplayChannel *channel, SpiceDat
return TRUE;
}
static int display_channel_handle_message(RedChannel *channel, SpiceDataHeader *message)
static int display_channel_handle_message(RedChannel *channel, size_t size, uint32_t type, void *message)
{
switch (message->type) {
switch (type) {
case SPICE_MSGC_DISPLAY_INIT:
if (message->size != sizeof(SpiceMsgcDisplayInit)) {
red_printf("bad message size");
return FALSE;
}
if (!((DisplayChannel *)channel)->expect_init) {
red_printf("unexpected SPICE_MSGC_DISPLAY_INIT");
return FALSE;
}
((DisplayChannel *)channel)->expect_init = FALSE;
return display_channel_init((DisplayChannel *)channel, (SpiceMsgcDisplayInit *)(message + 1));
return display_channel_init((DisplayChannel *)channel, (SpiceMsgcDisplayInit *)message);
case SPICE_MSGC_MIGRATE_FLUSH_MARK:
return display_channel_handle_migrate_mark((DisplayChannel *)channel);
case SPICE_MSGC_MIGRATE_DATA:
return display_channel_handle_migrate_data((DisplayChannel *)channel, message);
return display_channel_handle_migrate_data((DisplayChannel *)channel, size, message);
default:
return channel_handle_message(channel, message);
return channel_handle_message(channel, size, type, message);
}
}
@ -10249,19 +10243,34 @@ static void red_receive(RedChannel *channel)
} else {
channel->recive_data.now += n;
for (;;) {
SpiceDataHeader *message = channel->recive_data.message;
n = channel->recive_data.now - (uint8_t *)message;
SpiceDataHeader *header = channel->recive_data.message;
uint8_t *data = (uint8_t *)(header+1);
size_t parsed_size;
uint8_t *parsed;
n = channel->recive_data.now - (uint8_t *)header;
if (n < sizeof(SpiceDataHeader) ||
n < sizeof(SpiceDataHeader) + message->size) {
n < sizeof(SpiceDataHeader) + header->size) {
break;
}
if (!channel->handle_message(channel, message)) {
parsed = channel->parser((void *)data, data + header->size, header->type,
SPICE_VERSION_MINOR, &parsed_size);
if (parsed == NULL) {
red_printf("failed to parse message type %d", header->type);
channel->disconnect(channel);
return;
}
channel->recive_data.message = (SpiceDataHeader *)((uint8_t *)message +
sizeof(SpiceDataHeader) +
message->size);
if (!channel->handle_message(channel, parsed_size, header->type, parsed)) {
free(parsed);
channel->disconnect(channel);
return;
}
free(parsed);
channel->recive_data.message = (SpiceDataHeader *)((uint8_t *)header +
sizeof(SpiceDataHeader) +
header->size);
}
if (channel->recive_data.now == (uint8_t *)channel->recive_data.message) {
@ -10276,7 +10285,8 @@ static void red_receive(RedChannel *channel)
}
}
static RedChannel *__new_channel(RedWorker *worker, int size, RedsStreamContext *peer, int migrate,
static RedChannel *__new_channel(RedWorker *worker, int size, uint32_t channel_id,
RedsStreamContext *peer, int migrate,
event_listener_action_proc handler,
disconnect_channel_proc disconnect,
hold_item_proc hold_item,
@ -10306,6 +10316,7 @@ static RedChannel *__new_channel(RedWorker *worker, int size, RedsStreamContext
ASSERT(size >= sizeof(*channel));
channel = spice_malloc0(size);
channel->id = worker->id;
channel->parser = spice_get_client_channel_parser(channel_id, NULL);
channel->listener.refs = 1;
channel->listener.action = handler;
channel->disconnect = disconnect;
@ -10408,7 +10419,8 @@ static void handle_new_display_channel(RedWorker *worker, RedsStreamContext *pee
red_disconnect_display((RedChannel *)worker->display_channel);
if (!(display_channel = (DisplayChannel *)__new_channel(worker, sizeof(*display_channel), peer,
if (!(display_channel = (DisplayChannel *)__new_channel(worker, sizeof(*display_channel),
SPICE_CHANNEL_DISPLAY, peer,
migrate, handle_channel_events,
red_disconnect_display,
display_channel_hold_item,
@ -10508,7 +10520,8 @@ static void red_connect_cursor(RedWorker *worker, RedsStreamContext *peer, int m
red_disconnect_cursor((RedChannel *)worker->cursor_channel);
if (!(channel = (CursorChannel *)__new_channel(worker, sizeof(*channel), peer, migrate,
if (!(channel = (CursorChannel *)__new_channel(worker, sizeof(*channel),
SPICE_CHANNEL_CURSOR, peer, migrate,
handle_channel_events,
red_disconnect_cursor,
cursor_channel_hold_item,

View File

@ -52,6 +52,7 @@
#include "stat.h"
#include "ring.h"
#include "config.h"
#include "demarshallers.h"
#include "marshaller.h"
#include "generated_marshallers.h"
#ifdef HAVE_SLIRP
@ -113,11 +114,12 @@ static void openssl_init();
#define CAPS_LOCK_SCAN_CODE 0x3a
typedef struct IncomingHandler {
spice_parse_channel_func_t parser;
void *opaque;
int shut;
uint8_t buf[RECIVE_BUF_SIZE];
uint32_t end_pos;
void (*handle_message)(void *opaque, SpiceDataHeader *message);
void (*handle_message)(void *opaque, size_t size, uint32_t type, void *message);
} IncomingHandler;
typedef struct OutgoingHandler {
@ -792,9 +794,19 @@ static int handle_incoming(RedsStreamContext *peer, IncomingHandler *handler)
end = buf + pos;
while (buf + sizeof(SpiceDataHeader) <= end &&
buf + sizeof(SpiceDataHeader) + (header = (SpiceDataHeader *)buf)->size <= end) {
buf += sizeof(SpiceDataHeader) + header->size;
handler->handle_message(handler->opaque, header);
uint8_t *data = (uint8_t *)(header+1);
size_t parsed_size;
uint8_t *parsed;
buf += sizeof(SpiceDataHeader) + header->size;
parsed = handler->parser(data, data + header->size, header->type,
SPICE_VERSION_MINOR, &parsed_size);
if (parsed == NULL) {
red_printf("failed to parse message type %d", header->type);
return -1;
}
handler->handle_message(handler->opaque, parsed_size, header->type, parsed);
free(parsed);
if (handler->shut) {
return -1;
}
@ -1586,9 +1598,9 @@ static void main_channel_recive_migrate_data(MainMigrateData *data, uint8_t *end
ASSERT(state->num_client_tokens + state->num_tokens == REDS_AGENT_WINDOW_SIZE);
}
static void reds_main_handle_message(void *opaque, SpiceDataHeader *message)
static void reds_main_handle_message(void *opaque, size_t size, uint32_t type, void *message)
{
switch (message->type) {
switch (type) {
case SPICE_MSGC_MAIN_AGENT_START: {
SpiceMsgcMainAgentTokens *agent_start;
@ -1596,7 +1608,7 @@ static void reds_main_handle_message(void *opaque, SpiceDataHeader *message)
if (!reds->peer) {
return;
}
agent_start = (SpiceMsgcMainAgentTokens *)(message + 1);
agent_start = (SpiceMsgcMainAgentTokens *)message;
reds->agent_state.client_agent_started = TRUE;
reds->agent_state.send_tokens = agent_start->num_tokens;
read_from_vdi_port();
@ -1624,7 +1636,7 @@ static void reds_main_handle_message(void *opaque, SpiceDataHeader *message)
break;
}
if (message->size > SPICE_AGENT_MAX_DATA_SIZE) {
if (size > SPICE_AGENT_MAX_DATA_SIZE) {
red_printf("invalid agent message");
reds_disconnect();
break;
@ -1638,9 +1650,9 @@ static void reds_main_handle_message(void *opaque, SpiceDataHeader *message)
ring_remove(ring_item);
buf = (VDAgentExtBuf *)ring_item;
buf->base.now = (uint8_t *)&buf->base.chunk_header.port;
buf->base.write_len = message->size + sizeof(VDIChunkHeader);
buf->base.chunk_header.size = message->size;
memcpy(buf->buf, message + 1, message->size);
buf->base.write_len = size + sizeof(VDIChunkHeader);
buf->base.chunk_header.size = size;
memcpy(buf->buf, message, size);
ring_add(&reds->agent_state.write_queue, ring_item);
write_to_vdi_port();
break;
@ -1653,7 +1665,7 @@ static void reds_main_handle_message(void *opaque, SpiceDataHeader *message)
break;
}
token = (SpiceMsgcMainAgentTokens *)(message + 1);
token = (SpiceMsgcMainAgentTokens *)message;
reds->agent_state.send_tokens += token->num_tokens;
read_from_vdi_port();
break;
@ -1674,7 +1686,7 @@ static void reds_main_handle_message(void *opaque, SpiceDataHeader *message)
}
break;
case SPICE_MSGC_MAIN_MOUSE_MODE_REQUEST: {
switch (((SpiceMsgcMainMouseModeRequest *)(message + 1))->mode) {
switch (((SpiceMsgcMainMouseModeRequest *)message)->mode) {
case SPICE_MOUSE_MODE_CLIENT:
if (reds->is_client_mouse_allowed) {
reds_set_mouse_mode(SPICE_MOUSE_MODE_CLIENT);
@ -1691,7 +1703,7 @@ static void reds_main_handle_message(void *opaque, SpiceDataHeader *message)
break;
}
case SPICE_MSGC_PONG: {
SpiceMsgPing *ping = (SpiceMsgPing *)(message + 1);
SpiceMsgPing *ping = (SpiceMsgPing *)message;
uint64_t roundtrip;
struct timespec ts;
@ -1743,15 +1755,15 @@ static void reds_main_handle_message(void *opaque, SpiceDataHeader *message)
main_channel_push_migrate_data_item();
break;
case SPICE_MSGC_MIGRATE_DATA:
main_channel_recive_migrate_data((MainMigrateData *)(message + 1),
(uint8_t *)(message + 1) + message->size);
main_channel_recive_migrate_data((MainMigrateData *)message,
((uint8_t *)message) + size);
reds->mig_target = FALSE;
while (write_to_vdi_port() || read_from_vdi_port());
break;
case SPICE_MSGC_DISCONNECTING:
break;
default:
red_printf("unexpected type %d", message->type);
red_printf("unexpected type %d", type);
}
}
@ -2058,12 +2070,12 @@ static uint8_t kbd_get_leds(SpiceKbdInstance *sin)
return sif->get_leds(sin);
}
static void inputs_handle_input(void *opaque, SpiceDataHeader *header)
static void inputs_handle_input(void *opaque, size_t size, uint32_t type, void *message)
{
InputsState *state = (InputsState *)opaque;
uint8_t *buf = (uint8_t *)(header + 1);
uint8_t *buf = (uint8_t *)message;
switch (header->type) {
switch (type) {
case SPICE_MSGC_INPUTS_KEY_DOWN: {
SpiceMsgcKeyDown *key_up = (SpiceMsgcKeyDown *)buf;
if (key_up->code == CAPS_LOCK_SCAN_CODE || key_up->code == NUM_LOCK_SCAN_CODE ||
@ -2216,7 +2228,7 @@ static void inputs_handle_input(void *opaque, SpiceDataHeader *header)
case SPICE_MSGC_DISCONNECTING:
break;
default:
red_printf("unexpected type %d", header->type);
red_printf("unexpected type %d", type);
}
}
@ -2344,6 +2356,7 @@ static void inputs_link(Channel *channel, RedsStreamContext *peer, int migration
inputs_state->peer = peer;
inputs_state->end_pos = 0;
inputs_state->channel = channel;
inputs_state->in_handler.parser = spice_get_client_channel_parser(SPICE_CHANNEL_INPUTS, NULL);
inputs_state->in_handler.opaque = inputs_state;
inputs_state->in_handler.handle_message = inputs_handle_input;
inputs_state->out_handler.length = 0;
@ -3571,6 +3584,7 @@ static void do_spice_init(SpiceCoreInterface *core_interface)
reds->listen_socket = -1;
reds->secure_listen_socket = -1;
reds->peer = NULL;
reds->in_handler.parser = spice_get_client_channel_parser(SPICE_CHANNEL_MAIN, NULL);
reds->in_handler.handle_message = reds_main_handle_message;
ring_init(&reds->outgoing.pipe);
reds->outgoing.vec = reds->outgoing.vec_buf;

View File

@ -30,6 +30,7 @@
#include "snd_worker.h"
#include "marshaller.h"
#include "generated_marshallers.h"
#include "demarshallers.h"
#define MAX_SEND_VEC 100
@ -65,7 +66,7 @@ enum RecordCommand {
typedef struct SndChannel SndChannel;
typedef void (*send_messages_proc)(void *in_channel);
typedef int (*handle_message_proc)(SndChannel *channel, SpiceDataHeader *message);
typedef int (*handle_message_proc)(SndChannel *channel, size_t size, uint32_t type, void *message);
typedef void (*on_message_done_proc)(SndChannel *channel);
typedef void (*cleanup_channel_proc)(SndChannel *channel);
@ -74,6 +75,7 @@ typedef struct SndWorker SndWorker;
struct SndChannel {
RedsStreamContext *peer;
SndWorker *worker;
spice_parse_channel_func_t parser;
int active;
int client_active;
@ -266,12 +268,11 @@ static int snd_send_data(SndChannel *channel)
return TRUE;
}
static int snd_record_handle_write(RecordChannel *record_channel, SpiceDataHeader *message)
static int snd_record_handle_write(RecordChannel *record_channel, size_t size, void *message)
{
SpiceMsgcRecordPacket *packet;
uint32_t write_pos;
uint32_t* data;
uint32_t size;
uint32_t len;
uint32_t now;
@ -279,8 +280,8 @@ static int snd_record_handle_write(RecordChannel *record_channel, SpiceDataHeade
return FALSE;
}
packet = (SpiceMsgcRecordPacket *)(message + 1);
size = message->size - sizeof(*packet);
packet = (SpiceMsgcRecordPacket *)message;
size = size - sizeof(*packet);
if (record_channel->mode == SPICE_AUDIO_DATA_MODE_CELT_0_5_1) {
int celt_err = celt051_decode(record_channel->celt_decoder, packet->data, size,
@ -316,34 +317,34 @@ static int snd_record_handle_write(RecordChannel *record_channel, SpiceDataHeade
return TRUE;
}
static int snd_playback_handle_message(SndChannel *channel, SpiceDataHeader *message)
static int snd_playback_handle_message(SndChannel *channel, size_t size, uint32_t type, void *message)
{
if (!channel) {
return FALSE;
}
switch (message->type) {
switch (type) {
case SPICE_MSGC_DISCONNECTING:
break;
default:
red_printf("invalid message type %u", message->type);
red_printf("invalid message type %u", type);
return FALSE;
}
return TRUE;
}
static int snd_record_handle_message(SndChannel *channel, SpiceDataHeader *message)
static int snd_record_handle_message(SndChannel *channel, size_t size, uint32_t type, void *message)
{
RecordChannel *record_channel = (RecordChannel *)channel;
if (!channel) {
return FALSE;
}
switch (message->type) {
switch (type) {
case SPICE_MSGC_RECORD_DATA:
return snd_record_handle_write((RecordChannel *)channel, message);
return snd_record_handle_write((RecordChannel *)channel, size, message);
case SPICE_MSGC_RECORD_MODE: {
SpiceMsgcRecordMode *mode = (SpiceMsgcRecordMode *)(message + 1);
SpiceMsgcRecordMode *mode = (SpiceMsgcRecordMode *)message;
record_channel->mode = mode->mode;
record_channel->mode_time = mode->time;
if (record_channel->mode != SPICE_AUDIO_DATA_MODE_CELT_0_5_1 &&
@ -353,14 +354,14 @@ static int snd_record_handle_message(SndChannel *channel, SpiceDataHeader *messa
break;
}
case SPICE_MSGC_RECORD_START_MARK: {
SpiceMsgcRecordStartMark *mark = (SpiceMsgcRecordStartMark *)(message + 1);
SpiceMsgcRecordStartMark *mark = (SpiceMsgcRecordStartMark *)message;
record_channel->start_time = mark->time;
break;
}
case SPICE_MSGC_DISCONNECTING:
break;
case SPICE_MSGC_MIGRATE_DATA: {
RecordMigrateData* mig_data = (RecordMigrateData *)(message + 1);
RecordMigrateData* mig_data = (RecordMigrateData *)message;
if (mig_data->version != RECORD_MIG_VERSION) {
red_printf("invalid mig version");
break;
@ -371,7 +372,7 @@ static int snd_record_handle_message(SndChannel *channel, SpiceDataHeader *messa
break;
}
default:
red_printf("invalid message type %u", message->type);
red_printf("invalid message type %u", type);
return FALSE;
}
return TRUE;
@ -410,18 +411,31 @@ static void snd_receive(void* data)
} else {
channel->recive_data.now += n;
for (;;) {
SpiceDataHeader *message = channel->recive_data.message;
n = channel->recive_data.now - (uint8_t *)message;
if (n < sizeof(SpiceDataHeader) || n < sizeof(SpiceDataHeader) + message->size) {
SpiceDataHeader *header = channel->recive_data.message;
uint8_t *data = (uint8_t *)(header+1);
size_t parsed_size;
uint8_t *parsed;
n = channel->recive_data.now - (uint8_t *)header;
if (n < sizeof(SpiceDataHeader) || n < sizeof(SpiceDataHeader) + header->size) {
break;
}
if (!channel->handle_message(channel, message)) {
parsed = channel->parser((void *)data, data + header->size, header->type,
SPICE_VERSION_MINOR, &parsed_size);
if (parsed == NULL) {
red_printf("failed to parse message type %d", header->type);
snd_disconnect_channel(channel);
return;
}
channel->recive_data.message = (SpiceDataHeader *)((uint8_t *)message +
if (!channel->handle_message(channel, parsed_size, header->type, parsed)) {
free(parsed);
snd_disconnect_channel(channel);
return;
}
free(parsed);
channel->recive_data.message = (SpiceDataHeader *)((uint8_t *)header +
sizeof(SpiceDataHeader) +
message->size);
header->size);
}
if (channel->recive_data.now == (uint8_t *)channel->recive_data.message) {
channel->recive_data.now = channel->recive_data.buf;
@ -718,7 +732,8 @@ static void snd_record_send(void* data)
}
}
static SndChannel *__new_channel(SndWorker *worker, int size, RedsStreamContext *peer,
static SndChannel *__new_channel(SndWorker *worker, int size, uint32_t channel_id,
RedsStreamContext *peer,
int migrate, send_messages_proc send_messages,
handle_message_proc handle_message,
on_message_done_proc on_message_done,
@ -758,6 +773,7 @@ static SndChannel *__new_channel(SndWorker *worker, int size, RedsStreamContext
ASSERT(size >= sizeof(*channel));
channel = spice_malloc0(size);
channel->parser = spice_get_client_channel_parser(channel_id, NULL);
channel->peer = peer;
channel->worker = worker;
channel->recive_data.message = (SpiceDataHeader *)channel->recive_data.buf;
@ -941,6 +957,7 @@ static void snd_set_playback_peer(Channel *channel, RedsStreamContext *peer, int
if (!(playback_channel = (PlaybackChannel *)__new_channel(worker,
sizeof(*playback_channel),
SPICE_CHANNEL_PLAYBACK,
peer,
migration,
snd_playback_send,
@ -1106,6 +1123,7 @@ static void snd_set_record_peer(Channel *channel, RedsStreamContext *peer, int m
if (!(record_channel = (RecordChannel *)__new_channel(worker,
sizeof(*record_channel),
SPICE_CHANNEL_RECORD,
peer,
migration,
snd_record_send,