From d8e583805449a627470f005ea7886a23fef8b9dd Mon Sep 17 00:00:00 2001 From: Frediano Ziglio Date: Mon, 20 May 2019 15:39:15 +0100 Subject: [PATCH] red-channel-client: Move all functions to methods Improve incapsulation. The only not mechanical change is the addition of timer_add to make timer settings a bit more type safe. Signed-off-by: Frediano Ziglio --- server/red-channel-client.cpp | 525 +++++++++++++++++----------------- server/red-channel-client.h | 24 +- 2 files changed, 285 insertions(+), 264 deletions(-) diff --git a/server/red-channel-client.cpp b/server/red-channel-client.cpp index 5b10a7ff..ae2c1fcb 100644 --- a/server/red-channel-client.cpp +++ b/server/red-channel-client.cpp @@ -165,6 +165,27 @@ struct RedChannelClientPrivate RedStatCounter out_messages; RedStatCounter out_bytes; + + inline RedPipeItem *pipe_item_get(); + inline bool pipe_remove(RedPipeItem *item); + void handle_pong(SpiceMsgPing *ping); + inline void set_message_serial(uint64_t serial); + void pipe_clear(); + void data_sent(int n); + void data_read(int n); + inline int get_out_msg_size(); + inline int prepare_out_msg(struct iovec *vec, int vec_size, int pos); + inline void set_blocked(); + void reset_send_data(); + void seamless_migration_done(); + void clear_sent_item(); + void restart_ping_timer(); + void start_ping_timer(uint32_t timeout); + void cancel_ping_timer(); + inline int urgent_marshaller_is_active(); + inline int waiting_for_ack(); + inline void restore_main_sender(); + void watch_update_mask(int event_mask); }; static void full_header_set_msg_type(SpiceDataHeaderOpaque *header, uint16_t type); @@ -197,9 +218,6 @@ static const SpiceDataHeaderOpaque mini_header_wrapper = {NULL, sizeof(SpiceMini mini_header_get_msg_type, mini_header_get_msg_size}; -static void red_channel_client_clear_sent_item(RedChannelClient *rcc); -static void red_channel_client_set_message_serial(RedChannelClient *channel, uint64_t); - /* * When an error occurs over a channel, we treat it as a warning * for spice-server and shutdown the channel. @@ -224,46 +242,46 @@ typedef struct MarkerPipeItem { bool item_sent; } MarkerPipeItem; -static void red_channel_client_start_ping_timer(RedChannelClient *rcc, uint32_t timeout) +void RedChannelClientPrivate::start_ping_timer(uint32_t timeout) { - if (!rcc->priv->latency_monitor.timer) { + if (!latency_monitor.timer) { return; } - if (rcc->priv->latency_monitor.state != PING_STATE_NONE) { + if (latency_monitor.state != PING_STATE_NONE) { return; } - rcc->priv->latency_monitor.state = PING_STATE_TIMER; + latency_monitor.state = PING_STATE_TIMER; - red_timer_start(rcc->priv->latency_monitor.timer, timeout); + red_timer_start(latency_monitor.timer, timeout); } -static void red_channel_client_cancel_ping_timer(RedChannelClient *rcc) +void RedChannelClientPrivate::cancel_ping_timer() { - if (!rcc->priv->latency_monitor.timer) { + if (!latency_monitor.timer) { return; } - if (rcc->priv->latency_monitor.state != PING_STATE_TIMER) { + if (latency_monitor.state != PING_STATE_TIMER) { return; } - red_timer_cancel(rcc->priv->latency_monitor.timer); - rcc->priv->latency_monitor.state = PING_STATE_NONE; + red_timer_cancel(latency_monitor.timer); + latency_monitor.state = PING_STATE_NONE; } -static void red_channel_client_restart_ping_timer(RedChannelClient *rcc) +void RedChannelClientPrivate::restart_ping_timer() { uint64_t passed, timeout; - if (!rcc->priv->latency_monitor.timer) { + if (!latency_monitor.timer) { return; } - passed = (spice_get_monotonic_time_ns() - rcc->priv->latency_monitor.last_pong_time) / NSEC_PER_MILLISEC; + passed = (spice_get_monotonic_time_ns() - latency_monitor.last_pong_time) / NSEC_PER_MILLISEC; timeout = PING_TEST_IDLE_NET_TIMEOUT_MS; - if (passed < rcc->priv->latency_monitor.timeout) { - timeout += rcc->priv->latency_monitor.timeout - passed; + if (passed < latency_monitor.timeout) { + timeout += latency_monitor.timeout - passed; } - red_channel_client_start_ping_timer(rcc, timeout); + start_ping_timer(timeout); } RedChannelClient::~RedChannelClient() @@ -346,168 +364,165 @@ RedChannel* RedChannelClient::get_channel() return priv->channel; } -static void red_channel_client_data_sent(RedChannelClient *rcc, int n) +void RedChannelClientPrivate::data_sent(int n) { - if (rcc->priv->connectivity_monitor.timer) { - rcc->priv->connectivity_monitor.sent_bytes = true; + if (connectivity_monitor.timer) { + connectivity_monitor.sent_bytes = true; } - stat_inc_counter(rcc->priv->out_bytes, n); + stat_inc_counter(out_bytes, n); } -static void red_channel_client_data_read(RedChannelClient *rcc, int n) +void RedChannelClientPrivate::data_read(int n) { - if (rcc->priv->connectivity_monitor.timer) { - rcc->priv->connectivity_monitor.received_bytes = true; + if (connectivity_monitor.timer) { + connectivity_monitor.received_bytes = true; } } -static int red_channel_client_get_out_msg_size(RedChannelClient *rcc) +inline int RedChannelClientPrivate::get_out_msg_size() { - return rcc->priv->send_data.size; + return send_data.size; } -static int red_channel_client_prepare_out_msg(RedChannelClient *rcc, - struct iovec *vec, int vec_size, - int pos) +inline int RedChannelClientPrivate::prepare_out_msg(struct iovec *vec, int vec_size, int pos) { - return spice_marshaller_fill_iovec(rcc->priv->send_data.marshaller, + return spice_marshaller_fill_iovec(send_data.marshaller, vec, vec_size, pos); } -static void red_channel_client_set_blocked(RedChannelClient *rcc) +inline void RedChannelClientPrivate::set_blocked() { - rcc->priv->send_data.blocked = TRUE; + send_data.blocked = true; } -static inline int red_channel_client_urgent_marshaller_is_active(RedChannelClient *rcc) +inline int RedChannelClientPrivate::urgent_marshaller_is_active() { - return (rcc->priv->send_data.marshaller == rcc->priv->send_data.urgent.marshaller); + return send_data.marshaller == send_data.urgent.marshaller; } -static void red_channel_client_reset_send_data(RedChannelClient *rcc) +void RedChannelClientPrivate::reset_send_data() { - spice_marshaller_reset(rcc->priv->send_data.marshaller); - rcc->priv->send_data.header.data = spice_marshaller_reserve_space(rcc->priv->send_data.marshaller, - rcc->priv->send_data.header.header_size); - spice_marshaller_set_base(rcc->priv->send_data.marshaller, rcc->priv->send_data.header.header_size); - rcc->priv->send_data.header.set_msg_type(&rcc->priv->send_data.header, 0); - rcc->priv->send_data.header.set_msg_size(&rcc->priv->send_data.header, 0); + spice_marshaller_reset(send_data.marshaller); + send_data.header.data = spice_marshaller_reserve_space(send_data.marshaller, + send_data.header.header_size); + spice_marshaller_set_base(send_data.marshaller, send_data.header.header_size); + send_data.header.set_msg_type(&send_data.header, 0); + send_data.header.set_msg_size(&send_data.header, 0); - if (!rcc->priv->is_mini_header) { - spice_assert(rcc->priv->send_data.marshaller != rcc->priv->send_data.urgent.marshaller); - rcc->priv->send_data.header.set_msg_sub_list(&rcc->priv->send_data.header, 0); + if (!is_mini_header) { + spice_assert(send_data.marshaller != send_data.urgent.marshaller); + send_data.header.set_msg_sub_list(&send_data.header, 0); } } -static void red_channel_client_send_set_ack(RedChannelClient *rcc) +void RedChannelClient::send_set_ack() { SpiceMsgSetAck ack; - spice_assert(rcc); - rcc->init_send_data(SPICE_MSG_SET_ACK); - ack.generation = ++rcc->priv->ack_data.generation; - ack.window = rcc->priv->ack_data.client_window; - rcc->priv->ack_data.messages_window = 0; + init_send_data(SPICE_MSG_SET_ACK); + ack.generation = ++priv->ack_data.generation; + ack.window = priv->ack_data.client_window; + priv->ack_data.messages_window = 0; - spice_marshall_msg_set_ack(rcc->priv->send_data.marshaller, &ack); + spice_marshall_msg_set_ack(priv->send_data.marshaller, &ack); - rcc->begin_send_message(); + begin_send_message(); } -static void red_channel_client_send_migrate(RedChannelClient *rcc) +void RedChannelClient::send_migrate() { SpiceMsgMigrate migrate; - rcc->init_send_data(SPICE_MSG_MIGRATE); - g_object_get(rcc->priv->channel, "migration-flags", &migrate.flags, NULL); - spice_marshall_msg_migrate(rcc->priv->send_data.marshaller, &migrate); + init_send_data(SPICE_MSG_MIGRATE); + g_object_get(priv->channel, "migration-flags", &migrate.flags, NULL); + spice_marshall_msg_migrate(priv->send_data.marshaller, &migrate); if (migrate.flags & SPICE_MIGRATE_NEED_FLUSH) { - rcc->priv->wait_migrate_flush_mark = TRUE; + priv->wait_migrate_flush_mark = TRUE; } - rcc->begin_send_message(); + begin_send_message(); } -static void red_channel_client_send_ping(RedChannelClient *rcc) +void RedChannelClient::send_ping() { SpiceMsgPing ping; - if (!rcc->priv->latency_monitor.warmup_was_sent) { // latency test start + if (!priv->latency_monitor.warmup_was_sent) { // latency test start int delay_val; - rcc->priv->latency_monitor.warmup_was_sent = true; + priv->latency_monitor.warmup_was_sent = true; /* * When testing latency, TCP_NODELAY must be switched on, otherwise, * sending the ping message is delayed by Nagle algorithm, and the * roundtrip measurement is less accurate (bigger). */ - rcc->priv->latency_monitor.tcp_nodelay = true; - delay_val = red_stream_get_no_delay(rcc->priv->stream); + priv->latency_monitor.tcp_nodelay = true; + delay_val = red_stream_get_no_delay(priv->stream); if (delay_val != -1) { - rcc->priv->latency_monitor.tcp_nodelay = delay_val; + priv->latency_monitor.tcp_nodelay = delay_val; if (!delay_val) { - red_stream_set_no_delay(rcc->priv->stream, TRUE); + red_stream_set_no_delay(priv->stream, TRUE); } } } - rcc->init_send_data(SPICE_MSG_PING); - ping.id = rcc->priv->latency_monitor.id; + init_send_data(SPICE_MSG_PING); + ping.id = priv->latency_monitor.id; ping.timestamp = spice_get_monotonic_time_ns(); - spice_marshall_msg_ping(rcc->priv->send_data.marshaller, &ping); - rcc->begin_send_message(); + spice_marshall_msg_ping(priv->send_data.marshaller, &ping); + begin_send_message(); } -static void red_channel_client_send_empty_msg(RedChannelClient *rcc, RedPipeItem *base) +void RedChannelClient::send_empty_msg(RedPipeItem *base) { RedEmptyMsgPipeItem *msg_pipe_item = SPICE_UPCAST(RedEmptyMsgPipeItem, base); - rcc->init_send_data(msg_pipe_item->msg); - rcc->begin_send_message(); + init_send_data(msg_pipe_item->msg); + begin_send_message(); } -static void red_channel_client_send_item(RedChannelClient *rcc, RedPipeItem *item) +void RedChannelClient::send_item(RedPipeItem *item) { - spice_assert(rcc->no_item_being_sent()); - red_channel_client_reset_send_data(rcc); + spice_assert(no_item_being_sent()); + priv->reset_send_data(); switch (item->type) { case RED_PIPE_ITEM_TYPE_SET_ACK: - red_channel_client_send_set_ack(rcc); + send_set_ack(); break; case RED_PIPE_ITEM_TYPE_MIGRATE: - red_channel_client_send_migrate(rcc); + send_migrate(); break; case RED_PIPE_ITEM_TYPE_EMPTY_MSG: - red_channel_client_send_empty_msg(rcc, item); + send_empty_msg(item); break; case RED_PIPE_ITEM_TYPE_PING: - red_channel_client_send_ping(rcc); + send_ping(); break; case RED_PIPE_ITEM_TYPE_MARKER: SPICE_UPCAST(MarkerPipeItem, item)->item_sent = true; break; default: - rcc->priv->channel->send_item(rcc, item); + priv->channel->send_item(this, item); break; } red_pipe_item_unref(item); } -static void red_channel_client_restore_main_sender(RedChannelClient *rcc) +inline void RedChannelClientPrivate::restore_main_sender() { - rcc->priv->send_data.marshaller = rcc->priv->send_data.main.marshaller; - rcc->priv->send_data.header.data = rcc->priv->send_data.main.header_data; + send_data.marshaller = send_data.main.marshaller; + send_data.header.data = send_data.main.header_data; } -static void red_channel_client_msg_sent(RedChannelClient *rcc) +void RedChannelClient::msg_sent() { #ifndef _WIN32 int fd; - if (spice_marshaller_get_fd(rcc->priv->send_data.marshaller, &fd)) { - if (red_stream_send_msgfd(rcc->priv->stream, fd) < 0) { + if (spice_marshaller_get_fd(priv->send_data.marshaller, &fd)) { + if (red_stream_send_msgfd(priv->stream, fd) < 0) { perror("sendfd"); - rcc->disconnect(); + disconnect(); if (fd != -1) close(fd); return; @@ -517,24 +532,24 @@ static void red_channel_client_msg_sent(RedChannelClient *rcc) } #endif - red_channel_client_clear_sent_item(rcc); + priv->clear_sent_item(); - if (red_channel_client_urgent_marshaller_is_active(rcc)) { - red_channel_client_restore_main_sender(rcc); - spice_assert(rcc->priv->send_data.header.data != NULL); - rcc->begin_send_message(); + if (priv->urgent_marshaller_is_active()) { + priv->restore_main_sender(); + spice_assert(priv->send_data.header.data != NULL); + begin_send_message(); } else { - if (g_queue_is_empty(&rcc->priv->pipe)) { + if (g_queue_is_empty(&priv->pipe)) { /* It is possible that the socket will become idle, so we may be able to test latency */ - red_channel_client_restart_ping_timer(rcc); + priv->restart_ping_timer(); } } } -static gboolean red_channel_client_pipe_remove(RedChannelClient *rcc, RedPipeItem *item) +bool RedChannelClientPrivate::pipe_remove(RedPipeItem *item) { - return g_queue_remove(&rcc->priv->pipe, item); + return g_queue_remove(&pipe, item); } bool RedChannelClient::test_remote_common_cap(uint32_t cap) const @@ -551,24 +566,23 @@ bool RedChannelClient::test_remote_cap(uint32_t cap) const cap); } -static void red_channel_client_push_ping(RedChannelClient *rcc) +void RedChannelClient::push_ping() { - spice_assert(rcc->priv->latency_monitor.state == PING_STATE_NONE); - rcc->priv->latency_monitor.state = PING_STATE_WARMUP; - rcc->priv->latency_monitor.warmup_was_sent = false; - rcc->priv->latency_monitor.id = rand(); - rcc->pipe_add_type(RED_PIPE_ITEM_TYPE_PING); - rcc->pipe_add_type(RED_PIPE_ITEM_TYPE_PING); + spice_assert(priv->latency_monitor.state == PING_STATE_NONE); + priv->latency_monitor.state = PING_STATE_WARMUP; + priv->latency_monitor.warmup_was_sent = false; + priv->latency_monitor.id = rand(); + pipe_add_type(RED_PIPE_ITEM_TYPE_PING); + pipe_add_type(RED_PIPE_ITEM_TYPE_PING); } -static void red_channel_client_ping_timer(void *opaque) +void RedChannelClient::ping_timer(void *opaque) { RedChannelClient *rcc = (RedChannelClient *) opaque; rcc->ref(); - spice_assert(rcc->priv->latency_monitor.state == PING_STATE_TIMER); - red_channel_client_cancel_ping_timer(rcc); + rcc->priv->cancel_ping_timer(); #ifdef HAVE_LINUX_SOCKIOS_H /* SIOCOUTQ is a Linux only ioctl on sockets. */ int so_unsent_size = 0; @@ -580,25 +594,25 @@ static void red_channel_client_ping_timer(void *opaque) } if (so_unsent_size > 0) { /* tcp send buffer is still occupied. rescheduling ping */ - red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); + rcc->priv->start_ping_timer(PING_TEST_IDLE_NET_TIMEOUT_MS); rcc->unref(); return; } #endif /* ifdef HAVE_LINUX_SOCKIOS_H */ /* More portable alternative code path (less accurate but avoids bogus ioctls)*/ - red_channel_client_push_ping(rcc); + rcc->push_ping(); rcc->unref(); } -static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc) +inline int RedChannelClientPrivate::waiting_for_ack() { gboolean handle_acks; - g_object_get(rcc->priv->channel, + g_object_get(channel, "handle-acks", &handle_acks, NULL); - return (handle_acks && (rcc->priv->ack_data.messages_window > - rcc->priv->ack_data.client_window * 2)); + return (handle_acks && (ack_data.messages_window > + ack_data.client_window * 2)); } /* @@ -613,7 +627,7 @@ static inline int red_channel_client_waiting_for_ack(RedChannelClient *rcc) * been idle during the time that passed since the previous timer call. If the connection * has been idle, we consider the client as disconnected. */ -static void red_channel_client_connectivity_timer(void *opaque) +void RedChannelClient::connectivity_timer(void *opaque) { RedChannelClient *rcc = (RedChannelClient *) opaque; RedChannelClientConnectivityMonitor *monitor = &rcc->priv->connectivity_monitor; @@ -623,7 +637,7 @@ static void red_channel_client_connectivity_timer(void *opaque) if (monitor->state == CONNECTIVITY_STATE_BLOCKED) { if (!monitor->received_bytes && !monitor->sent_bytes) { - if (!rcc->is_blocked() && !red_channel_client_waiting_for_ack(rcc)) { + if (!rcc->is_blocked() && !rcc->priv->waiting_for_ack()) { spice_error("mismatch between rcc-state and connectivity-state"); } spice_debug("rcc is blocked; connection is idle"); @@ -643,7 +657,7 @@ static void red_channel_client_connectivity_timer(void *opaque) if (is_alive) { monitor->received_bytes = false; monitor->sent_bytes = false; - if (rcc->is_blocked() || red_channel_client_waiting_for_ack(rcc)) { + if (rcc->is_blocked() || rcc->priv->waiting_for_ack()) { monitor->state = CONNECTIVITY_STATE_BLOCKED; } else if (rcc->priv->latency_monitor.state == PING_STATE_WARMUP || rcc->priv->latency_monitor.state == PING_STATE_LATENCY) { @@ -677,20 +691,20 @@ void RedChannelClient::start_connectivity_monitoring(uint32_t timeout_ms) * on this channel. */ if (priv->latency_monitor.timer == NULL) { - priv->latency_monitor.timer = core->timer_add( - core, red_channel_client_ping_timer, this); + priv->latency_monitor.timer = + core->timer_add(core, ping_timer, this); priv->latency_monitor.roundtrip = -1; } else { - red_channel_client_cancel_ping_timer(this); + priv->cancel_ping_timer(); } priv->latency_monitor.timeout = PING_TEST_TIMEOUT_MS; if (!red_client_during_migrate_at_target(priv->client)) { - red_channel_client_start_ping_timer(this, PING_TEST_IDLE_NET_TIMEOUT_MS); + priv->start_ping_timer(PING_TEST_IDLE_NET_TIMEOUT_MS); } if (priv->connectivity_monitor.timer == NULL) { priv->connectivity_monitor.state = CONNECTIVITY_STATE_CONNECTED; - priv->connectivity_monitor.timer = core->timer_add( - core, red_channel_client_connectivity_timer, this); + priv->connectivity_monitor.timer = + core->timer_add(core, connectivity_timer, this); priv->connectivity_monitor.timeout = timeout_ms; if (!red_client_during_migrate_at_target(priv->client)) { red_timer_start(priv->connectivity_monitor.timer, @@ -803,13 +817,12 @@ bool RedChannelClient::init() red_channel_client_event, self); - if (red_stream_get_family(self->priv->stream) != AF_UNIX) { - self->priv->latency_monitor.timer = - core->timer_add(core, red_channel_client_ping_timer, self); + if (red_stream_get_family(priv->stream) != AF_UNIX) { + priv->latency_monitor.timer = + core->timer_add(core, ping_timer, this); if (!red_client_during_migrate_at_target(self->priv->client)) { - red_channel_client_start_ping_timer(self, - PING_TEST_IDLE_NET_TIMEOUT_MS); + priv->start_ping_timer(PING_TEST_IDLE_NET_TIMEOUT_MS); } self->priv->latency_monitor.roundtrip = -1; self->priv->latency_monitor.timeout = @@ -831,18 +844,17 @@ cleanup: return local_error == NULL; } -static void -red_channel_client_watch_update_mask(RedChannelClient *rcc, int event_mask) +void RedChannelClientPrivate::watch_update_mask(int event_mask) { - if (!rcc->priv->stream->watch) { + if (!stream->watch) { return; } - if (rcc->priv->block_read) { + if (block_read) { event_mask &= ~SPICE_WATCH_EVENT_READ; } - red_watch_update_mask(rcc->priv->stream->watch, event_mask); + red_watch_update_mask(stream->watch, event_mask); } void RedChannelClient::block_read() @@ -851,7 +863,7 @@ void RedChannelClient::block_read() return; } priv->block_read = true; - red_channel_client_watch_update_mask(this, SPICE_WATCH_EVENT_WRITE); + priv->watch_update_mask(SPICE_WATCH_EVENT_WRITE); } void RedChannelClient::unblock_read() @@ -860,25 +872,25 @@ void RedChannelClient::unblock_read() return; } priv->block_read = false; - red_channel_client_watch_update_mask(this, SPICE_WATCH_EVENT_READ|SPICE_WATCH_EVENT_WRITE); + priv->watch_update_mask(SPICE_WATCH_EVENT_READ|SPICE_WATCH_EVENT_WRITE); } -static void red_channel_client_seamless_migration_done(RedChannelClient *rcc) +void RedChannelClientPrivate::seamless_migration_done() { - rcc->priv->wait_migrate_data = FALSE; + wait_migrate_data = FALSE; - if (red_client_seamless_migration_done_for_channel(rcc->priv->client)) { - red_channel_client_start_ping_timer(rcc, PING_TEST_IDLE_NET_TIMEOUT_MS); - if (rcc->priv->connectivity_monitor.timer) { - red_timer_start(rcc->priv->connectivity_monitor.timer, - rcc->priv->connectivity_monitor.timeout); + if (red_client_seamless_migration_done_for_channel(client)) { + start_ping_timer(PING_TEST_IDLE_NET_TIMEOUT_MS); + if (connectivity_monitor.timer) { + red_timer_start(connectivity_monitor.timer, + connectivity_monitor.timeout); } } } void RedChannelClient::semi_seamless_migration_complete() { - red_channel_client_start_ping_timer(this, PING_TEST_IDLE_NET_TIMEOUT_MS); + priv->start_ping_timer(PING_TEST_IDLE_NET_TIMEOUT_MS); } bool RedChannelClient::is_waiting_for_migrate_data() const @@ -888,7 +900,7 @@ bool RedChannelClient::is_waiting_for_migrate_data() const void RedChannelClient::default_migrate(RedChannelClient *rcc) { - red_channel_client_cancel_ping_timer(rcc); + rcc->priv->cancel_ping_timer(); red_timer_remove(rcc->priv->latency_monitor.timer); rcc->priv->latency_monitor.timer = NULL; @@ -907,10 +919,10 @@ void RedChannelClient::shutdown() } } -static void red_channel_client_handle_outgoing(RedChannelClient *rcc) +void RedChannelClient::handle_outgoing() { - RedStream *stream = rcc->priv->stream; - OutgoingMessageBuffer *buffer = &rcc->priv->outgoing; + RedStream *stream = priv->stream; + OutgoingMessageBuffer *buffer = &priv->outgoing; ssize_t n; if (!stream) { @@ -918,7 +930,7 @@ static void red_channel_client_handle_outgoing(RedChannelClient *rcc) } if (buffer->size == 0) { - buffer->size = red_channel_client_get_out_msg_size(rcc); + buffer->size = priv->get_out_msg_size(); if (!buffer->size) { // nothing to be sent return; } @@ -927,35 +939,34 @@ static void red_channel_client_handle_outgoing(RedChannelClient *rcc) for (;;) { struct iovec vec[IOV_MAX]; int vec_size = - red_channel_client_prepare_out_msg(rcc, vec, G_N_ELEMENTS(vec), - buffer->pos); + priv->prepare_out_msg(vec, G_N_ELEMENTS(vec), buffer->pos); n = red_stream_writev(stream, vec, vec_size); if (n == -1) { switch (errno) { case EAGAIN: - red_channel_client_set_blocked(rcc); + priv->set_blocked(); break; case EINTR: continue; case EPIPE: - rcc->disconnect(); + disconnect(); break; default: - red_channel_warning(rcc->get_channel(), "%s", strerror(errno)); - rcc->disconnect(); + red_channel_warning(get_channel(), "%s", strerror(errno)); + disconnect(); break; } return; } buffer->pos += n; - red_channel_client_data_sent(rcc, n); + priv->data_sent(n); if (buffer->pos == buffer->size) { // finished writing data /* reset buffer before calling on_msg_done, since it - * can trigger another call to red_channel_client_handle_outgoing (when + * can trigger another call to RedChannelClient::handle_outgoing (when * switching from the urgent marshaller to the main one */ buffer->pos = 0; buffer->size = 0; - red_channel_client_msg_sent(rcc); + msg_sent(); return; } } @@ -995,11 +1006,11 @@ static int red_peer_receive(RedStream *stream, uint8_t *buf, uint32_t size) return pos - buf; } -static uint8_t *red_channel_client_parse(RedChannelClient *rcc, uint8_t *message, size_t message_size, - uint16_t message_type, - size_t *size_out, message_destructor_t *free_message) +uint8_t *RedChannelClient::parse(uint8_t *message, size_t message_size, + uint16_t message_type, + size_t *size_out, message_destructor_t *free_message) { - RedChannel *channel = rcc->get_channel(); + RedChannel *channel = get_channel(); RedChannelClass *klass = RED_CHANNEL_GET_CLASS(channel); return klass->parser(message, message + message_size, message_type, @@ -1010,10 +1021,10 @@ static uint8_t *red_channel_client_parse(RedChannelClient *rcc, uint8_t *message // does many calls to red_peer_receive and through it cb_read, and thus avoids pointer // arithmetic for the case where a single cb_read could return multiple messages. But // this is suboptimal potentially. Profile and consider fixing. -static void red_channel_client_handle_incoming(RedChannelClient *rcc) +void RedChannelClient::handle_incoming() { - RedStream *stream = rcc->priv->stream; - IncomingMessageBuffer *buffer = &rcc->priv->incoming; + RedStream *stream = priv->stream; + IncomingMessageBuffer *buffer = &priv->incoming; int bytes_read; uint16_t msg_type; uint32_t msg_size; @@ -1029,7 +1040,7 @@ static void red_channel_client_handle_incoming(RedChannelClient *rcc) uint8_t *parsed; size_t parsed_size; message_destructor_t parsed_free = NULL; - RedChannel *channel = rcc->get_channel(); + RedChannel *channel = get_channel(); RedChannelClass *klass = RED_CHANNEL_GET_CLASS(channel); if (buffer->header_pos < buffer->header.header_size) { @@ -1037,10 +1048,10 @@ static void red_channel_client_handle_incoming(RedChannelClient *rcc) buffer->header.data + buffer->header_pos, buffer->header.header_size - buffer->header_pos); if (bytes_read == -1) { - rcc->disconnect(); + disconnect(); return; } - red_channel_client_data_read(rcc, bytes_read); + priv->data_read(bytes_read); buffer->header_pos += bytes_read; if (buffer->header_pos != buffer->header.header_size) { @@ -1052,15 +1063,15 @@ static void red_channel_client_handle_incoming(RedChannelClient *rcc) msg_type = buffer->header.get_msg_type(&buffer->header); if (buffer->msg_pos < msg_size) { if (!buffer->msg) { - buffer->msg = rcc->alloc_recv_buf(msg_type, msg_size); - if (buffer->msg == NULL && rcc->priv->block_read) { + buffer->msg = alloc_recv_buf(msg_type, msg_size); + if (buffer->msg == NULL && priv->block_read) { // if we are blocked by flow control just return, message will be read // when data will be available return; } if (buffer->msg == NULL) { red_channel_warning(channel, "ERROR: channel refused to allocate buffer."); - rcc->disconnect(); + disconnect(); return; } } @@ -1069,41 +1080,39 @@ static void red_channel_client_handle_incoming(RedChannelClient *rcc) buffer->msg + buffer->msg_pos, msg_size - buffer->msg_pos); if (bytes_read == -1) { - rcc->release_recv_buf(msg_type, msg_size, buffer->msg); + release_recv_buf(msg_type, msg_size, buffer->msg); buffer->msg = NULL; - rcc->disconnect(); + disconnect(); return; } - red_channel_client_data_read(rcc, bytes_read); + priv->data_read(bytes_read); buffer->msg_pos += bytes_read; if (buffer->msg_pos != msg_size) { return; } } - parsed = red_channel_client_parse(rcc, - buffer->msg, msg_size, - msg_type, - &parsed_size, &parsed_free); + parsed = parse(buffer->msg, msg_size, + msg_type, &parsed_size, &parsed_free); if (parsed == NULL) { red_channel_warning(channel, "failed to parse message type %d", msg_type); - rcc->release_recv_buf(msg_type, msg_size, buffer->msg); + release_recv_buf(msg_type, msg_size, buffer->msg); buffer->msg = NULL; - rcc->disconnect(); + disconnect(); return; } - ret_handle = klass->handle_message(rcc, msg_type, + ret_handle = klass->handle_message(this, msg_type, parsed_size, parsed); if (parsed_free != NULL) { parsed_free(parsed); } buffer->msg_pos = 0; - rcc->release_recv_buf(msg_type, msg_size, buffer->msg); + release_recv_buf(msg_type, msg_size, buffer->msg); buffer->msg = NULL; buffer->header_pos = 0; if (!ret_handle) { - rcc->disconnect(); + disconnect(); return; } } @@ -1112,24 +1121,23 @@ static void red_channel_client_handle_incoming(RedChannelClient *rcc) void RedChannelClient::receive() { ref(); - red_channel_client_handle_incoming(this); + handle_incoming(); unref(); } void RedChannelClient::send() { ref(); - red_channel_client_handle_outgoing(this); + handle_outgoing(); unref(); } -static inline RedPipeItem *red_channel_client_pipe_item_get(RedChannelClient *rcc) +inline RedPipeItem *RedChannelClientPrivate::pipe_item_get() { - if (!rcc || rcc->is_blocked() - || red_channel_client_waiting_for_ack(rcc)) { + if (send_data.blocked || waiting_for_ack()) { return NULL; } - return (RedPipeItem*) g_queue_pop_tail(&rcc->priv->pipe); + return (RedPipeItem*) g_queue_pop_tail(&pipe); } void RedChannelClient::push() @@ -1147,13 +1155,13 @@ void RedChannelClient::push() } if (!no_item_being_sent() && !is_blocked()) { - red_channel_client_set_blocked(this); + priv->set_blocked(); red_channel_warning(get_channel(), "ERROR: an item waiting to be sent and not blocked"); } - while ((pipe_item = red_channel_client_pipe_item_get(this))) { - red_channel_client_send_item(this, pipe_item); + while ((pipe_item = priv->pipe_item_get())) { + send_item(pipe_item); } /* prepare_pipe_add() will reenable WRITE events when the priv->pipe is empty * ack_zero_messages_window() will reenable WRITE events @@ -1163,8 +1171,9 @@ void RedChannelClient::push() * are waiting for the ack consuming CPU in a tight loop */ if ((no_item_being_sent() && g_queue_is_empty(&priv->pipe)) || - red_channel_client_waiting_for_ack(this)) { - red_channel_client_watch_update_mask(this, SPICE_WATCH_EVENT_READ); + priv->waiting_for_ack()) { + priv->watch_update_mask(SPICE_WATCH_EVENT_READ); + /* channel has no pending data to send so now we can flush data in * order to avoid data stall into buffers in case of manual * flushing @@ -1192,31 +1201,31 @@ void RedChannelClient::init_outgoing_messages_window() push(); } -static void red_channel_client_handle_pong(RedChannelClient *rcc, SpiceMsgPing *ping) +void RedChannelClientPrivate::handle_pong(SpiceMsgPing *ping) { uint64_t now; /* ignoring unexpected pongs, or post-migration pongs for pings that * started just before migration */ - if (ping->id != rcc->priv->latency_monitor.id) { + if (ping->id != latency_monitor.id) { spice_warning("ping-id (%u)!= pong-id %u", - rcc->priv->latency_monitor.id, ping->id); + latency_monitor.id, ping->id); return; } now = spice_get_monotonic_time_ns(); - if (rcc->priv->latency_monitor.state == PING_STATE_WARMUP) { - rcc->priv->latency_monitor.state = PING_STATE_LATENCY; + if (latency_monitor.state == PING_STATE_WARMUP) { + latency_monitor.state = PING_STATE_LATENCY; return; - } else if (rcc->priv->latency_monitor.state != PING_STATE_LATENCY) { + } else if (latency_monitor.state != PING_STATE_LATENCY) { spice_warning("unexpected"); return; } /* set TCP_NODELAY=0, in case we reverted it for the test*/ - if (!rcc->priv->latency_monitor.tcp_nodelay) { - red_stream_set_no_delay(rcc->priv->stream, FALSE); + if (!latency_monitor.tcp_nodelay) { + red_stream_set_no_delay(stream, FALSE); } /* @@ -1225,23 +1234,23 @@ static void red_channel_client_handle_pong(RedChannelClient *rcc, SpiceMsgPing * * threads or processes that are utilizing the network. We update the roundtrip * measurement with the minimal value we encountered till now. */ - if (rcc->priv->latency_monitor.roundtrip < 0 || - now - ping->timestamp < rcc->priv->latency_monitor.roundtrip) { - rcc->priv->latency_monitor.roundtrip = now - ping->timestamp; - spice_debug("update roundtrip %.2f(ms)", ((double)rcc->priv->latency_monitor.roundtrip)/NSEC_PER_MILLISEC); + if (latency_monitor.roundtrip < 0 || + now - ping->timestamp < latency_monitor.roundtrip) { + latency_monitor.roundtrip = now - ping->timestamp; + spice_debug("update roundtrip %.2f(ms)", ((double)latency_monitor.roundtrip)/NSEC_PER_MILLISEC); } - rcc->priv->latency_monitor.last_pong_time = now; - rcc->priv->latency_monitor.state = PING_STATE_NONE; - red_channel_client_start_ping_timer(rcc, rcc->priv->latency_monitor.timeout); + latency_monitor.last_pong_time = now; + latency_monitor.state = PING_STATE_NONE; + start_ping_timer(latency_monitor.timeout); } -static void red_channel_client_handle_migrate_flush_mark(RedChannelClient *rcc) +void RedChannelClient::handle_migrate_flush_mark() { - RedChannel *channel = rcc->get_channel(); + RedChannel *channel = get_channel(); RedChannelClass *klass = RED_CHANNEL_GET_CLASS(channel); if (klass->handle_migrate_flush_mark) { - klass->handle_migrate_flush_mark(rcc); + klass->handle_migrate_flush_mark(this); } } @@ -1252,31 +1261,28 @@ static void red_channel_client_handle_migrate_flush_mark(RedChannelClient *rcc) // 3) source migrates to target // 4) target sends data to all // So need to make all the handlers work with per channel/client data (what data exactly?) -static void red_channel_client_handle_migrate_data(RedChannelClient *rcc, - uint32_t size, - void *message) +void RedChannelClient::handle_migrate_data(uint32_t size, void *message) { - RedChannel *channel = rcc->get_channel(); + RedChannel *channel = get_channel(); RedChannelClass *klass = RED_CHANNEL_GET_CLASS(channel); - red_channel_debug(channel, "rcc %p size %u", rcc, size); + red_channel_debug(channel, "rcc %p size %u", this, size); if (!klass->handle_migrate_data) { return; } - if (!rcc->is_waiting_for_migrate_data()) { - spice_channel_client_error(rcc, "unexpected"); + if (!is_waiting_for_migrate_data()) { + spice_channel_client_error(this, "unexpected"); return; } if (klass->handle_migrate_data_get_serial) { - red_channel_client_set_message_serial(rcc, - klass->handle_migrate_data_get_serial(rcc, size, message)); + priv->set_message_serial(klass->handle_migrate_data_get_serial(this, size, message)); } - if (!klass->handle_migrate_data(rcc, size, message)) { - spice_channel_client_error(rcc, "handle_migrate_data failed"); + if (!klass->handle_migrate_data(this, size, message)) { + spice_channel_client_error(this, "handle_migrate_data failed"); return; } - red_channel_client_seamless_migration_done(rcc); + priv->seamless_migration_done(); } @@ -1290,8 +1296,7 @@ bool RedChannelClient::handle_message(RedChannelClient *rcc, uint16_t type, case SPICE_MSGC_ACK: if (rcc->priv->ack_data.client_generation == rcc->priv->ack_data.generation) { rcc->priv->ack_data.messages_window -= rcc->priv->ack_data.client_window; - red_channel_client_watch_update_mask(rcc, - SPICE_WATCH_EVENT_READ|SPICE_WATCH_EVENT_WRITE); + rcc->priv->watch_update_mask(SPICE_WATCH_EVENT_READ|SPICE_WATCH_EVENT_WRITE); rcc->push(); } break; @@ -1302,14 +1307,14 @@ bool RedChannelClient::handle_message(RedChannelClient *rcc, uint16_t type, spice_error("unexpected flush mark"); return FALSE; } - red_channel_client_handle_migrate_flush_mark(rcc); + rcc->handle_migrate_flush_mark(); rcc->priv->wait_migrate_flush_mark = FALSE; break; case SPICE_MSGC_MIGRATE_DATA: - red_channel_client_handle_migrate_data(rcc, size, message); + rcc->handle_migrate_data(size, message); break; case SPICE_MSGC_PONG: - red_channel_client_handle_pong(rcc, (SpiceMsgPing*) message); + rcc->priv->handle_pong((SpiceMsgPing*) message); break; default: red_channel_warning(rcc->get_channel(), "invalid message type %u", @@ -1339,7 +1344,7 @@ void RedChannelClient::begin_send_message() stat_inc_counter(priv->out_messages, 1); /* canceling the latency test timer till the nework is idle */ - red_channel_client_cancel_ping_timer(this); + priv->cancel_ping_timer(); spice_marshaller_flush(m); priv->send_data.size = spice_marshaller_get_total_size(m); @@ -1360,7 +1365,7 @@ SpiceMarshaller *RedChannelClient::switch_to_urgent_sender() priv->send_data.main.header_data = priv->send_data.header.data; priv->send_data.marshaller = priv->send_data.urgent.marshaller; - red_channel_client_reset_send_data(this); + priv->reset_send_data(); return priv->send_data.marshaller; } @@ -1369,30 +1374,29 @@ uint64_t RedChannelClient::get_message_serial() const return priv->send_data.last_sent_serial + 1; } -static void red_channel_client_set_message_serial(RedChannelClient *rcc, uint64_t serial) +inline void RedChannelClientPrivate::set_message_serial(uint64_t serial) { - rcc->priv->send_data.last_sent_serial = serial - 1; + send_data.last_sent_serial = serial - 1; } -static inline gboolean prepare_pipe_add(RedChannelClient *rcc, RedPipeItem *item) +inline bool RedChannelClient::prepare_pipe_add(RedPipeItem *item) { - spice_assert(rcc && item); - if (SPICE_UNLIKELY(!rcc->is_connected())) { - spice_debug("rcc is disconnected %p", rcc); + spice_assert(item); + if (SPICE_UNLIKELY(!is_connected())) { + spice_debug("rcc is disconnected %p", this); red_pipe_item_unref(item); - return FALSE; + return false; } - if (g_queue_is_empty(&rcc->priv->pipe)) { - red_channel_client_watch_update_mask(rcc, - SPICE_WATCH_EVENT_READ | SPICE_WATCH_EVENT_WRITE); + if (g_queue_is_empty(&priv->pipe)) { + priv->watch_update_mask(SPICE_WATCH_EVENT_READ | SPICE_WATCH_EVENT_WRITE); } - return TRUE; + return true; } void RedChannelClient::pipe_add(RedPipeItem *item) { - if (!prepare_pipe_add(this, item)) { + if (!prepare_pipe_add(item)) { return; } g_queue_push_head(&priv->pipe, item); @@ -1408,24 +1412,22 @@ void RedChannelClient::pipe_add_after_pos(RedPipeItem *item, GList *pipe_item_pos) { spice_assert(pipe_item_pos); - if (!prepare_pipe_add(this, item)) { + if (!prepare_pipe_add(item)) { return; } g_queue_insert_after(&priv->pipe, pipe_item_pos, item); } -static void -red_channel_client_pipe_add_before_pos(RedChannelClient *rcc, - RedPipeItem *item, - GList *pipe_item_pos) +void +RedChannelClient::pipe_add_before_pos(RedPipeItem *item, GList *pipe_item_pos) { spice_assert(pipe_item_pos); - if (!prepare_pipe_add(rcc, item)) { + if (!prepare_pipe_add(item)) { return; } - g_queue_insert_before(&rcc->priv->pipe, pipe_item_pos, item); + g_queue_insert_before(&priv->pipe, pipe_item_pos, item); } void RedChannelClient::pipe_add_after(RedPipeItem *item, RedPipeItem *pos) @@ -1446,7 +1448,7 @@ int RedChannelClient::pipe_item_is_linked(RedPipeItem *item) void RedChannelClient::pipe_add_tail(RedPipeItem *item) { - if (!prepare_pipe_add(this, item)) { + if (!prepare_pipe_add(item)) { return; } g_queue_push_tail(&priv->pipe, item); @@ -1500,30 +1502,29 @@ bool RedChannelClient::is_connected() const && (g_list_find(priv->channel->get_clients(), this) != NULL); } -static void red_channel_client_clear_sent_item(RedChannelClient *rcc) +void RedChannelClientPrivate::clear_sent_item() { - rcc->priv->send_data.blocked = FALSE; - rcc->priv->send_data.size = 0; - spice_marshaller_reset(rcc->priv->send_data.marshaller); + send_data.blocked = FALSE; + send_data.size = 0; + spice_marshaller_reset(send_data.marshaller); } // TODO: again - what is the context exactly? this happens in channel disconnect. but our // current red_channel_shutdown also closes the socket - is there a socket to close? // are we reading from an fd here? arghh -static void red_channel_client_pipe_clear(RedChannelClient *rcc) +void RedChannelClientPrivate::pipe_clear() { RedPipeItem *item; - red_channel_client_clear_sent_item(rcc); - while ((item = (RedPipeItem*) g_queue_pop_head(&rcc->priv->pipe)) != NULL) { + clear_sent_item(); + while ((item = (RedPipeItem*) g_queue_pop_head(&pipe)) != NULL) { red_pipe_item_unref(item); } } void RedChannelClient::ack_zero_messages_window() { - red_channel_client_watch_update_mask(this, - SPICE_WATCH_EVENT_READ|SPICE_WATCH_EVENT_WRITE); + priv->watch_update_mask(SPICE_WATCH_EVENT_READ|SPICE_WATCH_EVENT_WRITE); priv->ack_data.messages_window = 0; } @@ -1544,7 +1545,7 @@ void RedChannelClient::disconnect() if (!is_connected()) { return; } - red_channel_client_pipe_clear(this); + priv->pipe_clear(); shutdown(); @@ -1611,7 +1612,7 @@ bool RedChannelClient::wait_pipe_item_sent(GList *item_pos, int64_t timeout) red_pipe_item_init(&mark_item->base, RED_PIPE_ITEM_TYPE_MARKER); mark_item->item_sent = false; red_pipe_item_ref(&mark_item->base); - red_channel_client_pipe_add_before_pos(this, &mark_item->base, item_pos); + pipe_add_before_pos(&mark_item->base, item_pos); for (;;) { receive(); @@ -1672,7 +1673,7 @@ bool RedChannelClient::no_item_being_sent() const void RedChannelClient::pipe_remove_and_release(RedPipeItem *item) { - if (red_channel_client_pipe_remove(this, item)) { + if (priv->pipe_remove(item)) { red_pipe_item_unref(item); } } diff --git a/server/red-channel-client.h b/server/red-channel-client.h index d38cad60..ebc48cfd 100644 --- a/server/red-channel-client.h +++ b/server/red-channel-client.h @@ -46,8 +46,6 @@ public: bool monitor_latency=false); virtual bool init(); - RedChannelClientPrivate *priv = nullptr; - bool is_connected() const; static void default_migrate(RedChannelClient *rcc); bool is_waiting_for_migrate_data() const; @@ -161,9 +159,31 @@ public: virtual void on_disconnect() {}; + /* Private functions */ +private: + void send_item(RedPipeItem *item); + void handle_outgoing(); + void handle_incoming(); + void handle_migrate_flush_mark(); + void handle_migrate_data(uint32_t size, void *message); + inline bool prepare_pipe_add(RedPipeItem *item); + void pipe_add_before_pos(RedPipeItem *item, GList *pipe_item_pos); + void send_set_ack(); + void send_migrate(); + void send_empty_msg(RedPipeItem *base); + void msg_sent(); + uint8_t *parse(uint8_t *message, size_t message_size, + uint16_t message_type, + size_t *size_out, message_destructor_t *free_message); + static void ping_timer(void *opaque); + static void connectivity_timer(void *opaque); + void send_ping(); + void push_ping(); + /* Private data */ private: gint _ref = 1; + RedChannelClientPrivate *priv = nullptr; }; #define SPICE_SERVER_ERROR spice_server_error_quark()