server: hookup agent-msg-filter discard-all functionality

This ensures that if the client or agent connects to the client-agent
"tunnel" while the other side is halfway through sending a multi part
message, the rest of the message gets discarded, and the connecting
party starts getting data at the beginning of the next message.
This commit is contained in:
Hans de Goede 2011-04-01 16:31:37 +02:00
parent 5d7cf4c002
commit 2df49c8a67

View File

@ -205,8 +205,6 @@ typedef struct VDIPortState {
AgentMsgFilter read_filter;
VDIChunkHeader vdi_chunk_header;
int client_agent_started;
} VDIPortState;
typedef struct InputsState {
@ -713,8 +711,12 @@ static void reds_reset_vdp()
ring_add(&state->read_bufs, &state->current_read_buf->link);
state->current_read_buf = NULL;
}
agent_msg_filter_init(&state->read_filter, agent_copypaste, FALSE);
state->client_agent_started = FALSE;
/* Reset read filter to start with clean state when the agent reconnects */
agent_msg_filter_init(&state->read_filter, agent_copypaste, TRUE);
/* Throw away pending chunks from the current (if any) and future
messages written by the client */
state->write_filter.result = AGENT_MSG_FILTER_DISCARD;
state->write_filter.discard_all = TRUE;
}
static void reds_reset_outgoing()
@ -744,8 +746,13 @@ static void reds_disconnect()
red_printf("");
reds->disconnecting = TRUE;
reds_reset_outgoing();
/* Reset write filter to start with clean state on client reconnect */
agent_msg_filter_init(&reds->agent_state.write_filter, agent_copypaste,
FALSE);
TRUE);
/* Throw away pending chunks from the current (if any) and future
messages read from the agent */
reds->agent_state.read_filter.result = AGENT_MSG_FILTER_DISCARD;
reds->agent_state.read_filter.discard_all = TRUE;
if (reds->agent_state.connected) {
SpiceCharDeviceInterface *sif;
@ -754,7 +761,6 @@ static void reds_disconnect()
if (sif->state) {
sif->state(vdagent, reds->agent_state.connected);
}
reds->agent_state.client_agent_started = FALSE;
}
reds_shatdown_channels();
@ -1239,17 +1245,10 @@ static void dispatch_vdi_port_data(int port, VDIReadBuf *buf)
reds_agent_remove();
return;
}
if (reds->agent_state.connected) {
item = new_out_item(SPICE_MSG_MAIN_AGENT_DATA);
spice_marshaller_add_ref_full(item->m, buf->data, buf->len,
vdi_read_buf_release, buf);
reds_push_pipe_item(item);
} else {
red_printf("throwing away, no client: %d", buf->len);
vdi_read_buf_release(buf->data, buf);
}
item = new_out_item(SPICE_MSG_MAIN_AGENT_DATA);
spice_marshaller_add_ref_full(item->m, buf->data, buf->len,
vdi_read_buf_release, buf);
reds_push_pipe_item(item);
break;
}
case VDP_SERVER_PORT:
@ -1436,7 +1435,7 @@ static void main_channel_push_migrate_data_item()
data->ping_id = reds->ping_id;
data->agent_connected = !!state->connected;
data->client_agent_started = state->client_agent_started;
data->client_agent_started = !state->write_filter.discard_all;
data->num_client_tokens = state->num_client_tokens;
data->send_tokens = ~0;
@ -1676,7 +1675,7 @@ static void main_channel_recive_migrate_data(MainMigrateData *data, uint8_t *end
return;
}
state->client_agent_started = data->client_agent_started;
state->write_filter.discard_all = !data->client_agent_started;
pos = (uint8_t *)(data + 1);
@ -1698,7 +1697,7 @@ static void reds_main_handle_message(void *opaque, size_t size, uint32_t type, v
if (!reds->peer) {
return;
}
reds->agent_state.client_agent_started = TRUE;
reds->agent_state.write_filter.discard_all = FALSE;
break;
case SPICE_MSGC_MAIN_AGENT_DATA: {
RingItem *ring_item;
@ -1724,17 +1723,6 @@ static void reds_main_handle_message(void *opaque, size_t size, uint32_t type, v
return;
}
if (!vdagent) {
add_token();
break;
}
if (!reds->agent_state.client_agent_started) {
red_printf("SPICE_MSGC_MAIN_AGENT_DATA race");
add_token();
break;
}
if (!(ring_item = ring_get_head(&reds->agent_state.external_bufs))) {
red_printf("no agent free bufs");
reds_disconnect();
@ -2088,6 +2076,7 @@ static void reds_handle_main_link(RedLinkInfo *link)
SpiceCharDeviceInterface *sif;
sif = SPICE_CONTAINEROF(vdagent->base.sif, SpiceCharDeviceInterface, base);
reds->agent_state.connected = 1;
reds->agent_state.read_filter.discard_all = FALSE;
if (sif->state) {
sif->state(vdagent, reds->agent_state.connected);
}
@ -3497,6 +3486,7 @@ static void attach_to_red_agent(SpiceCharDeviceInstance *sin)
}
sif = SPICE_CONTAINEROF(vdagent->base.sif, SpiceCharDeviceInterface, base);
state->connected = 1;
state->read_filter.discard_all = FALSE;
if (sif->state) {
sif->state(vdagent, state->connected);
}
@ -3751,8 +3741,8 @@ static void init_vd_agent_resources()
ring_init(&state->internal_bufs);
ring_init(&state->write_queue);
ring_init(&state->read_bufs);
agent_msg_filter_init(&state->write_filter, agent_copypaste, FALSE);
agent_msg_filter_init(&state->read_filter, agent_copypaste, FALSE);
agent_msg_filter_init(&state->write_filter, agent_copypaste, TRUE);
agent_msg_filter_init(&state->read_filter, agent_copypaste, TRUE);
state->read_state = VDI_PORT_READ_STATE_READ_HADER;
state->recive_pos = (uint8_t *)&state->vdi_chunk_header;