diff --git a/server/spicevmc.c b/server/spicevmc.c index fea5be11..619f08cb 100644 --- a/server/spicevmc.c +++ b/server/spicevmc.c @@ -34,14 +34,14 @@ #include "reds.h" #include "migration-protocol.h" -/* todo: add flow control. i.e., - * (a) limit the tokens available for the client - * (b) limit the tokens available for the server - */ /* 64K should be enough for all but the largest writes + 32 bytes hdr */ #define BUF_SIZE (64 * 1024 + 32) #define COMPRESS_THRESHOLD 1000 +// limit of the queued data, at this limit we stop reading from device to +// avoid DoS +#define QUEUED_DATA_LIMIT (1024*1024) + SPICE_DECLARE_TYPE(RedCharDeviceSpiceVmc, red_char_device_spicevmc, CHAR_DEVICE_SPICEVMC); #define RED_TYPE_CHAR_DEVICE_SPICEVMC red_char_device_spicevmc_get_type() @@ -83,6 +83,7 @@ struct RedCharDeviceSpiceVmcClass static RedCharDevice *red_char_device_spicevmc_new(SpiceCharDeviceInstance *sin, RedsState *reds, RedVmcChannel *channel); +static void spicevmc_red_channel_queue_data(RedVmcChannel *channel, RedVmcPipeItem *item); G_DEFINE_TYPE(RedCharDeviceSpiceVmc, red_char_device_spicevmc, RED_TYPE_CHAR_DEVICE) @@ -96,6 +97,7 @@ struct RedVmcChannel RedVmcPipeItem *pipe_item; RedCharDeviceWriteBuffer *recv_from_client_buf; uint8_t port_opened; + uint32_t queued_data; RedStatCounter in_data; RedStatCounter in_compressed; RedStatCounter in_decompressed; @@ -337,7 +339,7 @@ static RedPipeItem *spicevmc_chardev_read_msg_from_dev(RedCharDevice *self, sif = spice_char_device_get_interface(sin); - if (!channel->rcc) { + if (!channel->rcc || channel->queued_data >= QUEUED_DATA_LIMIT) { return NULL; } @@ -360,14 +362,14 @@ static RedPipeItem *spicevmc_chardev_read_msg_from_dev(RedCharDevice *self, msg_item_compressed = try_compress_lz4(channel, n, msg_item); if (msg_item_compressed != NULL) { - red_channel_client_pipe_add_push(channel->rcc, &msg_item_compressed->base); + spicevmc_red_channel_queue_data(channel, msg_item_compressed); return NULL; } #endif stat_inc_counter(channel->out_data, n); msg_item->uncompressed_data_size = n; msg_item->buf_used = n; - red_channel_client_pipe_add_push(channel->rcc, &msg_item->base); + spicevmc_red_channel_queue_data(channel, msg_item); return NULL; } channel->pipe_item = msg_item; @@ -609,11 +611,19 @@ static void spicevmc_red_channel_release_msg_rcv_buf(RedChannelClient *rcc, } } +static void +spicevmc_red_channel_queue_data(RedVmcChannel *channel, RedVmcPipeItem *item) +{ + channel->queued_data += item->buf_used; + red_channel_client_pipe_add_push(channel->rcc, &item->base); +} + static void spicevmc_red_channel_send_data(RedChannelClient *rcc, SpiceMarshaller *m, RedPipeItem *item) { RedVmcPipeItem *i = SPICE_UPCAST(RedVmcPipeItem, item); + RedVmcChannel *channel = RED_VMC_CHANNEL(red_channel_client_get_channel(rcc)); /* for compatibility send using not compressed data message */ if (i->type == SPICE_DATA_COMPRESSION_TYPE_NONE) { @@ -630,6 +640,14 @@ static void spicevmc_red_channel_send_data(RedChannelClient *rcc, red_pipe_item_ref(item); spice_marshaller_add_by_ref_full(m, i->buf, i->buf_used, marshaller_unref_pipe_item, item); + + // account for sent data and wake up device if was blocked + uint32_t old_queued_data = channel->queued_data; + channel->queued_data -= i->buf_used; + if (channel->chardev && + old_queued_data >= QUEUED_DATA_LIMIT && channel->queued_data < QUEUED_DATA_LIMIT) { + red_char_device_wakeup(channel->chardev); + } } static void spicevmc_red_channel_send_migrate_data(RedChannelClient *rcc, @@ -768,6 +786,7 @@ static void spicevmc_connect(RedChannel *channel, RedClient *client, return; } vmc_channel->rcc = rcc; + vmc_channel->queued_data = 0; red_channel_client_ack_zero_messages_window(rcc); if (strcmp(sin->subtype, "port") == 0) {