agent: employ SpiceCharDeviceState for managing reading from the device

This commit is contained in:
Yonit Halperin 2012-06-24 13:22:36 +03:00
parent 80145817ae
commit bf1d9007b4
3 changed files with 229 additions and 125 deletions

View File

@ -81,13 +81,8 @@ typedef struct TokensPipeItem {
int tokens;
} TokensPipeItem;
typedef struct AgentDataPipeItemRefs {
int refs;
} AgentDataPipeItemRefs;
typedef struct AgentDataPipeItem {
PipeItem base;
AgentDataPipeItemRefs *refs;
uint8_t* data;
size_t len;
spice_marshaller_item_free_func free_data;
@ -232,26 +227,18 @@ static PipeItem *main_agent_tokens_item_new(RedChannelClient *rcc, uint32_t num_
return &item->base;
}
typedef struct MainAgentDataItemInfo {
uint8_t* data;
size_t len;
spice_marshaller_item_free_func free_data;
void *opaque;
AgentDataPipeItemRefs *refs;
} MainAgentDataItemInfo;
static PipeItem *main_agent_data_item_new(RedChannelClient *rcc, void *data, int num)
static PipeItem *main_agent_data_item_new(RedChannelClient *rcc, uint8_t* data, size_t len,
spice_marshaller_item_free_func free_data,
void *opaque)
{
MainAgentDataItemInfo *info = data;
AgentDataPipeItem *item = spice_malloc(sizeof(AgentDataPipeItem));
red_channel_pipe_item_init(rcc->channel, &item->base,
SPICE_MSG_MAIN_AGENT_DATA);
item->refs = info->refs;
item->data = info->data;
item->len = info->len;
item->free_data = info->free_data;
item->opaque = info->opaque;
item->data = data;
item->len = len;
item->free_data = free_data;
item->opaque = opaque;
return &item->base;
}
@ -432,20 +419,13 @@ static void main_channel_marshall_tokens(SpiceMarshaller *m, uint32_t num_tokens
spice_marshall_msg_main_agent_token(m, &tokens);
}
void main_channel_push_agent_data(MainChannel *main_chan, uint8_t* data, size_t len,
void main_channel_client_push_agent_data(MainChannelClient *mcc, uint8_t* data, size_t len,
spice_marshaller_item_free_func free_data, void *opaque)
{
MainAgentDataItemInfo info = {
.data = data,
.len = len,
.free_data = free_data,
.opaque = opaque,
.refs = spice_malloc(sizeof(AgentDataPipeItemRefs)),
};
PipeItem *item;
info.refs->refs = main_chan->base.clients_num;
red_channel_pipes_new_add_push(&main_chan->base,
main_agent_data_item_new, &info);
item = main_agent_data_item_new(&mcc->base, data, len, free_data, opaque);
red_channel_client_pipe_add_push(&mcc->base, item);
}
static void main_channel_marshall_agent_data(SpiceMarshaller *m,
@ -760,14 +740,10 @@ static void main_channel_release_pipe_item(RedChannelClient *rcc,
{
switch (base->type) {
case SPICE_MSG_MAIN_AGENT_DATA: {
AgentDataPipeItem *data = (AgentDataPipeItem*)base;
if (!--data->refs->refs) {
spice_debug("SPICE_MSG_MAIN_AGENT_DATA %p %p, %d",
data, data->refs, data->refs->refs);
free(data->refs);
AgentDataPipeItem *data = (AgentDataPipeItem *)base;
data->free_data(data->data, data->opaque);
}
break;
break;
}
default:
break;

View File

@ -73,8 +73,8 @@ void main_channel_push_mouse_mode(MainChannel *main_chan, int current_mode, int
void main_channel_push_agent_connected(MainChannel *main_chan);
void main_channel_push_agent_disconnected(MainChannel *main_chan);
void main_channel_client_push_agent_tokens(MainChannelClient *mcc, uint32_t num_tokens);
void main_channel_push_agent_data(MainChannel *main_chan, uint8_t* data, size_t len,
spice_marshaller_item_free_func free_data, void *opaque);
void main_channel_client_push_agent_data(MainChannelClient *mcc, uint8_t* data, size_t len,
spice_marshaller_item_free_func free_data, void *opaque);
void main_channel_client_start_net_test(MainChannelClient *mcc);
// TODO: huge. Consider making a reds_* interface for these functions
// and calling from main.

View File

@ -146,24 +146,27 @@ typedef struct MonitorMode {
typedef struct VDIReadBuf {
RingItem link;
uint32_t refs;
int len;
uint8_t data[SPICE_AGENT_MAX_DATA_SIZE];
} VDIReadBuf;
static VDIReadBuf *vdi_port_read_buf_get(void);
static VDIReadBuf *vdi_port_read_buf_ref(VDIReadBuf *buf);
static void vdi_port_read_buf_unref(VDIReadBuf *buf);
enum {
VDI_PORT_READ_STATE_READ_HADER,
VDI_PORT_READ_STATE_GET_BUFF,
VDI_PORT_READ_STATE_READ_DATA,
};
void vdagent_char_device_wakeup(SpiceCharDeviceInstance *sin);
struct SpiceCharDeviceState vdagent_char_device_state = {
.wakeup = &vdagent_char_device_wakeup,
};
typedef struct VDIPortState {
SpiceCharDeviceState *base;
uint32_t plug_generation;
/* write to agent */
uint32_t num_tokens;
uint32_t num_client_tokens;
Ring external_bufs;
@ -171,6 +174,7 @@ typedef struct VDIPortState {
Ring write_queue;
AgentMsgFilter write_filter;
/* read from agent */
Ring read_bufs;
uint32_t read_state;
uint32_t message_recive_len;
@ -615,7 +619,7 @@ static void reds_reset_vdp(void)
state->recive_len = sizeof(state->vdi_chunk_header);
state->message_recive_len = 0;
if (state->current_read_buf) {
ring_add(&state->read_bufs, &state->current_read_buf->link);
vdi_port_read_buf_unref(state->current_read_buf);
state->current_read_buf = NULL;
}
/* Reset read filter to start with clean state when the agent reconnects */
@ -625,6 +629,21 @@ static void reds_reset_vdp(void)
state->write_filter.result = AGENT_MSG_FILTER_DISCARD;
state->write_filter.discard_all = TRUE;
/* reseting and not destroying the state as a workaround for a bad
* tokens management in the vdagent protocol:
* The client tokens' are set only once, when the main channel is initialized.
* Instead, it would have been more appropriate to reset them upon AGEN_CONNECT.
* The client tokens are tracked as part of the SpiceCharDeviceClientState. Thus,
* in order to be backwartd compatible with the client, we need to track the tokens
* event when the agent is detached. We don't destroy the the char_device state, and
* instead we just reset it.
* In addition, there is a misshandling of AGENT_TOKENS message in spice-gtk: it
* overrides the amount of tokens, instead of adding the given amount.
*
* TODO: change AGENT_CONNECT msg to contain tokens count.
*/
spice_char_device_reset(state->base);
sif = SPICE_CONTAINEROF(vdagent->base.sif, SpiceCharDeviceInterface, base);
if (sif->state) {
sif->state(vdagent, 0);
@ -658,6 +677,15 @@ void reds_client_disconnect(RedClient *client)
if (mig_client) {
reds_mig_target_client_free(mig_client);
}
if (reds->agent_state.base) {
/* note that vdagent might be NULL, if the vdagent was once
* up and than was removed */
if (spice_char_device_client_exists(reds->agent_state.base, client)) {
spice_char_device_client_remove(reds->agent_state.base, client);
}
}
ring_remove(&client->link);
reds->num_clients--;
red_client_destroy(client);
@ -823,22 +851,19 @@ static int write_to_vdi_port(void)
return total;
}
static int read_from_vdi_port(void);
/*******************************
* Char device state callbacks *
* *****************************/
static void vdi_read_buf_release(uint8_t *data, void *opaque)
static void vdi_port_read_buf_release(uint8_t *data, void *opaque)
{
VDIReadBuf *buf = (VDIReadBuf *)opaque;
ring_add(&reds->agent_state.read_bufs, &buf->link);
/* read_from_vdi_port() may have never completed because the read_bufs
ring was empty. So we call it again so it can complete its work if
necessary. Note since we can be called from read_from_vdi_port ourselves
this can cause recursion, read_from_vdi_port() contains code protecting
it against this. */
while (read_from_vdi_port());
vdi_port_read_buf_unref(buf);
}
static void dispatch_vdi_port_data(int port, VDIReadBuf *buf)
/* returns TRUE if the buffer can be forwarded */
static int vdi_port_read_buf_process(int port, VDIReadBuf *buf)
{
VDIPortState *state = &reds->agent_state;
int res;
@ -849,88 +874,91 @@ static void dispatch_vdi_port_data(int port, VDIReadBuf *buf)
buf->data, buf->len);
switch (res) {
case AGENT_MSG_FILTER_OK:
break;
return TRUE;
case AGENT_MSG_FILTER_DISCARD:
ring_add(&state->read_bufs, &buf->link);
return;
return FALSE;
case AGENT_MSG_FILTER_PROTO_ERROR:
ring_add(&state->read_bufs, &buf->link);
reds_agent_remove();
return;
return FALSE;
}
main_channel_push_agent_data(reds->main_channel, buf->data, buf->len,
vdi_read_buf_release, buf);
break;
}
case VDP_SERVER_PORT:
ring_add(&state->read_bufs, &buf->link);
break;
return FALSE;
default:
ring_add(&state->read_bufs, &buf->link);
spice_printerr("invalid port");
reds_agent_remove();
return FALSE;
}
}
/* Note this function MUST always be called in a while loop until it
returns 0. This is needed because it can cause new data available events
and its recursion protection causes those to get lost. Calling it until
it returns 0 ensures that all data has been consumed. */
static int read_from_vdi_port(void)
static VDIReadBuf *vdi_port_read_buf_get(void)
{
VDIPortState *state = &reds->agent_state;
RingItem *item;
VDIReadBuf *buf;
if (!(item = ring_get_head(&state->read_bufs))) {
return NULL;
}
ring_remove(item);
buf = SPICE_CONTAINEROF(item, VDIReadBuf, link);
buf->refs = 1;
return buf;
}
static VDIReadBuf* vdi_port_read_buf_ref(VDIReadBuf *buf)
{
buf->refs++;
return buf;
}
static void vdi_port_read_buf_unref(VDIReadBuf *buf)
{
if (!--buf->refs) {
ring_add(&reds->agent_state.read_bufs, &buf->link);
/* read_one_msg_from_vdi_port may have never completed because the read_bufs
ring was empty. So we call it again so it can complete its work if
necessary. Note that since we can be called from spice_char_device_wakeup
this can cause recursion, but we have protection for that */
spice_char_device_wakeup(reds->agent_state.base);
}
}
/* reads from the device till completes reading a message that is addressed to the client,
* or otherwise, when reading from the device fails */
static SpiceCharDeviceMsgToClient *vdi_port_read_one_msg_from_device(SpiceCharDeviceInstance *sin,
void *opaque)
{
/* There are 2 scenarios where we can get called recursively:
1) spice-vmc vmc_read triggering flush of throttled data, recalling us
2) the buf we push to the client may be send immediately without
blocking, in which case its free function will recall us
This messes up the state machine, so ignore recursive calls.
This is why we always must be called in a loop. */
static int inside_call = 0;
int quit_loop = 0;
VDIPortState *state = &reds->agent_state;
SpiceCharDeviceInterface *sif;
VDIReadBuf *dispatch_buf;
int total = 0;
int n;
if (inside_call) {
return 0;
}
inside_call = 1;
if (!vdagent) {
// discard data only if we are migrating (?) or vdagent has not been
// initialized.
inside_call = 0;
return 0;
return NULL;
}
spice_assert(vdagent == sin);
sif = SPICE_CONTAINEROF(vdagent->base.sif, SpiceCharDeviceInterface, base);
while (!quit_loop && vdagent) {
while (vdagent) {
switch (state->read_state) {
case VDI_PORT_READ_STATE_READ_HADER:
n = sif->read(vdagent, state->recive_pos, state->recive_len);
if (!n) {
quit_loop = 1;
break;
return NULL;
}
total += n;
if ((state->recive_len -= n)) {
state->recive_pos += n;
quit_loop = 1;
break;
return NULL;
}
state->message_recive_len = state->vdi_chunk_header.size;
state->read_state = VDI_PORT_READ_STATE_GET_BUFF;
case VDI_PORT_READ_STATE_GET_BUFF: {
RingItem *item;
if (!(item = ring_get_head(&state->read_bufs))) {
quit_loop = 1;
break;
if (!(state->current_read_buf = vdi_port_read_buf_get())) {
return NULL;
}
ring_remove(item);
state->current_read_buf = (VDIReadBuf *)item;
state->recive_pos = state->current_read_buf->data;
state->recive_len = MIN(state->message_recive_len,
sizeof(state->current_read_buf->data));
@ -941,10 +969,8 @@ static int read_from_vdi_port(void)
case VDI_PORT_READ_STATE_READ_DATA:
n = sif->read(vdagent, state->recive_pos, state->recive_len);
if (!n) {
quit_loop = 1;
break;
return NULL;
}
total += n;
if ((state->recive_len -= n)) {
state->recive_pos += n;
break;
@ -959,18 +985,64 @@ static int read_from_vdi_port(void)
} else {
state->read_state = VDI_PORT_READ_STATE_GET_BUFF;
}
dispatch_vdi_port_data(state->vdi_chunk_header.port, dispatch_buf);
}
}
inside_call = 0;
return total;
if (vdi_port_read_buf_process(state->vdi_chunk_header.port, dispatch_buf)) {
return dispatch_buf;
} else {
vdi_port_read_buf_unref(dispatch_buf);
}
} /* END switch */
} /* END while */
return NULL;
}
void vdagent_char_device_wakeup(SpiceCharDeviceInstance *sin)
static SpiceCharDeviceMsgToClient *vdi_port_ref_msg_to_client(SpiceCharDeviceMsgToClient *msg,
void *opaque)
{
while (read_from_vdi_port());
return vdi_port_read_buf_ref(msg);
}
static void vdi_port_unref_msg_to_client(SpiceCharDeviceMsgToClient *msg,
void *opaque)
{
vdi_port_read_buf_unref(msg);
}
/* after calling this, we unref the message, and the ref is in the instance side */
static void vdi_port_send_msg_to_client(SpiceCharDeviceMsgToClient *msg,
RedClient *client,
void *opaque)
{
VDIReadBuf *agent_data_buf = msg;
main_channel_client_push_agent_data(red_client_get_main(client),
agent_data_buf->data,
agent_data_buf->len,
vdi_port_read_buf_release,
vdi_port_read_buf_ref(agent_data_buf));
}
static void vdi_port_send_tokens_to_client(RedClient *client, uint32_t tokens, void *opaque)
{
main_channel_client_push_agent_tokens(red_client_get_main(client),
tokens);
}
static void vdi_port_on_free_self_token(void *opaque)
{
if (inputs_inited() && reds->pending_mouse_event) {
spice_debug("pending mouse event");
reds_handle_agent_mouse_event(inputs_get_mouse_state());
}
}
static void vdi_port_remove_client(RedClient *client, void *opaque)
{
reds_client_disconnect(client);
}
/****************************************************************************/
int reds_has_vdagent(void)
{
return !!vdagent;
@ -1066,9 +1138,33 @@ void reds_fill_channels(SpiceMsgChannels *channels_info)
void reds_on_main_agent_start(MainChannelClient *mcc, uint32_t num_tokens)
{
SpiceCharDeviceState *dev_state = reds->agent_state.base;
RedClient *client;
if (!vdagent) {
return;
}
spice_assert(vdagent->st && vdagent->st == dev_state);
client = main_channel_client_get_base(mcc)->client;
/*
* Note that in older releases, send_tokens were set to ~0 on both client
* and server. The server ignored the client given tokens.
* Thanks to that, when an old client is connected to a new server,
* and vice versa, the sending from the server to the client won't have
* flow control, but will have no other problem.
*/
if (!spice_char_device_client_exists(dev_state, client)) {
spice_char_device_client_add(dev_state,
client,
TRUE, /* flow control */
REDS_VDI_PORT_NUM_RECEIVE_BUFFS,
REDS_AGENT_WINDOW_SIZE,
num_tokens);
} else {
spice_char_device_send_to_client_tokens_set(dev_state,
client,
num_tokens);
}
reds->agent_state.write_filter.discard_all = FALSE;
}
@ -1077,7 +1173,10 @@ void reds_on_main_agent_tokens(MainChannelClient *mcc, uint32_t num_tokens)
if (!vdagent) {
return;
}
spice_printerr("to be implemented");
spice_assert(vdagent->st);
spice_char_device_send_to_client_tokens_add(vdagent->st,
main_channel_client_get_base(mcc)->client,
num_tokens);
}
void reds_on_main_agent_data(MainChannelClient *mcc, void *message, size_t size)
@ -1445,7 +1544,6 @@ static void reds_handle_main_link(RedLinkInfo *link)
main_channel_client_start_net_test(mcc);
/* Now that we have a client, forward any pending agent data */
while (read_from_vdi_port());
} else {
reds_mig_target_client_add(client);
}
@ -1548,9 +1646,6 @@ void reds_on_client_migrate_complete(RedClient *client)
}
reds_mig_target_client_free(mig_client);
/* Now that we have a client, forward any pending agent data */
while (read_from_vdi_port());
}
static void reds_handle_other_links(RedLinkInfo *link)
@ -3096,10 +3191,29 @@ static void mm_timer_proc(void *opaque)
core->timer_start(reds->mm_timer, MM_TIMER_GRANULARITY_MS);
}
static void attach_to_red_agent(SpiceCharDeviceInstance *sin)
static SpiceCharDeviceState *attach_to_red_agent(SpiceCharDeviceInstance *sin)
{
VDIPortState *state = &reds->agent_state;
SpiceCharDeviceInterface *sif;
SpiceCharDeviceCallbacks char_dev_state_cbs;
if (!state->base) {
char_dev_state_cbs.read_one_msg_from_device = vdi_port_read_one_msg_from_device;
char_dev_state_cbs.ref_msg_to_client = vdi_port_ref_msg_to_client;
char_dev_state_cbs.unref_msg_to_client = vdi_port_unref_msg_to_client;
char_dev_state_cbs.send_msg_to_client = vdi_port_send_msg_to_client;
char_dev_state_cbs.send_tokens_to_client = vdi_port_send_tokens_to_client;
char_dev_state_cbs.remove_client = vdi_port_remove_client;
char_dev_state_cbs.on_free_self_token = vdi_port_on_free_self_token;
state->base = spice_char_device_state_create(sin,
REDS_TOKENS_TO_SEND,
REDS_NUM_INTERNAL_AGENT_MESSAGES,
&char_dev_state_cbs,
NULL);
} else {
spice_char_device_state_reset_dev_instance(state->base, sin);
}
vdagent = sin;
reds_update_mouse_mode();
@ -3110,18 +3224,25 @@ static void attach_to_red_agent(SpiceCharDeviceInstance *sin)
}
if (!reds_main_channel_connected()) {
return;
return state->base;
}
state->read_filter.discard_all = FALSE;
reds->agent_state.plug_generation++;
/* we will assoicate the client with the char device, upon reds_on_main_agent_start,
* in response to MSGC_AGENT_START */
main_channel_push_agent_connected(reds->main_channel);
return state->base;
}
SPICE_GNUC_VISIBLE void spice_server_char_device_wakeup(SpiceCharDeviceInstance* sin)
{
(*sin->st->wakeup)(sin);
if (sin->st->wakeup) {
sin->st->wakeup(sin);
} else {
spice_char_device_wakeup(sin->st);
}
}
#define SUBTYPE_VDAGENT "vdagent"
@ -3147,6 +3268,7 @@ static int spice_server_char_device_add_interface(SpiceServer *s,
{
SpiceCharDeviceInstance* char_device =
SPICE_CONTAINEROF(sin, SpiceCharDeviceInstance, base);
SpiceCharDeviceState *dev_state = NULL;
spice_printerr("CHAR_DEVICE %s", char_device->subtype);
if (strcmp(char_device->subtype, SUBTYPE_VDAGENT) == 0) {
@ -3154,8 +3276,7 @@ static int spice_server_char_device_add_interface(SpiceServer *s,
spice_printerr("vdagent already attached");
return -1;
}
char_device->st = &vdagent_char_device_state;
attach_to_red_agent(char_device);
dev_state = attach_to_red_agent(char_device);
}
#ifdef USE_SMARTCARD
else if (strcmp(char_device->subtype, SUBTYPE_SMARTCARD) == 0) {
@ -3167,6 +3288,12 @@ static int spice_server_char_device_add_interface(SpiceServer *s,
else if (strcmp(char_device->subtype, SUBTYPE_USBREDIR) == 0) {
spicevmc_device_connect(char_device, SPICE_CHANNEL_USBREDIR);
}
if (dev_state) {
spice_assert(char_device->st);
/* setting the char_device state to "started" for backward compatibily with
* qemu releases that don't call spice api for start/stop (not implemented yet) */
spice_char_device_start(char_device->st);
}
return 0;
}
@ -3189,6 +3316,7 @@ static void spice_server_char_device_remove_interface(SpiceBaseInstance *sin)
else if (strcmp(char_device->subtype, SUBTYPE_USBREDIR) == 0) {
spicevmc_device_disconnect(char_device);
}
char_device->st = NULL;
}
SPICE_GNUC_VISIBLE int spice_server_add_interface(SpiceServer *s,